In [46]:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct
from pyspark.sql.functions import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline, PipelineModel

In [20]:
from google.cloud import storage
client = storage.Client()
json_files = [os.path.join(f"gs://{bucket}", x.name) 
              for x in client.list_blobs('bdl2021_final_project', prefix='nyc_tickets_train.csv') 
                                                                      if x.name.endswith('.csv')]

In [6]:
# linking pyspark to kafka #
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1'

In [7]:
spark = (SparkSession
         .builder
         .appName("NYC_Parking")
         .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")
         .getOrCreate())

In [9]:
df = (spark.read
    .option("header", "true")
    .csv("gs://bdl2021_final_project/nyc_tickets_train.csv/*.csv"))
df.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation_County: string (nullable = true)
 |-- Violation In Front Of Or Opposite: string (nullable = true)
 |-- House Number: string (nullable = true)
 |-- Street Name

In [11]:
df.columns

['Summons Number',
 'Plate ID',
 'Registration State',
 'Plate Type',
 'Issue Date',
 'Violation Code',
 'Vehicle Body Type',
 'Vehicle Make',
 'Issuing Agency',
 'Street Code1',
 'Street Code2',
 'Street Code3',
 'Vehicle Expiration Date',
 'Issuer Code',
 'Issuer Command',
 'Issuer Squad',
 'Violation Time',
 'Time First Observed',
 'Violation_County',
 'Violation In Front Of Or Opposite',
 'House Number',
 'Street Name',
 'Intersecting Street',
 'Date First Observed',
 'Law Section',
 'Sub Division',
 'Violation Legal Code',
 'Days Parking In Effect',
 'From Hours In Effect',
 'To Hours In Effect',
 'Vehicle Color',
 'Unregistered Vehicle?',
 'Vehicle Year',
 'Meter Number',
 'Feet From Curb',
 'Violation Post Code',
 'Violation Description',
 'No Standing or Stopping Violation',
 'Hydrant Violation',
 'Double Parking Violation',
 'Latitude',
 'Longitude',
 'Community Board',
 'Community Council',
 'Census Tract',
 'BIN',
 'BBL',
 'NTA']

In [18]:
dfnan = df.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in df.columns])

In [19]:
%%time
dfnan.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+----------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+-----------------+------------+--------+--------+--------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Issuer Code|Issuer Command|Iss

In [15]:
df.groupBy("Violation_County") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+----------------+-------+
|Violation_County|  count|
+----------------+-------+
|              NY|7649847|
|               K|4593940|
|               Q|4001832|
|              BX|2263800|
+----------------+-------+



We will account for this class imbalance

In [167]:
to_drop = ['Time First Observed', 'Intersecting Street', 'Law Section', 'Violation Legal Code', 'To Hours In Effect', 'Unregistered Vehicle?', 
            'Meter Number', 'Violation Description', 'No Standing or Stopping Violation', 'Hydrant Violation', 
            'Double Parking Violation', 'Latitude', 'Longitude', 'Community Board', 
            'Community Council', 'Census Tract', 'BIN', 'BBL', 'NTA']; #removed due to hagh cardinality of null values
df2 = df.select([i for i in df.columns if i not in to_drop])

In [168]:
y_collect = df2.select("Violation_County").groupBy("Violation_County").count().collect()

KeyboardInterrupt: 

In [169]:
import numpy as np
unique_y = [x["Violation_County"] for x in y_collect]
total_y = np.sum([x["count"] for x in y_collect])
unique_y_count = len(y_collect)
bin_count = [x["count"] for x in y_collect]

class_weights_spark = {i: ii for i, ii in zip(unique_y, total_y / (unique_y_count * np.array(bin_count)))}
print(class_weights_spark) # {0.0: 5.0, 1.0: 0.5555555555555556}

{'K': 1.007273658341206, 'Q': 1.1563090979331465, 'BX': 2.0440651780192596, 'NY': 0.6048950717576443}


In [170]:
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*class_weights_spark.items())])

df2 = df2.withColumn("weight", mapping_expr.getItem(col("Violation_County")))

### Model Training

In [171]:
# Selecting the categorical columns based on a catboost based feature importance on a subset of the provided data
cat_cols = ['Issuer Command',
 'Days Parking In Effect',
 'From Hours In Effect',
 'Vehicle Year']
# creating the intermediate column names
cat_cols_indexed = list(map(lambda x: x+'_Index', cat_cols))
cat_cols_onehot = list(map(lambda x: x+'_Onehot', cat_cols))

In [172]:
df2 = df2.select([i for i in cat_cols + ['Violation_County','weight']])

In [173]:
df2= df2.na.fill('NULL')

In [174]:
# test train split
(trainingData_original, testData) = df2.randomSplit([0.7, 0.3], seed = 100)
trainingData = trainingData_original

Label Encoding Features

In [176]:
featureIndexers = []
for i in cat_cols:
    featureIndexers.append(StringIndexer(inputCol=i,outputCol=i+'_Index').setHandleInvalid("keep"))

feature_pipeline = Pipeline(stages = featureIndexers).fit(trainingData)

