## Problem Statement:
From 1934 to 1963, San Francisco was infamous for housing some of the world's most notorious criminals on the inescapable island of Alcatraz.

Today, the city is known more for its tech scene than its criminal past. But, with rising wealth inequality, housing shortages, and a proliferation of expensive digital toys riding BART to work, there is no scarcity of crime in the city by the bay.

From Sunset to SOMA, and Marina to Excelsior, this competition's dataset provides nearly 12 years of crime reports from across all of San Francisco's neighborhoods. Given time and location, you must predict the category of crime that occurred.

We're also encouraging you to explore the dataset visually. What can we learn about the city through visualizations like this Top Crimes Map? The top most up-voted scripts from this competition will receive official Kaggle swag as prizes. 

### Data fields
Dates - timestamp of the crime incident

Category - category of the crime incident (only in train.csv). This is the target variable you are going to predict.

Descript - detailed description of the crime incident (only in train.csv)

DayOfWeek - the day of the week

PdDistrict - name of the Police Department District

Resolution - how the crime incident was resolved (only in train.csv)

Address - the approximate street address of the crime incident 

X - Longitude

Y - Latitude

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('sf_crime_classification').getOrCreate()

In [4]:
from pyspark import SparkFiles
df = spark.read.csv('train.csv',header=True)
df.show()

+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...|   -122.42436302145|  37.8004143219856|
|2015-05-13 23:30:00| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHER

In [5]:
print('Dataframe Structure')
print('----------------------------------')
print(df.printSchema())
print(' ')
print('Dataframe preview')
print(df.show(5))
print(' ')
print('----------------------------------')
print('Total number of rows', df.count())

Dataframe Structure
----------------------------------
root
 |-- Dates: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: string (nullable = true)
 |-- Y: string (nullable = true)

None
 
Dataframe preview
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / L

In [6]:
df.columns

['Dates',
 'Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'X',
 'Y']

In [7]:
from pyspark.sql.functions import col, lower

def top_n_list(df,var, N):
    '''
    This function determine the top N numbers of the list
    '''
    print("Total number of unique value of"+' '+var+''+':'+' '+str(df.select(var).distinct().count()),'\n')
    print('Top'+' '+str(N)+' '+'Crime'+' '+var)
    
    df.groupBy(var).count().withColumnRenamed('count','totalValue')\
    .orderBy(col('totalValue').desc()).show(N)
    

In [8]:
top_n_list(df, 'Category',10)

Total number of unique value of Category: 39 

Top 10 Crime Category
+--------------+----------+
|      Category|totalValue|
+--------------+----------+
| LARCENY/THEFT|    174900|
|OTHER OFFENSES|    126182|
|  NON-CRIMINAL|     92304|
|       ASSAULT|     76876|
| DRUG/NARCOTIC|     53971|
| VEHICLE THEFT|     53781|
|     VANDALISM|     44725|
|      WARRANTS|     42214|
|      BURGLARY|     36755|
|SUSPICIOUS OCC|     31414|
+--------------+----------+
only showing top 10 rows



In [9]:
top_n_list(df,'Descript',10)

Total number of unique value of Descript: 879 

Top 10 Crime Descript
+--------------------+----------+
|            Descript|totalValue|
+--------------------+----------+
|GRAND THEFT FROM ...|     60022|
|       LOST PROPERTY|     31729|
|             BATTERY|     27441|
|   STOLEN AUTOMOBILE|     26897|
|DRIVERS LICENSE, ...|     26839|
|      WARRANT ARREST|     23754|
|SUSPICIOUS OCCURR...|     21891|
|AIDED CASE, MENTA...|     21497|
|PETTY THEFT FROM ...|     19771|
|MALICIOUS MISCHIE...|     17789|
+--------------------+----------+
only showing top 10 rows



In [10]:
from pyspark.sql.functions import col, when, count
df.select(
   [count(when(col(i).isNull() , i)).alias(i) for i in df.columns]
   ).show()

