# Building End To End ETL Pipeline

In [11]:
# import modules
import pandas as pd
import psycopg2
import numpy as np

In [12]:
# creeate initail database connection( and instantiate if it doesn't exist already)
conn = psycopg2.connect(database="chicago.db",
                        host="localhost",
                        user="root",
                        password="root",
                        port="5432")

In [13]:
# initiate cursor object to execute any queries on the database and retrieve data. 
curr = conn.cursor()

In [14]:
# create the schema
curr.execute("""
    CREATE SCHEMA IF NOT EXISTS chicago_dmv;
""")
conn.commit()

In [15]:
# creating vehicle table
vehicle = """
    CREATE TABLE IF NOT EXISTS chicago_dmv.vehicle(
        CRASH_UNIT_ID integer, 
        CRASH_ID text, 
        CRASH_DATE timestamp, 
        VEHICLE_ID integer, 
        VEHICLE_MAKE text, 
        VEHICLE_MODEL text, 
        VEHICLE_YEAR integer, 
        VEHICLE_TYPE text
    );
"""
curr.execute(vehicle)

In [16]:
# creating person table
person = """
    CREATE TABLE IF NOT EXISTS chicago_dmv.person(
        PERSON_ID text, 
        CRASH_ID text, 
        CRASH_DATE timestamp, 
        PERSON_TYPE text, 
        VEHICLE_ID integer, 
        PERSON_SEX text, 
        PERSON_AGE integer
    );
"""
curr.execute(person)

In [17]:
# creating person table
time = """
    CREATE TABLE IF NOT EXISTS chicago_dmv.time(
        CRASH_DATE timestamp, 
        CRASH_ID text, 
        CRASH_HOUR integer, 
        CRASH_DAY_OF_WEEK integer, 
        CRASH_MONTH integer, 
        DATE_POLICE_NOTIFIED timestamp
    );
"""
curr.execute(time)

In [18]:
# creating person table
crash = """
    CREATE TABLE IF NOT EXISTS chicago_dmv.crash(
        CRASH_UNIT_ID integer, 
        CRASH_ID text, 
        PERSON_ID text, 
        VEHICLE_ID integer, 
        NUM_UNITS numeric, 
        TOTAL_INJURIES numeric 
    );
"""
curr.execute(crash)

In [19]:
# creating person table
drop = """
    DROP TABLE chicago_dmv.time;
"""
curr.execute(drop)

In [20]:
# creating person table
drop = """
    SELECT * FROM chicago_dmv.crash LIMIT 10;
"""
curr.execute(drop)

In [22]:
print(curr.execute(drop))

None


In [10]:
# Query to get a list of all tables and their schemas
query_all_tables = """
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema NOT LIKE 'pg_%' AND table_schema != 'information_schema';
"""

# Execute the query
curr.execute(query_all_tables)

# Fetch and print the results
tables = curr.fetchall()
print("List of Tables:")
for table in tables:
    print(f"{table[0]}.{table[1]}")

# Close the cursor and the connection
curr.close()
conn.close()

List of Tables:
chicago_dmv.vehicle
chicago_dmv.person
chicago_dmv.crash


### Extraction

In [None]:
# import dependent modules
import pandas as pd

# extract data
def extract_data(filepath: object) -> object:
    """
       Simple Extract Function in Python with Error Handling
       :param filepath: str, file path to CSV data
       :output: pandas dataframe, extracted from CSV data
    """
    try:
        # Read the CSV file and store it in a dataframe
        df = pd.read_csv(filepath)

    # Handle exception if any of the files are missing
    except FileNotFoundError as e:
        print(f"Error: {e}")

    # Handle any other exceptions
    except Exception as e:
        print(f"Error: {e}")

    return df

### Transformation

In [None]:
# import modules
import pandas as pd

