**NYPD Patrol Assistance System**

In [2]:

import pandas as pd
from pyspark import SparkContext 
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.ml import feature
from pyspark.ml import classification
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metrics
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

**Data Source **

The data can be obtained from the following Website:

data.ny.gov in the name of "Crash data"


**Versions of Software**


The version of the software is Python 3.0 and Apache Spark 2.2.0

In [4]:
$$$$$$$$$$$$$$$$$$$$$$$$$$$$#Test data with only one row to check prediction
test_data = sqlContext.read.format("csv").option("header", "true").load("/FileStore/tables/data1.csv")

In [5]:
#Loading data into databricks as a spark dataframe
myData1 = sqlContext.read.format("csv").option("header", "true").load("/FileStore/tables/Category.csv")

In [6]:
#Loading data onto databricks as a spark dataframe
myData = sqlContext.read.format("csv").option("header", "true").load("/FileStore/tables/myData.csv")

In [7]:
#Displaying data as a table
display(myData)

Year,Crash Descriptor,Time,Date,Day of Week,Police Report,Lighting Conditions,Municipality,Collision Type Descriptor,County Name,Road Descriptor,Weather Conditions,Traffic Control Device,Road Surface Conditions,DOT Reference Marker Location,Pedestrian Bicyclist Action,Event Descriptor,Number of Vehicles Involved
2014,Property Damage Accident,8:30,1/3/2014,Friday,Y,Daylight,WATERTOWN,OVERTAKING,JEFFERSON,Straight and Level,Clear,,Snow/Ice,,Not Applicable,"Other Motor Vehicle, Collision With",2
2014,Property Damage & Injury Accident,14:48,1/3/2014,Friday,Y,Daylight,WATERTOWN,REAR END,JEFFERSON,Straight and Level,Cloudy,Traffic Signal,Snow/Ice,3 73022024,Not Applicable,"Other Motor Vehicle, Collision With",2
2014,Property Damage & Injury Accident,11:30,1/4/2014,Saturday,Y,Daylight,WATERTOWN,OTHER,JEFFERSON,Straight and Grade,Clear,,Snow/Ice,,Not Applicable,"Building/Wall, Collision With Fixed Object",1
2014,Property Damage Accident,7:45,1/3/2014,Friday,Y,Daylight,WATERTOWN,RIGHT ANGLE,JEFFERSON,Straight and Level,Clear,Stop Sign,Snow/Ice,,Not Applicable,"Other Motor Vehicle, Collision With",2
2014,Property Damage Accident,15:11,1/4/2014,Saturday,Y,Daylight,WATERTOWN,RIGHT ANGLE,JEFFERSON,Straight and Level,Cloudy,,Wet,3 73022020,Not Applicable,"Other Motor Vehicle, Collision With",2
2014,Property Damage Accident,17:12,1/4/2014,Saturday,Y,Dark-Road Unlighted,FRANKFORT,OTHER,HERKIMER,Straight and Level,Cloudy,,Snow/Ice,,Not Applicable,"Snow Embankment, Collision With Fixed Object",1
2014,Injury Accident,15:15,1/3/2014,Friday,Y,Daylight,WARSAW,REAR END,WYOMING,Straight and Level,Clear,,Snow/Ice,19 46021177,Not Applicable,"Other Motor Vehicle, Collision With",2
2014,Property Damage Accident,11:30,1/3/2014,Friday,Y,Daylight,ROTTERDAM,OTHER,SCHENECTADY,Curve and Level,Snow,,Snow/Ice,890I16013001,Not Applicable,"Median - Not At End, Collision With Fixed Object",1
2014,Property Damage Accident,15:30,1/3/2014,Friday,Y,Daylight,EAST AURORA,RIGHT ANGLE,ERIE,Straight and Level,Clear,,Dry,16 53021191,Not Applicable,"Other Motor Vehicle, Collision With",2
2014,Property Damage & Injury Accident,8:02,1/3/2014,Friday,Y,Daylight,AURORA,OTHER,ERIE,Curve and Grade,Cloudy,,Snow/Ice,400 53011024,Not Applicable,"Guide Rail - Not At End, Collision With Fixed Object",1


**Displaying the dimension of the data**

In [9]:
#print((myData.count(), len(df.columns)))
print 'Number of rows in the dataframe : ', myData.count()
print 'Number of columns in the dataframe :' , len(myData.columns)

In [10]:
#Displaying t the column names
myData.columns

In [11]:
type(myData)

In [12]:
#Converting the dataframe to a pandas dataframe to manipulate the variables
Data_Pandas = myData.toPandas()


In [13]:
#Isolating the target column
Predicted =Data_Pandas['Crash Descriptor']

In [14]:
#Extracting the time column from the dataframe
Time = Data_Pandas['Time']
#Extracting hour information from time
Time = Time.map(lambda x:x.split(':'))
Time = Time.map(lambda x: x[0])

In [15]:
#Extracted the hour alone after eliminating the minutes
Time

