In [None]:
import findspark
findspark.init()
import pyspark
findspark.find()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DecimalType
from pyspark.sql.window import Window

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

import folium
from folium.plugins import HeatMapWithTime,HeatMap
from folium import plugins

from ipywidgets import interact, widgets
from IPython.display import display

In [None]:
from matplotlib import pyplot as plt
import re
import pandas as pd
import numpy as np
import seaborn as sns
from itertools import chain

from sklearn.metrics import accuracy_score, recall_score, precision_score

import chart_studio.plotly as py
import plotly.graph_objs as go
import chart_studio.dashboard_objs as dashboard
from dash import Dash, html, dash_table, dcc, Input, Output
import plotly.express as px

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("Motor Crash Analysis").getOrCreate()

In [None]:
# Read the CSV file into a DataFrame
df = spark.read.csv("Motor_Vehicle_Collisions_-_Crashes_20231026.csv", header=True, inferSchema=True)

In [None]:
# Dropping rows where there's no street name
df = df.na.drop(how='all', subset=['ON STREET NAME', 'CROSS STREET NAME', 'OFF STREET NAME'])

# Important features that cannot be Null for our analysis
df = df.na.drop(subset=['CRASH DATE', 'CRASH TIME', 'BOROUGH', 'ZIP CODE', 'LOCATION'])

# Dropping rows where there are no contributing factors mentioned
df = df.na.drop(how='all', subset=['CONTRIBUTING FACTOR VEHICLE 1', 'CONTRIBUTING FACTOR VEHICLE 2', 
                                   'CONTRIBUTING FACTOR VEHICLE 3', 'CONTRIBUTING FACTOR VEHICLE 4',
                                   'CONTRIBUTING FACTOR VEHICLE 5'])

# Dropping rows where there are vehicle type mentioned
df = df.na.drop(how='all', subset=['VEHICLE TYPE CODE 1', 'VEHICLE TYPE CODE 2', 
                                   'VEHICLE TYPE CODE 3', 'VEHICLE TYPE CODE 4',
                                   'VEHICLE TYPE CODE 5'])

In [None]:
df_orig = df.alias('df_orig')

# Exploratory Data Analysis

## Month wise plot of number of accidents

In [None]:
# Ensure the 'CRASH DATE' column is in datetime format
df = df.withColumn('CRASH DATE', F.to_timestamp(F.regexp_replace('CRASH DATE', '/', '-'), "MM-dd-yyyy"))

# Extract the month from the date
df = df.withColumn('Month', F.month('CRASH DATE'))

# Count the number of accidents per month
accidents_per_month = df.groupBy('Month').count().orderBy('Month')

# Collect the data to a Pandas DataFrame for plotting
accidents_per_month_pd = accidents_per_month.toPandas().set_index('Month')

# Plotting
fig1 = px.bar(accidents_per_month_pd, x=accidents_per_month_pd.index, y=accidents_per_month_pd.columns[0])

# Update the layout
fig1.update_layout(
    xaxis_title='Month',
    yaxis_title='Number of Accidents',
    xaxis=dict(tickmode='array', tickvals=list(range(12)), ticktext=['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
                                                                     'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']),
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig1.update_traces(marker_color = '#e377c2')

# Show the plot
fig1.show()

## How frequent the Accident happened by Day of Week and Hour of Day?

In [None]:
# Extract day of the week (1=Sunday, 7=Saturday) and hour of the day
df = df.withColumn('day_of_week', F.dayofweek(F.col('CRASH DATE')))
df = df.withColumn('hour_of_day', F.hour(F.col('CRASH TIME')))

# Group and count accidents by day of the week and hour of day
accidents_count = df.groupBy('day_of_week', 'hour_of_day').count()

# Pivot the data for plotting
pivot_df = accidents_count.groupBy('day_of_week').pivot('hour_of_day').sum('count')
pivot_df = pivot_df.where(pivot_df['day_of_week'] > 0)
pivot_df = pivot_df.drop('null')

# Fill NaN values with 0
pivot_df = pivot_df.na.fill(0)

# Convert to Pandas DataFrame for plotting
pivot_table_pd = pivot_df.toPandas()
pivot_table_pd.set_index('day_of_week', inplace=True)
pivot_table_pd.sort_index(inplace=True)

# Create a heatmap with Plotly
fig2 = go.Figure(data=go.Heatmap(
    z=pivot_table_pd.values,
    x=pivot_table_pd.columns,
    y=pivot_table_pd.index,
    colorscale='matter'
))

# Adding text annotations to each cell
for yd, day in enumerate(pivot_table_pd.index):
    for xd, hour in enumerate(pivot_table_pd.columns):
        fig2.add_annotation(
            x=hour,
            y=day,
            text=str(pivot_table_pd.loc[day, hour]),
            showarrow=False,
            font=dict(color="black")
        )

# Update the layout
fig2.update_layout(
    xaxis_title='Hour of Day',
    yaxis_title='Day of Week',
    yaxis=dict(
        tickmode='array',
        tickvals=[1, 2, 3, 4, 5, 6, 7],
        ticktext=['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
    ),
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

# Show the plot
fig2.show()

## Number of Injuries and Killed

In [None]:
# Part 1: Aggregating Data by Date
daily_data = df.groupBy('CRASH DATE').agg(
    F.sum('NUMBER OF PERSONS KILLED').alias('NUMBER OF PERSONS KILLED'),
    F.sum('NUMBER OF PEDESTRIANS KILLED').alias('NUMBER OF PEDESTRIANS KILLED'),
    F.sum('NUMBER OF CYCLIST KILLED').alias('NUMBER OF CYCLIST KILLED'),
    F.sum('NUMBER OF MOTORIST KILLED').alias('NUMBER OF MOTORIST KILLED')
).sort('CRASH DATE')

# Use cumsum() if you want to show cumulative data
window_spec = Window.orderBy('CRASH DATE')
for col in daily_data.columns:
    if col != 'CRASH DATE':
        daily_data = daily_data.withColumn(col, F.sum(daily_data[col]).over(window_spec))

# Convert to Pandas DataFrame for plotting (assuming the dataset is small enough)
pdf = daily_data.toPandas()

# Plotting a stacked area chart
fig3 = go.Figure()

# Add traces for each category
fig3.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF PERSONS KILLED'],
    mode='lines', name='Persons Killed', stackgroup='one'
))
fig3.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF PEDESTRIANS KILLED'],
    mode='lines', name='Pedestrians Killed', stackgroup='one'
))
fig3.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF CYCLIST KILLED'],
    mode='lines', name='Cyclists Killed', stackgroup='one'
))
fig3.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF MOTORIST KILLED'],
    mode='lines', name='Motorists Killed', stackgroup='one'
))

