* ***Importación de librerías y módulos***

In [1]:
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
from zipfile import ZipFile

* ***Datos utilizados:***

In [2]:
def get_data():
    print('Se inicia la desarga de archivos')
    
    #Descarga el archivo fuente desde la nube
    !wget http://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
    
    #Extrae el archivo zip
    with ZipFile('datasource.zip', 'r') as zip:
        zip.extractall('dealership_data')
        
#Establecer las rutas para los archivos de destino:
tmpfile = "dealership_temp.tmp" # store all extracted data
logfile = "dealership_logfile.txt" # all event logs will be stored

targetfile = "dealership_transformed_data.csv" # transformed data is stored

 * ***Proceso de Extracción***
    * ***Extracción de datos:*** 

In [3]:
#CSV Extract Function
def extract_from_csv(file_to_process):
    dataframe = pd.read_csv(file_to_process)
    return dataframe
#JSON Extract Function
def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe
#XML Extract Function
def extract_from_xml(file_to_process):
    dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price','fuel'])
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        dataframe = dataframe.append({"car_model":car_model,"year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel},ignore_index=True)
        return dataframe

 * ***Proceso de Extraccion***
     * ***Llamado de la función de extracción*** 

In [4]:
def extract():
    extracted_data =pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])
    #for csv files
    for csvfile in glob.glob("dealership_data/*.csv"):
        extracted_data = extracted_data.append(extract_from_csv(csvfile),ignore_index=True)
    #for json files
    for jsonfile in glob.glob("dealership_data/*.json"):
        extracted_data = extracted_data.append(extract_from_json(jsonfile),ignore_index=True)
    #for xml files
    for xmlfile in glob.glob("dealership_data/*.xml"):
        extracted_data = extracted_data.append(extract_from_xml(xmlfile),ignore_index=True)
    return extracted_data

 * ***Proceso de Transformación***

In [5]:
def transform(data):
    data['price'] = round(data.price, 2)
    return data

 * ***Proceso de Carga***
     * ***Función de carga***

In [6]:
def load(targetfile,data_to_load):
    data_to_load.to_csv(targetfile)

 * ***Proceso de Carga***
     * ***Función de registro***

In [7]:
def log(message):
    timestamp_format = '%H:%M:%S-%h-%d-%Y'
    #Hour-Minute-Second-MonthName-Day-Year
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open(logfile,"a") as f: f.write(timestamp + ',' + message +'n')

***Ejecución del proceso ETL***

In [12]:
#Llamdo de la funcion get_data()
get_data()

#Registro de que ha iniciado el proceso ETL
log("ETL Job Started")

#Registro que ha iniciado y finalizado el paso de extracción
extracted_data = extract()
log("Extract phase Ended")

#Registro que ha iniciado y finalizado el paso de transformación
log("Transform phase Ended")
transformed_data = transform(extracted_data)

#Registro de que ha iniciado y finalizado la fase de carga
log("Load phase Started")
load(targetfile, transformed_data)
log("Load phase Ended")

#Registro de que finaliza el ciclo ETL
log("ETL Job Ended")