trainingData = feature_pipeline.transform(trainingData)

One Hot encoding Features

In [177]:
OHE = OneHotEncoder(inputCols=cat_cols_indexed,outputCols=cat_cols_onehot).fit(trainingData)
trainingData = OHE.transform(trainingData)

In [178]:
columns = cat_cols_onehot

Vector Assembler

In [179]:
assembler = VectorAssembler(inputCols=columns,
                           outputCol='features')
trainingData = assembler.transform(trainingData)

Label Encoding Label to predict

In [180]:
labelIndexer = StringIndexer() \
    .setInputCol('Violation_County') \
    .setOutputCol("label") \
    .setHandleInvalid("skip") \
    .fit(trainingData)
trainingData = labelIndexer.transform(trainingData)
print('labels in Order:', labelIndexer.labels)

labels in Order: ['NY', 'K', 'Q', 'BX']


Model fitting

In [181]:
classifier = LogisticRegression() \
    .setMaxIter(10) \
    .setRegParam(0.3) \
    .setElasticNetParam(0.8) \
    .setWeightCol('weight') \
    .fit(trainingData)

Label Column to String

In [182]:
outputLabel = IndexToString() \
    .setInputCol("prediction") \
    .setOutputCol('Violation_County_Prediction') \
    .setLabels(labelIndexer.labels)

Create Pipeline

In [183]:
pipeline = Pipeline(stages= [feature_pipeline, OHE, assembler, classifier, outputLabel])
model = pipeline.fit(trainingData_original)
print('Pipeline has been fit:')

Pipeline has been fit:


In [184]:
!gsutil rm -r gs://bd_project_joe/finalproject/model

Removing gs://bd_project_joe/finalproject/model/metadata/#1621066594510307...
Removing gs://bd_project_joe/finalproject/model/metadata/_SUCCESS#1621066594781544...
Removing gs://bd_project_joe/finalproject/model/metadata/part-00000#1621066593812557...
Removing gs://bd_project_joe/finalproject/model/stages/0_PipelineModel_f320b77b4089/metadata/#1621066597729698...
/ [4 objects]                                                                   
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m rm ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Removing gs://bd_project_joe/finalproject/model/stages/0_PipelineModel_f320b77b4089/metadata/_SUCCESS#1621066597937898...
Removing gs://bd_project_joe/finalproject/model/stages/0_PipelineModel_f320b77b4089/metadata/part-00000#1621066597069795...
Removing gs://bd_project_joe/finalproject/model/stages/

In [185]:
#model = PipelineModel.load('gs://bd_project_joe/finalproject/model/')
model.save('gs://bd_project_joe/finalproject/model')

In [186]:
testPredictions =  model.transform(testData)
testPredictions = labelIndexer.transform(testPredictions)
trainPredictions = model.transform(trainingData_original)
trainPredictions = labelIndexer.transform(trainPredictions)

#### Evaluation

In [187]:
%%time
evaluatoracc =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluatorf1 =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
# Train
trainaccuracy = evaluatoracc.evaluate(trainPredictions)
trainf1 = evaluatorf1.evaluate(trainPredictions)

CPU times: user 299 ms, sys: 26.9 ms, total: 326 ms
Wall time: 1min 58s


In [188]:
# Test evaluation
testaccuracy = evaluatoracc.evaluate(testPredictions)
testf1 = evaluatorf1.evaluate(testPredictions)

In [189]:
print("Train Accuracy:", trainaccuracy)
print("Train F1 score:", trainf1)
print("Test Accuracy:", testaccuracy)
print("Test F1 score:", testf1)

Train Accuracy: 0.7033504234949932
Train F1 score: 0.6917843722596115
Test Accuracy: 0.7030805839094334
Test F1 score: 0.6915001891015524


### Kafka Producer

In [114]:
dfproducer = (spark.read
    .option("header", "true")
    .csv("gs://bdl2021_final_project/nyc_tickets_train.csv/*.csv"))

In [115]:
dfproducer1 = dfproducer.select([i for i in dfproducer.columns if i in ['Summons Number'] + cat_cols + ['Violation_County']])

In [116]:
dfproducer1 = dfproducer1.na.fill('NULL')

In [117]:
dfproducer1.columns

['Summons Number',
 'Issuer Command',
 'Violation_County',
 'Days Parking In Effect',
 'From Hours In Effect',
 'Vehicle Year']

In [118]:
dfproducer1 = dfproducer1.select(to_json(struct("*"))) \
    .toDF("value") \
    .selectExpr("cast(value as string) value").limit(20000)

In [119]:
dfPandas = dfproducer1.toPandas()

In [120]:
producer = KafkaProducer(bootstrap_servers=['10.138.0.4:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

In [None]:
import pandas as pd
import json
for index, row in dfPandas.iterrows():
    try:
        payload = ",".join(str(x) for x in json.loads(row[0]).values())
        producer.send('quickstart-events', value = ','+payload+',')
        producer.flush()
    except KeyboardInterrupt:
        print('\nKinterrupted')
        break


Kinterrupted
