# <ins>"_3M 2024 CBG Data Engineering Internship Take-Home Coding Assessment_"</ins>

- A lightweight simple ETL system to parse files with set expectations, transform as per database expectations and ingest them.
- ETL class is made generic for multiple ETL objects in case of multiprocessing ETL etc (currently just one instance). Same case for SQLConnector class and its objects as well.
- Currently runs to completion but can be enhanced later on as mentioned in future scope section.

In [1]:
# @author: Meghana Hegde
# 02/2024

# bunch of libraries that will come handy
import pandas as pd
import numpy as np
import os
import sys
import sqlite3
from   sqlite3 import Error
from   collections import defaultdict 
import sql_queries

# this is where all data is 
DATA_DIR = os.path.dirname(os.getcwd()) + '/data'

# set this to False if cleaned/processed data need not be saved into files.
PROCESSED_DATA_SAVE = False

PROCESSED_DATA_DIR = os.getcwd() + '/PROCESSED'
if not os.path.exists(PROCESSED_DATA_DIR) and PROCESSED_DATA_SAVE is True:
    os.makedirs(PROCESSED_DATA_DIR)

# set this to False if you need to reduce the output/ETL logging.
DEBUG = False

# set this to False if analysis is not required (prelim analysis dump etc)
ANALYZE = True

# pretty names for logging
DF_MAP_KEY_TO_NAMES = {
    'trans_csv' : 'Transactions Data (CSV)',
    'trans_json': 'Transactions Data (JSON)',
    'trans_txt' : 'Transactions Data  (TEXT)',
    'weather'   : 'Weather Data',
    'location'  : 'Location Data',
    'holiday'   : 'Holiday Data',
    'date'      : 'Date Data (Derived)'
}

# value = [[l1], [l2] [l3]]
# first list is the list of columns on which duplicates/NA values need to be dropped before insertion
# second list is the column mapper of the corresponding tables in sql db.
# third list is the database table name it maps to.
DF_MAP_KEY_TO_FIELDS = {
    'trans'   : [['transaction_id', 'location_id', 'date', 'profit'], ['transaction_id', 'location_id', 'date', 'profit'], ['transactions']],
    'location': [['location_id'], ['location_id', 'elevation', 'population'], ['location']],
    'date'    : [[], ['date', 'day', 'day_of_week', 'month', 'year', 'holiday'], ['date']],
    'weather' : [['location_id','date'], ['date', 'location_id', 'temperature', 'pressure', 'humidity', 'cloudy', 'precipitation'], ['weather']],
}

