Import necessary libraries for the ETL workflow

In [11]:
from zipfile import ZipFile
import requests
import os
import glob
import pandas as pd
from datetime import datetime
# from urllib.request import urlretrieve


# Extract Step

Acquire the data using an HTTP request. Download the file to a specific directory and then write the data to a zip file

In [12]:
url = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip'

response = requests.get(url)

file_path = os.path.join(os.getcwd(), 'ETL_data')
data_file = os.path.join(file_path, 'etl_data.zip')

with open(data_file, 'wb') as file:
    file.write(response.content)

FileNotFoundError: [Errno 2] No such file or directory: 'c:\\Users\\Olu Edward\\Documents\\Data_Engineering_Certificate\\Python_Project_Data_Engineering\\Project\\ETL_data\\etl_data.zip'

Unzip the zip file using the zipfile library

In [None]:
with ZipFile(data_file, 'r') as zip_file:
    zip_file.extractall(path=file_path)


Creat functions to extract the downloaded datafiles from different file formats to a pandas dataframe

In [None]:
def extract_csv(file_to_process):
    return pd.read_csv(file_to_process)


def extract_json(file_to_process):
    return pd.read_json(file_to_process, lines=True)


def extract_xml(file_to_process):
    dataframe = pd.read_xml(file_to_process, xpath="/data/person")
    return dataframe


Create a function to identify which of the extract functions to call on the basis of the filetype of the data file. To call the relevant function, we write a function 'extract', which uses the glob library to identify the filetype.

In [None]:
def extract(file_path):
    # create an empty data frame to hold extracted data
    extracted_data = pd.DataFrame(columns=['name', 'height', 'weight'])

    # process all csv files
    csv_directory = os.path.join(file_path, "*.csv")
    for csvfile in glob.glob(csv_directory):
        extracted_data = pd.concat([extracted_data, pd.DataFrame(
            extract_csv(csvfile))], ignore_index=True)

    # process all json files
    json_directory = os.path.join(file_path, "*.json")
    for jsonfile in glob.glob(json_directory):
        extracted_data = pd.concat([extracted_data, pd.DataFrame(
            extract_json(jsonfile))], ignore_index=True)

    # process all xml files
    xml_directory = os.path.join(file_path,  "*.xml")
    for xmlfile in glob.glob(xml_directory):
        extracted_data = pd.concat([extracted_data, pd.DataFrame(
            extract_xml(xmlfile))], ignore_index=True)

    return extracted_data


# Transform Step

Create a function to transform the 'height' and 'weight' columns to the correct SI Unit, in this case, from inches and pounds to meters and kilograms.

In [None]:
def transform(data):
    # we convert inches to meters. 1 inch = 0.0254 meters.
    data['height'] = round(data.height*0.0254, 2)

    # we convert pounds to kilograms. 1 pound = 0.45359237 kilos.
    data['weight'] = round(data.weight*0.45359237, 2)

    return data

# Load Step

We load and log the data

In [None]:
# Create a function to load the data into csv format.

def load_data(transformed_data, target_file):
    transformed_data.to_csv(target_file)

In [None]:
log_file = "log_file.txt"
target_file = "transformed_data.csv"

Create a function that logs the start and end time of the ETL operation in a text file.

In [None]:
def log_progress(info):
    time_format = '%Y-%B-%d-%H:%M:%S:%f'
    now = datetime.now()
    timestamp = now.strftime(time_format)
    with open(log_file, 'a') as file:
        file.write(f"{timestamp}, {info} \n")

# Run the whole ETL operation and log the progress of the procedure in the log file

In [None]:
# Log the initialization of the ETL process
log_progress("ETL Operation Started")

# Log the start and completion of the Extraction process
log_progress("Extraction step Started")
extracted_data = extract(file_path)
log_progress("Extract step Ended")

# Log the start and completion of the Transformation process
log_progress("Transform step Started")
transformed_data = transform(extracted_data)
transformed_data
log_progress("Transform step Ended")

# Log the start and completion of the Loading process
log_progress("Load step Started")
load_data(transformed_data, target_file)
log_progress("Load step Ended")

# Log the completion of the ETL process
log_progress("ETL Job Completed")