In [9]:
# Import libraries to execute python and spark commands and libraries
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("clipper-pyspark").getOrCreate()

sc = spark.sparkContext

In [10]:
# Importing libraries to perform visualization
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
np.random.seed(60)

In [11]:
#Read the data into spark datafrome
from pyspark.sql.functions import col, lower
df = spark.read.csv('/Users/manje/Documents/MANJEERA/SEM 2/BIG DATA ANALYTICS/TECHNIAL PROJECT/output_file_final.csv', inferSchema=True, header=True)

data = df.select(lower(col('Primary Type')),lower(col('Description')))\
        .withColumnRenamed('lower(Primary Type)','Primary Type')\
        .withColumnRenamed('lower(Description)', 'Description')
data.cache()
print('Dataframe Structure')
print('----------------------------------')
print(data.printSchema())
print(' ')
print('Dataframe preview')
print(data.show(5))
print(' ')
print('----------------------------------')
print('Total number of rows', df.count())

Dataframe Structure
----------------------------------
root
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)

None
 
Dataframe preview
+-------------------+--------------------+
|       Primary Type|         Description|
+-------------------+--------------------+
| deceptive practice|financial identit...|
|crim sexual assault|      non-aggravated|
|           burglary|      unlawful entry|
|              theft|           over $500|
|crim sexual assault|      non-aggravated|
+-------------------+--------------------+
only showing top 5 rows

None
 
----------------------------------
Total number of rows 7032560


In [12]:
#Defining the top categories of crime with the highest total count
def top_n_list(df,var, N):
    '''
    This function determine the top N numbers of the list 
    '''
    print("Total number of unique value of"+' '+var+''+':'+' '+str(df.select(var).distinct().count()))
    print(' ')
    print('Top'+' '+str(N)+' '+'Crime'+' '+var)
    df.groupBy(var).count().withColumnRenamed('count','totalValue')\
    .orderBy(col('totalValue').desc()).show(N)
    
    
top_n_list(data, 'Primary Type',10)
print(' ')
print(' ')
top_n_list(data,'Description',10)
top_n_list(data,'Location Description',10)
print(' ')

Total number of unique value of Primary Type: 35
 
Top 10 Crime Primary Type
+-------------------+----------+
|       Primary Type|totalValue|
+-------------------+----------+
|              theft|   1486432|
|            battery|   1285561|
|    criminal damage|    800484|
|          narcotics|    726658|
|            assault|    440824|
|      other offense|    436869|
|           burglary|    398672|
|motor vehicle theft|    324105|
| deceptive practice|    284366|
|            robbery|    264485|
+-------------------+----------+
only showing top 10 rows

 
 
Total number of unique value of Description: 381
 
Top 10 Crime Description
+--------------------+----------+
|         Description|totalValue|
+--------------------+----------+
|              simple|    829104|
|      $500 and under|    572188|
|domestic battery ...|    542802|
|          to vehicle|    388870|
|         to property|    368875|
|           over $500|    364510|
|poss: cannabis 30...|    278021|
|      forcible

AnalysisException: "cannot resolve '`Location Description`' given input columns: [Primary Type, Description];;\n'Project ['Location Description]\n+- Project [Primary Type#278, lower(Description)#275 AS Description#281]\n   +- Project [lower(Primary Type)#274 AS Primary Type#278, lower(Description)#275]\n      +- Project [lower(Primary Type#231) AS lower(Primary Type)#274, lower(Description#232) AS lower(Description)#275]\n         +- Relation[ID#226,Case Number#227,Date#228,Block#229,IUCR#230,Primary Type#231,Description#232,Location Description#233,Arrest#234,Domestic#235,Beat#236,District#237,Ward#238,Community Area#239,FBI Code#240,X Coordinate#241,Y Coordinate#242,Year#243,Updated On#244,Latitude#245,Longitude#246,Location#247,Crime_Date#248,Crime_Time#249] csv\n"

In [13]:
#Display the distinct crime count based on the type of crime
data.select('Primary Type').distinct().count()


35

We are splitting the input data into Training set and Test set randomly in order to train and test the model respectively

In [14]:
training, test = data.randomSplit([0.7,0.3], seed=60)
#trainingSet.cache()
print("Training Dataset Count:", training.count())
print("Test Dataset Count:", test.count())

Training Dataset Count: 4924319
Test Dataset Count: 2108241


The below code is to import the various text classification libraries and to create a pipeline for the features extracted

In [15]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes 

#Defining the tokenizer using regextokenizer function
regex_tokenizer = RegexTokenizer(pattern='\\W')\
                  .setInputCol("Description")\
                  .setOutputCol("tokens")