In [2]:
class ETL:
    """ ETL class for extract, transform, ingest and some analysis.
    """
    def __init__(self, connector):
        """ Initialize a dictionary to keep all dataframes to be processed.
            Initialize sqlite db connection etc
        """
        self.dataFrameMap = {}
        self.sqlConn      = connector
        self.stats        = {'writes': 0, 'reads': 0}
        
    def parseFilesToDf(self):
        """ A parser function that walks over all files in a data directory
            and parses files of interest and stores them in dataframe dict of df's per type of data.
            We are interested in files ending with csv, txt, json. 
            Also, files with transaction, weather, location keywords in filenames
        """
        trans_csv_dfs  = []
        trans_json_dfs = []
        trans_txt_dfs  = []
        weather_dfs    = []
        location_dfs   = []
        holiday_dfs    = []
        
        # lets try to walk over all files of interest.
        try:
            for file in os.listdir(DATA_DIR):
                file_path = DATA_DIR + '/' + file
                
                # parse transactions files.
                if file.startswith('transactions'):
                    
                    # get the extension of the file
                    _, extension = os.path.splitext(file)
                
                    if extension == ".csv":
                        try:
                            # transid, location id have leading 0's - parse as it is as string and not int.
                            df_csv = pd.read_csv(file_path, dtype={'transaction_id': str, 'location_id': str})
                        except Exception as ex:
                            # we should probably log this so that file parsing errors are known
                            continue
                            
                        # storing filename in df temporarily
                        df_csv['file_name'] = file
                        trans_csv_dfs.append(df_csv)
                    
                    elif extension == '.txt':
                        try:
                            df_txt = pd.read_csv(file_path, sep='\t', dtype={'transaction_id': str, 'location_id': str})
                        except Exception as ex:
                            continue
                        df_txt['file_name'] = file
                        trans_txt_dfs.append(df_txt)
                
                    elif extension == '.json':
                        try:
                            df_json = pd.read_json(file_path, dtype={'transaction_id': str, 'location_id': str})
                        except Exception as ex:
                            continue
                        df_json['file_name'] = file
                        trans_json_dfs.append(df_json)
                    else:
                        print("Unsupported file %s" %(file))
                    
                elif file.startswith('location'):
                    # these will always be csv 
                    try:
                        df = pd.read_csv(file_path, dtype={'location_id': str})
                    except Exception as ex:
                        continue
                    df['file_name'] = file
                    location_dfs.append(df)
                
                elif file.startswith('weather'):
                    # these will always be csv
                    try:
                        df = pd.read_csv(file_path, dtype={'location_id': str})
                    except Exception as ex:
                        continue
                    df['file_name'] = file
                    weather_dfs.append(df)
                    
                elif file.startswith('holiday'):
                    try:
                        df = pd.read_csv(file_path)
                    except Exception as ex:
                        continue
                    holiday_dfs.append(df)
                    
            # concat all df's of similar type
            self.dataFrameMap['trans_csv'] = pd.concat(trans_csv_dfs)
            self.dataFrameMap['trans_txt'] = pd.concat(trans_txt_dfs)
            self.dataFrameMap['trans_json'] = pd.concat(trans_json_dfs)
            self.dataFrameMap['weather'] = pd.concat(weather_dfs)
            self.dataFrameMap['location'] = pd.concat(location_dfs)
            self.dataFrameMap['holiday']  = pd.concat(holiday_dfs)
                    
        except Exception as ex:
            print(ex)
            
    def prelimAnalysis(self):
        """ Prints preliminary analysis of a data frame.
        """
        merged_trans_df_list = []
        count = 0
        
        for key, data_frame in self.dataFrameMap.items():
            # text to print
            text = DF_MAP_KEY_TO_NAMES[key]
            print(f"\n\nPreliminary Analysis of {text}")
            print("-----------------------------------------------\n")
            
            try:
                if 'trans' in key:
                    merged_trans_df_list.append(data_frame)
                
                # Top 5 rows to get overview of data
                print(data_frame.head().to_markdown())

                # check information about data whether data is null, how many rows are appended to dataframe
                print("\n=====> Information about data")
                data_frame.info()

                # check how many rows there in original file? how many appended
                print(f"\n=====> There are total {data_frame.shape[0]} rows in {text}")

                # Null check for any rows in dataframe
                null_values = data_frame.columns[data_frame.isnull().any()]
                total_nulls = []


                # check columns which has null values present
                for column in null_values:
                    total_nulls.append(data_frame[column])

                print(f"\n=====> There are {len(total_nulls)} columns which has NULL values")
                
                # Check for duplicate rows
                duplicates = data_frame[data_frame.duplicated()]

                # Display the duplicate rows, if any
                if not duplicates.empty:
                    print("\n=====> Following are Duplicate Rows:")
                    print(duplicates)
                else:
                    print("\n=====> No duplicate rows found.")
                    
            except Exception as ex:
                print(ex)
                continue
        
        try:
            merged_trans_df = pd.concat(merged_trans_df_list)
            location_df = self.dataFrameMap['location']
            weather_df = self.dataFrameMap['weather']

            loc_ids = set(location_df['location_id'])
            trans_loc_ids = set(merged_trans_df['location_id'])

            # this is to find all such locations which dont have transaction data
            missing_ids = loc_ids - trans_loc_ids

            for id in missing_ids:
                print(f"\n\n=====> Location ID {id} has no corresponding transactions data")
        
            # try to find weather data for irrelevant locations (location that dont have trans data)
            # these rows are of no use for our analysis.
            weather_ids = set(weather_df['location_id'])
            for id in [i for i in weather_df['location_id']]:
                if id in list(missing_ids):
                    count += 1
                    
            print(f"\n=====> There are {count} records in weather data for irrelevant locations")

        except Exception as ex:
            print(ex)
                
    def createDateDataFrame(self):
        """ Create a date dateframe to load all values from January 2019 to October 2022 as mentioned in project
            We need to derive date, day, day_of_week, month and year
            To derive holiday column we need to left join with existing date dataframe with 
            holiday_data dataframe to fetch holiday column
        """
        try:
            # Create a date range from Jan 2019 to Oct 2022 (you can change end date and make it current date)
            date_start_end = pd.date_range(start='2019-01-01', end='2022-10-31', freq='D').strftime('%m/%d/%Y')

            # Create a DataFrame with the date range
            date_df = pd.DataFrame({'date': date_start_end})

            date_df['date'] = pd.to_datetime(date_df['date'])

            # Add a new column 'Day' to represent the day of the week
            date_df['day'] = date_df['date'].dt.day
            date_df['day_of_week'] = date_df['date'].dt.dayofweek
            date_df['month'] = date_df['date'].dt.month
            date_df['year'] = date_df['date'].dt.year

            # Fetch Holidays data
            holiday_df = self.dataFrameMap['holiday']
            holiday_df['date'] = pd.to_datetime(holiday_df['date'])

            # Left join of date with holidays_data on date to fetch holiday column
            date_df = date_df.merge(holiday_df, how='left', on='date')

            # If Holiday then True else False
            date_df['holiday'] = date_df['holiday'].notna()

            # update the dataFrameMap with these values
            self.dataFrameMap['date'] = date_df
            
        except Exception as ex:
            print(ex)
        
    def transformData(self):
        """ Does some data transformations based on dataframe type and fields to suit database needs
            - Deals with duplicate rows, rows with N/A or NULL values
            - Generalize data format of different columns as per database expectations.
            - data cleaning for profit values in transactions etc
        """
        for key, data_frame in self.dataFrameMap.items():
            if key in ['holiday']:
                # these wont go into DB
                continue
                
            # this is the key to index the KEY TO FIELDS dict
            subset_key = key.split("_")[0]
            
            # pretty text for print's
            text = DF_MAP_KEY_TO_NAMES[key]
            
            # columns in DF
            df_columns = data_frame.columns.values.tolist()
            
            # columns in DB
            db_columns = DF_MAP_KEY_TO_FIELDS[subset_key][1]
            
            # Retrieve rows before dropping
            nof_rows_before = data_frame.shape[0]
            if DEBUG:
                print(f"Total number of records before transformation = {nof_rows_before} for {text}")

            # Remove rows with null data based on a subset of columns
            if len(DF_MAP_KEY_TO_FIELDS[subset_key][0]) > 0:
                # consider a subset of columns - if they have null values drop it 
                data_frame = data_frame.dropna(subset=DF_MAP_KEY_TO_FIELDS[subset_key][0])
                nof_rows_after = data_frame.shape[0]
                if DEBUG:
                    print(f"No of null rows dropped = {nof_rows_before - nof_rows_after}")

            # Remove duplicates (This is a dimension tables)
            nof_rows_before = data_frame.shape[0]
            if len(DF_MAP_KEY_TO_FIELDS[subset_key][0]) > 0:
                data_frame = data_frame.drop_duplicates(subset=DF_MAP_KEY_TO_FIELDS[subset_key][0])
                nof_rows_after = data_frame.shape[0]
                if DEBUG:
                    print(f"No of duplicate rows dropped = {nof_rows_before - nof_rows_after}")
                
            # some transformations as per db needs
            if 'date' in df_columns:
                # standardize different formats that we have into one format
                data_frame['date'] = pd.to_datetime(data_frame['date']).dt.strftime('%Y-%m-%d')
            if 'temperature' in df_columns:
                # keep upto 2 decimals
                data_frame['temperature'] = data_frame['temperature'].round(2)
            if 'pressure' in df_columns:
                # keep upto 2 decimals
                data_frame['pressure'] = data_frame['pressure'].round(2)
            if 'profit' in df_columns:
                # strip leading $ character, handle negative profit values in different format etc.
                if data_frame['profit'].dtype == 'O':
                    data_frame['profit'] = data_frame['profit'].str.replace('[$, 0]', '', regex=True)
                    data_frame['profit'] = pd.to_numeric(data_frame['profit'].str.replace('[(]', '-', regex=True), errors='coerce')
            
            # Arrange column names as it is in ER diagram for DB insertion
            self.dataFrameMap[key] = data_frame[db_columns]
            
            if DEBUG:
                nof_rows_after = self.dataFrameMap[key].shape[0]
                print(f"Total number of records after transformation = {nof_rows_after} for {text}")
                print(f"Summary of {text}")
                print(self.dataFrameMap[key])
                self.dataFrameMap[key].info()
                
    def saveProcessedData(self):
        """ Optionally save the processed data in cleaned files
            Every iteration the files are truncated first and written
        """
        
        merged_df_list = []
        
        if PROCESSED_DATA_SAVE is False:
            return
        
        for key in ['trans_json', 'trans_csv', 'trans_txt']:
            # gather all transactions df's from 3 systems for transactions
            merged_df_list.append(self.dataFrameMap[key])
            
        merged_df = pd.concat(merged_df_list)
    
        # flush all the dataframes into files (dont write column names to file)
        try:
            merged_df.to_csv(PROCESSED_DATA_DIR + '/transactions.csv', header=False)
            self.dataFrameMap['location'].to_csv(PROCESSED_DATA_DIR + '/location.csv', header=False)
            self.dataFrameMap['weather'].to_csv(PROCESSED_DATA_DIR + '/weather.csv', header=False)
            self.dataFrameMap['date'].to_csv(PROCESSED_DATA_DIR + '/date.csv', header=False)
        except Exception as ex:
            print(ex)
            
    def ingestData(self):
        """ Ingests the data in database
        """
        
        for key, data_frame in self.dataFrameMap.items():
            try:
                if key in ['holiday']:
                    # these wont go into DB
                    continue

                # this is the key to index the KEY TO FIELDS dict
                subset_key = key.split("_")[0]

                # pretty text
                text = DF_MAP_KEY_TO_NAMES[key]

                # table name
                table = DF_MAP_KEY_TO_FIELDS[subset_key][2][0]
                
                # flush the dataframe to db
                nof_writes = self.sqlConn.executeWriteQueries(data_frame, table)
                
                self.stats['writes'] += nof_writes
                print(f"Wrote {nof_writes} rows to table {table}")
            except Exception as ex:
                print(ex)
                
    def readData(self, query, df_columns):
        """ Read query from etl object and print in dataframe
        """
        
        try:
            rows = self.sqlConn.executeReadQueries(query)
            df = pd.DataFrame(rows, columns=df_columns)
            print(df.to_markdown())
        except Exception as ex:
            print(ex)