# transform data
def transform_data(df: object) -> object:
    """
       Simple Transformation Function in Python with Error Handling
       :param df: pandas dataframe, extracted data
       :output: pandas dataframe, transformed data
    """

    # drop duplicate rows
    df = df.drop_duplicates()

    # replace missing values in numeric columns with the mean
    df.fillna(df.mean(), inplace=True)

    # replace missing values in categorical columns with the mode
    df.fillna(df.mode().iloc[0], inplace=True)

    # convert columns to appropriate data types
    try:
        df['CRASH_DATE'] = pd.to_datetime(df['CRASH_DATE'], format='%m/%d/%Y')
    except:
        pass

    try:
        df['POSTED_SPEED_LIMIT'] = df ['POSTED_SPEED_LIMIT'].astype('int32')
    except:
        pass

    # merge the three dataframes into a single dataframe
    merge_01_df = pd.merge(df, df2, on='CRASH_RECORD_ID')
    all_data_df = pd.merge(merge_01_df, df3, on='CRASH_RECORD_ID')
    
    # drop unnecessary columns
    df = df[['CRASH_UNIT_ID', 'CRASH_ID', 'CRASH_DATE', 'VEHICLE_ID', 'VEHICLE_MAKE', 'VEHICLE_MODEL',
             'VEHICLE_YEAR', 'VEHICLE_TYPE', 'PERSON_ID', 'PERSON_TYPE', 'PERSON_SEX', 'PERSON_AGE',
             'CRASH_HOUR', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH', 'DATE_POLICE_NOTIFIED']]
    return df

### Load

In [None]:
# import relevant modules
import duckdb as db

# establish connection to the DuckDB database
conn = db.connect('chicago.db')

# create a cursor object for running SQL queries
cur = conn.cursor()
print('successful creation of cursor object.')


# suggested continued learning: this function can be modified to be fully dynamic
def load_data(df: object, duckdb_table: object, duckdb_schema: object) -> object:
    """
    Load transformed data into respective DuckDB Table
    :param cur: posgre cursor object
    :return: cursor object
    """
    insert_query = f"INSERT INTO {duckdb_table} {duckdb_schema};"

    # insert transformed data into the DuckDB table
    # TODO: REFACTOR TO MAKE SENSE - VERY SLOW / POOR USE OF CPUs
    for index, row in df.iterrows():

        if duckdb_table == 'chicago_dmv.crash':
            insert_values = (row['CRASH_UNIT_ID'],
                              row['CRASH_ID'],
                              row['PERSON_ID'],
                              row['VEHICLE_ID'],
                              row['NUM_UNITS'],
                              row['TOTAL_INJURIES'])

        elif duckdb_table == 'chicago_dmv.vehicle':
            insert_values = (row['CRASH_UNIT_ID'],
                              row['CRASH_ID'],
                              row['CRASH_DATE'],
                              row['VEHICLE_ID'],
                              row['VEHICLE_MAKE'],
                              row['VEHICLE_MODEL'],
                              row['VEHICLE_YEAR'],
                              row['VEHICLE_TYPE'])

        elif duckdb_table == 'chicago_dmv.person':
            insert_values = (row['PERSON_ID'],
                              row['CRASH_ID'],
                              row['CRASH_DATE'],
                              row['PERSON_TYPE'],
                              row['VEHICLE_ID'],
                              row['PERSON_SEX'],
                              row['PERSON_AGE'])

        else:
            raise ValueError(f'DuckDB Data Table {duckdb_table} does not exist in this pipeline.')

        # Insert data int
        cur.execute(insert_query, insert_values)

    # Commit all changes to the database
    conn.commit()

def close_conn(cur):
    """
    Closing DuckDB connection
    :param cur: duckdb cursor object
    :return: none
    """

    # Close the cursor and database connection
    cur.close()
    conn.close()
    print('successful closing of cursor object.')

In [None]:
import duckdb as db

# Establish connection to the DuckDB database
conn = db.connect('chicago.db')

# Create a cursor object for running SQL queries
cur = conn.cursor()
print('Successful creation of cursor object.')


def load_data(df: object, duckdb_table: object, duckdb_schema: object) -> object:
    """
    Load transformed data into respective DuckDB Table
    :param df: pandas DataFrame
    :param duckdb_table: DuckDB table name
    :param duckdb_schema: DuckDB table schema
    :return: None
    """

    # Construct the insert query based on the table name
    insert_query = f"INSERT INTO {duckdb_table} {duckdb_schema}"

    # Convert pandas DataFrame to a list of tuples (suitable for parameterization)
    data_list = df.values.tolist()

    # Execute the parameterized insert query using the data list
    cur.executemany(insert_query, data_list)

    # Commit all changes to the database
    conn.commit()


def close_conn(cur):
    """
    Closing DuckDB connection
    :param cur: duckdb cursor object
    :return: None
    """

    # Close the cursor and database connection
    cur.close()
    conn.close()
    print('Successful closing of cursor object.')


In [None]:
# import relevant modules
import duckdb as db

# establish connection to the DuckDB database
conn = db.connect('chicago.db')

# create a cursor object for running SQL queries
cur = conn.cursor()
print('successful creation of cursor object.')


# suggested continued learning: this function can be modified to be fully dynamic
def load_data(df: object, duckdb_table: object, duckdb_schema: object) -> object:
    """
    Load transformed data into respective DuckDB Table
    :param cur: posgre cursor object
    :return: cursor object
    """
    # Prepare the INSERT INTO query with placeholders for values
    insert_query = f"INSERT INTO {duckdb_table} {duckdb_schema} VALUES ({', '.join(['?'] * len(df.columns))});"

    # Loop through DataFrame rows
    for _, row in df.iterrows():
        # Create a tuple of values from the DataFrame row
        insert_values = tuple(row)

        # Execute the prepared statement with the current row's values
        cur.execute(insert_query, insert_values)

    # Commit all changes to the database
    conn.commit()


def close_conn(cur):
    """
    Closing DuckDB connection
    :param cur: duckdb cursor object
    :return: none
    """
    # Close the cursor and database connection
    cur.close()
    conn.close()
    print('successful closing of cursor object.')

In [None]:
# create a command-line runnable pipeline
from etl.extract import extract_data
from etl.transform import transform_data
import etl.load as load

import yaml

# import pipeline configuration
with open('config.yaml', 'r') as file:
    config_data = yaml.safe_load(file)


def run_pipeline():
    # Step 1: Extract data
    crashes_df = extract_data(config_data['crash_filepath'])
    vehicle_df = extract_data(config_data['vehicle_filepath'])
    people_df = extract_data(config_data['people_filepath'])

    # Step 2: Transform data
    crashes_transformed_df = transform_data(crashes_df)
    vehicle_transformed_df = transform_data(vehicle_df)
    people_transformed_df = transform_data(people_df)

    # Step 3: Load data
    load.load_data(df=crashes_transformed_df,
                   postgre_table=config_data['crash_table_PSQL'],
                   postgre_schema=config_data['crash_insert_PSQL'])
    load.load_data(df=vehicle_transformed_df,
                   postgre_table=config_data['vehicle_table_PSQL'],
                   postgre_schema=config_data['vehicle_insert_PSQL'])
    load.load_data(df=people_transformed_df,
                   postgre_table=config_data['people_table_PSQL'],
                   postgre_schema=config_data['people_insert_PSQL'])


if __name__ == "__main__":
    run_pipeline()