# Predictive Analysis on Motor vehicle Collisions in New York City

Importing all the necessary libraries

In [1]:

import findspark
findspark.init()
from pyspark.sql import SparkSession
from google.cloud import storage
from pyspark.sql.functions import col, mean, avg, coalesce, when, to_date, to_timestamp, udf, lit, percentile_approx, sum, hour, year, dayofweek
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType, DateType, DoubleType
from geopy.geocoders import Nominatim
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.sql import functions as F
import ssl
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

Creating spark session

In [2]:
# Creating a spark session
spark = SparkSession.builder \
  .appName('penneni_603_project_sample') \
  .config("spark.jars", "/usr/local/cellar/apache-spark/3.5.0/libexec/jars/gcs-connector-hadoop3-latest.jar") \
  .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/tejaswinipenneni/Downloads/bigdata-project-603-cbaafc8fbd38.json") \
  .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
  .getOrCreate()

24/04/10 16:50:39 WARN Utils: Your hostname, Tejaswinis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.241 instead (on interface en0)
24/04/10 16:50:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/10 16:50:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Reading data from Google Cloud Storage

Accessing the data from Google Cloud Storage

In [3]:
# Creating a client to interact with Google Cloud Storage
gcs_client = storage.Client()

# Getting the bucket object
bucket = gcs_client.bucket('603-bigdata-project-group')


Read the data from Google Cloud Platform using spark and schema

In [4]:
# Reading CSV file from GCP using spark
data = spark \
  .read \
  .option ( "inferSchema" , "true" ) \
  .option ( "header" , "true" ) \
  .csv ( "gs://603-bigdata-project-group/Motor_Vehicle_Collisions_-_Crashes.csv" )

24/04/10 16:50:52 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

printing the Schema of dataset

In [5]:
# Printing the schema
data.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

In [6]:
# Displaying the dataframe
data.show()

24/04/10 16:50:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+----------+---------+--------+---------+----------+--------------------+--------------------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE| LATITUDE| LONGITUDE|            LOCATION|      ON STREET NAME|   CROSS STREET NAME|     OFF STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING

In [7]:
# Describe the DataFrame
data.describe().show()



+-------+---------------+----------+-------------+------------------+------------------+------------------+----------+--------------------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|summary|     CRASH DATE|CRASH TIME|      BOROUGH|          ZIP CODE|          LATITUDE|         LONGITUDE|  LOCATION|      ON STREET NAME|   CROSS STREET NAME|     OFF STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST 

                                                                                

# DATA CLEANING AND PREPROCESSING

In [8]:
# List of columns to remove
columns_to_remove = ['CROSS STREET NAME', 
                     'OFF STREET NAME','CONTRIBUTING FACTOR VEHICLE 2','CONTRIBUTING FACTOR VEHICLE 3',
                     'CONTRIBUTING FACTOR VEHICLE 4', 
                     'CONTRIBUTING FACTOR VEHICLE 5','VEHICLE TYPE CODE 2', 'VEHICLE TYPE CODE 3', 
                     'VEHICLE TYPE CODE 4', 'VEHICLE TYPE CODE 5']

# Remove columns from the DataFrame
data = data.drop(*columns_to_remove)

Dropped unnecessary columns from dataframe for analysis

In [9]:
# displaying updated dataframe
data.show()

+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+------------+--------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE| LATITUDE| LONGITUDE|            LOCATION|      ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID| VEHICLE TYPE CODE 1|
+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-----------------

In [10]:
# Print the Dataframe Schema
data.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- COLLISION_ID: integer (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)



Convert datatypes of dataframe

In [11]:
# Converting datatypes of the columns

data = data.filter(data["CRASH DATE"].isNotNull()) \
               .withColumn("CRASH DATE", to_date(data["CRASH DATE"], "MM/dd/yyyy"))
data = data.withColumn("NUMBER OF MOTORIST INJURED", col("NUMBER OF MOTORIST INJURED").cast("integer"))
data = data.withColumn("NUMBER OF CYCLIST KILLED", col("NUMBER OF CYCLIST KILLED").cast("integer"))
data = data.withColumn("NUMBER OF PERSONS INJURED", col("NUMBER OF PERSONS INJURED").cast("integer"))
data.printSchema()


root
 |-- CRASH DATE: date (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: integer (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: integer (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: integer (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- COLLISION_ID: integer (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)



In [12]:
# Check for null values in each column
null_counts = data.select([sum(col(column).isNull().cast("int")).alias(column) for column in data.columns])

# Show null counts
null_counts.show()



+----------+----------+-------+--------+--------+---------+--------+--------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+------------+-------------------+
|CRASH DATE|CRASH TIME|BOROUGH|ZIP CODE|LATITUDE|LONGITUDE|LOCATION|ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID|VEHICLE TYPE CODE 1|
+----------+----------+-------+--------+--------+---------+--------+--------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------

                                                                                

There are notable null values in columns except in crash date and time. We have handled these null values in the next steps.

To handle null values in numerical columns, we have replace with mean value

In [13]:
numerical_columns = ["NUMBER OF PERSONS INJURED", "NUMBER OF PERSONS KILLED",
             "NUMBER OF PEDESTRIANS INJURED", "NUMBER OF PEDESTRIANS KILLED",
             "NUMBER OF CYCLIST INJURED", "NUMBER OF CYCLIST KILLED",
             "NUMBER OF MOTORIST INJURED", "NUMBER OF MOTORIST KILLED"]

# Function to replace null values with mean for each column
def replace_null_with_mean(data, column_name):
    # Calculate mean
    mean_value = data.select(col(column_name)).agg({column_name: "mean"}).collect()[0][0]
    # Replace null values with mean
    return data.fillna({column_name: mean_value})

# Replace null values with mean for each numerical column
for column in numerical_columns:
    data = replace_null_with_mean(data, column)

# Show DataFrame with null values replaced by mean
data.show()



+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+------------+--------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE| LATITUDE| LONGITUDE|            LOCATION|      ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID| VEHICLE TYPE CODE 1|
+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-----------------

                                                                                

Handle String column values with unknown

In [15]:
# Coverting string column null values
string_columns = [ "BOROUGH",
                  "ZIP CODE",
                  "LATITUDE",
                  "LONGITUDE",
                  "LOCATION",
                  "ON STREET NAME", 
                  "CONTRIBUTING FACTOR VEHICLE 1",
                  "VEHICLE TYPE CODE 1"]

for column in string_columns:
    data = data.withColumn(column, col(column).cast("string"))
    data = data.fillna({column: 'unknown'})

# Show DataFrame with null values replaced by 'unknown'
data.show()

+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+------------+--------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE| LATITUDE| LONGITUDE|            LOCATION|      ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID| VEHICLE TYPE CODE 1|
+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-----------------

Checking for duplicate rows

In [16]:
# Check for duplicate rows
duplicate_rows = data.count() - data.dropDuplicates().count()

if duplicate_rows > 0:
    print(f"There are {duplicate_rows} duplicate rows in the DataFrame.")
else:
    print("There are no duplicate rows in the DataFrame.")

24/04/10 17:00:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/10 17:00:14 WARN RowBasedKeyValueBatch: Calling spill() on

There are no duplicate rows in the DataFrame.




As you observe above There are no duplicate values in dataframe

In [17]:
data.show()

+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+------------+--------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE| LATITUDE| LONGITUDE|            LOCATION|      ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID| VEHICLE TYPE CODE 1|
+----------+----------+---------+--------+---------+----------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-----------------