In [3]:
class SQLConnector:
    """ This class is to interact with sqlite3 database 
    """
    def __init__(self):
        """ Initialize all required variables
        """
        # lets keep it private for now
        self._connection = None
        self._cursor = None
        self._dbName = 'tubing'
        
    def __del__(self):
        """ Destructor function - destructor will close the dangling connections on destroy
        """
        if self._cursor:
            self._cursor.close()
            self._cursor = None
        if self._connection:
            self._connection.close()
            self._connection = None
            
    def connectToDB(self):
        """ Connects to database
        """
        if self._connection is None:
            try:
                self._connection = sqlite3.connect(self._dbName)
                self._cursor = self._connection.cursor()
                return True
            except Error as e:
                print(e)
                
                
        return False
    
    def disconnectFromDB(self):
        """ Disconnects from database
        """
        if self._cursor:
            self._cursor.close()
            self._cursor = None
        if self._connection:
            self._connection.close()
            self._connection = None
    
    def createTables(self):
        """ Create default tables 
        """
        if self._connection is None or self._cursor is None:
            return False
        
        try:
            
            # this is expensive and should be avoided; but since we can 
            # re-run the playbook, its best to drop the old tables 
            # and ensure unique constraint doesnt reject data
            self._cursor.execute(sql_queries.DROP_WEATHER_TABLE)
            self._cursor.execute(sql_queries.DROP_LOCATION_TABLE)
            self._cursor.execute(sql_queries.DROP_DATE_TABLE)
            self._cursor.execute(sql_queries.DROP_TRANSACTIONS_TABLE)
            
            # create tables
            self._cursor.execute(sql_queries.CREATE_DATE_TABLE)
            self._cursor.execute(sql_queries.CREATE_LOCATION_TABLE)
            self._cursor.execute(sql_queries.CREATE_WEATHER_TABLE)
            self._cursor.execute(sql_queries.CREATE_TRANSACTIONS_TABLE)
            
        except Exception as ex:
            print(ex)
            
        return True
            
            
    def getConnectionHandle(self):
        """ Gets the SQL connector object
        """
        return self._connection
    
    def executeWriteQueries(self, df, table):
        """ Flushes the dataframe to database tables.
        """
        if self._connection is None or self._cursor is None:
            return False
                
        try:
            # dont drop the table with multiple transactions data writes - hence if_exists is append
            # chunksize is set to 1000 to stagger the batch flush
            # Do not attempt to write dataframe index to db - so skip index
            nof_writes = df.to_sql(table, self._connection, if_exists='append', index=False, chunksize=1000)
            self._connection.commit()
            return nof_writes
        except Exception as ex:
            print(ex)
            
        return 0
        
    def executeReadQueries(self, query):
        """ Executes read queries
        """
        rows = []
        if self._connection is None:
            return rows
        
        try:
            cursor = self._connection.cursor()
            cursor.execute(query)
            rows = cursor.fetchall()
        except Exception as ex:
            print(ex)
            
        return rows
            

