In [65]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)

data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')

In [66]:
data.columns

['Dates',
 'Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'X',
 'Y']

In [67]:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']

data = data.select([column for column in data.columns if column not in drop_list])

In [68]:
data.show(5)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 5 rows



In [70]:
data.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)



In [71]:
from pyspark.sql.functions import col

# by top 20 categories
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|            Category|count|
+--------------------+-----+
|       LARCENY/THEFT| 1933|
|        NON-CRIMINAL|  929|
|      OTHER OFFENSES|  907|
|             ASSAULT|  634|
|       VEHICLE THEFT|  460|
|           VANDALISM|  346|
|            BURGLARY|  292|
|            WARRANTS|  284|
|      SUSPICIOUS OCC|  242|
|      MISSING PERSON|  223|
|             ROBBERY|  201|
|       DRUG/NARCOTIC|  185|
|               FRAUD|  139|
|     SECONDARY CODES|   96|
|         WEAPON LAWS|   90|
|            TRESPASS|   59|
|SEX OFFENSES FORC...|   44|
|     STOLEN PROPERTY|   37|
|         DRUNKENNESS|   29|
|FORGERY/COUNTERFE...|   28|
+--------------------+-----+
only showing top 20 rows



In [72]:
# by top 20 descriptions
data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|  836|
|   STOLEN AUTOMOBILE|  293|
|AIDED CASE, MENTA...|  275|
|DRIVERS LICENSE, ...|  226|
|             BATTERY|  207|
|PETTY THEFT FROM ...|  191|
|PETTY THEFT OF PR...|  190|
|       LOST PROPERTY|  170|
|MALICIOUS MISCHIE...|  164|
|      WARRANT ARREST|  162|
|      FOUND PROPERTY|  143|
|MALICIOUS MISCHIE...|  122|
|SUSPICIOUS OCCURR...|  121|
|GRAND THEFT FROM ...|  111|
|INVESTIGATIVE DET...|  105|
|PETTY THEFT FROM ...|   97|
|        FOUND PERSON|   96|
|        STOLEN TRUCK|   89|
|PETTY THEFT SHOPL...|   83|
|ENROUTE TO OUTSID...|   83|
+--------------------+-----+
only showing top 20 rows



In [73]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] # standard stop words

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [74]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")

In [75]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

In [76]:
dataset.show(5)

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(289,[18,35],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(289,[18,25,47],[...|  2.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(289,[18,25,47],[...|  2.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(289,[0,1,2,3,5],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(289,[0,1,2,3,5],...|  0.0|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [77]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 5185
Test Dataset Count: 2104


### Logistic Regression using Count Vector Features

In [78]:
# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [79]:
predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842314993062758,0.0165

In [80]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9610787444388802

### Logistic Regression using TF-IDF Features

In [81]:
from pyspark.ml.feature import HashingTF, IDF

# Add HashingTF and IDF to transformation
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# Redo Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [82]:
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [83]:
predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.01659...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.8842990879226498,0.0165

In [84]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9616202660247297

### Cross Validation

In [85]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9851796929217101

### Naive Bayes

In [86]:
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1)

# train the model
model = nb.fit(trainingData)

In [87]:
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.05533...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9999999985178225,1.0553

In [88]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9625414629888848

### Random Forest

In [89]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [90]:
predictions = rfModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.04025...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.7544297305704795,0.0402

In [91]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6600326922344301

### Cross Validation

In [93]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 200]) # number of trees
             .addGrid(rf.maxDepth, [3, 4, 5]) # maximum depth
#            .addGrid(rf.maxBins, [24, 32, 40]) #Number of bins
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [94]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7312371927336645