In [16]:
#Extracting date column from the dataframe
Date = Data_Pandas['Date']

In [17]:
#Extracting month information from date
Date = Date.map(lambda x:x.split('/'))
Date  = Date.map(lambda x:x[0])

In [18]:
#Deleting date and time column from original dataframe to avoid problems while merging
del Data_Pandas['Date']
del Data_Pandas['Time']

In [19]:
#Displaying columns without date and time
Data_Pandas.columns

In [20]:
#Joining the new date and time columns  to the dataframe
Data_Pandas = Data_Pandas.join(Time,how = "outer")
Data_Pandas = Data_Pandas.join(Date,how = "outer")

In [21]:
#Validating the join by checking a random row value
Data_Pandas.iloc[818078]

In [22]:
#Display columns for validation
Data_Pandas.columns

In [23]:
#Creating the Predicted variable Fatal Accident from Crash descriptor variable
#If the Cr
Data_Pandas['Fatal Accident'] = Data_Pandas['Crash Descriptor'].apply(lambda x: 'Y' if x == 'Fatal Accident' else 'N')

In [24]:
#Validating the join
Data_Pandas.columns

In [25]:
#Renaming the date column as month
Data_Pandas = Data_Pandas.rename(columns = {'Date':'Month'})

In [26]:
#Checking for NULL values in the predicted variable
Data_Pandas['Fatal Accident'].isnull().values.any()

In [27]:
Data_Pandas['Fatal Accident'].describe(include = 'all')

In [28]:
#Converting the pandas dataframe to a spark dataframe
Data = sqlContext.createDataFrame(Data_Pandas)


In [29]:
#Displaying the spark dataframe
Data.columns

**Data Splitting**

The data is split into three parts:

**Training**: To fit the model

**Validation**: For Model Selection

**Testing**:For Testing the Model

In [31]:
#Splitting the data into test and train
training,validation,test = Data.randomSplit([0.7, 0.2,0.1], 0)

In [32]:
#Bucketizing time into various bins
splits = [0,2,4,6,8,10,12,14,16,18,20,22,24]

MODEL 1: **Logistic Regression** (with elastic net Regularization)

The feature Engineering techniques used are:

**StringIndexer**

**OneHotEncoder**

**Bucketizer**(For Time)

All these techniques are pipelined and the Machine Learning models are fitted with the training data and validated on validation data

In [34]:
#Model 1 - Logistic Regression
model1 = Pipeline(stages=[feature.StringIndexer(inputCol='Month', outputCol='encoded_Month'),
                          feature.StringIndexer(inputCol = 'County Name',outputCol = 'encoded_County'),
                          feature.StringIndexer(inputCol = 'Day of Week',outputCol = 'encoded_Day'),
                          feature.StringIndexer(inputCol = 'Fatal Accident',outputCol = 'encoded_Fatalaccident'),
                          feature.StringIndexer(inputCol = 'Time',outputCol = 'encoded_Time'),
                          feature.OneHotEncoder(inputCol = 'encoded_Month',outputCol = 'final_Month'),
                          feature.OneHotEncoder(inputCol = 'encoded_Day',outputCol = 'final_Day'),
                          feature.OneHotEncoder(inputCol = 'encoded_County',outputCol = 'final_County'),
                          feature.Bucketizer(splits=splits, inputCol='encoded_Time', outputCol="bucketed_Time"),
  feature.VectorAssembler(inputCols = ['final_Month','final_Day','bucketed_Time','final_County'],outputCol = 'final_features'),
 classification.LogisticRegression(labelCol='encoded_Fatalaccident', featuresCol='final_features',regParam = 0.1,elasticNetParam = 0.002)])

In [35]:
#Fitting model1 with training data
fit1 = model1.fit(training)

In [36]:
#Transforming model0 on to validation data
dataframe1 = fit1.transform(validation)

In [37]:
#Prediction vs Actual for model0
df1 = dataframe1.select('Prediction','Fatal Accident')

In [38]:
# Print the coefficients and intercept for logistic regression without regularization
lrm = fit1.stages[-1]
lrm.coefficients 

In [39]:
# Print the coefficients and intercept for logistic regression with regularization
#There is a decrease in the magnitude of coefficients
lrm = fit1.stages[-1]
lrm.coefficients

Accuracy is calculated using Multiclass Classification Evaluator and it is found to be 67%

In [41]:
#Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol = 'encoded_Fatalaccident', predictionCol = 'prediction',metricName = 'accuracy')
accuracy1 = evaluator.evaluate(dataframe1)
#Error percentage:
print("Test Error = %g" % (1.0 - accuracy))

MODEL 2: **Random Forest**

The feature Engineering techniques used are:

**StringIndexer**

**OneHotEncoder**

**Bucketizer**(For Time)

All these techniques are pipelined and the Machine Learning models are fitted with the training data and validated on validation data

