In [14]:
import pandas as pd
from pathlib import Path

In [15]:
try:
    df_crashes = pd.read_csv("../data/traffic_crashes/traffic_crashes.csv")
    df_vehicles = pd.read_csv("../data/traffic_crashes/traffic_crash_vehicle.csv")
    df_people = pd.read_csv("../data/traffic_crashes/traffic_crash_people.csv")
except FileNotFoundError as e:
    print(f"Error: {e}")
except Exception as e:
    print(f"Error: {e}")

In [16]:
traffic_data_dir = Path("../data/traffic_crashes/")
crash_filepath = traffic_data_dir / "traffic_crashes.csv"
vehicle_filepath = traffic_data_dir / "traffic_crash_vehicle.csv"
people_filepath = traffic_data_dir / "traffic_crash_people.csv"

In [49]:
def extract_data(csv):
    try:
        df = pd.read_csv(csv)
    except FileNotFoundError as e:
        print(f"Error: {e}")
    except Exception as e:
        print(f"Error: {e}")

    return df

def transform_data(df):
    df = df.drop_duplicates()
    
    numeric_cols = df.select_dtypes(include='number').columns
    # Fill NaNs only in numeric columns
    df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())
    
    df = df.fillna(df.mode().iloc[0])

    try:
        df["CRASH_DATE"] = pd.to_datetime(df["CRASH_DATE"], format="%m/%d/%Y")
    except:
        pass

    try:
        df["POSTED_SPEED_LIMIT"] = df["POSTED_SPEED_LIMIT"].astype("int32")
    except:
        pass

    df.columns = df.columns.str.upper()
    
    return df

def merge_dfs(df1, df2, df3):
    merge_01_df = pd.merge(
        df1, df2,
        on='CRASH_RECORD_ID')
    all_data_df = pd.merge(
        merge_01_df, df3,
        on='CRASH_RECORD_ID')
    return all_data_df

def drop_columns(df):
    columns_to_keep = [
        'CRASH_RECORD_ID',
        'CRASH_DATE_X',
        'NUM_UNITS',
        'MOST_SEVERE_INJURY',
        'INJURIES_TOTAL',
        'INJURIES_FATAL',
        'VEHICLE_ID_X',
        'MAKE',
        'MODEL',
        'VEHICLE_YEAR',
        'VEHICLE_TYPE',
        'PERSON_ID',
        'PERSON_TYPE',
        'SEX',
        'AGE',
        'CRASH_HOUR',
        'CRASH_DAY_OF_WEEK',
        'CRASH_MONTH',
        'DATE_POLICE_NOTIFIED'
    ]
    # Keep only columns that exist in df
    columns_to_keep = [c for c in columns_to_keep if c in df.columns]
    return df[columns_to_keep]



In [50]:
import psycopg2
from decouple import Config, RepositoryEnv

project_root = Path.cwd().parent
env_path = project_root / ".env"

config = Config(RepositoryEnv(env_path))


conn = psycopg2.connect(
    database="chicago_vehicle_crash_data",
    user=config("POSTGRES_USER"),
    password=config("POSTGRES_PASSWORD"),
    host=config("POSTGRES_HOST"),
    port=config("POSTGRES_PORT")
)

cur = conn.cursor()
print("Successful creation of cursor object.")