# <ins>SQL Connector object</ins>

- this will initialize tubing database
- it will also execute DDL queries for different tables
- for the sake of ease of use, this drops tables if they are already there to allow multiple rules of playbook.

In [4]:
# get a connection handle object
connector = SQLConnector()

In [5]:
# connect to database
connector.connectToDB()

# create tables
connector.createTables()

True

# <ins>ETL object</ins>

In [6]:
# ETL object
etl = ETL(connector)

In [7]:
# fire ETL - parse the files
etl.parseFilesToDf()

In [8]:
# if prelim analysis is required
if ANALYZE:    
    print("-----------------------------------------------\n")
    etl.prelimAnalysis()

-----------------------------------------------



Preliminary Analysis of Transactions Data (CSV)
-----------------------------------------------

|    |   location_id | date       |   transaction_id | profit   | file_name                    |
|---:|--------------:|:-----------|-----------------:|:---------|:-----------------------------|
|  0 |           008 | 01/02/2019 |              001 | $26.89   | transactions_008_system1.csv |
|  1 |           008 | 01/02/2019 |              002 | $24.74   | transactions_008_system1.csv |
|  2 |           008 | 01/02/2019 |              003 | $31.36   | transactions_008_system1.csv |
|  3 |           008 | 01/02/2019 |              004 | $27.06   | transactions_008_system1.csv |
|  4 |           008 | 01/02/2019 |              005 | $29.51   | transactions_008_system1.csv |

