In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import RandomForestClassifier

***

Spark ML Lib

Use notebook on Databricks for better EDA visualisations and faster processing.

***

In [2]:
sc = SparkContext()
sqlContext = SparkSession.builder.appName('Praful').getOrCreate()

In [3]:
# Load Data
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/m_2013954/Desktop/Praf/Personal/DS/train.csv')

In [4]:
data.show(10)

+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...|   -122.42436302145|  37.8004143219856|
|2015-05-13 23:30:00| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHER

In [5]:
data.columns

['Dates',
 'Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'X',
 'Y']

In [6]:
# Drop unwanted columns
drop_list = ["Dates", "DayOfWeek", "PdDistrict", "Resolution", "Address", "X", "Y"]

data = data.select([column for column in data.columns if column not in drop_list])

data.show()

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|PETTY THEFT FROM ...|
|OTHER OFFENSES|MISCELLANEOUS INV...|
|     VANDALISM|MALICIOUS MISCHIE...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
|  NON-CRIMINAL|      FOUND PROPERTY|
|  NON-CRIMINAL|      FOUND PROPERTY|
|       ROBBERY|ROBBERY, ARMED WI...|
|       ASSAULT|AGGRAVATED ASSAUL...|
|OTHER OFFENSES|   TRAFFIC VIOLATION|
|  NON-CRIMINAL|      FOUND PROPERTY|
+--------------+--------------------+
only showing top 20 rows



In [7]:
# To View Schema 
data.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)



In [8]:
# Top 20 Crime Categories
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



In [9]:
# Top 20 Crime Categories + Descriptions
data.groupBy(["Category", "Descript"]).count().orderBy(col("count").desc()).show()

+--------------+--------------------+-----+
|      Category|            Descript|count|
+--------------+--------------------+-----+
| LARCENY/THEFT|GRAND THEFT FROM ...|60022|
|  NON-CRIMINAL|       LOST PROPERTY|31729|
|       ASSAULT|             BATTERY|27441|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|26897|
|OTHER OFFENSES|DRIVERS LICENSE, ...|26839|
|      WARRANTS|      WARRANT ARREST|23754|
|SUSPICIOUS OCC|SUSPICIOUS OCCURR...|21891|
|  NON-CRIMINAL|AIDED CASE, MENTA...|21497|
| LARCENY/THEFT|PETTY THEFT FROM ...|19771|
|     VANDALISM|MALICIOUS MISCHIE...|17789|
|OTHER OFFENSES|   TRAFFIC VIOLATION|16471|
| LARCENY/THEFT|PETTY THEFT OF PR...|16196|
|     VANDALISM|MALICIOUS MISCHIE...|15957|
|       ASSAULT|THREATS AGAINST LIFE|14716|
|  NON-CRIMINAL|      FOUND PROPERTY|12146|
|      WARRANTS|ENROUTE TO OUTSID...|11470|
| LARCENY/THEFT|GRAND THEFT OF PR...|11010|
| DRUG/NARCOTIC|POSSESSION OF NAR...|10050|
| LARCENY/THEFT|PETTY THEFT FROM ...|10029|
| LARCENY/THEFT|PETTY THEFT SHOP

In [10]:
# Model Pipeline

# 3 Steps:
# 1. Tokenization
# 2. Remove Stop Words
# 3. Count Vectors

# Using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")

# Select Stop Words to Ignore
add_stopwords = ["http", "https", "amp", "rt", "t", "c", "the"]

# Remove Stop Words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# Count Vectorizer Model using Bag of Words
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [11]:
# StringIndexer to Create Labels
label_stringIdx = StringIndexer(inputCol="Category", outputCol="label")

# Create a Pipeline - which will be our sequence of stages/steps in the ML process
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [12]:
# Fit the Pipeline to Training Documents
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(10)

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(809,[17,32],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,104...|  0.0|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|

In [13]:
# Training and Test Datasets
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], 100)

print("Training Dataset Count:" + str(trainingData.count()))
print("Testing Dataset Count:" + str(testData.count()))

Training Dataset Count:614561
Testing Dataset Count:263488


In [14]:
# Logistic Regression using Count Vector Features
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions["prediction"]==0) \
           .select("Descript", "Category", "probability", "label", "prediction") \
           .orderBy("probability", ascending=False).show(n=10, truncate=30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8727453887602625,0.02061...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8

In [16]:
# Accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

print(f"Accuracy of Logistic Regression Model using Count Vectorizer: {evaluator.evaluate(predictions)}")

Accuracy of Logistic Regression Model using Count Vectorizer: 0.9726104545051534


In [17]:
# Logistic Regression using TF-IDF Features

# HashingTF is a transformer which takes sets of terms and converts those sets into fixed-length vectors
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

# IDF (Inverse Document Frequency) - used to reflect the importance of a term to a document in the corpurs
# minDocFreq removes sparse words
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

# Create a Pipeline - which will be our sequence of stages/steps in the ML process
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [18]:
# Fit the Pipeline to Training Documents
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

# Training and Test Datasets
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions["prediction"]==0) \
           .select("Descript", "Category", "probability", "label", "prediction") \
           .orderBy("probability", ascending=False).show(n=10, truncate=30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.882914051690535,0.019173...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8

In [19]:
# Accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

print(f"Accuracy of Logistic Regression Model using HashingTF: {evaluator.evaluate(predictions)}")

Accuracy of Logistic Regression Model using HashingTF: 0.9726123540373978


In [20]:
# Cross-Validation to tune hyper-parameters

# We can just tune either one of the models 
# We will tune only Count Vectors Logistic Regression

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

# Training and Test Datasets
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
             .addGrid(lr.maxIter, [10, 20, 50])
             .build())

# Create 5-Fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate Best Model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print(f"Accuracy after Cross-Validation: {evaluator.evaluate(predictions)}")

Accuracy after Cross-Validation: 0.9919573055956002


In [23]:
# Naive Bayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)

predictions = model.transform(testData)
predictions.filter(predictions["prediction"]==0) \
           .select("Descript", "Category", "probability", "label", "prediction") \
           .orderBy("probability", ascending=False) \
           .show(n=10, truncate=30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print(f"Accuracy of Naive Bayes Model: {evaluator.evaluate(predictions)}")

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.53944...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999999861693,1.5394

In [24]:
# Random Forest 
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, maxDepth=4, maxBins=32)

# Train Model with Training Data
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)

predictions.filter(predictions["prediction"]==0) \
           .select("Descript", "Category", "probability", "label", "prediction") \
           .orderBy("probability", ascending=False) \
           .show(n=10, truncate=30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print(f"Accuracy of Random Forest Model: {evaluator.evaluate(predictions)}")

# As the rumor has it - RF is no good for high-dimensional sparse data

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.08403...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.5195727297682495,0.0840