Pratham Tiwari Big Data Project

1.Spark Setup and Data Extraction

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("Pratham Big Data Project").getOrCreate()

sc = spark.sparkContext

In [2]:
df = spark.read.csv("train.csv", header=True, inferSchema=True)
df.show(5)

+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...|   -122.42436302145|  37.8004143219856|
|2015-05-13 23:30:00| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHER

In [3]:
from pyspark.sql.functions import col, lower

df = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('train.csv')

data = df.select(
    lower(col('Category')).alias('Category'),
    lower(col('Descript')).alias('Description')
)

data.cache()

print('Dataframe Structure')
print('----------------------------------')
data.printSchema()

print('\nDataframe preview')
print('----------------------------------')
data.show(5)

print('\nTotal number of rows:', df.count())

Dataframe Structure
----------------------------------
root
 |-- Category: string (nullable = true)
 |-- Description: string (nullable = true)


Dataframe preview
----------------------------------
+--------------+--------------------+
|      Category|         Description|
+--------------+--------------------+
|      warrants|      warrant arrest|
|other offenses|traffic violation...|
|other offenses|traffic violation...|
| larceny/theft|grand theft from ...|
| larceny/theft|grand theft from ...|
+--------------+--------------------+
only showing top 5 rows


Total number of rows: 878049


In [4]:
from pyspark.sql.functions import col

def top_n_list(df, var, N):

    total_unique = df.select(var).distinct().count()
    print(f"Total number of unique values in '{var}': {total_unique}\n")
    print(f"Top {N} most frequent values in '{var}':")

    df.groupBy(var).count()\
      .withColumnRenamed('count', 'totalValue')\
      .orderBy(col('totalValue').desc())\
      .show(N, truncate=False)

top_n_list(data, 'Category', 10)
print('\n')
top_n_list(data, 'Description', 10)

Total number of unique values in 'Category': 39

Top 10 most frequent values in 'Category':
+--------------+----------+
|Category      |totalValue|
+--------------+----------+
|larceny/theft |174900    |
|other offenses|126182    |
|non-criminal  |92304     |
|assault       |76876     |
|drug/narcotic |53971     |
|vehicle theft |53781     |
|vandalism     |44725     |
|warrants      |42214     |
|burglary      |36755     |
|suspicious occ|31414     |
+--------------+----------+
only showing top 10 rows



Total number of unique values in 'Description': 879

Top 10 most frequent values in 'Description':
+-----------------------------------------+----------+
|Description                              |totalValue|
+-----------------------------------------+----------+
|grand theft from locked auto             |60022     |
|lost property                            |31729     |
|battery                                  |27441     |
|stolen automobile                        |26897     |
|dri

In [5]:
data.select('Category').distinct().count()

39

In [6]:
training, test = data.randomSplit([0.7,0.3], seed=60)
print("Training Dataset Count:", training.count())
print("Test Dataset Count:", test.count())

Training Dataset Count: 614667
Test Dataset Count: 263382


2.Pipeline Setup and Model Definition

In [7]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes

regex_tokenizer = RegexTokenizer(pattern='\\W')\
                  .setInputCol("Description")\
                  .setOutputCol("tokens")

extra_stopwords = ['http','amp','rt','t','c','the']
stopwords_remover = StopWordsRemover()\
                    .setInputCol('tokens')\
                    .setOutputCol('filtered_words')\
                    .setStopWords(extra_stopwords)


count_vectors = CountVectorizer(vocabSize=10000, minDF=5)\
               .setInputCol("filtered_words")\
               .setOutputCol("features")


hashingTf = HashingTF(numFeatures=10000)\
            .setInputCol("filtered_words")\
            .setOutputCol("raw_features")

idf = IDF(minDocFreq=5)\
        .setInputCol("raw_features")\
        .setOutputCol("features")

word2Vec = Word2Vec(vectorSize=1000, minCount=0)\
           .setInputCol("filtered_words")\
           .setOutputCol("features")

label_string_idx = StringIndexer()\
                  .setInputCol("Category")\
                  .setOutputCol("label")

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

nb = NaiveBayes(smoothing=1)

