# Machine learning model building section: LogisticsRegression Model to predict delays due to an accident

In [1]:
# For pyspark session creation and using pyspark and SQL functionality for data analysis
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# For performing pandas dataframe calculations
import pandas as pd
import numpy as np

# machine learing MLIB libraries
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [2]:
# start the spark session
# use local[*] to use all the available core of the computer for processing
sparkSession = SparkSession.builder.master("local[*]").config("spark.driver.memory","4g")\
                        .appName("7082CEM_CourseWork_Sushil_ML_Section")\
                        .getOrCreate()

In [43]:
# load the input data file( csv format, stored on local storage)
accidents_df=sparkSession.read.csv("US_Accidents_June20.csv",inferSchema=True,header=True)

type(accidents_df)

pyspark.sql.dataframe.DataFrame

In [44]:
# copy the original dataframe just in case required 
orginal_df = accidents_df

In [45]:
# Drop the fields based on the initial assessment
accidents_df=accidents_df.drop('ID','End_Lat','Start_Lat','Start_Lng','End_Lng','TMC','Number','Civil_Twilight',\
                               'Nautical_Twilight','Astronomical_Twilight',\
                               'Airport_Code','Country','Street','Zipcode','Weather_TimeStamp',\
                               'Side','Pressure(in)','Wind_Chill(F)','Precipitation(in)')

In [46]:
accidents_df=accidents_df.drop('Turning_Loop','Stop','Station','Roundabout','Railway','No_Exit','Give_Way',\
                              'Bump','Amenity')

In [47]:
accidents_df=accidents_df.drop('Description','Wind_Direction','Wind_Speed(mph)','Humidity(%)',\
                               'Temperature(F)','Traffic_Calming','Source','County')

In [48]:
# calculate delays due an accident : based on accident start time and end time. Convert into minutes
accidents_df= accidents_df.withColumn('durationAcc', (accidents_df['End_Time'].\
                                                  cast('long') - accidents_df['Start_Time'].cast('long'))/60)

accidents_df = accidents_df.withColumn('durationAcc',accidents_df['durationAcc'].cast('int'))

In [49]:
#(training_data, test_data) = accidents_df.randomSplit([0.8,0.2])

In [50]:
# find all the datatypes in dataset to decide on categorical features and continuous variable
[i for i in accidents_df.dtypes]

[('Severity', 'int'),
 ('Start_Time', 'timestamp'),
 ('End_Time', 'timestamp'),
 ('Distance(mi)', 'double'),
 ('City', 'string'),
 ('State', 'string'),
 ('Timezone', 'string'),
 ('Visibility(mi)', 'double'),
 ('Weather_Condition', 'string'),
 ('Crossing', 'boolean'),
 ('Junction', 'boolean'),
 ('Traffic_Signal', 'boolean'),
 ('Sunrise_Sunset', 'string'),
 ('durationAcc', 'int')]

In [None]:
# find null value count in data set

In [51]:
data_agg = accidents_df.agg(*[count(when(isnull(c), c)).alias(c) for c in accidents_df.columns])

In [52]:
data_agg.toPandas().T

Unnamed: 0,0
Severity,0
Start_Time,0
End_Time,0
Distance(mi),0
City,112
State,0
Timezone,3880
Visibility(mi),75856
Weather_Condition,76138
Crossing,0


In [13]:
accidents_df.count()

3513617

In [14]:
# as the dataset has many rows so dropping rows with null value is not going to impact much. 
accidents_df=accidents_df.dropna(how='any')

In [15]:
accidents_df.show(5)

+--------+-------------------+-------------------+------------+------------+-----+----------+--------------+-----------------+--------+--------+--------------+--------------+-----------+
|Severity|         Start_Time|           End_Time|Distance(mi)|        City|State|  Timezone|Visibility(mi)|Weather_Condition|Crossing|Junction|Traffic_Signal|Sunrise_Sunset|durationAcc|
+--------+-------------------+-------------------+------------+------------+-----+----------+--------------+-----------------+--------+--------+--------------+--------------+-----------+
|       3|2016-02-08 05:46:00|2016-02-08 11:00:00|        0.01|      Dayton|   OH|US/Eastern|          10.0|       Light Rain|   false|   false|         false|         Night|        314|
|       2|2016-02-08 06:07:59|2016-02-08 06:37:59|        0.01|Reynoldsburg|   OH|US/Eastern|          10.0|       Light Rain|   false|   false|         false|         Night|         30|
|       2|2016-02-08 06:49:27|2016-02-08 07:19:27|        0.01|Wi

In [16]:
# re-calculate the null values after dropping rows with null value
data_agg = accidents_df.agg(*[count(when(isnull(c), c)).alias(c) for c in accidents_df.columns])


In [17]:
data_agg.show()

+--------+----------+--------+------------+----+-----+--------+--------------+-----------------+--------+--------+--------------+--------------+-----------+
|Severity|Start_Time|End_Time|Distance(mi)|City|State|Timezone|Visibility(mi)|Weather_Condition|Crossing|Junction|Traffic_Signal|Sunrise_Sunset|durationAcc|
+--------+----------+--------+------------+----+-----+--------+--------------+-----------------+--------+--------+--------------+--------------+-----------+
|       0|         0|       0|           0|   0|    0|       0|             0|                0|       0|       0|             0|             0|          0|
+--------+----------+--------+------------+----+-----+--------+--------------+-----------------+--------+--------+--------------+--------------+-----------+



In [18]:
# split the original data into two parts : training data- for training the model, test_data - for testing the model
(training_data, test_data) = accidents_df.randomSplit([0.8,0.2])

In [19]:
training_data=training_data.withColumn('durationAcc',when(training_data['durationAcc'] < 0,0).\
                         otherwise(training_data['durationAcc']))

