## PySpark Setup -- *Do Not Modify!*
**Do not change this.** Extract Spark/Hadoop and install PyPython dependencies

In [23]:
#######################################
###!@0 START INIT ENVIRONMENT
from google.colab import drive
drive.mount('/content/drive')
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf /content/drive/MyDrive/spark-3.0.3-bin-hadoop2.7.tgz

!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

###!@0 END INIT ENVIRONMENT

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Create a Spark Session & Load Data -- *Do Not Modify!*

**Do not change this.** Standard initialization for the Spark session within colab.

In [24]:
#######################################
###!@1 START OF PYSPARK INIT
# Provides findspark.init() to make pyspark importable as a regular library.
# Resource : https://pypi.org/project/findspark/
import findspark

findspark.init()
findspark.find()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
spark
# Spark is ready to go within Colab!
###!@1 END OF PYSPARK INIT

## **Common Imports**

In [25]:
from __future__ import print_function 
from pyspark.context import SparkContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, DecisionTreeClassifier
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import IntegerType, DoubleType
import pyspark.sql.functions as f
from pyspark.sql.functions import udf,split
from collections import Counter
import numpy as np
import datetime 

### **Reading the cleaned CSV file**

In [29]:
# filename = "/content/drive/MyDrive/nyc_data/2021_chunk0.csv"
filename = "/content/drive/MyDrive/nyc_data/NYC_2021_clean_data.csv"
df = spark.read.option("header",True).option("inferSchema",True).csv(filename)
df.show()
df.columns

+--------------+--------------+--------+------------------+----------+----------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+--------------------+-------------------+-----------+------------+----------------------+--------------------+------------------+-------------+------------+--------------+-------------------+-------------------+---------------------+-----------+
|Violation_Code|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|         Street_Name|Date_First_Observed|Law_Section|Sub_Division|

['Violation_Code',
 'Summons_Number',
 'Plate_ID',
 'Registration_State',
 'Plate_Type',
 'Issue_Date',
 'Vehicle_Body_Type',
 'Vehicle_Make',
 'Issuing_Agency',
 'Street_Code1',
 'Street_Code2',
 'Street_Code3',
 'Vehicle_Expiration_Date',
 'Violation_Location',
 'Issuer_Precinct',
 'Issuer_Code',
 'Issuer_Command',
 'Issuer_Squad',
 'Violation_Time',
 'Violation_County',
 'Violation_In_Front_Of_Or_Opposite',
 'House_Number',
 'Street_Name',
 'Date_First_Observed',
 'Law_Section',
 'Sub_Division',
 'Days_Parking_In_Effect',
 'From_Hours_In_Effect',
 'To_Hours_In_Effect',
 'Vehicle_Color',
 'Vehicle_Year',
 'Feet_From_Curb',
 'Violation_Post_Code',
 'Issue_Date_Time',
 'Violation_Description',
 'Fine_Amount']

# User Defined Functions and few cleaning operations particular to performing the ML.

In [30]:
#Data preprocessing pipeline
columns_to_drop = ['Plate ID','Issuer Code',
                   'House Number','Street Name',
                   'Date First Observed',
                   'Law Section','Sub Division',
                   'Days Parking In Effect',
                   'From Hours In Effect','To Hours In Effect',
                   'Vehicle Color',
                   'Vehicle Year',
                   'Feet From Curb','Violation Post Code','Violation Description',
                   'Issuer Squad']
df = df.drop(*columns_to_drop)

#UDFS

#Extracting information from date
def day_finder(x):
    return datetime.datetime.strptime(x, '%Y-%m-%d').weekday()

#Bucketizing violation time
def time_bucket(x):
  #Bucketizing the time into 8 buckets
  if x is None:
    return 3
  # if x[-1]!='P' and x[-1]!='A':
  #   return 3
  try:
    hour = int(x[:2])
    min = int(x[3:5])
  except:
    return 3
  # if x[-1]=='P':
  #   time = 1200+time
  time = hour*3600+min
  for i in range(8):
    if time>=3600*i and time<3600*(i+1):
      return i

time_udf = udf(lambda x: time_bucket(x), IntegerType())

day_udf = udf(lambda x: day_finder(x), IntegerType())

In [31]:
############# PREPROCESSING ###########################