def metrics_ev(labels, metrics):

    print("---------Confusion matrix-----------------")
    print(metrics.confusionMatrix)
    print(' ')

    print('----------Overall statistics-----------')
    print("Precision = %s" %  metrics.precision())
    print("Recall = %s" %  metrics.recall())
    print("F1 Score = %s" % metrics.fMeasure())
    print(' ')

    print('----------Statistics by class----------')
    for label in sorted(labels):
       print("Class %s precision = %s" % (label, metrics.precision(label)))
       print("Class %s recall = %s" % (label, metrics.recall(label)))
       print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
    print(' ')

    print('----------Weighted stats----------------')
    print("Weighted recall = %s" % metrics.weightedRecall)
    print("Weighted precision = %s" % metrics.weightedPrecision)
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

3.Build Multi-Classification

3a.Logistic Regression with Count Vector Features

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import OneVsRest
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as F
from pyspark.sql.functions import col

pipeline_cv_lr = Pipeline().setStages([regex_tokenizer, stopwords_remover, count_vectors, label_string_idx, lr])
model_cv_lr = pipeline_cv_lr.fit(training)
predictions_cv_lr = model_cv_lr.transform(test)

In [13]:
print('-----------------------------Check Top 5 predictions----------------------------------')
predictions_cv_lr.select('Description', 'Category', 'probability', 'label', 'prediction')\
                  .orderBy("probability", ascending=False)\
                  .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105611061,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105611061,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105611061,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105611061,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105611061,0.02048...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [17]:
evaluator_cv_lr = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")

accuracy = evaluator_cv_lr.evaluate(predictions_cv_lr)
print(f'Accuracy: {accuracy}')

precision_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions_cv_lr)
print(f'Precision: {precision}')

recall_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions_cv_lr)
print(f'Recall: {recall}')

f1_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions_cv_lr)
print(f'F1 Score: {f1_score}')

rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions_cv_lr)
print(f'RMSE: {rmse}')

mse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mse")
mse = mse_evaluator.evaluate(predictions_cv_lr)
print(f'MSE: {mse}')

mae_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
mae = mae_evaluator.evaluate(predictions_cv_lr)
print(f'MAE: {mae}')

r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")
r2 = r2_evaluator.evaluate(predictions_cv_lr)
print(f'R-Squared: {r2}')

def calculate_mape(predictions):
    return predictions.withColumn("mape", F.abs((col("label") - col("prediction")) / col("label")) * 100)

predictions_mape = calculate_mape(predictions_cv_lr)
mape = predictions_mape.agg(F.avg("mape")).collect()[0][0]
print(f'MAPE: {mape}')

Accuracy: 0.9780281112604506
Precision: 0.9724534113698968
Recall: 0.9780281112604506
F1 Score: 0.9720379224200315
RMSE: 3.0462195287716347
MSE: 9.27945341746968
MAE: 0.42185494832600556
R-Squared: 0.6943280931074408
MAPE: 2.5339090007843796


3b.Apply Naive Bayes with Count Vector Features

In [19]:
pipeline_cv_nb = Pipeline().setStages([regex_tokenizer, stopwords_remover, count_vectors, label_string_idx, nb])
model_cv_nb = pipeline_cv_nb.fit(training)
predictions_cv_nb = model_cv_nb.transform(test)

In [20]:
accuracy_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions_cv_nb)
print(f'Accuracy: {accuracy}')

precision_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions_cv_nb)
print(f'Precision: {precision}')

recall_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions_cv_nb)
print(f'Recall: {recall}')

f1_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions_cv_nb)
print(f'F1 Score: {f1_score}')

Accuracy: 0.993268332687883
Precision: 0.9941741818988308
Recall: 0.993268332687883
F1 Score: 0.99350875457078


3c.Logistic Regression Using TF-IDF Features

In [24]:
pipeline_idf_lr = Pipeline().setStages([regex_tokenizer, stopwords_remover, hashingTf, idf, label_string_idx, lr])
model_idf_lr = pipeline_idf_lr.fit(training)
predictions_idf_lr = model_idf_lr.transform(test)

In [25]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_idf_lr.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589967,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589967,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589967,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589967,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589967,0.01879...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [29]:
accuracy_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions_idf_lr)
print(f'Accuracy: {accuracy}')

precision_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions_idf_lr)
print(f'Precision: {precision}')

recall_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions_idf_lr)
print(f'Recall: {recall}')

f1_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions_idf_lr)
print(f'F1 Score: {f1_score}')

Accuracy: 0.9780167209604301
Precision: 0.9725160742029567
Recall: 0.97801672096043
F1 Score: 0.9719229068954107
