In [5]:
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [6]:
# conf = SparkConf().setAppName("App")
# sc = SparkContext(conf = conf)
# ss = SparkSession.builder.getOrCreate()

# The Road Ahead
---
We break the notebook into separate steps.  Feel free to use the links below to navigate the notebook.

* [Step 1](#step1): Import Datasets From Mongodb
* [Step 2](#step2): Data Preprocessing
* [Step 3](#step3): Data Engineering
* [Step 4](#step4): Modeling
* [Step 5](#step5): Prediction and Model Evaluation

<a id='step1'></a>

## Step 1: Import Datasets From Mongodb

In [1]:
import os
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://54.186.235.165/mydb.accident")\
    .getOrCreate()

### Define our own schema

In [8]:
accidentSchema = StructType([ StructField("Accident_Index", StringType(), True), 
                         StructField('1st_Road_Class', StringType(), True),
                         StructField('1st_Road_Number', FloatType(), True), 
                         StructField("2nd_Road_Class", StringType(), True),
                         StructField('2nd_Road_Number', FloatType(), True), 
                         StructField("Accident_Severity", StringType(), True),
                         StructField("Carriageway_Hazards", StringType(), True), 
                         StructField("Date", StringType(), True),
                         StructField('Day_of_Week', StringType(), True), 
                         StructField("Did_Police_Officer_Attend_Scene_of_Accident", FloatType(), True),
                         StructField("Junction_Control", StringType(), True),
                         StructField('Junction_Detail', StringType(), True), 
                         StructField("Latitude", FloatType(), True),
                         StructField("Light_Conditions", StringType(), True),
                         StructField("Local_Authority_(District)", StringType(), True),
                         StructField("Local_Authority_(Highway)", StringType(), True),
                         StructField('Location_Easting_OSGR', FloatType(), True), 
                         StructField("Location_Northing_OSGR", FloatType(), True),
                         StructField("Longitude", FloatType(), True),
                         StructField("LSOA_of_Accident_Location", StringType(), True),
                         StructField("Number_of_Casualties", FloatType(), True),
                         StructField("Number_of_Vehicles", FloatType(), True),
                         StructField('Pedestrian_Crossing-Human_Control', FloatType(), True), 
                         StructField("Pedestrian_Crossing-Physical_Facilities", FloatType(), True),
                         StructField("Police_Force", StringType(), True),
                         StructField("Road_Surface_Conditions", StringType(), True),
                         StructField('Road_Type', StringType(), True), 
                         StructField("Special_Conditions_at_Site", StringType(), True),
                         StructField("Speed_limit", FloatType(), True),
                         StructField('Time', StringType(), True), 
                         StructField("Urban_or_Rural_Area", StringType(), True),
                         StructField("Weather_Conditions", StringType(), True),
                         StructField("Year", FloatType(), True),
                         StructField("InScotland", StringType(), True),
                       ])

In [9]:
carSchema = StructType([StructField("Accident_Index", StringType(), True),
                        StructField("Age_Band_of_Driver", StringType(), True),
                        StructField('Age_of_Vehicle', FloatType(), True),
                        StructField("Driver_Home_Area_Type",StringType(), True),
                        StructField('Driver_IMD_Decile', FloatType(), True),
                        StructField("Engine_Capacity_.CC", FloatType(), True),
                        StructField("Hit_Object_in_Carriageway",StringType(), True),
                        StructField("Hit_Object_off_Carriageway",StringType(), True),
                        StructField('Journey_Purpose_of_Driver',StringType(), True),
                        StructField("Junction_Location", StringType(), True),
                        StructField("make", StringType(), True),
                        StructField('model', StringType(), True),
                        StructField("Propulsion_Code", StringType(), True),
                        StructField("Sex_of_Driver", StringType(), True),
                        StructField("Skidding_and_Overturning",StringType(), True),
                        StructField("Towing_and_Articulation",StringType(), True),
                        StructField('Vehicle_Leaving_Carriageway',StringType(), True),
                        StructField("Vehicle_Location_Restricted_Lane", StringType(), True),
                        StructField("Vehicle_Manoeuvre", StringType(), True),
                        StructField("Vehicle_Reference", StringType(), True),
                        StructField("Vehicle_Type", StringType(), True),
                        StructField("Was_Vehicle_Left_Hand_Drive",StringType(), True),
                        StructField('X1st_Point_of_Impact',StringType(), True),
                        StructField("Year", FloatType(), True),
                        StructField("Accident_Severity", StringType(), True),
                        ])

### Load two datasets

In [7]:
accident_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=accidentSchema)

In [8]:
accident_df.show(1)

+--------------+------------------+--------------+---------------------+-----------------+----------------+-------------------------+--------------------------+-------------------------+--------------------+---------------+-------------+------------------------+-----------------------+---------------------------+----------------+-----------------+-----------------+------------+---------------------------+--------------------+----+--------------------+----+-----+
|Accident_Index|Age_Band_of_Driver|Age_of_Vehicle|Driver_Home_Area_Type|Driver_IMD_Decile|Engine_Capacity_|Hit_Object_in_Carriageway|Hit_Object_off_Carriageway|Journey_Purpose_of_Driver|   Junction_Location|Propulsion_Code|Sex_of_Driver|Skidding_and_Overturning|Towing_and_Articulation|Vehicle_Leaving_Carriageway|Vehicle_Location|Vehicle_Manoeuvre|Vehicle_Reference|Vehicle_Type|Was_Vehicle_Left_Hand_Drive|X1st_Point_of_Impact|Year|                 _id|make|model|
+--------------+------------------+--------------+----------------

In [4]:
spark2 = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://54.186.235.165/mydb.car")\
    .getOrCreate()

In [5]:
vehicle_df = spark2.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=carSchema)

In [9]:
vehicle_df.show(1)

+--------------+------------------+--------------+---------------------+-----------------+----------------+-------------------------+--------------------------+-------------------------+--------------------+---------------+-------------+------------------------+-----------------------+---------------------------+----------------+-----------------+-----------------+------------+---------------------------+--------------------+----+--------------------+----+-----+
|Accident_Index|Age_Band_of_Driver|Age_of_Vehicle|Driver_Home_Area_Type|Driver_IMD_Decile|Engine_Capacity_|Hit_Object_in_Carriageway|Hit_Object_off_Carriageway|Journey_Purpose_of_Driver|   Junction_Location|Propulsion_Code|Sex_of_Driver|Skidding_and_Overturning|Towing_and_Articulation|Vehicle_Leaving_Carriageway|Vehicle_Location|Vehicle_Manoeuvre|Vehicle_Reference|Vehicle_Type|Was_Vehicle_Left_Hand_Drive|X1st_Point_of_Impact|Year|                 _id|make|model|
+--------------+------------------+--------------+----------------

<a id='step2'></a>
## Step 2: Data Preprocessing

### Select the data and features we are interested in

*For Accident Dataset*

In [11]:
accident_df = accident_df[(accident_df.Number_of_Vehicles == 1) & (
    accident_df.Year <= 2016)]

In [12]:
accident_features = ['Accident_Index',
                     '1st_Road_Class',
                     '1st_Road_Number',
                     'Accident_Severity',
                     'Carriageway_Hazards',
                     'Day_of_Week',
                     'Junction_Detail',
                     'Light_Conditions',
                     'Number_of_Vehicles',
                     'Pedestrian_Crossing-Human_Control',
                     'Pedestrian_Crossing-Physical_Facilities',
                     'Police_Force',
                     'Road_Surface_Conditions',
                     'Road_Type',
                     'Special_Conditions_at_Site',
                     'Speed_limit',
                     'Urban_or_Rural_Area',
                     'Weather_Conditions']

In [13]:
df_1=accident_df[accident_features]

*For Car Dataset*

In [16]:
vehicle_features = ['Accident_Index',
                     'Age_Band_of_Driver',
                     'Age_of_Vehicle',
                     'Driver_Home_Area_Type',
                     'Propulsion_Code',
                     'Sex_of_Driver',
                     'Towing_and_Articulation',
                     'Vehicle_Location_Restricted_Lane',
                     'Vehicle_Manoeuvre',
                     'Vehicle_Type']

In [17]:
df_2 = vehicle_df[vehicle_features]

### Join two tables

In [18]:
joined = df_1.join(df_2,"Accident_Index","left_outer")

In [19]:
joined_labeled = joined.where(
    "Accident_Severity == 'Slight' or Accident_Severity == 'Serious' or Accident_Severity == 'Fatal'")

In [20]:
joined_labeled.columns

['Accident_Index',
 '1st_Road_Class',
 '1st_Road_Number',
 'Accident_Severity',
 'Carriageway_Hazards',
 'Day_of_Week',
 'Junction_Detail',
 'Light_Conditions',
 'Number_of_Vehicles',
 'Pedestrian_Crossing-Human_Control',
 'Pedestrian_Crossing-Physical_Facilities',
 'Police_Force',
 'Road_Surface_Conditions',
 'Road_Type',
 'Special_Conditions_at_Site',
 'Speed_limit',
 'Urban_or_Rural_Area',
 'Weather_Conditions',
 'Age_Band_of_Driver',
 'Age_of_Vehicle',
 'Driver_Home_Area_Type',
 'Propulsion_Code',
 'Sex_of_Driver',
 'Towing_and_Articulation',
 'Vehicle_Location_Restricted_Lane',
 'Vehicle_Manoeuvre',
 'Vehicle_Type']

<a id='step3'></a>
## Step 3: Data Engineering

### Missing Value Imputation

1. For **FloatType** cols, impute with **mean**

2. For **StringType** cols, impute with **"empty"** 

In [21]:
Numeric_col = ['1st_Road_Number',
 'Pedestrian_Crossing-Human_Control',
 'Pedestrian_Crossing-Physical_Facilities',
 'Speed_limit',
 'Age_of_Vehicle']
Numeric_col2 = ['1st_Road_Number_2',
 'Pedestrian_Crossing-Human_Control_2',
 'Pedestrian_Crossing-Physical_Facilities_2',
 'Speed_limit_2',
 'Age_of_Vehicle_2']
String_col = ['1st_Road_Class',
 'Carriageway_Hazards',
 'Day_of_Week',
 'Junction_Detail',
 'Light_Conditions',
 'Police_Force',
 'Road_Surface_Conditions',
 'Road_Type',
 'Special_Conditions_at_Site',
 'Urban_or_Rural_Area',
 'Weather_Conditions',
 'Age_Band_of_Driver',
 'Driver_Home_Area_Type',
 'Propulsion_Code',
 'Sex_of_Driver',
 'Towing_and_Articulation',
 'Vehicle_Location_Restricted_Lane',
 'Vehicle_Manoeuvre',
 'Vehicle_Type','Accident_Severity']

In [22]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = Numeric_col, outputCols= Numeric_col2)
model = imputer.fit(joined_labeled).transform(joined_labeled)
filled = model.select(Numeric_col2+String_col)

In [23]:
joined_labeled_filled = filled.na.fill("empty", String_col)

### Encode and Assemble Features

In [25]:
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    newdf = df
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+"-num").setHandleInvalid("keep")
        sm = si.fit(newdf)
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

dfnumeric = indexStringColumns(joined_labeled_filled, String_col)

In [26]:
from pyspark.ml.feature import OneHotEncoder

def oneHotEncodeColumns(df, cols):  
    newdf = df
    for c in cols:
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot")
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric,String_col[0:-1])

