# Data Cleansing and Transformation in Python

In [216]:
import logging
import pandas as pd
import sys

In [117]:
# define paths to data files

crashes_data_file = "../data/raw/traffic_crashes.csv"
vehicles_data_file = "../data/raw/traffic_crash_vehicle.csv"

In [228]:
# Configure logging to display in notebook output
logger = logging.getLogger()
if logger.hasHandlers():
    logger.handlers.clear()  # Prevent duplicate logs
logger.setLevel(logging.INFO)

# Create handlers
file_handler = logging.FileHandler("pipeline.log")
stream_handler = logging.StreamHandler(sys.stdout)

# Create format
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)

# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

# logging.info(" Logs will now appear both in Jupyter and in pipeline.log!")

In [None]:
# import data as dataframes

def read_data_sources(source_name):
    df = pd.DataFrame()
    try:
        df= pd.read_csv(source_name)
        logging.info(f"Read {source_name} data source")
    except Exception as e:
        logging.exception(f"Error reading {source_name} : {e}")
    finally:
        return df

df_crashes = read_data_sources(crashes_data_file)
df_vehicles = read_data_sources(vehicles_data_file)

# df_crashes.dropna(axis='columns', how='all', inplace=True)
# print(df_crashes)

## Preliminary Tasks: The Importance of Staging Data

In [None]:
df_crashes.head()

In [None]:
df_crashes.info()

In [None]:
df_crashes.isnull().sum()

## clear column and rows that has no data from dataFrame

In [186]:
def drop_rows_with_null_values(df):
    df_ = df.dropna(axis='columns', how='all', inplace=False)
    return df_

In [None]:
df_crashes = df_crashes.dropna(axis='index', thresh=2, inplace=False)

## work with missing data

In [None]:
# get the unique values of the column it is useful for batch data
# in this table all the report types in 'ON SCENE' or 'DESK REPORT'
df_crashes['report_type'].unique()

In [189]:
def fill_missing_values(df, column_name= 'report_type', value= 'ON SCENE'):
    replacement = {column_name: value}
    return df.fillna(value=replacement)

In [190]:
# fill all NaN field with 'ON SCENE'
df_crashes = fill_missing_values(df_crashes)

## Merging Data

In [197]:
def merge_dataframes(df_crashes, df_vehicles, on='crash_record_id', how='left'):
    return df_crashes.merge(df_vehicles, how, on, suffixes=('_left', '_right'))


In [201]:
df_merged = merge_dataframes(df_crashes, df_vehicles)
df_merged.shape

(1510, 120)

In [None]:
df_merged.head()

In [None]:
df_agg = df_merged.groupby('vehicle_type').agg({"crash_record_id": "count"}).reset_index()
df_agg

In [None]:
number_of_passenger_cars_involved = df_agg[df_agg['vehicle_type'] == 'PASSENGER']['crash_record_id'].array[0]
print(number_of_passenger_cars_involved)

## Data Mapping with Target Data

In [204]:
def rename_columns(df, **kwargs):
    return df.rename(columns=kwargs)

In [None]:
# rename columns for data outputype" : "vehicleType"})
df_agg_mapping = rename_columns(df_agg,vehicle_type='vehicleTypes')
df_agg_mapping

## Writing Transformation Function

In [207]:
def get_transformed_data(csv_file_1, csv_file_2):
    crashes_df = read_data_sources(csv_file_1)
    vehicles_df = read_data_sources(csv_file_2)
    print(type(crashes_df))
    print(type(vehicles_df))
    crashes_df = drop_rows_with_null_values(crashes_df)
    vehicles_df = drop_rows_with_null_values(vehicles_df)



    crashes_df = fill_missing_values(crashes_df)
    vehicles_df = fill_missing_values(vehicles_df)

    df= merge_dataframes(crashes_df, vehicles_df)

    df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index()

    df_agg = rename_columns(df_agg, vehicle_type='vehicletypes')

    return df_agg


In [None]:
get_transformed_data(crashes_data_file, vehicles_data_file)