# Crash Data Big Data Processing

In [None]:
# Package Upload

import pandas as pd
from pathlib import Path

# Data Upload

## Three files:
df_01 = pd.read_csv("./data/CT_crashesexport_5673_0.csv")
df_02 = pd.read_csv("./data/CT_crashes/export_5673_1.csv")
df_03 = pd.read_csv("./data/CT_crashes/export_5673_2.csv")

## Individual File Processing

In [None]:
# Keep Columns Lists:

## dataset 0:

keep_columns_0 = [
        "CrashId",
        "Fatal Case Status",
        "Latitude",
        "Longitude",
        "Date of Crash",
        "Day of the Week Text Format",
        "Hour of the Day",
        "Most Severe Injury",
        "Number Of Motor Vehicles",
        "Number Of Non-Motorist",
        "Route Class", "Route Class Text Format",
        "First Harmful Event",
        "Manner of Crash / Collision Impact",
        "Location of First Harmful Event",
        "Weather Condition", "Weather Condition Text Format",
        "Light Condition", "Light Condition Text Format",
        "Road Surface Condition", "Road Surface Condition Text Format",
        "Crash Specific Location", "Crash Specific Location Text Format",
        "Type of Intersection", "Type of Intersection Text Format"
    ]

## dataset 1:

keep_columns_1 = [
        "CrashId", "VehicleId", "Vehicle Unit Type", "Make", 
        "Vehicle Model Year", "Model", "Vehicle Color", 
        "Most Harmful Event", "Most Harmful Event Text Format",
        "Vehicle Manuever/Action", "Vehicle Manuever/Action Text Format",
        "Roadway Grade", "Extent of Damage", "Extent of Damage Text Format",
        "Body Type", "Body Type Text Format"
    ]

## dataset 2:

keep_columns_2 = [
        "CrashId", "Age", "Gender", "State", "Postal Code",
        "Person Type", "Person Type Text Format", "Injury Status",
        "Air Bag Status", "Air Bag Status Text Format",
        "Speeding Related", "Speeding Related Text Format"
    ]


In [None]:
# Precursor functions for Data Cleaning:

## IDENTIFY NUMERIC, TEXT FIELDS TO EXPORT TEXT VERSION TO DECODING FILE FOR STORAGE PRESERVATION
def extract_decodings(df, decoding_records):
    """
    Extract unique code â†’ text mappings from columns ending in ' Text Format'
    and append them to decoding_records (a list of dicts).
    """
    text_cols = [c for c in df.columns if c.endswith(" Text Format")]

    for text_col in text_cols:
        code_col = text_col.replace(" Text Format", "")

        if code_col not in df.columns:
            continue

        pairs = (
            df[[code_col, text_col]]
            .dropna()
            .drop_duplicates()
        )

        for _, row in pairs.iterrows():
            decoding_records.append({
                "variable": code_col,
                "code": row[code_col],
                "text": row[text_col]
            })

    # Drop text columns after extraction
    df = df.drop(columns=text_cols)

    return df

## TAKE AN INITIAL DATAFRAME (0,1,2) AND USE ITS KEEP_COLUMNS TO REMOVE UNNECESSARY INFO
def reduce_single_df(df, keep_columns, decoding_records):
    """
    Reduce a single dataframe:
    - Keep only desired columns
    - Extract decoding info
    - Remove 'Text Format' columns
    """

    # Keep only requested columns (safe intersection)
    df = df.loc[:, df.columns.intersection(keep_columns)]

    # Extract and remove text-format columns
    df = extract_decodings(df, decoding_records)

    return df

In [None]:
# DATA CLEANING EXECUTION FUNCTION
def clean_and_merge_crash_data(
    dfs,
    keep_columns_list,
    output_dir="../data/clean_crash_data"
):
    """
    Parameters
    ----------
    dfs : list[pd.DataFrame]
        List of crash dataframes
    keep_columns_list : list[list[str]]
        Matching list of keep-columns for each df
    """

    Path(output_dir).mkdir(parents=True, exist_ok=True)

    decoding_records = []
    cleaned_dfs = []

    for df, keep_cols in zip(dfs, keep_columns_list):
        reduced = reduce_single_df(df, keep_cols, decoding_records)
        cleaned_dfs.append(reduced)

    # ---- Merge datasets ----
    merged = pd.concat(cleaned_dfs, axis=0, ignore_index=True)

    # ---- Remove duplicate columns by value ----
    merged = merged.T.drop_duplicates().T

    # ---- Save clean crash data ----
    clean_path = Path(output_dir) / "clean_crash_data.csv"
    merged.to_csv(clean_path, index=False)

    # ---- Build and save decoding table ----
    decoding_df = (
        pd.DataFrame(decoding_records)
        .drop_duplicates()
        .sort_values(["variable", "code"])
    )

    decoding_path = Path(output_dir) / "decoding_table.csv"
    decoding_df.to_csv(decoding_path, index=False)

    return merged, decoding_df


## Gluing Files Together

In [None]:
merged_df, decoding_df = clean_and_merge_crash_data(
    dfs=[df1, df2, df3],
    keep_columns_list=[keep_columns_1, keep_columns_2, keep_columns_3]
)

## Building Decoding Files

In [None]:
# First Harmful Event
# Most severe injury
# Manner of Crash / Collision Impact
# Location of First Harmful Evenet