#Splitting the issue date into month,year,day
df_new = df.withColumn('Month',split('Issue_Date_Time','-')[1]) \
           .withColumn('Year',split('Issue_Date_Time','-')[0]) \
           .withColumn('Day',day_udf(split('Issue_Date_Time',' ')[0])) \
           .withColumn('Time',time_udf(split('Issue_Date_Time',' ')[1]))

#converting the columns into integers
df_new = df_new.withColumn("Year",df_new["Year"].cast(IntegerType())) \
               .withColumn("Month",df_new["Month"].cast(DoubleType())) \
               .withColumn("Day",df_new["Day"].cast(DoubleType())) \
               .withColumn("Time",df_new["Time"].cast(DoubleType()))

#Removing outliers and some filtering
df_new = df_new.where(f.col("Year") > 2019)

#Dropping columns
df_new =df_new.drop(*['Issue_Date_Time','Year'])

#Filling na
df_new = df_new.fillna({'Time':3})

#Removing na locaions of violation location and violation count
df_new = df_new.dropna(how='any',subset=['Violation_Location','Violation_County'])

df_new=df_new.dropna(how='any')

df_new.persist()

DataFrame[Violation_Code: int, Summons_Number: bigint, Plate_ID: string, Registration_State: string, Plate_Type: string, Issue_Date: string, Vehicle_Body_Type: string, Vehicle_Make: string, Issuing_Agency: string, Street_Code1: int, Street_Code2: int, Street_Code3: int, Vehicle_Expiration_Date: int, Violation_Location: double, Issuer_Precinct: int, Issuer_Code: int, Issuer_Command: string, Issuer_Squad: string, Violation_Time: string, Violation_County: string, Violation_In_Front_Of_Or_Opposite: string, House_Number: string, Street_Name: string, Date_First_Observed: int, Law_Section: int, Sub_Division: string, Days_Parking_In_Effect: string, From_Hours_In_Effect: string, To_Hours_In_Effect: string, Vehicle_Color: string, Vehicle_Year: int, Feet_From_Curb: int, Violation_Post_Code: string, Violation_Description: string, Fine_Amount: int, Month: double, Day: double, Time: double]

In [None]:
df_new.show()

# Predicting the Violation County

**Pipeline Execution**

In [None]:
#Label encoding pipeline
#Split the data

train, test = df_new.randomSplit([0.80,0.20])

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index",handleInvalid='keep').fit(df_new) for column in list(set(df_new.columns)-set(['Month','Day','Time','Summons_Number','Violation_County','Violation_Location']))]
target_indexer = StringIndexer(inputCol="Violation_County", outputCol="label", handleInvalid='keep').fit(df_new)

assembler = VectorAssembler(inputCols=['Registration_State_index','Plate_Type_index',
                                       'Violation_Code_index','Vehicle_Body_Type_index',
                                       'Vehicle_Make_index','Issuing_Agency_index',
                                       'Street_Code1_index','Street_Code2_index',
                                       'Street_Code3_index','Issuer_Precinct_index',
                                       'Issuer_Command_index','Violation_In_Front_Of_Or_Opposite_index',
                                       'Month','Day','Time'],outputCol='features')

labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=target_indexer.labels)

**Logistic Regression**

In [None]:
lr = LogisticRegression(maxIter=100, featuresCol='features', labelCol='label')
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=indexers+[target_indexer,assembler,lr, labelConverter])

model = pipeline.fit(train)
#Transforming train data
predictions = model.transform(test)
predictions.select("predictedLabel", "Violation_County", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Accuracy = %g" % (accuracy))


**Random Forest Classifier**

In [None]:
rf = RandomForestClassifier(numTrees=10,maxBins=4000,labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=indexers+[target_indexer,assembler,rf, labelConverter])

model = pipeline.fit(train)
#Transforming train data
predictions = model.transform(test)
predictions.select("predictedLabel", "Violation_County", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Accuracy = %g" % (accuracy))


**Decision Tree Classifier**

In [None]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth = 10, maxBins=4000)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=indexers+[target_indexer,assembler,dt, labelConverter])