#Defining the stopwords using stopwordsremover function
extra_stopwords = ['http','amp','rt','t','c','the']
stopwords_remover = StopWordsRemover()\
                    .setInputCol('tokens')\
                    .setOutputCol('filtered_words')\
                    .setStopWords(extra_stopwords)
                    

#Defining a set of words using countVectorizer function
count_vectors = CountVectorizer(vocabSize=10000, minDF=5)\
               .setInputCol("filtered_words")\
               .setOutputCol("features")


#Defining TF-IDF to vectorise features 
hashingTf = HashingTF(numFeatures=10000)\
            .setInputCol("filtered_words")\
            .setOutputCol("raw_features")
            
#Using minDocFreq to remove sparse terms
idf = IDF(minDocFreq=5)\
        .setInputCol("raw_features")\
        .setOutputCol("features")

#Encoding the Category variable into label using StringIndexer
label_string_idx = StringIndexer()\
                  .setInputCol("Primary Type")\
                  .setOutputCol("label")

#Defining classifier structure for logistic Regression imported from the library
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

#Defining classifier structure for Naive Bayes
nb = NaiveBayes(smoothing=1)


Training the model using logistic regression by converting features into count vectors

In [16]:
#Training the model using Logistic Regression after converting the features into vectors
pipeline_cv_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, lr])
model_cv_lr = pipeline_cv_lr.fit(training)
predictions_cv_lr = model_cv_lr.transform(test)


In [36]:
#Displaying the top predictions of the model
print('Displaying the TOP Predictions')
predictions_cv_lr.select('Description','Primary Type',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

Displaying the TOP Predictions
+---------------------------+------------+------------------------------+-----+----------+
|                Description|Primary Type|                   probability|label|prediction|
+---------------------------+------------+------------------------------+-----+----------+
|from coin-op machine/device|       theft|[0.8425360622808422,0.02801...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8425360622808422,0.02801...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8425360622808422,0.02801...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8425360622808422,0.02801...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8425360622808422,0.02801...|  0.0|       0.0|
+---------------------------+------------+------------------------------+-----+----------+
only showing top 5 rows



In [37]:
#Determing the accuracy of the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
evaluator_cv_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_lr)
print('Accuracy of the model')
print('accuracy:{}%'.format(np.round(evaluator_cv_lr,2)*100))

Accuracy of the model
accuracy:89.0%


In [23]:
### Secondary model using NaiveBayes
pipeline_cv_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, nb])
model_cv_nb = pipeline_cv_nb.fit(training)
predictions_cv_nb = model_cv_nb.transform(test)

In [27]:
# Evaluting the accuracy of the model and displaying the accuracy
evaluator_cv_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_nb)
print('Accuracy of the model')
print('accuracy:{}%'.format(np.round(evaluator_cv_nb,2)*100))

Accuracy of the model
accuracy:92.0%


In [29]:
# Training the model using Logistic regression with the help of TF-IDF
pipeline_idf_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, lr])
model_idf_lr = pipeline_idf_lr.fit(training)
predictions_idf_lr = model_idf_lr.transform(test)

In [30]:
#Displaying the top predictions of the model
print('Few predictions of the model')
print(' ')
predictions_idf_lr.select('Description','Primary Type',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)


Few predictions of the model
 
+---------------------------+------------+------------------------------+-----+----------+
|                Description|Primary Type|                   probability|label|prediction|
+---------------------------+------------+------------------------------+-----+----------+
|from coin-op machine/device|       theft|[0.8424143819961089,0.02800...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8424143819961089,0.02800...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8424143819961089,0.02800...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8424143819961089,0.02800...|  0.0|       0.0|
|from coin-op machine/device|       theft|[0.8424143819961089,0.02800...|  0.0|       0.0|
+---------------------------+------------+------------------------------+-----+----------+
only showing top 5 rows



In [31]:
#Evaluating the model and displaying the accuracy of the model using test data
evaluator_idf_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_lr)
print('Accuracy of the model')
print('accuracy:{}%'.format(np.round(evaluator_idf_lr,2)*100))

Accuracy of the model
accuracy:89.0%


In [32]:
#Training the model using Naive Bayes algorithm using the TF-IDF function
pipeline_idf_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, nb])
model_idf_nb = pipeline_idf_nb.fit(training)
predictions_idf_nb = model_idf_nb.transform(test)

In [34]:
#Evaluating the model and displaying the accuracy of the model
evaluator_idf_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_nb)
print('Accuracy of the model')
print('accuracy:{}%'.format(np.round(evaluator_idf_nb,2)*100))

Accuracy of the model
accuracy:92.0%