In [27]:
features = ['1st_Road_Number_2',
 'Pedestrian_Crossing-Human_Control_2',
 'Pedestrian_Crossing-Physical_Facilities_2',
 'Speed_limit_2',
 'Age_of_Vehicle_2',
 '1st_Road_Class',
 'Carriageway_Hazards',
 'Day_of_Week',
 'Junction_Detail',
 'Light_Conditions',
 'Police_Force',
 'Road_Surface_Conditions',
 'Road_Type',
 'Special_Conditions_at_Site',
 'Urban_or_Rural_Area',
 'Weather_Conditions',
 'Age_Band_of_Driver',
 'Driver_Home_Area_Type',
 'Propulsion_Code',
 'Sex_of_Driver',
 'Towing_and_Articulation',
 'Vehicle_Location_Restricted_Lane',
 'Vehicle_Manoeuvre',
 'Vehicle_Type']

In [92]:
dfhot.show()

+-----------------+-----------------------------------+-----------------------------------------+-------------+----------------+-----------------+--------------+-------------------+-------------+---------------+----------------+---------------+-----------------------+-------------+--------------------------+-------------------+------------------+------------------+---------------------+---------------+-------------+-----------------------+--------------------------------+-----------------+--------------+
|1st_Road_Number_2|Pedestrian_Crossing-Human_Control_2|Pedestrian_Crossing-Physical_Facilities_2|Speed_limit_2|Age_of_Vehicle_2|Accident_Severity|1st_Road_Class|Carriageway_Hazards|  Day_of_Week|Junction_Detail|Light_Conditions|   Police_Force|Road_Surface_Conditions|    Road_Type|Special_Conditions_at_Site|Urban_or_Rural_Area|Weather_Conditions|Age_Band_of_Driver|Driver_Home_Area_Type|Propulsion_Code|Sex_of_Driver|Towing_and_Articulation|Vehicle_Location_Restricted_Lane|Vehicle_Manoeu