def load_data(df, postgres_table, postgres_schema):

    insert_query = f"INSERT INTO {postgres_table} {postgres_schema}"

    for index, row in df.iterrows():

        if postgres_table == 'chicago_dmv.Crash':
            insert_values = (row['CRASH_UNIT_ID'],
                              row['CRASH_ID'],
                              row['PERSON_ID'],
                              row['VEHICLE_ID'],
                              row['NUM_UNITS'],
                              row['TOTAL_INJURIES'])

        elif postgres_table == 'chicago_dmv.Vehicle':
            insert_values = (row['CRASH_UNIT_ID'],
                              row['CRASH_ID'],
                              row['CRASH_DATE'],
                              row['VEHICLE_ID'],
                              row['VEHICLE_MAKE'],
                              row['VEHICLE_MODEL'],
                              row['VEHICLE_YEAR'],
                              row['VEHICLE_TYPE'])

        elif postgres_table == 'chicago_dmv.Person':
            insert_values = (row['PERSON_ID'],
                              row['CRASH_ID'],
                              row['CRASH_DATE'],
                              row['PERSON_TYPE'],
                              row['VEHICLE_ID'],
                              row['PERSON_SEX'],
                              row['PERSON_AGE'])
        
        else:
            raise ValueError(f"Postgres Data Tabl {postgres_table} does not exist in this pipeline.")

        cur.execute(insert_query, insert_values)
    
    conn.commit()

def close_conn(cur):
    cur.close()
    conn.close()
    print("Successful closing of cursor object.")

Successful creation of cursor object.


In [None]:
import yaml

with open("config.yaml", "r") as file:
    config_data = yaml.safe_load(file)
    
def main():
    df_crashes = extract_data(config_data["crash_filepath"])
    df_vehicle = extract_data(config_data["vehicle_filepath"])
    df_people = extract_data(config_data["people_filepath"])

    df_crashes = transform_data(df_crashes)
    df_vehicle = transform_data(df_vehicle)
    df_people = transform_data(df_people)

    df = merge_dfs(df_crashes, df_vehicle, df_people)
    print(df.columns.tolist())
    df = drop_columns(df)

    load_data(df=df, postgres_table=config_data["crash_table_PSQL"], postgres_schema=config_data["crash_insert_PSQL"])
    #load_data(df=df_vehicle, postgres_table=config_data["vehicle_table_PSQL"], postgres_schema=config_data["vehicle_insert_PSQL"])
    #load_data(df=df_people, postgres_table=config_data["people_table_PSQL"], postgres_schema=config_data["people_insert_PSQL"])

    df.head()


if __name__ == "__main__":
    main()

['CRASH_RECORD_ID', 'RD_NO_x', 'CRASH_DATE_EST_I', 'CRASH_DATE_x', 'POSTED_SPEED_LIMIT', 'TRAFFIC_CONTROL_DEVICE', 'DEVICE_CONDITION', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 'FIRST_CRASH_TYPE', 'TRAFFICWAY_TYPE', 'LANE_CNT', 'ALIGNMENT', 'ROADWAY_SURFACE_COND', 'ROAD_DEFECT', 'REPORT_TYPE', 'CRASH_TYPE', 'INTERSECTION_RELATED_I', 'PRIVATE_PROPERTY_I', 'HIT_AND_RUN_I', 'DAMAGE', 'DATE_POLICE_NOTIFIED', 'PRIM_CONTRIBUTORY_CAUSE', 'SEC_CONTRIBUTORY_CAUSE', 'STREET_NO', 'STREET_DIRECTION', 'STREET_NAME', 'BEAT_OF_OCCURRENCE', 'PHOTOS_TAKEN_I', 'STATEMENTS_TAKEN_I', 'DOORING_I', 'WORK_ZONE_I', 'WORK_ZONE_TYPE', 'WORKERS_PRESENT_I', 'NUM_UNITS', 'MOST_SEVERE_INJURY', 'INJURIES_TOTAL', 'INJURIES_FATAL', 'INJURIES_INCAPACITATING', 'INJURIES_NON_INCAPACITATING', 'INJURIES_REPORTED_NOT_EVIDENT', 'INJURIES_NO_INDICATION', 'INJURIES_UNKNOWN', 'CRASH_HOUR', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH', 'LATITUDE', 'LONGITUDE', 'LOCATION', 'CRASH_UNIT_ID', 'RD_NO_y', 'CRASH_DATE_y', 'UNIT_NO', 'UNIT_TYPE'