<a href="https://colab.research.google.com/github/shreya-rawal/notebooks/blob/main/ETL_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import glob                         # this module helps in selecting files
import pandas as pd                 # this module helps in processing CSV files
import xml.etree.ElementTree as ET  # this module helps in processing XML files.
from datetime import datetime

## Download Files

In [2]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip

--2023-08-19 09:23:36--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2707 (2.6K) [application/zip]
Saving to: ‘source.zip’


2023-08-19 09:23:37 (1.55 GB/s) - ‘source.zip’ saved [2707/2707]



## Unzip Files


In [3]:
!unzip source.zip

Archive:  source.zip
  inflating: source3.json            
  inflating: source1.csv             
  inflating: source2.csv             
  inflating: source3.csv             
  inflating: source1.json            
  inflating: source2.json            
  inflating: source1.xml             
  inflating: source2.xml             
  inflating: source3.xml             


## Set Paths

In [4]:
tmpfile    = "temp.tmp"               # file used to store all extracted data
logfile    = "logfile.txt"            # all event logs will be stored in this file
targetfile = "transformed_data.csv"   # file where transformed data is stored

## EXTRACT

CSV Extract Function

In [15]:
def extract_from_csv(file_to_process):
    dataframe = pd.read_csv(file_to_process)
    return dataframe

JSON Extract Function

In [16]:
def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe

XML Extract Function

In [17]:
def extract_from_xml(file_to_process):
    dataframe = pd.DataFrame(columns=["name", "height", "weight"])
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        name = person.find("name").text
        height = float(person.find("height").text)
        weight = float(person.find("weight").text)
        dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
    return dataframe

Extract Function

In [18]:
def extract():
    extracted_data = pd.DataFrame(columns=['name','height','weight']) # create an empty data frame to hold extracted data

    #process all csv files
    for csvfile in glob.glob("*.csv"):
        extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)

    #process all json files
    for jsonfile in glob.glob("*.json"):
        extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)

    #process all xml files
    for xmlfile in glob.glob("*.xml"):
        extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)

    return extracted_data

## Transform

1. Convert height from inches to milimeters
2. Convert weight from pounds to kilograms

In [19]:
def transform(data):
        #Convert height which is in inches to millimeter
        #Convert the datatype of the column into float
        data.height = data.height.astype(float)
        #Convert inches to meters and round off to two decimals(one inch is 0.0254 meters)
        data['height'] = round(data.height * 0.0254,2)

        #Convert weight which is in pounds to kilograms
        #Convert the datatype of the column into float
        data.weight = data.weight.astype(float)
        #Convert pounds to kilograms and round off to two decimals(one pound is 0.45359237 kilograms)
        data['weight'] = round(data.weight * 0.45359237,2)
        return data

## Loading

In [20]:
def load(targetfile, data_to_load):
    data_to_load.to_csv(targetfile)

## Logging

In [21]:
def log(message):
  timestamp_format = '%Y-%h-%d-%H:%M:%S'
  now = datetime.now() #get current time
  timestamp = now.strftime(timestamp_format)
  with open("logfile.txt", "a") as f:
    f.write(timestamp + ',' + message + '\n')

## Running ETL Process



In [22]:
log("ETL Job Started")

In [23]:
log("Extract phase Started")
extracted_data = extract()
log("Extract phase Ended")
extracted_data

  extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
  dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
  dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
  dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
  dataframe = dataframe.append({"name":name, "height":height, "weight":weight}, ignore_index=True)
  extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_i

Unnamed: 0,name,height,weight
0,alex,65.78,112.99
1,ajay,71.52,136.49
2,alice,69.4,153.03
3,ravi,68.22,142.34
4,joe,67.79,144.3
5,alex,65.78,112.99
6,ajay,71.52,136.49
7,alice,69.4,153.03
8,ravi,68.22,142.34
9,joe,67.79,144.3


In [24]:
log("Transform phase Started")
transformed_data = transform(extracted_data)
log("Transform phase Ended")
transformed_data

Unnamed: 0,name,height,weight
0,alex,1.67,51.25
1,ajay,1.82,61.91
2,alice,1.76,69.41
3,ravi,1.73,64.56
4,joe,1.72,65.45
5,alex,1.67,51.25
6,ajay,1.82,61.91
7,alice,1.76,69.41
8,ravi,1.73,64.56
9,joe,1.72,65.45


In [25]:
log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")

In [26]:
log("ETL Job Ended")

## Another ETL Exercise

In [27]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip

--2023-08-19 09:50:40--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4249 (4.1K) [application/zip]
Saving to: ‘datasource.zip’


2023-08-19 09:50:41 (2.04 GB/s) - ‘datasource.zip’ saved [4249/4249]



In [28]:
!unzip datasource.zip -d dealership_data

Archive:  datasource.zip
  inflating: dealership_data/used_car_prices1.csv  
  inflating: dealership_data/used_car_prices2.csv  
  inflating: dealership_data/used_car_prices3.csv  
  inflating: dealership_data/used_car_prices1.json  
  inflating: dealership_data/used_car_prices2.json  
  inflating: dealership_data/used_car_prices3.json  
  inflating: dealership_data/used_car_prices1.xml  
  inflating: dealership_data/used_car_prices2.xml  
  inflating: dealership_data/used_car_prices3.xml  


In [29]:
tmpfile    = "dealership_temp.tmp"               # file used to store all extracted data
logfile    = "dealership_logfile.txt"            # all event logs will be stored in this file
targetfile = "dealership_transformed_data.csv"   # file where transformed data is stored

In [52]:
def extract_from_csv(file_to_process):
    dataframe = pd.read_csv(file_to_process)
    return dataframe

In [53]:
def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe

In [54]:
def extract_from_xml(file_to_process):
    dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True)
    return dataframe


In [55]:
def extract():
    extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel']) # create an empty data frame to hold extracted data

    #process all csv files
    for csvfile in glob.glob("dealership_data/*.csv"):
        extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)

    #process all json files
    for jsonfile in glob.glob("dealership_data/*.json"):
        extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)

    #process all xml files
    for xmlfile in glob.glob("dealership_data/*.xml"):
        extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)

    return extracted_data

In [56]:
#round the price column to 2 decimal places
def transform(dataframe):
    dataframe['price'] = round(dataframe['price'], 2)
    return dataframe

In [57]:
def load(targetfile, dataframe):
    dataframe.to_csv(targetfile)

In [58]:
def log(message):
    timestamp_format = '%H:%M:%S-%h-%d-%Y' #Hour-Minute-Second-MonthName-Day-Year
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open("dealership_logfile.txt","a") as f:
        f.write(timestamp + ',' + message + '\n')

## Run the ETL pipeline

In [59]:
# Log that you have started the ETL process
log("ETL begun")

# Log that you have started the Extract step
log("Extracting")
# Call the Extract function
extracted_data = extract()
# Log that you have completed the Extract step
log("Extract complete")

# Log that you have started the Transform step
log("Transforming")
# Call the Transform function
transformed_data = transform(extracted_data)
# Log that you have completed the Transform step
log("Transform complete")

# Log that you have started the Load step
log("Loading data")
# Call the Load function
load(targetfile, transformed_data)
# Log that you have completed the Load step
log("Load complete")

# Log that you have completed the ETL process
log("ETL complete")

  extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
  extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
  dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True)
  dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True)
  dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True)
  dataframe = dataframe.append({"c