In [1]:
import os
from pathlib import Path

### Absolute Paths

In [2]:
# Obtain the path to the base of the working directory
# Base path can be used to point to any file or directory in the parent directory

BASE_DIR = Path().parent.absolute().resolve()
print(BASE_DIR)

C:\Users\SethAntanah\Desktop\Data Engineering\workflow-ochestration


In [3]:
# List the files in the base directory
files_in_base_dir = os.listdir(BASE_DIR)

dir_count = 0
file_count = 0

for path in files_in_base_dir:
    # Check for directories
    if(os.path.isdir(BASE_DIR / path)):
        dir_count += 1
    else:
        file_count += 1
        
print(f'The are {dir_count} directories in the base directory')
print(f'The are {file_count} files in the base directory')

The are 2 directories in the base directory
The are 4 files in the base directory


### File IO

** File Modes: **
* "r": Read mode. Opens a file for reading. (default mode)
* "w": Write mode. Opens a file for writing. Creates a new file if it does not exist or truncates the file if it exists.
* "a": Append mode. Opens a file for appending. Creates a new file if it does not exist.
* "b": Binary mode. Opens a file in binary mode.
* "t": Text mode. Opens a file in text mode. (default mode)
* "+": Open a file for updating (reading and writing).

In [4]:
# Open and read contents of  file

path_to_file = BASE_DIR / 'datasets/plan.txt'
file = open(path_to_file, 'r')
content = file.read()
print(content)

# always remember to close the file
file.close()

(1) The only way to get things done is to plan and get things done
(2) Organization is very key in this aspect.


In [5]:
# Open and read contents of a file one line at a time

path_to_file = BASE_DIR / 'datasets/plan.txt'
file = open(path_to_file, 'r')

for line in file.readlines():
    print(line)

# always remember to close the file
file.close()

(1) The only way to get things done is to plan and get things done

(2) Organization is very key in this aspect.


In [6]:
# Open and Write to the file

path_to_file = BASE_DIR / 'datasets/plan.txt'
file = open(path_to_file, 'w')

text = """(1) The only way to get things done is to plan and get things done"""
file.write(text)

# always remember to close the file
file.close()

In [7]:
# Open and Append text to the file

path_to_file = BASE_DIR / 'datasets/plan.txt'
file = open(path_to_file, 'a')

text = """\n(2) Organization is very key in this aspect."""
file.write(text)

# always remember to close the file
file.close()

In [8]:
# Using with Statement:
# Using a with statement ensures that the file is properly closed after its suite finishes, even if an exception is raised.

path_to_file = BASE_DIR / 'README.md'
with open(path_to_file, "r") as file:
    content = file.read()
    print(content)
    # File is automatically closed here


### Orchestration

The process of automating, coordinating and managing the execution of individual tasks within the workflow across various systems and application in a unified manner.

### Usecases of Workflow and Orchestration

* CI/CD Pipelines
* ETL Pipelines
* Decision Automated

### ETL

* Extract
* Transform 
* Load

### AWS Tools for Workflow and Orchestration

* AWS Step Function / Airflow
* CloudFormation
* Lambda 
* Kubernetes


### Terminologies

* Processes
* * Batch 
* * Realtime


In [9]:
# Read and Write at the same time
path_to_file = BASE_DIR / 'datasets/plan.txt'
with open(path_to_file, "r+") as file:
    content = file.read()  # Read the existing content
    file.write("\nAppending this line.")  # Append new content
    # Alternatively, you can also seek to a specific position and write
    # file.seek(0, 2)  # Move the file pointer to the end
    # file.write("\nAppending this line.")

    file.seek(0)  # Move the file pointer to the beginning (optional)
    updated_content = file.read()  # Read the updated content
    print(updated_content)


(1) The only way to get things done is to plan and get things done
(2) Organization is very key in this aspect.
Appending this line.


### Handling File IO Errors

In [10]:
try:
    path_to_file = BASE_DIR / 'plan2.txt'
    file = open(path_to_file, 'r+')
    content = file.read()
    print(content)
    