+-----+--------+--------+---------+----------+----------+-------+---+---+
|Dates|Category|Descript|DayOfWeek|PdDistrict|Resolution|Address|  X|  Y|
+-----+--------+--------+---------+----------+----------+-------+---+---+
|    0|       0|       0|        0|         0|         0|      0|  0|  0|
+-----+--------+--------+---------+----------+----------+-------+---+---+



In [11]:
df.describe().show()



+-------+-------------------+-----------+--------------------+---------+----------+--------------+--------------------+--------------------+-------------------+
|summary|              Dates|   Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                   X|                  Y|
+-------+-------------------+-----------+--------------------+---------+----------+--------------+--------------------+--------------------+-------------------+
|  count|             878049|     878049|              878049|   878049|    878049|        878049|              878049|              878049|             878049|
|   mean|               null|       null|                null|     null|      null|          null|                null| -122.42261645502985|  37.77102029801277|
| stddev|               null|       null|                null|     null|      null|          null|                null|0.030353622998489052|0.45689310470581795|
|    min|2003-01-06 00:01:00|     

                                                                                

### Data Exploration using SQL

In [12]:
df.createOrReplaceTempView("sf_crime_classification")

In [13]:
df.columns

['Dates',
 'Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'X',
 'Y']

In [14]:
sql_query = spark.sql(
    '''
    Select COUNT(distinct Category) as distinct_categories
    from
    sf_crime_classification
    '''
).show()

+-------------------+
|distinct_categories|
+-------------------+
|                 39|
+-------------------+



In [15]:
sql_query = spark.sql(
    '''
    Select DATE(min(Dates)), DATE(max(Dates))
    from
    sf_crime_classification
    '''
).show()

+----------+----------+
|min(Dates)|max(Dates)|
+----------+----------+
|2003-01-06|2015-05-13|
+----------+----------+



In [16]:
# %%sql
#     Select DATE(min(Dates)), DATE(max(Dates))
#     from
#     sf_crime_classification

In [17]:
training, test = df.randomSplit([0.7,0.3], seed=60)
#trainingSet.cache()
print("Training Dataset Count:", training.count())
print("Test Dataset Count:", test.count())

                                                                                

Training Dataset Count: 614457
Test Dataset Count: 263592


In [18]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes 

#----------------Define tokenizer with regextokenizer()------------------
regex_tokenizer = RegexTokenizer(pattern='\\W')\
                  .setInputCol("Descript")\
                  .setOutputCol("tokens")

#----------------Define stopwords with stopwordsremover()---------------------
extra_stopwords = ['http','amp','rt','t','c','the']
stopwords_remover = StopWordsRemover()\
                    .setInputCol('tokens')\
                    .setOutputCol('filtered_words')\
                    .setStopWords(extra_stopwords)
                    

#----------Define bags of words using countVectorizer()---------------------------
count_vectors = CountVectorizer(vocabSize=10000, minDF=5)\
               .setInputCol("filtered_words")\
               .setOutputCol("features")


#-----------Using TF-IDF to vectorise features instead of countVectoriser-----------------
hashingTf = HashingTF(numFeatures=10000)\
            .setInputCol("filtered_words")\
            .setOutputCol("raw_features")
            
#Use minDocFreq to remove sparse terms
idf = IDF(minDocFreq=5)\
        .setInputCol("raw_features")\
        .setOutputCol("features")

#---------------Define bag of words using Word2Vec---------------------------
word2Vec = Word2Vec(vectorSize=1000, minCount=0)\
           .setInputCol("filtered_words")\
           .setOutputCol("features")

#-----------Encode the Category variable into label using StringIndexer-----------
label_string_idx = StringIndexer()\
                  .setInputCol("Category")\
                  .setOutputCol("label")

#-----------Define classifier structure for logistic Regression--------------
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

#---------Define classifier structure for Naive Bayes----------
nb = NaiveBayes(smoothing=1)

