________
# __Machine Learning Project (using PySpark):__
## __Accurately Classify "Injury" variable__
#### By Varun Grewal  
<a href="https://www.linkedin.com/in/varungrewal/">LinkedIn</a> | <a href="https://github.com/varungrewal">Github</a>

____
### Relevant Links:        
<a href="https://drive.google.com/file/d/1l2E1zqmXeG9cYv9l7QVbJC11romEIzjV/view?usp=sharing">Data</a> | <a href="https://github.com/varungrewal/Machine-Learning-using-PySpark-/blob/main/Metadata.pdf">Meta Data</a> 
_____

### Tools Used: 
__Tool:__ Jupyter Notebook | __Language:__ Python       
__Packages:__ NumPy | Pandas | PySpark
_____

<a id='TOC'></a>
### <u>Table of Contents:</u> 
1. [Introduction](#1)      
    1.1 [Goal](#1.1)            
2. [Data Description](#2)    
    2.1 [Intialize](#2.1)     
    2.2 [Understanding Raw Data](#2.2)      
3. [ETL (Data Preparation, Cleaning, Wrangling, Manipulation and Check)](#3)                 
4. [Machine Learning](#4)      
    4.1 [Functions](#4.1)       
    4.2 [PySpark Session and SQL](#4.2)       
    4.3 [Model 1 - Logistic Regression](#4.3)           
    4.4 [Model 2 - Random Forest](#4.4)            
    4.5 [Model 3 - Gradient Boosted Trees](#4.5)       
    4.6 [Model 4 - Linear Support Vector](#4.6)    
5. [Conclusion](#5)

<p style="color:green"> Note: Select any cell and press TAB to come back to Table of Contents </p> 

_____
<a id='1'></a>
### 1. Introduction
____

This is the 3rd project in the series. Please see the links below to learn about the previous projects.   
Project 1: <a href="https://github.com/varungrewal/Data-Analytics-Visualization">Data Analytics and Visualization</a>              
Project 2: <a href="https://github.com/varungrewal/Machine-Learning-using-Scikit-">Machine Learning (using Scikit) </a> 

_____
<a id='1.1'></a>
### 1.1 Goal 
_____
The goal of this project is to build following four Classification models to accurately classify "Injury" variable 
   - Logistic Rgeression
   - Random Forest
   - Gradient Boosted Tree
   - Linear Support Vector

<a id='2'></a>
____ 
### 2. Data Description  
____
The raw data under consideration for this project is the 'Collision Data" sourced from the Seattle Police Department for the year 2004-2020. Actual dataset is much larger. But, for this project I have limited the scope of the dataset to focus on one variable "Injury Collision".

Preliminary analysis suggests that data is mostly clean and complete. However, some cleaning might be required to make it ideal for modeling and analysis. Size of the dataset is approx. 195K rows and 38 columns.
Dataset is of medium complexity as there are multiple variables that can potentially impact the severity of the collision. Data is of mixed nature with integer, float, date and categorical variables being present. That means, it will require preprocessing and potentially normalization.  

Note: Data is missing following important variables:
- Age
- Gender
- Make/Model of the vehicle

<a id='2.1'></a>
_____
### 2.1 Intialize:
Import/Load all the required packages and the dataset
_____

In [1]:
import pandas as pd
import numpy as np

In [2]:
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 85kB/s  eta 0:00:01   |█▎                              | 8.7MB 7.2MB/s eta 0:00:30     |█▉                              | 12.5MB 291kB/s eta 0:11:44/s eta 0:11:37                        | 20.8MB 291kB/s eta 0:11:15 |███▏                            | 21.3MB 291kB/s eta 0:11:14| 23.1MB 6.8MB/s eta 0:00:29     |██████▉                         | 46.4MB 5.9MB/s eta 0:00:30     |███████                         | 47.8MB 5.9MB/s eta 0:00:29     |███████▍                        | 50.0MB 5.9MB/s eta 0:00:29     |███████▌                        | 50.9MB 5.9MB/s eta 0:00:29     |███████▋                        | 51.5MB 5.9MB/s eta 0:00:29     |████████                        | 53.7MB 5.9MB/s eta 0:00:28     |████████                        | 54.5MB 5.9MB/s eta 0:00:2

In [3]:
!pip install pyspark[sql]

Collecting pyarrow>=0.8.0; extra == "sql" (from pyspark[sql])
[?25l  Downloading https://files.pythonhosted.org/packages/51/6b/f67ab1171433d380e60ee4f5dad0c8abe7f1246ff44cabbf45cdb3877dd5/pyarrow-2.0.0-cp36-cp36m-manylinux2010_x86_64.whl (17.9MB)
[K     |████████████████████████████████| 17.9MB 5.4MB/s eta 0:00:01    |████████                        | 4.5MB 6.1MB/s eta 0:00:03     |████████████████████████▋       | 13.8MB 5.4MB/s eta 0:00:01��███████████████▉  | 16.7MB 5.4MB/s eta 0:00:01
Installing collected packages: pyarrow
Successfully installed pyarrow-2.0.0


In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.ml.stat import Correlation, Summarizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [5]:
%%capture
! pip install seaborn
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline 

In [6]:
mpl.style.use(['ggplot'])

In [7]:
path='https://s3.us.cloud-object-storage.appdomain.cloud/cf-courses-data/CognitiveClass/DP0701EN/version-2/Data-Collisions.csv'
df = pd.read_csv(path)

df.head(1)

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,SEVERITYCODE,X,Y,OBJECTID,INCKEY,COLDETKEY,REPORTNO,STATUS,ADDRTYPE,INTKEY,...,ROADCOND,LIGHTCOND,PEDROWNOTGRNT,SDOTCOLNUM,SPEEDING,ST_COLCODE,ST_COLDESC,SEGLANEKEY,CROSSWALKKEY,HITPARKEDCAR
0,2,-122.323148,47.70314,1,1307,1307,3502005,Matched,Intersection,37475.0,...,Wet,Daylight,,,,10,Entering at angle,0,0,N


<a id='2.2'></a>
_____
### 2.2 Understanding Raw Data 
To get basic understanding (size,shape, etc.) of the dataset
____

In [8]:
print('Raw Data Dimensions (Rows/Columns):',df.shape)

Raw Data Dimensions (Rows/Columns): (194673, 38)


In [9]:
print ("Column Names: ")
print("-------------------------------")
df.columns.values 

Column Names: 
-------------------------------


array(['SEVERITYCODE', 'X', 'Y', 'OBJECTID', 'INCKEY', 'COLDETKEY',
       'REPORTNO', 'STATUS', 'ADDRTYPE', 'INTKEY', 'LOCATION',
       'EXCEPTRSNCODE', 'EXCEPTRSNDESC', 'SEVERITYCODE.1', 'SEVERITYDESC',
       'COLLISIONTYPE', 'PERSONCOUNT', 'PEDCOUNT', 'PEDCYLCOUNT',
       'VEHCOUNT', 'INCDATE', 'INCDTTM', 'JUNCTIONTYPE', 'SDOT_COLCODE',
       'SDOT_COLDESC', 'INATTENTIONIND', 'UNDERINFL', 'WEATHER',
       'ROADCOND', 'LIGHTCOND', 'PEDROWNOTGRNT', 'SDOTCOLNUM', 'SPEEDING',
       'ST_COLCODE', 'ST_COLDESC', 'SEGLANEKEY', 'CROSSWALKKEY',
       'HITPARKEDCAR'], dtype=object)

In [10]:
df.index.values

array([     0,      1,      2, ..., 194670, 194671, 194672])

In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 194673 entries, 0 to 194672
Data columns (total 38 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   SEVERITYCODE    194673 non-null  int64  
 1   X               189339 non-null  float64
 2   Y               189339 non-null  float64
 3   OBJECTID        194673 non-null  int64  
 4   INCKEY          194673 non-null  int64  
 5   COLDETKEY       194673 non-null  int64  
 6   REPORTNO        194673 non-null  object 
 7   STATUS          194673 non-null  object 
 8   ADDRTYPE        192747 non-null  object 
 9   INTKEY          65070 non-null   float64
 10  LOCATION        191996 non-null  object 
 11  EXCEPTRSNCODE   84811 non-null   object 
 12  EXCEPTRSNDESC   5638 non-null    object 
 13  SEVERITYCODE.1  194673 non-null  int64  
 14  SEVERITYDESC    194673 non-null  object 
 15  COLLISIONTYPE   189769 non-null  object 
 16  PERSONCOUNT     194673 non-null  int64  
 17  PEDCOUNT  

<a id='3'></a>
_____
### 3. ETL (Data Preparation, Cleaning, Wrangling, Manipulation and Check)
____


In [12]:
# To be consistent, making all column labels as type string
df.columns = list(map(str, df.columns))

# Fixing datatype for DATETIME variables
df[["INCDATE"]] = df[["INCDATE"]].astype("datetime64")
df[["INCDTTM"]] = df[["INCDTTM"]].astype("datetime64")

# Renaming the Severity variables to improve readability
df["SEVERITYDESC"].replace("Property Damage Only Collision", "Property Damage", inplace=True)
df["SEVERITYDESC"].replace("Injury Collision", "Injury", inplace=True)

# Adding needed columns for analysis of Big Picture
df[["COMB"]] = df['SEVERITYDESC']+"/"+df['COLLISIONTYPE']+"/"+df['JUNCTIONTYPE']
df["COMB"] = df.COMB.astype(str)
df[["COMB-COND"]] = df['WEATHER']+"/"+df['ROADCOND']+"/"+df['LIGHTCOND']
df["COMB-COND"] = df["COMB-COND"].astype(str)

# Adding needed columns for analysis of DATETIME variables
df['DATE'] = pd.to_datetime(df['INCDTTM'], format='%d-%m-%y', errors='coerce').dt.floor('D')
df['YEAR'] = pd.DatetimeIndex(df['INCDTTM']).year
df['MONTH'] = pd.DatetimeIndex(df['INCDTTM']).month
df['DAY'] = pd.DatetimeIndex(df['INCDTTM']).day
df['WEEKDAY'] = df['DATE'].dt.day_name()
df['WEEKDAYNUM'] = df['DATE'].dt.dayofweek
df['TIME'] = pd.DatetimeIndex(df['INCDTTM']).time
df['TIME2']=pd.to_datetime(df['INCDTTM']).dt.strftime('%I:%M %p')
df['TIME3']=pd.to_datetime(df['INCDTTM']).dt.strftime('%p')

# Adding needed columns for Business/Finance inspired metrics
bins = [1,3,6,9,12]
quarter = ["Q1","Q2","Q3","Q4"]
df['QUARTER'] = pd.cut(df['MONTH'], bins, labels=quarter,include_lowest=True)
df['QUARTER'] = df.QUARTER.astype(str)
df['YR-QTR'] = df['YEAR'].astype("str")+ "-" + df['QUARTER']

# Adding needed columns for Seasonal effect metrics
bins2 = [1,2,5,8,11,12]
season = ["WINTER","SPRING","SUMMER","FALL","WINTER"]
df['SEASON'] = pd.cut(df['MONTH'], bins2, labels=season,ordered=False, include_lowest=True)
bins3 = [0,1,2,3,4,5,6,7,8,9,10,11,12]
rainfall = [5.2,3.9,3.3,2.0,1.6,1.4,0.6,0.8,1.7,3.3,5.0,5.4]
df['AVGRAINFALL-INCHES'] = pd.cut(df['MONTH'], bins3, labels=rainfall,ordered=False, include_lowest=True)
temp = [45,48,52,56,64,69,72,73,67,59,51,47]
df['AVGTEMP-F'] = pd.cut(df['MONTH'], bins3, labels=temp,ordered=False, include_lowest=True)
daylight = [9,10,12,14,15,16,16,14,13,11,9,9]
df['AVGDAYLIGHT-HRS'] = pd.cut(df['MONTH'], bins3, labels=daylight,ordered=False, include_lowest=True)
df[['AVGRAINFALL-INCHES']] = df[['AVGRAINFALL-INCHES']].astype("float")
df[['AVGTEMP-F']] = df[["AVGTEMP-F"]].astype("int")
df[['AVGDAYLIGHT-HRS']] = df[["AVGDAYLIGHT-HRS"]].astype("int")

# Adding needed columns for analysis of GPS variable
df["GPS"] = round(df['X'],7).astype("str")+ ","+round(df['Y'],7).astype("str")

# Dropping unnecessary columns
df.drop(['OBJECTID','INCKEY','INTKEY','COLDETKEY','REPORTNO','STATUS','SEVERITYCODE.1','INCDTTM','INCDATE','EXCEPTRSNCODE','EXCEPTRSNDESC', 'SDOTCOLNUM', 'SEGLANEKEY', 'CROSSWALKKEY', 'ST_COLCODE'], axis=1, inplace=True)
df.head(1)

# list of columns after changes
df.columns

# To see if dataset has any missing rows
missing_data = df.isnull()
missing_data.head(1)

# To identiy and list columns with missing values
#for column in missing_data.columns.values.tolist():
  #  print(column)
   # print (missing_data[column].value_counts())
   # print("________________________________________") 

# Dropping missing data rows to make sure data is complete
df.dropna(subset=["X"], axis=0, inplace=True)
df.dropna(subset=["COLLISIONTYPE"], axis=0, inplace=True)
df.dropna(subset=["UNDERINFL"], axis=0, inplace=True)
df.dropna(subset=["ROADCOND"], axis=0, inplace=True)
df.dropna(subset=["JUNCTIONTYPE"], axis=0, inplace=True)
df.dropna(subset=["WEATHER"], axis=0, inplace=True)
df.dropna(subset=["LIGHTCOND"], axis=0, inplace=True)

# Drop incomplete data i.e. Year 2020
df.drop(df[df.YEAR > 2019].index, inplace=True)

# Reset index, because we dropped rows
df.reset_index(drop=True, inplace=True)
print('Data Dimensions (Rows/Columns) after cleaning:',df.shape)
df.head(1)

# Steps to prepare data for future analysis

# Converting Y/N to 1/0
df["UNDERINFL"].replace("N", 0, inplace=True)
df["UNDERINFL"].replace("Y", 1, inplace=True)
df["HITPARKEDCAR"].replace("N", 0, inplace=True)
df["HITPARKEDCAR"].replace("Y", 1, inplace=True)
# Filling missing values
df["PEDROWNOTGRNT"].replace(np.nan, 0, inplace=True)
df["PEDROWNOTGRNT"].replace("Y", 1, inplace=True)
df["SPEEDING"].replace(np.nan, 0, inplace=True)
df["SPEEDING"].replace("Y", 1, inplace=True)
df["INATTENTIONIND"].replace(np.nan, 0, inplace=True)
df["INATTENTIONIND"].replace("Y", 1, inplace=True)

# Correcting datatype
df[["UNDERINFL"]] = df[["UNDERINFL"]].astype("int")
df[["PEDROWNOTGRNT"]] = df[["PEDROWNOTGRNT"]].astype("int")
df[["SPEEDING"]] = df[["SPEEDING"]].astype("int")
df[["INATTENTIONIND"]] = df[["INATTENTIONIND"]].astype("int")
df[["HITPARKEDCAR"]] = df[["HITPARKEDCAR"]].astype("int")
df[['YEAR']] = df[['YEAR']].astype("int")
df[['MONTH']] = df[['MONTH']].astype("int")
df[['DAY']] = df[['DAY']].astype("int")

# adding columns for analysis of state of mind
df[["COMB-MIND"]] = df['INATTENTIONIND']+df['UNDERINFL']+df['SPEEDING']
df["COMB-MIND"] = df["COMB-MIND"].astype(int)
df.head(1)


# Check missing data
missing_data = df.isnull()
#for column in missing_data.columns.values.tolist():
  #  print(column)
   # print (missing_data[column].value_counts())
   # print("________________________________________") 
if missing_data.bool == True:
    print("----There is still missing data----")
else:
    print("----There is no missing data----")

# Print unique values and its count for each column
col_name = df.columns.tolist()
row_num = df.index.tolist()

#for i,x in enumerate(col_name):
   # print ("Unique value count of: ", x)
    #print ("------------------------------------------")
    #print(df[x].value_counts())
   # print ("__________________________________________")


# create dummy variable to split SEVERITYDESC
dummy_var = pd.get_dummies(df["SEVERITYDESC"])
dum_list = dummy_var.columns.values.tolist()
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var.columns = dum_list2
#dummy_var.head(1)

# create dummy variable to split COLLISIONTYPE
dummy_var1 = pd.get_dummies(df["COLLISIONTYPE"])
dum_list = dummy_var1.columns.values.tolist()
#dummy_var1.head(1)
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var1.columns = dum_list2
dummy_var1.rename(columns={'OTHER':'COLLISIONTYPE-OTHER'}, inplace=True)
#dummy_var1.head(1)

# create dummy variable to split ROADCOND
dummy_var2 = pd.get_dummies(df["ROADCOND"])
dum_list = dummy_var2.columns.values.tolist()
#dummy_var2.head(1)
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var2.columns = dum_list2
dummy_var2.rename(columns={'OTHER':'ROADCOND-OTHER'}, inplace=True)
dummy_var2.rename(columns={'UNKNOWN':'ROADCOND-UNKNOWN'}, inplace=True)
#dummy_var2.head(1)

# create dummy variable to split LIGHTCOND
dummy_var3 = pd.get_dummies(df["LIGHTCOND"])
dum_list = dummy_var3.columns.values.tolist()
#dummy_var3.head(1)
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var3.columns = dum_list2
dummy_var3.rename(columns={'OTHER':'LIGHTCOND-OTHER'}, inplace=True)
dummy_var3.rename(columns={'UNKNOWN':'LIGHTCOND-UNKNOWN'}, inplace=True)
#dummy_var3.head(1)

# create dummy variable to split WEATHER
dummy_var4 = pd.get_dummies(df["WEATHER"])
dum_list = dummy_var4.columns.values.tolist()
#dummy_var3.head(1)
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var4.columns = dum_list2
dummy_var4.rename(columns={'OTHER':'WEATHER-OTHER'}, inplace=True)
dummy_var4.rename(columns={'UNKNOWN':'WEATHER-UNKNOWN'}, inplace=True)
#dummy_var4.head(1)

# create dummy variable to split JUNCTIONTYPE
dummy_var5 = pd.get_dummies(df["JUNCTIONTYPE"])
dum_list = dummy_var5.columns.values.tolist()
#dummy_var3.head(1)
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var5.columns = dum_list2
dummy_var5.rename(columns={'UNKNOWN':'JUNCTIONTYPE-UNKNOWN'}, inplace=True)
#dummy_var5.head(1)

## create dummy variable to split ADDRTYPE
dummy_var6 = pd.get_dummies(df["ADDRTYPE"])
dum_list = dummy_var6.columns.values.tolist()
#dummy_var3.head(1)
dum_list2 = [x.upper() for x in dum_list]
#print(dum_list2)
dummy_var6.columns = dum_list2
#dummy_var6.head(1)

# merge dummy variables with df_ds (dataframe intialized for Data Science model)
df_ds = pd.concat([df, dummy_var,dummy_var1,dummy_var2,
                   dummy_var3,dummy_var4,dummy_var5,dummy_var6], axis=1)

# Dropping unnecessary columns
df_ds.drop(['SEVERITYCODE', 'ADDRTYPE','COLLISIONTYPE', 
            'JUNCTIONTYPE', 'SDOT_COLDESC','SDOT_COLCODE',
            'WEATHER', 'ROADCOND', 'LIGHTCOND','ST_COLDESC'],  
             axis=1, inplace=True)
df_ds.head(1)

Data Dimensions (Rows/Columns) after cleaning: (178831, 41)
----There is no missing data----


Unnamed: 0,X,Y,LOCATION,SEVERITYDESC,PERSONCOUNT,PEDCOUNT,PEDCYLCOUNT,VEHCOUNT,INATTENTIONIND,UNDERINFL,...,WEATHER-UNKNOWN,AT INTERSECTION (BUT NOT RELATED TO INTERSECTION),AT INTERSECTION (INTERSECTION RELATED),DRIVEWAY JUNCTION,MID-BLOCK (BUT INTERSECTION RELATED),MID-BLOCK (NOT RELATED TO INTERSECTION),RAMP JUNCTION,JUNCTIONTYPE-UNKNOWN,BLOCK,INTERSECTION
0,-122.323148,47.70314,5TH AVE NE AND NE 103RD ST,Injury,2,0,0,2,0,0,...,0,0,1,0,0,0,0,0,0,1


<a id='4'></a>
_____
### 4. Machine Learning 


<a id='4.1'></a>
____
#### 4.1 Functions 
____

In [21]:
 def spk_classification(classificationmodel,df_spk_train,df_spk_test): 
    #prediction_train = None
    #prediction_test = None
    global pipeline
    pipeline = Pipeline(stages=[vectorAssembler,classificationmodel])
    model_train = pipeline.fit(df_spk_train)
    prediction_train = model_train.transform(df_spk_train)
    #model_test = pipeline.fit(df_spk_test)
    #prediction_test = model_test.transform(df_spk_test)
    prediction_test = model_train.transform(df_spk_test)
    spk_classificationevaluation(prediction_train,prediction_test,model_train)
   
def spk_classificationevaluation(prediction_train,prediction_test,model_train):
    if classificationmodel == lr:
        print("Model: Logistic Regression","\n--------------------------------------" )
    if classificationmodel == rf:
        print("Model: Random Forest","\n--------------------------------------" )
    if classificationmodel == gbt:
        print("Model: Gradient Boosted Tree","\n--------------------------------------" )
    if classificationmodel == lsvc:
        print("Model: Linear Support Vector Machine","\n--------------------------------------" )   
    global evaluation
    evaluation = MulticlassClassificationEvaluator(
        labelCol="INJURY", predictionCol="prediction", metricName="accuracy")
    print("Description:", model_train.stages[1])
    accuracy_train = evaluation.evaluate(prediction_train)
    print_spk_testtrainsetinfo(df_spk_train,df_spk_test)
    print("Trainset Accuracy:", round(accuracy_train,3))
    print("Trainset Error = %g" % round((1.0 - accuracy_train),3))
    accuracy_test = evaluation.evaluate(prediction_test)
    print("Testset Accuracy:",round(evaluation.evaluate(prediction_test),3))
    print("Test Error = %g" % round((1.0 - accuracy_test),3))
    spk_checkmodeloutput(prediction_train,prediction_test)

def spk_checkmodeloutput(prediction_train,prediction_test):
    #prediction_test.createOrReplaceTempView("prediction_test")
    #print("\nCheck Results: \n--------------------------------------" )
    #prediction_test.select("Injury","rawPrediction","probability","prediction").show(2)
    #spark.sql("""SELECT Injury, rawPrediction, probability, prediction 
               #from prediction_test where Injury=0 AND prediction =1""").show(2)
    print("\nTrainset Results: " )
    prediction_train.groupBy("Injury","prediction").count().show()
    print("\nTestset Results: " )
    prediction_test.groupBy("Injury","prediction").count().show()

def print_spk_testtrainsetinfo(df_spk_train,df_spk_test):
    print("Trainset Size (%):",round(spk_trainsize,2),"\nTestset Size (%):", round(spk_testsize,2))
    print("Trainset Count:",df_spk_train.count(),"\nTestset Count:", df_spk_test.count())

In [14]:
def spk_crossvalidation(classificationmodel,pipeline,evaluation,df_spk_train,df_spk_test):   
    if classificationmodel == rf: 
        paramGrid = ParamGridBuilder() \
        .addGrid(classificationmodel.numTrees, [3, 10]) \
        .build()
    if classificationmodel == lr:
        paramGrid = ParamGridBuilder() \
        .addGrid(classificationmodel.regParam, [0.1, 0.01]) \
        .addGrid(classificationmodel.elasticNetParam, [0, 1]).build()
    if classificationmodel == lsvc:
        paramGrid = ParamGridBuilder() \
        .addGrid(classificationmodel.regParam, [0.1, 0.01]) \
        .addGrid(classificationmodel.maxIter, [3, 10]).build()
    if classificationmodel == gbt: 
        paramGrid = ParamGridBuilder() \
        .addGrid(classificationmodel.maxIter, [3, 10]) \
        .build()

    crossval = CrossValidator(estimator=pipeline, \
                              estimatorParamMaps=paramGrid,\
                              evaluator=evaluation, \
                              numFolds=4)  # use 3+ folds in practice
    cv_model = crossval.fit(df_spk_train)
    cv_prediction = cv_model.transform(df_spk_test)
    cv_selected = cv_prediction.select("INJURY","prediction")
    print("Cross Validation Results:\n--------------------------------------")
    cv_selected.groupBy("Injury","prediction").count().show()

<a id='4.2'></a>
____
#### 4.2 PySpark Session and SQL
____

In [15]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [16]:
df_spark= df_ds[['INATTENTIONIND', 'UNDERINFL',
       'PEDROWNOTGRNT', 'SPEEDING', 'HITPARKEDCAR', 
        'AVGRAINFALL-INCHES','AVGTEMP-F', 'AVGDAYLIGHT-HRS', 'INJURY',
       'PROPERTY DAMAGE', 'ANGLES', 'CYCLES', 'HEAD ON', 'LEFT TURN',
       'COLLISIONTYPE-OTHER', 'PARKED CAR', 'PEDESTRIAN', 'REAR ENDED',
       'RIGHT TURN', 'SIDESWIPE', 'DRY', 'ICE', 'OIL', 'ROADCOND-OTHER',
       'SAND/MUD/DIRT', 'SNOW/SLUSH', 'STANDING WATER', 'ROADCOND-UNKNOWN',
       'WET', 'DARK - NO STREET LIGHTS', 'DARK - STREET LIGHTS OFF',
       'DARK - STREET LIGHTS ON', 'DAWN', 'DAYLIGHT', 'DUSK',
       'LIGHTCOND-OTHER', 'LIGHTCOND-UNKNOWN', 'BLOWING SAND/DIRT', 'CLEAR',
       'FOG/SMOG/SMOKE', 'WEATHER-OTHER', 'OVERCAST', 'RAINING',
       'SEVERE CROSSWIND', 'SLEET/HAIL/FREEZING RAIN', 'SNOWING',
       'WEATHER-UNKNOWN', 'AT INTERSECTION (BUT NOT RELATED TO INTERSECTION)',
       'AT INTERSECTION (INTERSECTION RELATED)', 'DRIVEWAY JUNCTION',
       'MID-BLOCK (BUT INTERSECTION RELATED)',
       'MID-BLOCK (NOT RELATED TO INTERSECTION)', 'RAMP JUNCTION',
       'JUNCTIONTYPE-UNKNOWN', 'BLOCK', 'INTERSECTION']]

df_spark.columns = ['INATTENTIONIND', 'UNDERINFL',
       'PEDROWNOTGRNT', 'SPEEDING', 'HITPARKEDCAR', 
        'AVGRAINFALL_INCHES','AVGTEMP_F', 'AVGDAYLIGHT_HRS', 'INJURY',
       'PROPERTY_DAMAGE', 'ANGLES', 'CYCLES', 'HEAD_ON', 'LEFT_TURN',
       'COLLISIONTYPE_OTHER', 'PARKED_CAR', 'PEDESTRIAN', 'REAR_ENDED',
       'RIGHT_TURN', 'SIDESWIPE', 'DRY', 'ICE', 'OIL', 'ROADCOND_OTHER',
       'SAND_MUD_DIRT', 'SNOW_SLUSH', 'STANDING_WATER', 'ROADCOND_UNKNOWN',
       'WET', 'DARK_NO_STREET_LIGHTS', 'DARK_STREET_LIGHTS_OFF',
       'DARK_STREET_LIGHTS_ON', 'DAWN', 'DAYLIGHT', 'DUSK',
       'LIGHTCOND_OTHER', 'LIGHTCOND_UNKNOWN', 'BLOWING_SAND_DIRT', 'CLEAR',
       'FOG_SMOG_SMOKE', 'WEATHER_OTHER', 'OVERCAST', 'RAINING',
       'SEVERE_CROSSWIND', 'SLEET_HAIL_FREEZING_RAIN', 'SNOWING',
       'WEATHER_UNKNOWN', 'AT_INTERSECTION_BUT_NOT_RELATED_TO_INTERSECTION',
       'AT_INTERSECTION_INTERSECTION_RELATED', 'DRIVEWAY_JUNCTION',
       'MID_BLOCK_BUT_INTERSECTION_RELATED',
       'MID_BLOCK_NOT RELATED_TO_INTERSECTION', 'RAMP_JUNCTION',
       'JUNCTIONTYPE_UNKNOWN', 'BLOCK', 'INTERSECTION']

df_spark.columns = list(map(str, df_spark.columns))

df_spk_use = spark.createDataFrame(df_spark)

df_spk_use.createOrReplaceTempView("df_spk_use")
#spark.sql("SELECT * from df_spk_use").show()

In [None]:
#df_spk_use.printSchema()
#df_spk_use.select("Injury","Property_Damage").show(3)
#df_spk_use.filter((df_spk_use["Injury"] > 0) & (df_spk_use["UNDERINFL"] > 0)).show(1)
#df_spk_use.groupBy("Injury","Property_Damage","Clear", "ANGLES").count().show()
#spark.sql("""SELECT SEVERITYDESC, INJURY,PROPERTY_DAMAGE, ANGLES, 
         # CYCLES, HEAD_ON, LEFT_TURN from df_spk_use""").show(2)
#spark.sql("SELECT INJURY as count from df_spk_use").count()
#spark.sql("SELECT INJURY as cnt from df_spk_use").first().cnt
'''spark.sql("""SELECT count(Injury) as Count, 
          max(Injury) as Max, 
          min(Injury) as Min, 
          round(mean(Injury),3) as Mean,
          round(stddev_pop(Injury),3) as Std,
          round(skewness(Injury),3) as Skewness,
          round(kurtosis(injury),3) as Kurtosis
          from df_spk_use""").show()'''    
#spark.sql("SELECT round(corr(Injury,Clear),3) as Correlation from df_spk_use").show()
#Injury = spark.sql("SELECT Injury, clear, dry, angles FROM df_spk_use WHERE clear > 0 AND dry < 1")
#Injury.show(3)
#df_spk_use["Injury", "Clear", "Dry"].describe().show()

In [17]:
vectorAssembler = VectorAssembler(inputCols= ['INATTENTIONIND', 'UNDERINFL',
       'PEDROWNOTGRNT', 'SPEEDING', 'HITPARKEDCAR', 
        'AVGRAINFALL_INCHES','AVGTEMP_F', 'AVGDAYLIGHT_HRS','INJURY','ANGLES', 
        'CYCLES', 'HEAD_ON', 'LEFT_TURN',  
        'PARKED_CAR', 'PEDESTRIAN', 'REAR_ENDED',
       'RIGHT_TURN', 'SIDESWIPE', 'DRY', 
       'DARK_NO_STREET_LIGHTS', 'DARK_STREET_LIGHTS_OFF',
       'DARK_STREET_LIGHTS_ON',  'DAYLIGHT', 
        'CLEAR','OVERCAST', 'RAINING',
       'AT_INTERSECTION_BUT_NOT_RELATED_TO_INTERSECTION',
       'AT_INTERSECTION_INTERSECTION_RELATED', 
       'MID_BLOCK_BUT_INTERSECTION_RELATED',
       'MID_BLOCK_NOT RELATED_TO_INTERSECTION', 'RAMP_JUNCTION'], 
        outputCol="features")

In [18]:
var_topredict = "INJURY"
lr = LogisticRegression(maxIter = 10, regParam = 0.3, elasticNetParam = 0.8,labelCol=var_topredict)
rf = RandomForestClassifier(labelCol= var_topredict, featuresCol="features", numTrees=10)
gbt = GBTClassifier(labelCol=var_topredict, featuresCol="features", maxIter=10)
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol=var_topredict)

<a id='4.3'></a>
____
#### 4.3 Model 1 - Logistic Regression
_____

In [22]:
classificationmodel = lr
spk_trainsize = 0.75
spk_testsize = 1 - spk_trainsize
df_spk_train, df_spk_test = df_spk_use.randomSplit([spk_trainsize, spk_testsize], seed=12345)
spk_classification(classificationmodel,df_spk_train,df_spk_test)
spk_crossvalidation(classificationmodel,pipeline,evaluation,df_spk_train,df_spk_test)

Model: Logistic Regression 
--------------------------------------
Description: LogisticRegressionModel: uid = LogisticRegression_26a0e72cd4ae, numClasses = 2, numFeatures = 31
Trainset Size (%): 0.75 
Testset Size (%): 0.25
Trainset Count: 134155 
Testset Count: 44676
Trainset Accuracy: 1.0
Trainset Error = 0
Testset Accuracy: 1.0
Test Error = 0

Trainset Results: 
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|92671|
|     1|       1.0|41484|
+------+----------+-----+


Testset Results: 
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|30746|
|     1|       1.0|13930|
+------+----------+-----+

Cross Validation Results:
--------------------------------------
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|30746|
|     1|       1.0|13930|
+------+----------+-----+



<a id='4.4'></a>
____
#### 4.4 Model 2 - Random Forest
_____

In [23]:
classificationmodel = rf
spk_trainsize = 0.8
spk_testsize = 1 - spk_trainsize
df_spk_train, df_spk_test = df_spk_use.randomSplit([spk_trainsize, spk_testsize], seed=12345)
spk_classification(classificationmodel,df_spk_train,df_spk_test)
spk_crossvalidation(classificationmodel,pipeline,evaluation,df_spk_train,df_spk_test)

Model: Random Forest 
--------------------------------------
Description: RandomForestClassificationModel (uid=RandomForestClassifier_b3a4c4dfb3b3) with 10 trees
Trainset Size (%): 0.8 
Testset Size (%): 0.2
Trainset Count: 143084 
Testset Count: 35747
Trainset Accuracy: 1.0
Trainset Error = 0
Testset Accuracy: 1.0
Test Error = 0

Trainset Results: 
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|98862|
|     1|       1.0|44222|
+------+----------+-----+


Testset Results: 
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|24555|
|     1|       1.0|11192|
+------+----------+-----+

Cross Validation Results:
--------------------------------------
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|24555|
|     1|       1.0|11192|
+------+----------+-----+



<a id='4.5'></a>
____
#### 4.5 Model 3 - Gradient Boosted Tree
_____

In [24]:
classificationmodel = gbt
spk_trainsize = 0.85
spk_testsize = 1 - spk_trainsize
df_spk_train, df_spk_test = df_spk_use.randomSplit([spk_trainsize, spk_testsize], seed=12345)
spk_classification(classificationmodel,df_spk_train,df_spk_test)
spk_crossvalidation(classificationmodel,pipeline,evaluation,df_spk_train,df_spk_test)

Model: Gradient Boosted Tree 
--------------------------------------
Description: GBTClassificationModel (uid=GBTClassifier_e24b34312d3f) with 10 trees
Trainset Size (%): 0.85 
Testset Size (%): 0.15
Trainset Count: 152104 
Testset Count: 26727
Trainset Accuracy: 1.0
Trainset Error = 0
Testset Accuracy: 1.0
Test Error = 0

Trainset Results: 
+------+----------+------+
|Injury|prediction| count|
+------+----------+------+
|     0|       0.0|104979|
|     1|       1.0| 47125|
+------+----------+------+


Testset Results: 
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|18438|
|     1|       1.0| 8289|
+------+----------+-----+

Cross Validation Results:
--------------------------------------
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       0.0|18438|
|     1|       1.0| 8289|
+------+----------+-----+



<a id='4.6'></a>
____
#### 4.6 Model 4 - Linear Support Vector
_____

In [26]:
classificationmodel = lsvc
spk_trainsize = 0.82
spk_testsize = 1 - spk_trainsize
df_spk_train, df_spk_test = df_spk_use.randomSplit([spk_trainsize, spk_testsize], seed=12345)
spk_classification(classificationmodel,df_spk_train,df_spk_test)
spk_crossvalidation(classificationmodel,pipeline,evaluation,df_spk_train,df_spk_test)

Model: Linear Support Vector Machine 
--------------------------------------
Description: LinearSVC_b996fb5ef15a
Trainset Size (%): 0.82 
Testset Size (%): 0.18
Trainset Count: 146756 
Testset Count: 32075
Trainset Accuracy: 0.993
Trainset Error = 0.007
Testset Accuracy: 0.993
Test Error = 0.007

Trainset Results: 
+------+----------+------+
|Injury|prediction| count|
+------+----------+------+
|     0|       1.0|   978|
|     0|       0.0|100348|
|     1|       1.0| 45430|
+------+----------+------+


Testset Results: 
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       1.0|  217|
|     0|       0.0|21874|
|     1|       1.0| 9984|
+------+----------+-----+

Cross Validation Results:
--------------------------------------
+------+----------+-----+
|Injury|prediction|count|
+------+----------+-----+
|     0|       1.0|  217|
|     0|       0.0|21874|
|     1|       1.0| 9984|
+------+----------+-----+



<a id='5'></a>
____
### 5. Conclusion

The accuracy score across all four models is very high which is a very good outcome. 

______

 


<strong> <center> Thank You! :)</s>
 _____