In [29]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
# except the label col.
va = VectorAssembler(outputCol="features", inputCols=features)
df_va = va.transform(dfhot).select("features", "Accident_Severity").withColumnRenamed("Accident_Severity", "label")

In [30]:
df_va.groupBy("label").count().show()

+-----+------+
|label| count|
+-----+------+
|  0.0|452989|
|  1.0|114444|
|  2.0| 11093|
+-----+------+



We are dealing with **highly imbalanced** data, we need to resampling before building our model

In [31]:
df_va.select("features").show(1)

+--------------------+
|            features|
+--------------------+
|(223,[0,3,4,5,11,...|
+--------------------+
only showing top 1 row



<a id='step4'></a>
## Step 4: Modeling

### Split into training/test

In [68]:
pendtsets = df_va.randomSplit([0.8, 0.2],seed = 697) 
train = pendtsets[0].cache()
test = pendtsets[1].cache()

### Resampling the training set

1. Upsample the minority group
2. Downsample the majority group

In [69]:
down_sampling_ratio = train.groupBy("label").count().collect(
)[1][1]*1.0/(train.groupBy("label").count().collect()[0][1])

In [70]:
over_sampling_ratio = train.groupBy("label").count().collect(
)[1][1]/(train.groupBy("label").count().collect()[2][1])

In [71]:
train_resampled = train.sampleBy("label",fractions={0.0:float(down_sampling_ratio), 1.0:1})

In [72]:
fatal = train.filter("label == 2.0").sample(withReplacement=True,fraction=float(over_sampling_ratio))
train_data = train_resampled.unionAll(fatal)

In [73]:
train_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|91281|
|  1.0|91548|
|  2.0|91907|
+-----+-----+



### 1. RandomForestClassifer

In [75]:
%%time
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(maxDepth=10)
rfmodel = rf.fit(train_data)

CPU times: user 55.8 ms, sys: 8.46 ms, total: 64.3 ms
Wall time: 3min 29s


### 2. DecisionTreeClassifer

In [76]:
%%time
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(maxDepth=10,impurity="entropy", seed=2019)
dtmodel = dt.fit(train_data)

CPU times: user 25.8 ms, sys: 941 µs, total: 26.7 ms
Wall time: 1min


### 3. NaiveBayes

In [77]:
%%time
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
nbmodel = nb.fit(train_data)

CPU times: user 16.2 ms, sys: 570 µs, total: 16.8 ms
Wall time: 2.17 s


<a id='step5'></a>
## Step 5: Prediction and Model Evaluation

### 1. F1_Score Analysis

In [78]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rfpredicts = rfmodel.transform(test)
dtpredicts = dtmodel.transform(test)
nbpredicts = nbmodel.transform(test)

In [79]:
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)

In [80]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1")
F1_rf = evaluator.evaluate(rfpredicts)
F1_dt = evaluator.evaluate(dtpredicts)
F1_nb = evaluator.evaluate(nbpredicts)
print("F1_rf =  %g" % F1_rf)
print("F1_dt =  %g" % F1_dt)
print("F1_nb =  %g" % F1_nb)

F1_rf =  0.553814
F1_dt =  0.582893
F1_nb =  0.43958


In [81]:
rf_train_predicts = rfmodel.transform(train_data)
dt_train_predicts = dtmodel.transform(train_data)
nb_train_predicts = nbmodel.transform(train_data)
F1_rf_train_ = evaluator.evaluate(rf_train_predicts)
F1_dt_train_ = evaluator.evaluate(dt_train_predicts)
F1_nb_train_ = evaluator.evaluate(nb_train_predicts)
print("F1_rf_train_ =  %g" % F1_rf_train_)
print("F1_dt_train_ =  %g" % F1_dt_train_)
print("F1_nb_train_ =  %g" % F1_nb_train_)

F1_rf_train_ =  0.479504
F1_dt_train_ =  0.447107
F1_nb_train_ =  0.393228


### 2. Recall Matrix Analysis

In [82]:
def recall_rate(predictions):
    """Compute recall per class."""
    actual_positives = predictions.groupBy('label').count()
    true_positives = predictions.groupBy('label', 'prediction').count().filter('label == prediction')
    output = (actual_positives.join(true_positives, "label")
                             .withColumn("recall",
                             true_positives['count'] / actual_positives['count'])
                             .select("label", "recall"))
    return output

In [83]:
rf_recall = recall_rate(rfpredicts)
dt_recall = recall_rate(dtpredicts)
nb_recall = recall_rate(nbpredicts)

print("Random_Forest_Recall_Rate")
rf_recall.show()
print("Decision_Tree_Recall_Rate")
dt_recall.show()
print("Naive_Bayes_Recall_Rate")
nb_recall.show()

Random_Forest_Recall_Rate
+-----+------------------+
|label|            recall|
+-----+------------------+
|  0.0|0.5256262341638748|
|  1.0|0.2589660601819454|
|  2.0|0.6083733100741386|
+-----+------------------+

Decision_Tree_Recall_Rate
+-----+-------------------+
|label|             recall|
+-----+-------------------+
|  0.0| 0.5994320072774068|
|  1.0|0.17122988103568929|
|  2.0| 0.5856955952900131|
+-----+-------------------+

Naive_Bayes_Recall_Rate
+-----+-------------------+
|label|             recall|
+-----+-------------------+
|  0.0|0.36734263717246124|
|  1.0|0.22266445066480056|
|  2.0| 0.6467509812472743|
+-----+-------------------+



In [53]:
sc.stop()