model = pipeline.fit(train)
#Transforming train data
predictions = model.transform(test)
predictions.select("predictedLabel", "Violation_County", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Accuracy = %g" % (accuracy))

**Uisng the violation county prediction as input for predicting the violation location**

#**Violation location prediction**

Using the violation county prediction as input for predicting the violation location

In [33]:
#Label encoding pipeline
#Split the data

train, test = df_new.randomSplit([0.80,0.20])

indexersL = [StringIndexer(inputCol=column, outputCol=column+"_index",handleInvalid='keep').fit(df_new) for column in list(set(df_new.columns)-set(['Month','Day','Time','Violation_Location','Summons_Number']))]
target_indexerL = StringIndexer(inputCol="Violation_Location", outputCol="label", handleInvalid='keep').fit(df_new)
assemblerL = VectorAssembler(inputCols=['Registration_State_index','Plate_Type_index',
                                       'Violation_Code_index','Vehicle_Body_Type_index',
                                       'Vehicle_Make_index','Issuing_Agency_index',
                                       'Street_Code1_index','Street_Code2_index',
                                       'Street_Code3_index','Issuer_Precinct_index',
                                       'Issuer_Command_index','Violation_In_Front_Of_Or_Opposite_index',
                                       'Violation_County_index','Month','Day','Time'],outputCol='features')

# Convert indexed labels back to original labels.
labelConverterL = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=target_indexerL.labels)

**Logistic Regression**

In [34]:
lr = LogisticRegression(maxIter=100, featuresCol='features', labelCol='label')
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=indexersL+[target_indexerL,assemblerL,lr, labelConverterL])

model = pipeline.fit(train)
#Transforming train data
predictions = model.transform(test)
predictions.select("predictedLabel", "Violation_Location", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Accuracy = %g" % (accuracy))

+--------------+------------------+--------------------+
|predictedLabel|Violation_Location|            features|
+--------------+------------------+--------------------+
|          13.0|               1.0|[1.0,0.0,13.0,3.0...|
|          13.0|               1.0|[2.0,0.0,13.0,0.0...|
|          13.0|               1.0|[0.0,0.0,13.0,0.0...|
|           7.0|               7.0|[0.0,0.0,13.0,0.0...|
|          17.0|               1.0|[0.0,0.0,13.0,0.0...|
+--------------+------------------+--------------------+
only showing top 5 rows

Test Error = 0.35283
Test Accuracy = 0.64717


**Decision Tree**

In [40]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth = 30, maxBins=4000)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=indexersL+[target_indexerL,assemblerL,dt, labelConverterL])

model = pipeline.fit(train)
#Transforming train data
predictions = model.transform(test)
predictions.select("predictedLabel", "Violation_Location", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Accuracy = %g" % (accuracy))

+--------------+------------------+--------------------+
|predictedLabel|Violation_Location|            features|
+--------------+------------------+--------------------+
|           1.0|               1.0|[1.0,0.0,13.0,3.0...|
|           1.0|               1.0|[2.0,0.0,13.0,0.0...|
|           1.0|               1.0|[0.0,0.0,13.0,0.0...|
|           5.0|               7.0|[0.0,0.0,13.0,0.0...|
|           1.0|               1.0|[0.0,0.0,13.0,0.0...|
+--------------+------------------+--------------------+
only showing top 5 rows

Test Error = 0.298807
Test Accuracy = 0.701193


**Random Forest CLassifier**

In [38]:
rf = RandomForestClassifier(numTrees=10,maxBins=6000,labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=indexers+[target_indexer,assembler,rf,labelConverter])

model = pipeline.fit(train)
#Transforming train data
predictions = model.transform(test)

# A = {c.name: c.metadata["ml_attr"]["vals"] for c in df_r1.schema.fields if c.name.endswith("label")}
# print("String Indexed labels:{}".format(A))

# Select example rows to display.
predictions.select("predictedLabel", "Violation_Location", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Error = %g" % (accuracy))

+--------------+------------------+--------------------+
|predictedLabel|Violation_Location|            features|
+--------------+------------------+--------------------+
|     Manhattan|               1.0|[1.0,0.0,13.0,3.0...|
|     Manhattan|               1.0|[2.0,0.0,13.0,0.0...|
|     Manhattan|               1.0|[0.0,0.0,13.0,0.0...|
|     Manhattan|               7.0|[0.0,0.0,13.0,0.0...|
|     Manhattan|               1.0|[0.0,0.0,13.0,0.0...|
+--------------+------------------+--------------------+
only showing top 5 rows

Test Error = 0.04029
Test Error = 0.95971