=====> Information about data
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8506 entries, 0 to 8505
Data columns (total 5 columns):
 #   Column          Non-Null Count 

In [9]:
# create date dataframe
# this creates a dataframe with all the dates and their corresponding days
# etc which will be joined with holiday data
etl.createDateDataFrame()

In [10]:
# transform the parsed data for DB ingestion
etl.transformData()

# this is optional controlled by a variable
etl.saveProcessedData()

# <ins>Load the transformed data</ins>

In [11]:
etl.ingestData()

Wrote 8506 rows to table transactions
Wrote 42151 rows to table transactions
Wrote 34663 rows to table transactions
Wrote 17278 rows to table weather
Wrote 13 rows to table location
Wrote 1400 rows to table date


In [12]:
if DEBUG:
    print(f"ETL stats: Writes: {etl.stats['writes']}, Reads: {etl.stats['reads']}")

# <ins>Run the query</ins>

In [13]:
# Run the query
etl.readData(sql_queries.READ_QUERY, sql_queries.READ_QUERY_COLUMNS)

|    |   location_id | date       |   temperature |   daily_sum_proft | income_stmt   |   percent_change |    roll |
|---:|--------------:|:-----------|--------------:|------------------:|:--------------|-----------------:|--------:|
|  0 |           001 | 2019-01-02 |         16.88 |            430.96 | positive      |           nan    |  430.96 |
|  1 |           001 | 2019-01-03 |         17.24 |            416.9  | positive      |            -3.26 |  847.86 |
|  2 |           001 | 2019-01-04 |          9.32 |            280.16 | positive      |           -32.8  | 1128.02 |
|  3 |           001 | 2019-01-05 |         20.12 |            415.98 | positive      |            48.48 | 1544    |
|  4 |           001 | 2019-01-06 |         17.24 |            412.2  | positive      |            -0.91 | 1956.2  |
|  5 |           001 | 2019-01-07 |         14.9  |            165.01 | positive      |           -59.97 | 2121.21 |
|  6 |           001 | 2019-01-08 |          0    |            1

In [14]:
# cleanup
etl.sqlConn.disconnectFromDB()