except FileExistsError:
    print(f"File '{os.path.basename(path_to_file)}' does not exist")
except FileNotFoundError:
    print(f"File '{os.path.basename(path_to_file)}' not found")
except PermissionError:
    print(f"Permission denied to '{os.path.basename(path_to_file)}'")

File 'plan2.txt' not found


### Downloading and handling Files

In [None]:
# install requirements
! pip install requests

In [11]:
import requests

In [12]:
# URL of text file
file_url = 'https://example-files.online-convert.com/document/txt/example.txt'

try:
    response = requests.get(file_url)
    if response.status_code == 200:
        file_content = response.text
        if(file_content):
            try:
                with open(BASE_DIR / 'datasets/random_text.txt', 'r+') as file:
                    file.write(file_content)
            except FileExistsError:
                print('File does not exist')
            except FileNotFoundError:
                print('File was not found in current directory')
                
                
except ConnectionError as e:
    print('Connection to server could not be extablished', str(e))
except Exception as e:
    print(e)
        

### Read a CSV file

In [None]:
! pip install pandas

In [15]:
import requests
import pandas as pd
from io import StringIO

In [17]:
file_url = 'https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2021-financial-year-provisional/Download-data/annual-enterprise-survey-2021-financial-year-provisional-size-bands-csv.csv'

try:
    response = requests.get(file_url)
    if response.status_code == 200:
        csv_data = StringIO(response.content.decode('utf-8'))
        df = pd.read_csv(csv_data)
        df.to_csv(BASE_DIR / 'datasets/annual-enterprise-survey-2021-financial-year-provisional-size-bands-csv.csv', index=False)
        print(f'File Downloaded.\nFile Size {df.shape}\nFile Description {df.describe}')
except ConnectionError as e:
    print(f'Download Failed \n {str(e)}')

File Downloaded. 
 File Size (17028, 7) 
 File Description <bound method NDFrame.describe of        year industry_code_ANZSIC               industry_name_ANZSIC  \
0      2011                    A  Agriculture, Forestry and Fishing   
1      2011                    A  Agriculture, Forestry and Fishing   
2      2011                    A  Agriculture, Forestry and Fishing   
3      2011                    A  Agriculture, Forestry and Fishing   
4      2011                    A  Agriculture, Forestry and Fishing   
...     ...                  ...                                ...   
17023  2021                  all                     All Industries   
17024  2021                  all                     All Industries   
17025  2021                  all                     All Industries   
17026  2021                  all                     All Industries   
17027  2021                  all                     All Industries   

        rme_size_grp                                  

### Download and Extract Zip files

In [19]:
import io
import requests
import zipfile
import pandas as pd

In [20]:
file_url = 'https://www.stats.govt.nz/assets/Uploads/Business-financial-data/Business-financial-data-December-2023-quarter/Download-data/business-financial-data-december-2023-quarter.zip'

try:
    response = requests.get(file_url)
    
    if response.status_code == 200:
        with zipfile.ZipFile(io.BytesIO(response.content), 'r') as zip_ref:
            print(f'Download Completed')
            
            # List all files in the zip archive
            zip_file_content = zip_ref.namelist()
            
            # Extract files to directory
            print(f'File Extraction Completed')
            zip_ref.extractall(BASE_DIR / 'datasets')
            
            print(f'{str(zip_file_content)}')
          
    if response.status_code == 400:
        print('File cannot be found, check the url') 
              
except ConnectionError:
    print('Cannot connect to sever.')

except Exception as e:
    print(e)
            
            

Download Completed
File Extraction Completed
['business-financial-data-december-2023-quarter-csv.csv']


### Stream Data From a File

In [None]:
# URL of text file
file_url = 'https://example-files.online-convert.com/document/txt/example.txt'
file_content = ''
try:
    response = requests.get(file_url)
    if response.status_code == 200:
        file_content = response.text
except ConnectionError:
    print(f'Could not connect to server. \n {str(e)}')