In [43]:
#Model 2
model1 = Pipeline(stages=[feature.StringIndexer(inputCol='Month', outputCol='encoded_Month'),
                          feature.StringIndexer(inputCol = 'County Name',outputCol = 'encoded_County'),
                          feature.StringIndexer(inputCol = 'Day of Week',outputCol = 'encoded_Day'),
                          feature.StringIndexer(inputCol = 'Fatal Accident',outputCol = 'encoded_Fatalaccident'),
                          feature.StringIndexer(inputCol = 'Time',outputCol = 'encoded_Time'),
                          feature.OneHotEncoder(inputCol = 'encoded_Month',outputCol = 'final_Month'),
                          feature.OneHotEncoder(inputCol = 'encoded_Day',outputCol = 'final_Day'),
                          feature.OneHotEncoder(inputCol = 'encoded_County',outputCol = 'final_County'),
                          feature.Bucketizer(splits=splits, inputCol='encoded_Time', outputCol="bucketed_Time"),
                 feature.VectorAssembler(inputCols = ['final_Month','final_Day','bucketed_Time','final_County'],outputCol = 'final_features'),
  classification.RandomForestClassifier(labelCol='encoded_Fatalaccident', featuresCol='final_features',numTrees = 200,maxDepth = 4)])

In [44]:
#Fitting model1 with training data
fit2 = model1.fit(training)

In [45]:
#Transforming model1 on to validation data
dataframe2 = fit2.transform(validation)

In [46]:
#Prediction vs Actual for model1
display(dataframe1.select('Prediction','Fatal Accident')

Accuracy is calculated using MultiClass Classification Evaluator and it comes to be 61%

In [48]:
#Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol = 'encoded_Fatalaccident', predictionCol = 'prediction',metricName = 'accuracy')
accuracy2 = evaluator.evaluate(dataframe2)
#Error percentage:
print("Test Error = %g" % (1.0 - accuracy2))
print(accuracy2)

In [49]:
confusion_matrix('Fatal Accident','Prediction')

MODEL 3: **Gradient Boosting**

The feature Engineering techniques used are:

**StringIndexer**

**OneHotEncoder**

**Bucketizer**(For Time)

All these techniques are pipelined and the Machine Learning models are fitted with the training data and validated on validation data

In [51]:
#Model 3
model2 = Pipeline(stages=[feature.StringIndexer(inputCol='Month', outputCol='encoded_Month'),
                          feature.StringIndexer(inputCol = 'County Name',outputCol = 'encoded_County'),
                          feature.StringIndexer(inputCol = 'Day of Week',outputCol = 'encoded_Day'),
                          feature.StringIndexer(inputCol = 'Fatal Accident',outputCol = 'encoded_Fatalaccident'),
                          feature.StringIndexer(inputCol = 'Time',outputCol = 'encoded_Time'),
                          feature.OneHotEncoder(inputCol = 'encoded_Month',outputCol = 'final_Month'),
                          feature.OneHotEncoder(inputCol = 'encoded_Day',outputCol = 'final_Day'),
                          feature.OneHotEncoder(inputCol = 'encoded_County',outputCol = 'final_County'),
                          feature.Bucketizer(splits=splits, inputCol='encoded_Time', outputCol="bucketed_Time"),
                 feature.VectorAssembler(inputCols = ['final_Month','final_Day','bucketed_Time','final_County'],outputCol = 'final_features'),
                 classification.GBTClassifier(labelCol='encoded_Fatalaccident', featuresCol='final_features',maxIter = 10)])

In [52]:
#Fitting model2 with training data
fit3 = model2.fit(training)

In [53]:
#Transforming model2 on to validation data
dataframe3 = fit3.transform(validation)

In [54]:
#Prediction vs Actual for model2
df3 = dataframe1.select('Prediction','Fatal Accident')

Accuracy is calculated using MultiClass Classification Evaluator and it is found to be 69%

In [56]:
#Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol = 'encoded_Fatalaccident', predictionCol = 'prediction',metricName = 'accuracy')
accuracy3 = evaluator.evaluate(dataframe3)
#Error percentage:
print("Test Error = %g" % (1.0 - accuracy3))

In [57]:
print('LogisticRegression: ',accuracy1)
print('RandomForest: ' ,accuracy2)
print('GradientBoosting: ' , accuracy3)

**Gradient Boosting** is applied to the final 10% test data since it gave the highest accuracy using Validation Data

In [59]:
#Model3(Gradient Boosting ) will be our final model.
#Applying Gradient Boosting to test data
d4 = fit3.transform(test_data)

Final Accuracy for the test data was calculated using MultiClass Classification Evaluator and it was found to be **68.28%** and the best model is **Gradient Boosting**

In [61]:
#Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol = 'encoded_Fatalaccident', predictionCol = 'prediction',metricName = 'accuracy')
final_accuracy = evaluator.evaluate(d4)
#Error percentage:
print("Test Error = %g" % (1.0 - final_accuracy))
print(final_accuracy)

In [62]:
#Final Accuracy of our model is 68.28% for our test data