<a href="https://www.kaggle.com/code/suriender/etl-with-python?scriptVersionId=179968355" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# ETL with Python

### Description
<font color="maroon" font-family="Times New Roman">The Aim is to perform ETL (Extract, Transform and Load) on different file formats (json, xml, csv); further, the objectives can be concluded in following points: </font>
1. Creation of directory and code structure (data, code and logs) <font color="blue">[functions: createLogFiles, createDir] </font>
2. [__Extract__]
    * Downloading file from the URL and saving it to respective directory <font color="blue">[function: downloadZIPData] </font>
    * Extraction of data files from compressed (zip) file <font color="blue">[function: extractZIP]</font>
    * Reading different files format and creation of dataframes <font color="blue">[function: readFile]</font>
2. [__Transform__] Performing data transformations
3. [__Load__] Storing data to file(s) <font color="blue">[function: copyDFtoCSV]</font>

### Import Required Libraries

In [None]:
import pandas as pd
import requests, zipfile, io, os
import logging

### Functions block

In [None]:
# Function to create directory
def createDir(file):
    logging.info("Entering createDir")

    try:
        # Create dir/folder at current path
        path = os.path.join(os.getcwd(), file)
    
        # Create dir if it doesn't exist
        if not(os.path.exists(path) and os.path.isdir(path)):
            os.mkdir(path)
            logging.info("Directory '% s' created " + file)

    except Exception as ex:
        logging.error("Exception: Failed to create directory: " + file)

    logging.info("Entering createDir")

# Function to download zip file from the url
def downloadZIPData(url, path_to_save):
    logging.info("Extract:: Entering downloadZIPData")

    try:
        #Create temp dir to save the file
        createDir(path_to_save)
    
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            logging.debug("Extract:: Started downloading ZIP file from "+ url)
            
            logging.info("Extract:: Downloading zip file from " + url)
            with open(os.path.join(path_to_save, 'data.zip'), 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
    
        logging.info("Extract:: Downloaded ZIP file from " + url + " to " + path_to_save + " Successfully!")

    except Exception as ex:
        logging.error("Extract:: Exception: Failed to download zip file")
    
    logging.info("Extract:: Exiting downloadZIPData")
    
    return path_to_save + '/data.zip'

# Function to extract zip file
def extractZIP(filepath, destpath):
    logging.info("Extract:: Entering extractZIP")
    
    # Extract zip file to destination folder
    try:
        with zipfile.ZipFile(filepath) as zf:
            zf.extractall(destpath)
    
        logging.info("Extract:: Extracted " + filepath + " to " + destpath + " successfully!!")

    except Exception as ex:
        logging.error("Extract:: Exception: Failed to extract zip file")

    logging.info("Extract:: Exiting extractZIP")

# Function to read different file format files
def readFile(path, file):
    logging.info("Extract:: Entering readFile")
    file_extension = os.path.splitext(file)[1][1:]
    logging.debug("Extract:: Started Converting file: " + file + " to dataframe.")
    try:
        match (file_extension):
            case 'json':
                return pd.read_json(path + file, lines=True)

            case 'csv':
                return pd.read_csv(path + file)
            
            case 'xml':
                return pd.read_xml(path + file)

        logging.info("Extract:: File: " + file + " converted to dataframe successfully!!")
        
    except Exception as ex:
        logging.error("Extract:: Exception raised while reading file: " + file)
        logging.error(ex)

    logging.info("Extract:: Exiting readFile")

# Function to Load dataframe to CSV file
def copyDFtoCSV(df, dir, filename):

    logging.info("Load:: Entering copyDFtoCSV")
    
    try:
        #Create required dir to save the file
        createDir(dir)
        file = os.getcwd() + "/" + dir + "/" + filename
    
        df.to_csv(file, sep=',', encoding='utf-8', index = False)
        logging.info("Load:: Copied Datframe to " + file + " successfully!!")
        
    except Exception as ex:
        logging.error("Load:: Exception: Failed to copy Dataframe to file")
    
    
    logging.info("Load:: Exiting copyDFtoCSV")

# Create log files under provided path and return file
def createLogFiles(dir):
    logging.info("Entering createLogFiles")

    try:
        # Create directory: Logs 
        createDir(dir)
        
        debugFilepath = os.path.join(os.getcwd(), dir + "/debug.log")
    
        # Create file in append mode, create if file doesn't exist
        debugFile = open(debugFilepath, "a+")    
        
        ## Note: We will close this debug file once whole program completed, else we need to open everytime we write.
        
        logging.info("Created debug file at path: " + debugFilepath + " successfully!!")

    except Exception as ex:
        logging.error("Exception: Failed to create log files")
    
    logging.info("Exiting createLogFiles")
    
    return debugFilepath, debugFile

debugFilepath, debugFile = createLogFiles("Logs")

logging.basicConfig(
        filename = debugFilepath,
        level = logging.DEBUG,
        force = True,
        format='[%(levelname)s] %(asctime)s - %(name)s - %(funcName)s:%(lineno)d - %(message)s',
        datefmt = '%Y-%m-%d %H:%M:%S'
)

### Extract

In [None]:
url = 'https://elasticbeanstalk-us-east-2-340729127361.s3.us-east-2.amazonaws.com/prices.zip'

logging.info("Starting program")

# Downlaod the zip file from the URL
file = downloadZIPData(url, "temp")

# Extract the downloaded zip to 'data' folder
extractZIP(file, "data")

# Get list of files from 'data' dir
all_files = os.listdir('data')

logging.debug("Extract:: List of files extracted: " + str(all_files))

# Convert files data to dataframe and make list of frames
list_of_frames = []
for file in all_files:
    df = readFile("data/", file)
    list_of_frames.append(df)

# Concat all the frames
df = pd.concat(list_of_frames, ignore_index = True)

# Print dataframe
df

### Transform

In [None]:
# Double the price
df['price'] = df['price'].apply(lambda x: x*2)
logging.info("Transform:: Doubled the price!!")

# Round the price to 2 decimal places
df['price'] = df['price'].round(decimals = 2)
logging.info("Transform:: Rounded up the price to two decimal places!!")

#Print the dataframe
df

### Load

In [None]:
# Copy dataframe to output dir
copyDFtoCSV(df, "output", "output.csv")

# Close the log file
debugFile.close()

# Delete the temp folder - Not deleting for this assignment sake, but ideally it should be deleted!
#os.remove("temp")

logging.info("Program Completed Successfully...!!!")