In [20]:
training_data=training_data.withColumn('durationAcc',when(training_data['durationAcc'] >360,400).\
                         otherwise(training_data['durationAcc']))

In [21]:
training_data.describe('durationAcc').show()

+-------+------------------+
|summary|       durationAcc|
+-------+------------------+
|  count|           2745233|
|   mean| 87.46776466696998|
| stddev|104.73335456185325|
|    min|                 0|
|    max|               400|
+-------+------------------+



In [22]:
# convert categorical features into numeric using Indexer and OneHotEncoderEstimatort
stage1=StringIndexer(inputCol='City',outputCol='City_index',handleInvalid='keep',)
stage2=StringIndexer(inputCol='State',outputCol='State_index',handleInvalid='keep',)
stage3=StringIndexer(inputCol='Weather_Condition',outputCol='Weather_index',handleInvalid='keep',)


In [23]:
OHE = OneHotEncoderEstimator(inputCols=['City_index', 'State_index','Weather_index'],\
                             outputCols=['City_OHE', 'State_OHE','Weather_OHE'])

In [24]:
# assemble all the features which are used for model building into a single column
stage_4 = VectorAssembler(inputCols=['Severity', 'City_OHE', 'State_OHE', 'Weather_OHE'],
                          outputCol='features')

In [25]:
# create a logisticRegression model with input and output label column
stage_5 = LogisticRegression(featuresCol='features',labelCol='durationAcc')

In [26]:
# create the pipeline with all the steps required for model building
regression_pipeline = Pipeline(stages= [stage1, stage2, stage3,OHE, stage_4, stage_5])

In [27]:
# fit the model with training data
model = regression_pipeline.fit(training_data)

In [28]:
# transform the trained model
training_data_update = model.transform(training_data)

In [29]:
training_data_update.printSchema()

root
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Crossing: boolean (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- Traffic_Signal: boolean (nullable = true)
 |-- Sunrise_Sunset: string (nullable = true)
 |-- durationAcc: integer (nullable = true)
 |-- City_index: double (nullable = false)
 |-- State_index: double (nullable = false)
 |-- Weather_index: double (nullable = false)
 |-- City_OHE: vector (nullable = true)
 |-- State_OHE: vector (nullable = true)
 |-- Weather_OHE: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- predi

In [30]:
# check the newly created columns like prediction and probability
training_data_update.select('features','rawPrediction','probability','prediction','durationAcc').show(10)

+--------------------+--------------------+--------------------+----------+-----------+
|            features|       rawPrediction|         probability|prediction|durationAcc|
+--------------------+--------------------+--------------------+----------+-----------+
|(11569,[0,705,114...|[-1.8265070772726...|[1.18393016015430...|      29.0|         30|
|(11569,[0,20,1140...|[-1.7579878772627...|[3.46421546727922...|      29.0|         63|
|(11569,[0,139,114...|[-1.7666481526789...|[3.97320015397987...|      29.0|         60|
|(11569,[0,170,114...|[-1.7663355860556...|[4.29333213253871...|      29.0|         30|
|(11569,[0,302,114...|[-1.8190396701676...|[1.04387394954724...|      29.0|         45|
|(11569,[0,140,114...|[-1.7726112401167...|[3.96603401395655...|      29.0|         90|
|(11569,[0,610,114...|[-1.8334759817796...|[4.36694535432026...|      29.0|         30|
|(11569,[0,633,114...|[-1.7327285346091...|[6.45200131241716...|     360.0|         30|
|(11569,[0,35,1141...|[-1.813221

In [31]:
# drop the durationAcc from the test dataset
# test_data=test_data.drop('durationAcc')

In [32]:
test_data.printSchema()

root
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Crossing: boolean (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- Traffic_Signal: boolean (nullable = true)
 |-- Sunrise_Sunset: string (nullable = true)
 |-- durationAcc: integer (nullable = true)



In [33]:
# run the model using the test data
predict = model.transform(test_data)

In [34]:
predict.printSchema()

root
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Crossing: boolean (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- Traffic_Signal: boolean (nullable = true)
 |-- Sunrise_Sunset: string (nullable = true)
 |-- durationAcc: integer (nullable = true)
 |-- City_index: double (nullable = false)
 |-- State_index: double (nullable = false)
 |-- Weather_index: double (nullable = false)
 |-- City_OHE: vector (nullable = true)
 |-- State_OHE: vector (nullable = true)
 |-- Weather_OHE: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- predi

In [35]:
predict.describe().show(2)

+-------+------------------+-------------------+------+------+--------+-----------------+-----------------+--------------+------------------+-----------------+-----------------+----------------+------------------+
|summary|          Severity|       Distance(mi)|  City| State|Timezone|   Visibility(mi)|Weather_Condition|Sunrise_Sunset|       durationAcc|       City_index|      State_index|   Weather_index|        prediction|
+-------+------------------+-------------------+------+------+--------+-----------------+-----------------+--------------+------------------+-----------------+-----------------+----------------+------------------+
|  count|            684268|             684268|684268|684268|  684268|           684268|           684268|        684268|            684268|           684268|           684268|          684268|            684268|
|   mean|2.3387576212828893|0.27849951626251285|  null|  null|    null|9.119851900132694|             null|          null|113.51050027182332|585

In [36]:
# evaluatoion of the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [38]:
evaluator = MulticlassClassificationEvaluator(labelCol='durationAcc',\
                                                predictionCol='prediction',\
                                            metricName='accuracy')

In [39]:
accuracy = evaluator.evaluate(predict)

In [40]:
print('Test Accuracy = ',accuracy)

Test Accuracy =  0.34020149999707716