def metrics_ev(labels, metrics):
    '''
    List of all performance metrics
    '''
    # Confusion matrix
    print("---------Confusion matrix-----------------")
    print(metrics.confusionMatrix)
    print(' ')    
    # Overall statistics
    print('----------Overall statistics-----------')
    print("Precision = %s" %  metrics.precision())
    print("Recall = %s" %  metrics.recall())
    print("F1 Score = %s" % metrics.fMeasure())
    print(' ')
    # Statistics by class
    print('----------Statistics by class----------')
    for label in sorted(labels):
       print("Class %s precision = %s" % (label, metrics.precision(label)))
       print("Class %s recall = %s" % (label, metrics.recall(label)))
       print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
    print(' ')
    # Weighted stats
    print('----------Weighted stats----------------')
    print("Weighted recall = %s" % metrics.weightedRecall)
    print("Weighted precision = %s" % metrics.weightedPrecision)
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

In [19]:
pipeline_cv_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, lr])
model_cv_lr = pipeline_cv_lr.fit(training)
predictions_cv_lr = model_cv_lr.transform(test)



                                                                                

22/12/01 20:18:44 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


[Stage 54:>                                                         (0 + 8) / 8]

22/12/01 20:18:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/12/01 20:18:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

22/12/01 20:18:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/01 20:18:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [20]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_cv_lr.select('Descript','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 


[Stage 96:>                                                         (0 + 8) / 8]

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8711720612177544,0.02172...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8711720612177544,0.02172...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8711720612177544,0.02172...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8711720612177544,0.02172...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8711720612177544,0.02172...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



                                                                                

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
evaluator_cv_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_lr)
print(' ')
print('------------------------------Accuracy----------------------------------')
print(' ')
print('                       accuracy:{}:'.format(evaluator_cv_lr))

[Stage 97:>                                                         (0 + 8) / 8]

 
------------------------------Accuracy----------------------------------
 
                       accuracy:0.9722709356739532:


                                                                                

In [22]:
### Secondary model using NaiveBayes
pipeline_cv_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, nb])
model_cv_nb = pipeline_cv_nb.fit(training)
predictions_cv_nb = model_cv_nb.transform(test)

                                                                                

In [23]:
evaluator_cv_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_nb)
print(' ')
print('--------------------------Accuracy-----------------------------')
print(' ')
print('                      accuracy:{}:'.format(evaluator_cv_nb))

[Stage 109:>                                                        (0 + 8) / 8]

 
--------------------------Accuracy-----------------------------
 
                      accuracy:0.9936147254042839:


                                                                                

In [24]:
pipeline_idf_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, lr])
model_idf_lr = pipeline_idf_lr.fit(training)
predictions_idf_lr = model_idf_lr.transform(test)

                                                                                

In [25]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_idf_lr.select('Descript','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 




+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8831671122887976,0.01976...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8831671122887976,0.01976...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8831671122887976,0.01976...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8831671122887976,0.01976...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8831671122887976,0.01976...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



                                                                                

In [26]:
evaluator_idf_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_lr)
print(' ')
print('-------------------------------Accuracy---------------------------------')
print(' ')
print('                        accuracy:{}:'.format(evaluator_idf_lr))



 
-------------------------------Accuracy---------------------------------
 
                        accuracy:0.9722324196105095:


                                                                                

In [27]:
pipeline_idf_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, nb])
model_idf_nb = pipeline_idf_nb.fit(training)
predictions_idf_nb = model_idf_nb.transform(test)

                                                                                

In [28]:
evaluator_idf_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_nb)
print(' ')
print('-----------------------------Accuracy-----------------------------')
print(' ')
print('                          accuracy:{}:'.format(evaluator_idf_nb))

22/12/01 20:19:28 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 171:>                                                        (0 + 8) / 8]

 
-----------------------------Accuracy-----------------------------
 
                          accuracy:0.9949693209189158:


                                                                                