# Update layout
fig3.update_layout(
    xaxis_title='Date',
    yaxis_title='Cumulative Number',
    legend_title='Category',
    hovermode='x',
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig3.show()

In [None]:
# Update the 'number_of_injured' column based on conditions
killed_df = df.withColumn('NUMBER OF PERSONS KILLED', 
                               F.when(F.col('NUMBER OF PERSONS KILLED') > 5, 'More than 5')
                               .when(F.col('NUMBER OF PERSONS KILLED') > 2, '3-5')
                               .otherwise(F.col('NUMBER OF PERSONS KILLED'))
                              )

# Aggregate data for the pie chart
killed_df = killed_df.groupBy('NUMBER OF PERSONS KILLED').count()

# Note: Plotly cannot directly visualize Spark DataFrames. 
# You would need to convert it back to a Pandas DataFrame for visualization.
pandas_df = killed_df.toPandas()
pandas_df = pandas_df.dropna(subset=['NUMBER OF PERSONS KILLED'])
pandas_df = pandas_df[pandas_df['NUMBER OF PERSONS KILLED'] != '0']
sorted_df = pandas_df.sort_values(by='NUMBER OF PERSONS KILLED', ascending=True)

# Now you can use Plotly to visualize the data
fig4 = px.pie(sorted_df, values='count', names='NUMBER OF PERSONS KILLED',
             color_discrete_sequence=px.colors.diverging.PRGn)

fig4.update_layout(
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig4.show()

In [None]:
# Part 1: Aggregating Data by Date
daily_data = df.groupBy('CRASH DATE').agg(
    F.sum('NUMBER OF PERSONS INJURED').alias('NUMBER OF PERSONS INJURED'),
    F.sum('NUMBER OF PEDESTRIANS INJURED').alias('NUMBER OF PEDESTRIANS INJURED'),
    F.sum('NUMBER OF CYCLIST INJURED').alias('NUMBER OF CYCLIST INJURED'),
    F.sum('NUMBER OF MOTORIST INJURED').alias('NUMBER OF MOTORIST INJURED')
).sort('CRASH DATE')

# Use cumsum() if you want to show cumulative data
window_spec = Window.orderBy('CRASH DATE')
for col in daily_data.columns:
    if col != 'CRASH DATE':
        daily_data = daily_data.withColumn(col, F.sum(daily_data[col]).over(window_spec))

# Convert to Pandas DataFrame for plotting (assuming the dataset is small enough)
pdf = daily_data.toPandas()

# Plotting a stacked area chart with Plotly
fig5 = go.Figure()

# Add traces for each category
fig5.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF PERSONS INJURED'],
    mode='lines', name='Persons Injured', stackgroup='one'
))
fig5.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF PEDESTRIANS INJURED'],
    mode='lines', name='Pedestrians Injured', stackgroup='one'
))
fig5.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF CYCLIST INJURED'],
    mode='lines', name='Cyclists Injured', stackgroup='one'
))
fig5.add_trace(go.Scatter(
    x=pdf['CRASH DATE'], y=pdf['NUMBER OF MOTORIST INJURED'],
    mode='lines', name='Motorists Injured', stackgroup='one'
))

