## CS4830 Project

* Aniruddha (ME18B181)
* Vasudev Gupta (ME18B182)
* Shubham (ME18B183)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline, PipelineModel

In [None]:
spark = SparkSession.builder.appName("CS4830_project")\
         .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
         .getOrCreate()

In [None]:
DATA_PATH = "gs://big-data-cs4830/project/trainingdatanyc.csv/*.csv"

## Data Exploration Steps

In [None]:
df = spark.read.option("header", "true").csv(DATA_PATH)

In [None]:
df = df.filter(col("Violation Precinct").isNotNull())
df.select("Violation Precinct").distinct().count()

In [None]:
df.select("Violation Precinct").distinct().sort("Violation Precinct").show()

In [None]:
df.printSchema()

In [None]:
df.select([count(when(isnan(col) | isnull(col), col)).alias(col) for col in df.columns]).show()

In [None]:
# removed due to presence of many null values
cols_to_drop = ['Time First Observed', 'Intersecting Street', 'Law Section', 'Violation Legal Code', 'To Hours In Effect', 'Unregistered Vehicle?', 'Meter Number', 'Violation Description', 'No Standing or Stopping Violation', 'Hydrant Violation', 'Double Parking Violation', 'Latitude', 'Longitude', 'Community Board', 'Community Council', 'Census Tract', 'BIN', 'BBL', 'NTA']
df = df.select([col for col in df.columns if col not in cols_to_drop])

In [None]:
# handling null values
df = df.na.fill('NULL')

In [None]:
for col in df.columns:
    print(col, df.select(col).distinct().count())

In [None]:
input_columns = ["Feet From Curb", "Violation In Front Of Or Opposite", "Issuing Agency", "Violation County", "Plate Type", "Violation Code", "Registration State", "Issuer Squad"]
df = df.select(input_columns + ["Violation Precinct"])

In [None]:
df.printSchema()

## Exporting code for production

In [None]:
INPUT_COLUMNS = ["Feet From Curb", "Violation In Front Of Or Opposite", "Issuing Agency", "Violation County", "Plate Type", "Violation Code", "Registration State", "Issuer Squad"]
TARGET_COLUMN = "Violation Precinct"

In [None]:
def read_and_prepare_data(path):
    df = spark.read.option("header", "true").csv(path)
    df = df.filter(col(TARGET_COLUMN).isNotNull())

    df = df.na.fill('NULL')    
    df = df.select(INPUT_COLUMNS + [TARGET_COLUMN])

    return df

## Preparing data for training & inference

In [None]:
data = read_and_prepare_data(DATA_PATH)
data.printSchema()

In [None]:
tr_data, val_data = data.randomSplit([0.99, 0.01], seed=42)
print({"train_size": tr_data.count(), "val_size": val_data.count()})

In [None]:
# # just for testing purposes
# _, tr_data = data.randomSplit([0.999, 0.001], seed=42)
# val_data = tr_data
# tr_data.count()

## Setting up model pipeline

In [None]:
labelIndexer = StringIndexer(inputCol=TARGET_COLUMN, outputCol="label").fit(tr_data)
print('labels:', labelIndexer.labels)

In [None]:
feature_indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in INPUT_COLUMNS]
feature_pipeline = Pipeline(stages=feature_indexers).fit(tr_data)

In [None]:
OHE = OneHotEncoder(
    inputCols=[col + "_index" for col in INPUT_COLUMNS],
    outputCols=[col + "_onehot" for col in INPUT_COLUMNS],
)

In [None]:
assembler = VectorAssembler(
    inputCols=[col + "_onehot" for col in INPUT_COLUMNS],
    outputCol="features",
)

In [None]:
model = LogisticRegression(featuresCol="features", labelCol="label", predictionCol="class")
index_to_string = IndexToString(inputCol="class", outputCol="prediction", labels=labelIndexer.labels)

In [None]:
stages = [
    labelIndexer,
    feature_pipeline,
    OHE,
    assembler,
    model,
    index_to_string,
]
pipeline = Pipeline(stages=stages)

In [None]:
pipeline = pipeline.fit(tr_data)

In [None]:
MODEL_PATH = "gs://big-data-cs4830/project/final_model"

In [None]:
pipeline.save(MODEL_PATH)

## Using model for inference

In [None]:
# running the model directly for testing
pipeline = PipelineModel.load(MODEL_PATH)

In [None]:
accuracy_metric =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="class", metricName="accuracy")
f1_metric =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="class", metricName="f1")

In [None]:
tr_pred =  pipeline.transform(tr_data).select("class", "label")
tr_pred.show()

In [None]:
accuracy_metric.evaluate(tr_pred), f1_metric.evaluate(tr_pred)

In [None]:
val_pred =  pipeline.transform(val_data).select("class", "label")
val_pred.show()

In [None]:
accuracy_metric.evaluate(val_pred), f1_metric.evaluate(val_pred)

### Kafka Producer

In [None]:
# TODO: change following for demo
REAL_TIME_DATA_PATH = DATA_PATH
BROKER = "10.128.0.34:9092"
TOPIC = "CS4830-project"
LIMIT = 200

In [None]:
!pip3 install -U -q kafka-python

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1'

In [None]:
import json
from kafka import KafkaProducer
from tqdm.auto import tqdm

In [None]:
real_time_df = read_and_prepare_data(REAL_TIME_DATA_PATH)

In [None]:
# data file should be small otherwise one needs to allocate bigger cluster
real_time_df = real_time_df.limit(LIMIT)

In [None]:
producer = KafkaProducer(
    bootstrap_servers=[BROKER],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

In [None]:
pandas_df = real_time_df.toPandas()

In [None]:
for index, row in tqdm(pandas_df.iterrows()):
    payload = ",".join(str(x) for x in row.to_dict().values())
    producer.send(TOPIC, value = ',' + payload + ',')
    producer.flush()