#Crime Category Prediction Using Machine Learning

- Spark ML's Machine Learning Models were used to predict the crime "Category" from the input feature vector consisted of ["DayOfWeek", "Month", "Date", "Time", "PdDistrict", "X", "Y"]
- Random Forest Classifier Model and the Multi-layer Perception Classifier Model were used

In [2]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext

In [3]:
sqlContext = SQLContext(sc)

In [4]:
dataPath = '/FileStore/tables/Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv'
crimeDataSchema = StructType([StructField("IncidntNum", LongType(), True),
                              StructField("Category", StringType(), True),
                              StructField("Descript", StringType(), True),
                              StructField("DayOfWeek", StringType(), True),
                              StructField("Date", StringType(), True),
                              StructField("Time", StringType(), True),
                              StructField("PdDistrict", StringType(), True),
                              StructField("Resolution", StringType(), True),
                              StructField("Address", StringType(), True),
                              StructField("X", DoubleType(), True),
                              StructField("Y", DoubleType(), True),
                              StructField("Location", StringType(), True),
                              StructField("PdId", LongType(), True)])

crimeDF = (sqlContext.read
           .format('csv')
           .option('delimiter', ',')
           .option('header', 'true')
           .load(dataPath, schema=crimeDataSchema))

In [5]:
crimeDF = crimeDF.filter("PdDistrict is not NULL")

In [6]:
cats = crimeDF.select("Category").distinct().collect()

In [7]:
Category_idx = {}

for i in range(39):
    Category_idx.update({cats[i][0]:i})

In [8]:
day_idx = {"Monday":0,
           "Tuesday":1,
           "Wednesday":2,
           "Thursday":3,
           "Friday":4,
           "Saturday":5,
           "Sunday":6
          }

district_idx = {"MISSION":0,
                "BAYVIEW":1,
                "CENTRAL":2,
                "TARAVAL":3,
                "TENDERLOIN":4,
                "INGLESIDE":5,
                "PARK":6,
                "SOUTHERN":7,
                "RICHMOND":8,
                "NORTHERN":9
               }

For the dataset,
- "Date" column is parsed and decomposed into Month and Date
- "Time" column is parsed to extract out the hour from the time
- "PdDistrict" is encoded with district_idx
- "DayOfWeek" is encoded following day_idx
- "Category" is encoded with Category_idx

In [10]:
import datetime

# def parseDate(dateStr):
#     tokens = dateStr.split("/")
    
#     month = int(tokens[0])
#     date = int(tokens[1])
#     year = int(tokens[2])
#     return datetime.date(year, month, date)
  
def parseMonth(dateStr):
    tokens = dateStr.split("/")    
    return int(tokens[0])

def parseDate(dateStr):
    tokens = dateStr.split("/")    
    return int(tokens[1])

def parseTime(timeStr):
    tokens = timeStr.split(":")
    hour = int(tokens[0])
    minute = int(tokens[1])
    return hour

def parseDay(dayStr):
    return day_idx[dayStr]

def parseDist(distStr):
    return district_idx[distStr]

def parseCategory(catStr):
    return Category_idx[catStr]

from pyspark.sql.functions import udf

crimeDF = (crimeDF.withColumn("Date_tmp", udf(parseDate, IntegerType())(crimeDF.Date))
           .withColumn("Month_tmp", udf(parseMonth, IntegerType())(crimeDF.Date))
           .withColumn("Time_tmp", udf(parseTime, IntegerType())(crimeDF.Time))
           .withColumn("Day_tmp", udf(parseDay, IntegerType())(crimeDF.DayOfWeek))
           .withColumn("District_tmp", udf(parseDist, IntegerType())(crimeDF.PdDistrict))
           .withColumn("Cats_tmp", udf(parseCategory, IntegerType())(crimeDF.Category))
           .drop("Time")
           .drop("Date")
           .drop("DayOfWeek")
           .drop("PdDistrict")
           .drop("Category")
           .withColumnRenamed("Date_tmp", "Date")
           .withColumnRenamed("Month_tmp", "Month")
           .withColumnRenamed("Time_tmp", "Time")
           .withColumnRenamed("Day_tmp", "DayOfWeek")
           .withColumnRenamed("District_tmp", "PdDistrict")
           .withColumnRenamed("Cats_tmp", "Category")
          ).cache()

In [11]:
crimeDF.show()

#### Input Feature

In [13]:
from pyspark.ml.feature import VectorAssembler

vectorizer = VectorAssembler()
vectorizer.setInputCols(["DayOfWeek", "Month", "Date", "Time", "PdDistrict", "X", "Y"])
vectorizer.setOutputCol("features")


#### Training, Test Dataset Splitting

In [15]:
(split20DF, split80DF) = crimeDF.randomSplit([20.0, 80.0])

testSetDF = split20DF.cache()
trainingSetDF = split80DF.cache()

####Classification Evaluator

In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

classEval = MulticlassClassificationEvaluator(predictionCol="Prediction_Category", labelCol="Category", metricName="accuracy")

##Random Forest Classifier
- 3-fold crossvalidation 
- Grid-search on "maxBins", "maxDepth", "numTrees" paramters

In [19]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

rf = RandomForestClassifier()
rf.setParams(featuresCol="features", labelCol="Category", predictionCol="Prediction_Category")

rfPipeline = Pipeline()
rfPipeline.setStages([vectorizer, rf])

In [20]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

crossval = CrossValidator(estimator=rfPipeline, evaluator=classEval, numFolds=3)

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxBins, [50,100])
            .addGrid(rf.maxDepth, [5, 8])
            .addGrid(rf.numTrees, [20, 25])
            .build())

crossval.setEstimatorParamMaps(paramGrid)
rfModel = crossval.fit(trainingSetDF).bestModel

In [21]:
resultsDF = rfModel.transform(testSetDF)

In [22]:
evalRF = classEval.evaluate(resultsDF)

In [23]:
print("RF Accuracy: {0:.2f}".format(evalRF))

##Multilayer Perceptron Classifier
- consists of 4 layers of size [7, 50, 39]
  - input feature consists of 7 elements
  - output is 1 of 39 different categories

In [25]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml import Pipeline

layers = [7, 50, 39]

mlp = MultilayerPerceptronClassifier().setLayers(layers).setLabelCol("Category").setFeaturesCol("features").setPredictionCol("Prediction_Category")

mlpPipeline = Pipeline()
mlpPipeline.setStages([vectorizer, mlp])

mlpModel = mlpPipeline.fit(trainingSetDF)

In [26]:
resultsMLP = mlpModel.transform(testSetDF)

In [27]:
evalMLP = classEval.evaluate(resultsMLP)

In [28]:
print("MLP Accuracy: {0:.2f}".format(evalMLP))