# Update layout
fig5.update_layout(
    xaxis_title='Date',
    yaxis_title='Cumulative Number',
    legend_title='Category',
    hovermode='x unified',
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig5.show()

In [None]:
# Update the 'number_of_injured' column based on conditions
injured_df = df.withColumn('NUMBER OF PERSONS INJURED', 
                               F.when(F.col('NUMBER OF PERSONS INJURED') > 5, 'More than 5')
                               .when(F.col('NUMBER OF PERSONS INJURED') > 2, '3-5')
                               .otherwise(F.col('NUMBER OF PERSONS INJURED'))
                              )

# Aggregate data for the pie chart
injured_df = injured_df.groupBy('NUMBER OF PERSONS INJURED').count()

# Note: Plotly cannot directly visualize Spark DataFrames. 
# You would need to convert it back to a Pandas DataFrame for visualization.
pandas_df = injured_df.toPandas()
pandas_df = pandas_df.dropna(subset=['NUMBER OF PERSONS INJURED'])
sorted_df = pandas_df.sort_values(by='NUMBER OF PERSONS INJURED', ascending=True)

# Now you can use Plotly to visualize the data
fig6 = px.pie(sorted_df, values='count', names='NUMBER OF PERSONS INJURED',
             color_discrete_sequence=px.colors.diverging.PRGn)

fig6.update_layout(
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig6.show()

## Top 10 streets where accidents reported

In [None]:
# Assuming df is your PySpark DataFrame and it's already loaded
# Count the number of accidents per street
street_counts = df.groupBy("ON STREET NAME").count().orderBy(F.col("count").desc())

top_streets_excluding_first = spark.createDataFrame(street_counts.tail(street_counts.count()-1), street_counts.schema)
top_streets_for_plotting = spark.createDataFrame(top_streets_excluding_first.head(10), top_streets_excluding_first.schema)

# Show the results
top_streets_for_plotting.show()

# For plotting, collect the data to the driver (local machine)
top_streets_data = top_streets_for_plotting.toPandas()

# For plotting, collect the data to the driver (local machine)
fig7 = px.bar(top_streets_data, x='ON STREET NAME', y='count')

# Update layout for the axes and plot
fig7.update_layout(
    xaxis_title='Street Name',
    yaxis_title='Number of Accidents',
    xaxis={'categoryorder':'total descending'},  # This ensures that bars are sorted by count
    plot_bgcolor = "#000000", 
    paper_bgcolor = "#000000",
    font=dict(family="Open Sans", color='#9467bd')
)

fig7.update_traces(marker_color = '#e377c2')
# Show the figure
fig7.show()

## Top 10 contribution factors for accidents

In [None]:
# DataFrame is loaded
factors_columns = ['CONTRIBUTING FACTOR VEHICLE 1']

# Filter out null values
non_null_factors = df.filter(df[factors_columns[0]].isNotNull())

# Count the occurrences of each factor and get the top 11
top_11_factors = non_null_factors.groupBy(factors_columns[0]) \
                                 .count() \
                                 .orderBy(F.col("count").desc()) \
                                 .limit(11)

top_10_factors = spark.createDataFrame(top_11_factors.tail(top_11_factors.count()-1), top_11_factors.schema)

# Show the top 10 contributing factors
top_10_factors.show()

# Collect data for plotting
top_10_data = top_10_factors.toPandas()

# Plotting
# Create a bar chart with Plotly Express
fig8 = px.bar(top_10_data, x=factors_columns[0], y='count')

# Update the layout
fig8.update_layout(
    title='Top 10 contribution factors for accidents',
    xaxis_title='Contributing Factor',
    yaxis_title='Number of Accidents',
    plot_bgcolor = "#000000", 
    paper_bgcolor = "#000000",
    font=dict(family="Open Sans", color='#9467bd')
)

fig8.update_traces(marker_color = '#e377c2')
# Show the plot
fig8.show()

## Which vehicle type was involved in most crashes

In [None]:
vehicle_code_combined = df.select(F.col('VEHICLE TYPE CODE 1').alias('vehicle_codes')).\
                              union(df.select(F.col('VEHICLE TYPE CODE 2').alias('vehicle_codes'))).\
                              groupby('vehicle_codes').count().sort(F.col('count'), ascending=False)

vehicle_code_data = vehicle_code_combined.collect()
vehicle_code_data = [[ind['vehicle_codes'], ind['count']] for ind in vehicle_code_data]

# Removing empty values
empty_value_list = [None, 'None', 'UNKNOWN', 'OTHER']
vehicle_code_data_filtered = [sublist for sublist in vehicle_code_data if sublist[0] not in empty_value_list]

# Merging values that are the same but written differently
# To later delete duplciates


# Function to merge values
def vehicle_code_merge(vehicle, data):
    vehicle_ele = [vehicle, 0]
    # To store indices of duplicates
    del_inds = []
    for ind, sub_l in enumerate(data):
        if re.search(vehicle, sub_l[0], re.IGNORECASE):
            vehicle_ele[1] += sub_l[1]
            del_inds.append(ind)
    
    data.append(vehicle_ele)
    data = [i for j, i in enumerate(data) if j not in set(del_inds)]
    
    return data

vehicle_code_data_filtered = vehicle_code_merge('Station Wagon', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Sedan', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Taxi', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Bus', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Truck', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Van', vehicle_code_data_filtered)

vehicle_code_data_filtered = sorted(vehicle_code_data_filtered, key=lambda x: x[1], reverse=True)

# Prepare your data
vehicle_types = [str(i[0]) for i in vehicle_code_data_filtered[:5]]
counts = [i[1] for i in vehicle_code_data_filtered[:5]]

# Create a bar chart with Plotly Express
fig9 = px.bar(x=vehicle_types, y=counts,
             labels={'x': 'Vehicle Type', 'y': 'Count of Crashes'})

# Update the layout to rotate the x-axis labels
fig9.update_layout(
    xaxis_tickangle=-45,
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig9.update_traces(marker_color = '#e377c2')

# Show the plot
fig9.show()

## Number of collisions for boroughs within each days of week

In [None]:
# Extracting Day and Borough Data
weekday_bor_df = df.select(F.col('BOROUGH').alias('borough'), F.col('day_of_week').alias('day'))

# Group by 'borough' and 'day', then count the occurrences
weekday_bor_df = weekday_bor_df.groupBy("borough", "day").count()

# Renaming the 'count' column to 'Crash_Count'
weekday_bor_df_heatmap = weekday_bor_df.withColumnRenamed("count", "Crash_Count")

# Converting the PySpark DataFrame to a Pandas DataFrame
weekday_bor_df_heatmap = weekday_bor_df_heatmap.toPandas()

# Pivot the table to prepare for Heatmap data
heatmap_data_pivot = weekday_bor_df_heatmap.pivot(index='day', columns='borough', values='Crash_Count').fillna(0)

# Create the heatmap using Plotly
fig10 = go.Figure(data=go.Heatmap(
    z=heatmap_data_pivot.values,
    x=heatmap_data_pivot.columns,
    y=heatmap_data_pivot.index,
    colorscale='matter'
))

# Add annotations - one for each cell in the heatmap
annotations = []
for n, row in enumerate(heatmap_data_pivot.values):
    for m, val in enumerate(row):
        annotations.append(
            go.layout.Annotation(
                text=str(val),
                x=heatmap_data_pivot.columns[m],
                y=heatmap_data_pivot.index[n],
                xref='x1', yref='y1',
                showarrow=False,
                font=dict(color="black")
            )
        )

# Update the layout to include the annotations
fig10.update_layout(
    xaxis_title='Boroughs',
    yaxis_title='Week Day of Crash',
    annotations=annotations,
    yaxis=dict(
        tickmode='array',
        tickvals=list(heatmap_data_pivot.index),
        ticktext=['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
    ),
    yaxis_tickangle=0,
    plot_bgcolor = "#000000", 
    paper_bgcolor = "#000000",
    font=dict(family="Open Sans", color='#9467bd')
)

# Show the plot
fig10.show()

## Number of collisions for boroughs within each hour of the day

In [None]:
# Extracting Crash Hour and Borough Data
bor_hour_df = df.select('BOROUGH', 'hour_of_day')

# Grouping by Borough and Hour, then counting
bor_hour_df_heatmap = bor_hour_df.groupBy("BOROUGH", "hour_of_day").count()

# Renaming the 'count' column to 'Crash_Count'
bor_hour_df_heatmap = bor_hour_df_heatmap.withColumnRenamed("count", "Crash_Count")

# Pivoting the table for Heatmap data
heatmap_data_pivot = bor_hour_df_heatmap.groupBy("hour_of_day").pivot("BOROUGH").sum("Crash_Count").sort(F.col('hour_of_day'))

# Replacing null values with 0
heatmap_data_pivot = heatmap_data_pivot.na.fill(0)

heatmap_data_pd = heatmap_data_pivot.toPandas()
heatmap_data_pd = heatmap_data_pd.drop('hour_of_day', axis=1)

# Create a heatmap using seaborn
fig11 = px.imshow(heatmap_data_pd, 
                labels=dict(x="Borough", y="Hour of the Day", color="Collisions"),
                x=heatmap_data_pd.columns,
                y=heatmap_data_pd.index,
                color_continuous_scale='curl',
                text_auto=True)

fig11.update_xaxes(side="bottom")
fig11.update_layout(    
    plot_bgcolor = "#000000", 
    paper_bgcolor = "#000000",
    font=dict(family="Open Sans", color='#9467bd')
)

fig11.show()

## Deaths occurred within each borough

In [None]:
deaths_borough_data = df.select('NUMBER OF PERSONS KILLED', 'NUMBER OF PEDESTRIANS KILLED', 
          'NUMBER OF CYCLIST KILLED', 'NUMBER OF MOTORIST KILLED', 
          'BOROUGH').groupby('BOROUGH').\
          agg(F.sum('NUMBER OF PEDESTRIANS KILLED').alias('NUMBER OF PEDESTRIANS KILLED'),\
              F.sum('NUMBER OF CYCLIST KILLED').alias('NUMBER OF CYCLIST KILLED'),\
              F.sum('NUMBER OF MOTORIST KILLED').alias('NUMBER OF MOTORIST KILLED'),\
             ).collect()

boroughs = [ind['BOROUGH'] for ind in deaths_borough_data]
X = [ind['NUMBER OF PEDESTRIANS KILLED'] for ind in deaths_borough_data]
Y = [ind['NUMBER OF CYCLIST KILLED'] for ind in deaths_borough_data]
Z = [ind['NUMBER OF MOTORIST KILLED'] for ind in deaths_borough_data]

death_bor_df = pd.DataFrame(np.c_[X,Z, Y], index=boroughs)

# Plotting
fig12 = go.Figure()
fig12.add_trace(go.Bar(x=death_bor_df.index, y=death_bor_df[0], name='Pedestrians'))
fig12.add_trace(go.Bar(x=death_bor_df.index, y=death_bor_df[1], name='Motorists'))
fig12.add_trace(go.Bar(x=death_bor_df.index, y=death_bor_df[2], name='Cyclists'))

# Update the layout
fig12.update_layout(
    barmode='group',
    xaxis_title='Boroughs',
    yaxis_title='Death Counts',
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd'),
    legend=dict(title='Categories', itemclick='toggle')
)
# Show the plot
fig12.show()

## Which vehicle type killed the most

In [None]:
vehicle_code_combined = df.select('VEHICLE TYPE CODE 1' , 'NUMBER OF PERSONS KILLED')\
                          .groupby('VEHICLE TYPE CODE 1').agg(F.sum('NUMBER OF PERSONS KILLED')\
                                                              .alias('NUMBER OF PERSONS KILLED'))\
                                                              .sort(F.col('NUMBER OF PERSONS KILLED'), ascending=False)

vehicle_code_data = vehicle_code_combined.collect()
vehicle_code_data = [[ind['VEHICLE TYPE CODE 1'], ind['NUMBER OF PERSONS KILLED']] for ind in vehicle_code_data]

# Removing empty values
empty_value_list = [None, 'None', 'UNKNOWN', 'OTHER']
vehicle_code_data_filtered = [sublist for sublist in vehicle_code_data if sublist[0] not in empty_value_list]

# Merging values that are the same but written differently
# To later delete duplciates


# Function to merge values
def vehicle_code_merge(vehicle, data):
    vehicle_ele = [vehicle, 0]
    # To store indices of duplicates
    del_inds = []
    for ind, sub_l in enumerate(data):
        if re.search(vehicle, sub_l[0], re.IGNORECASE):
            vehicle_ele[1] += sub_l[1]
            del_inds.append(ind)
    
    data.append(vehicle_ele)
    data = [i for j, i in enumerate(data) if j not in set(del_inds)]
    
    return data

vehicle_code_data_filtered = vehicle_code_merge('Station Wagon', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Sedan', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Taxi', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Bus', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Truck', vehicle_code_data_filtered)
vehicle_code_data_filtered = vehicle_code_merge('Van', vehicle_code_data_filtered)

vehicle_code_data_filtered = sorted(vehicle_code_data_filtered, key=lambda x: x[1], reverse=True)

# Prepare the data for plotting
top_vehicle_types = [str(i[0]) for i in vehicle_code_data_filtered[:5]]
counts = [i[1] for i in vehicle_code_data_filtered[:5]]

# Create the bar chart
fig13 = px.bar(
    x=top_vehicle_types,
    y=counts,
    labels={'x': 'Vehicle Type', 'y': 'Count of Crashes'}
)

# Update the layout for a better look
fig13.update_layout(
    xaxis_title='Vehicle Type',
    yaxis_title='Count of Crashes',
    xaxis_tickangle=-45,  # Rotate the x-axis labels
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)

fig13.update_traces(marker_color = '#e377c2')

# Show the figure
fig13.show()

## Top contribution factors for accidents borough wise

In [None]:
contr_fact_bor_df = df.filter(F.col('CONTRIBUTING FACTOR VEHICLE 1') != "Unspecified")\
                      .select('BOROUGH', 'CONTRIBUTING FACTOR VEHICLE 1')\
                      .groupby('BOROUGH', 'CONTRIBUTING FACTOR VEHICLE 1')\
                      .count()\
                      .collect()

contr_fact_bor_df = pd.DataFrame({'borough': [i['BOROUGH'] for i in contr_fact_bor_df],
                                  'factor': [i['CONTRIBUTING FACTOR VEHICLE 1'] for i in contr_fact_bor_df],
                                  'count': [i['count'] for i in contr_fact_bor_df]}).sort_values('count', ascending=False)[:100]

# Pivot the table to prepare for Heatmap data
heatmap_data_pivot = contr_fact_bor_df.pivot(index='factor', columns='borough', values='count').fillna(0)

# Create a heatmap using Plotly
fig14 = go.Figure(data=go.Heatmap(
    z=heatmap_data_pivot.values,
    x=heatmap_data_pivot.columns,
    y=heatmap_data_pivot.index,
    colorscale='matter'
))

# Add text annotations with the values
for y in range(heatmap_data_pivot.shape[0]):
    for x in range(heatmap_data_pivot.shape[1]):
        fig14.add_annotation(
            x=heatmap_data_pivot.columns[x],
            y=heatmap_data_pivot.index[y],
            text=str(heatmap_data_pivot.iloc[y, x]),
            showarrow=False,
            font=dict(color='black')
        )

# Update the layout to set the axis titles and the chart title
fig14.update_layout(
    xaxis_title='Boroughs',
    yaxis_title='Contributing Factors',
    yaxis=dict(tickmode='array', tickvals=list(heatmap_data_pivot.index)),
    xaxis=dict(side='bottom'),
    plot_bgcolor = "#31302F", 
    paper_bgcolor = "#31302F",
    font=dict(family="Open Sans", color='#9467bd')
)
    
# Show the figure
fig14.show()

# Geospatial Analysis

## Heatmap of Crash Locations over time

In [None]:
# Extract year from Date
df = df.withColumn('year', F.year('CRASH DATE'))

# Extract month from 'CRASH_DATE'
df = df.withColumn('month_year', F.concat_ws('/', df['Month'], df['year']))

# Group by month and collect latitudes and longitudes
df_grouped = df.groupBy('month_year').agg(
    F.collect_list('LATITUDE').alias('latitudes'),
    F.collect_list('LONGITUDE').alias('longitudes'),
    F.collect_list('CONTRIBUTING FACTOR VEHICLE 1').alias('CONTRIBUTING FACTOR VEHICLE 1'),
    F.collect_list('VEHICLE TYPE CODE 1').alias('VEHICLE TYPE CODE 1')
)

# To sort the dates
def sortDateSeries(datesSeries):
    def sortDate(dates):
        split_up = dates.split('/')
        return int(split_up[1]), int(split_up[0])
    return datesSeries.apply(lambda x: sortDate(x))

# Convert to Pandas DataFrame for visualization
df_pandas = df_grouped.toPandas().sort_values('month_year', key=sortDateSeries)


# # Prepare data for the heatmap
hour_list = [[list(coordinate) for coordinate in zip(row['latitudes'], row['longitudes'])] for index, row in df_pandas.iterrows()]


# Ensure index covers all months
index = df_pandas['month_year'].unique().tolist()

df_map = folium.Map(location=[40.7128, -74.0060], zoom_start=12)
HeatMapWithTime(hour_list, index=index, auto_play=True, radius=8).add_to(df_map)
df_map

# Modeling

## Building an imputation model to predict Borough

Idea is to plug in missing Borough name in the data with some level of accuracy and not through pure descriptive statitistics

In [None]:
# create separate df for model preparation
model_df = df_orig.alias('model_df')

In [None]:
# Separate day, month, and year into three different features
# Day of month
model_df = model_df.withColumn('day', F.to_timestamp(F.regexp_replace('CRASH DATE', '/', '-'), "MM-dd-yyyy"))\
        .withColumn('day', F.day('day'))

# Month
model_df = model_df.withColumn('month', F.to_timestamp(F.regexp_replace('CRASH DATE', '/', '-'), "MM-dd-yyyy"))\
        .withColumn('month', F.month('month'))

# Year
model_df = model_df.withColumn('year', F.to_timestamp(F.regexp_replace('CRASH DATE', '/', '-'), "MM-dd-yyyy"))\
        .withColumn('year', F.year('year'))

In [None]:
# Crash Time can be converted into a categorical variable with
# 6AM - 12 PM : Morning
# 12 PM - 5 PM : Afternoon
# 5 PM - 10 PM : Evening
# 10 PM - 6 AM : Night

# Convert 'time' to timestamp
model_df = model_df.withColumn('CRASH TIME', F.col('CRASH TIME').cast('timestamp'))

# Define conditions for time categorization
conditions = [
    (F.hour('CRASH TIME') >= 6) & (F.hour('CRASH TIME') < 12),  # Morning
    (F.hour('CRASH TIME') >= 12) & (F.hour('CRASH TIME') < 17),  # Afternoon
    (F.hour('CRASH TIME') >= 17) & (F.hour('CRASH TIME') < 22)   # Evening
]

# Corresponding categories for each condition
categories = ['Morning', 'Afternoon', 'Evening']

# Use the 'when' function to apply conditions and create a new column 'time_category'
model_df = model_df.withColumn('time_category', 
    F.when(conditions[0], categories[0])
    .otherwise(F.when(conditions[1], categories[1])
    .otherwise(F.when(conditions[2], categories[2])
    .otherwise('Night'))))

In [None]:
# Vehicle Type 1

# Vehicle code variations
common_vehicle_list_pattern = ['Station Wagon', 'Sedan', 'Taxi', 'Bus', 'Van',
                       'Ambu', 'Motor', 'Bike', 'scoot', 'Fire']

# Vehicle code replacements
common_vehicle_list_category = ['Station Wagon', 'Sedan', 'Taxi', 'Bus', 'Van',
                       'Ambulance', 'Motorcycle', 'Bike', 'Scooter', 'Fire Truck']

# Variations of Truck
truck_wrong = ['Dump', 'Garbage or Refuse', 'Concrete Mixer', 'LARGE COM VEH(6 OR MORE TIRES)', 'Flat Bed',
               'Tanker', 'TOW T', 'BOX T', 'TRACT', 'DUMP', 'FDNY', 'Flat Rack']

# Variations of Delivery
delivery_wrong = ['LIVERY VEHICLE', 'Carry All', 'USPS', 'DELIV', 'COM', 'DELV']

# Replace variations with proper names
for ind in range(len(common_vehicle_list_pattern)):
    model_df = model_df.withColumn(
                'VEHICLE TYPE CODE 1',
                F.when(F.col('VEHICLE TYPE CODE 1').ilike(f"%{common_vehicle_list_pattern[ind]}%"), common_vehicle_list_category[ind])
                .otherwise(F.col('VEHICLE TYPE CODE 1')))

# Replace variations of truck and delivery
model_df = model_df.withColumn(
    'VEHICLE TYPE CODE 1',
    F.when(
        F.col("VEHICLE TYPE CODE 1").isin(truck_wrong),
        "Truck"
    ).when(
        F.col("VEHICLE TYPE CODE 1").isin(delivery_wrong),
        "Delivery"
    ).otherwise(F.col("VEHICLE TYPE CODE 1"))
)


# If count of a specific vehicle code is less than 50, then replace it with "Other"
other_list = [i['VEHICLE TYPE CODE 1'] for i in model_df.groupby('VEHICLE TYPE CODE 1') \
                                           .count() \
                                           .filter(F.col('count') < 50).collect()]

model_df = model_df.withColumn(
    'VEHICLE TYPE CODE 1',
    F.when(
        F.col('VEHICLE TYPE CODE 1').isin(other_list),
        "Other"
    ).otherwise(F.col('VEHICLE TYPE CODE 1')))

In [None]:
# Feature list to identify which features the model should train on
features_to_keep = ['day','month','year','time_category',
                    'VEHICLE TYPE CODE 1', 'CONTRIBUTING FACTOR VEHICLE 1', 'NUMBER OF PERSONS KILLED',
                    'NUMBER OF PERSONS INJURED', 'LONGITUDE', 'LATITUDE', 'BOROUGH']

# Extracting selected features out of the dataframe
train_data = model_df.select(features_to_keep)

# Dropping any NaNs
train_data = train_data.na.drop(how="any")

In [None]:
# PySpark doesn't take categorical variables as Target variables. So, encoding the Boroughs with a monotonic series
borough_mapping = {'QUEENS':1, 'BROOKLYN':2, 'BRONX':3, 'MANHATTAN':4, 'STATEN ISLAND':5}

# Creating a map for the borough to transform values
mapping_expr = F.create_map([F.lit(x) for x in chain(*borough_mapping.items())])

# Implementing the mapping on the dataset
train_data = train_data.withColumn('target', mapping_expr[train_data['BOROUGH']])

# This is lowest count of datapoints in the dataset. So, to keep a balanced dataset, taking 56000 from each class
lowest_class_count = 56000

# Identifying the fraction of data required from each class
fractions = train_data.groupBy('target').count().withColumn("required_n", lowest_class_count/F.col("count"))\
                .drop("count").rdd.collectAsMap()

# Extracting the fraction of data from each class
train_data = train_data.stat.sampleBy('target', fractions, 42)

In [None]:
# Converting Injured Persons row to Integer
train_data = train_data.withColumn('NUMBER OF PERSONS INJURED', F.col('NUMBER OF PERSONS INJURED').cast(IntegerType()))

# StringIndexer for encoding string features
train_string_features = ['time_category', 'VEHICLE TYPE CODE 1', 'CONTRIBUTING FACTOR VEHICLE 1']
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="skip") for col in train_string_features]

# Assemble all features into a single vector
train_numeric_features = ['day','month','year', 'NUMBER OF PERSONS INJURED', 'LONGITUDE', 'LATITUDE']
assembler_inputs = [col + "_index" for col in train_string_features] + train_numeric_features
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# StringIndexer for encoding the target variable
label_indexer = StringIndexer(inputCol='target', outputCol="label", handleInvalid="skip")

# Create a Random Forest Classifier
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10, maxDepth=3, maxBins=62)

# Create a pipeline to chain indexers, assembler, and the classifier
pipeline = Pipeline(stages=indexers + [assembler, label_indexer, rf_classifier])

# Split the data into training and testing sets
(training_data, testing_data) = train_data.randomSplit([0.8, 0.2], seed=42)

# Train the Random Forest model
model = pipeline.fit(training_data)

# Make predictions on the test set
predictions = model.transform(testing_data)

# Storing the essential columns for performance metric generation
preds_df = predictions.select('target', 'prediction', 'probability').collect()

In [None]:
# Seperating true labels and predictions for performance metric calculations
true_labels = [i['target'] for i in preds_df]
preds = [i['prediction'] for i in preds_df]

print("Overall Accuracy:", accuracy_score(true_labels, preds))
print("Individual Recall scores:",recall_score(true_labels, preds, average=None))
print("Individual Precision scores:",precision_score(true_labels, preds, average=None))

# Dashboard

In [None]:
# Creating a seperate dataframe for dashboard data processing
dashboard_df = df_orig.alias('dashboard_df')

# Ensure the 'CRASH DATE' column is in datetime format
dashboard_df = dashboard_df.withColumn('CRASH DATE', F.to_timestamp(F.regexp_replace('CRASH DATE', '/', '-'), "MM-dd-yyyy"))

# Extract the month from the date
dashboard_df = dashboard_df.withColumn('Month', F.month('CRASH DATE'))

# Extract day of the week (1=Sunday, 7=Saturday) and hour of the day
dashboard_df = dashboard_df.withColumn('day_of_week', F.dayofweek(F.col('CRASH DATE')))
dashboard_df = dashboard_df.withColumn('hour_of_day', F.hour(F.col('CRASH TIME')))

# Extract year from Date
dashboard_df = dashboard_df.withColumn('year', F.year('CRASH DATE'))

# Extract month from 'CRASH_DATE'
dashboard_df = dashboard_df.withColumn('month_year', F.concat_ws('/', dashboard_df['Month'], dashboard_df['year']))

# Extracting data into Pandas Dataframe
dashboard_df_pd =  dashboard_df.toPandas()

# Dropping NaNs from the following features, as we are using them in the maps
dashboard_df_pd.dropna(subset=['LATITUDE', 'LONGITUDE', "ZIP CODE", 
                               "CONTRIBUTING FACTOR VEHICLE 1", "VEHICLE TYPE CODE 1" ], inplace=True)

In [None]:
# Sort the dataframe according to month and year
dashboard_df_pd = dashboard_df_pd.sort_values('month_year', key=sortDateSeries)

# Get all the month and year dates
month_year_list = dashboard_df_pd['month_year'].unique().tolist()

# Taking the top 25 contributing factors since there are some sparse ones
contributing_factors_list = dashboard_df_pd['CONTRIBUTING FACTOR VEHICLE 1'].value_counts().index.tolist()[:25]

# Taking the top 25 vehicle codes since there are some sparse ones
vehicle_code_list = dashboard_df_pd['VEHICLE TYPE CODE 1'].value_counts().index.tolist()[:25]

In [None]:
# Create a Dash App
app = Dash(__name__)


app.css.config.serve_locally = True
app.css.append_css({"external_url": r"C:\Users\91830\Documents\Notebooks\Big_data_project\assets\main.css"})
app.server.static_folder = "assets"

app.layout = html.Div(children=[
    
    html.H1("MOTOR CRASH ANALYSIS - A DEEP DIVE INTO ITS FACTORS AND CAUSES", 
            style={'font-family':'Open Sans Semi Bold', 'font-weight':'bold', 'letter-spacing': '4px',
                  'font-size':'30px'}),
    
    html.Div([
    html.H2('Month wise plot of number of accidents'),
    dcc.Graph(id="graph1", figure=fig1.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('How frequent the accident happened by day of week and hour of day?'),
    dcc.Graph(id="graph2", figure=fig2.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Cumulative Daily Total of Traffic Fatalities'),
    dcc.Graph(id="graph3", figure=fig3.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Number of total people killed in crashes'),
    dcc.Graph(id="graph4", figure=fig4.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Cumulative Daily Total of Traffic Injuries'),
    dcc.Graph(id="graph5", figure=fig5.update_layout(height=600))], className='div-for-charts'),

    html.Div([
    html.H2('Number of total people injured in crashes'),
    dcc.Graph(id="graph6", figure=fig6.update_layout(height=600))], className='div-for-charts'),

    html.Div([
    html.H2('Top 10 Streets with the Most Reported Accidents'),
    dcc.Graph(id="graph7", figure=fig7.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Top 10 contribution factors for accidents'),
    dcc.Graph(id="graph8", figure=fig8.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Which vehicle type was involved in most crashes'),
    dcc.Graph(id="graph9", figure=fig9.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Number of collisions for boroughs within each days of week'),
    dcc.Graph(id="graph10", figure=fig10.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Number of collisions for boroughs within each hour of the day'),
    dcc.Graph(id="graph11", figure=fig11.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Deaths occurred within each borough'),
    dcc.Graph(id="graph12", figure=fig12.update_layout(height=600))], className='div-for-charts'),
    
    html.Div([
    html.H2('Which vehicle type killed the most'),
    dcc.Graph(id="graph13", figure=fig13.update_layout(height=600))], className='div-for-charts'),

    html.Div([
    html.H2('Top contribution factors for accidents borough wise'),
    dcc.Graph(id="graph14", figure=fig14.update_layout(height=600))], className='div-for-charts'),

    html.Div([
        html.H2('Map view of Motor Crashes over time'),
        html.Iframe(id='map1', srcDoc = open("Month_year_visualization.html", 'r').read(), 
                    width='1024', height='768', className='div-center')],
    ),

    html.Div(children=[
        html.H2(children='Interactive Selection Map'),

        # Dropdown for selecting Contributing Factor
        dcc.Dropdown(
            id='contributing-factor-dropdown',
            options=[
                {'label': factor, 'value': factor} for factor in contributing_factors_list
            ],
            value=contributing_factors_list[0],
            style={'width': '100%'}
        ),

        # Dropdown for selecting Contributing Factor
        dcc.Dropdown(
            id='vehicle-code-dropdown',
            options=[
                {'label': factor, 'value': factor} for factor in vehicle_code_list
            ],
            value=vehicle_code_list[0],
            style={'width': '100%'}
        ),

        # Dropdown for selecting Contributing Factor
        dcc.Dropdown(
            id='month-year-dropdown',
            options=[
                {'label': factor, 'value': factor} for factor in month_year_list
            ],
            value=month_year_list[0],
            style={'width': '100%'}
        ),

        # Map to display the points
        dcc.Graph(
            id='contributing-factor-map'
        )
        
    ], className='div-for-charts')

])

# Define callback to update the map based on the selected Contributing Factor
@app.callback(
    Output('contributing-factor-map', 'figure'),
    [Input('contributing-factor-dropdown', 'value'), Input('vehicle-code-dropdown', 'value'),
    Input('month-year-dropdown', 'value')]
)
def update_map(selected_factor, selected_vehicle, selected_date):
    # Filter data based on the selected Contributing Factor
    filtered_data = dashboard_df_pd[
        (dashboard_df_pd['month_year'] == selected_date) & \
        (dashboard_df_pd['CONTRIBUTING FACTOR VEHICLE 1'] == selected_factor) & \
        (dashboard_df_pd['VEHICLE TYPE CODE 1'] == selected_vehicle) \
    ]
    
    if filtered_data.shape[0] != 0:
        # Create a map with markers
        fig = px.scatter_mapbox(
            filtered_data,
            lat='LATITUDE',
            lon='LONGITUDE',
            size_max=15,
            zoom=10
        )
    else:
        fig = px.scatter_mapbox(
            dashboard_df_pd.iloc[:2],
            lat='LATITUDE',
            lon='LONGITUDE',
            size_max=15,
            zoom=10,
            opacity=0
        ) 

    # Customize the map layout
    fig.update_layout(
        mapbox_style='carto-darkmatter',
        mapbox_zoom=10,
        mapbox_center={'lat': dashboard_df_pd['LATITUDE'].mean(), 'lon': dashboard_df_pd['LONGITUDE'].mean()},
        plot_bgcolor = "#31302F", 
        paper_bgcolor = "#31302F",
        font=dict(family="Open Sans", color='#9467bd')
    )

    return fig

app.run_server(debug=True,  jupyter_mode='tab')