# E6893 Final Project


## predict the crime

scripts version, used for test.

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import csv
from datetime import *
from dateutil.parser import parse

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

import numpy as np
import math

sqlContext = SQLContext(sc)

In [4]:
def ParseDate(t):
    try:
        dt = parse(t, fuzzy=True).date()
    except ValueError as e:
        dt = date.today()
    return dt
MINYEAR = 2005
MAXYEAR = 2017

# read the file from loacal
#crimeFile = sc.textFile("file:///home/ubuntu/crime/raw_data/row06-15.csv")
crimeFile = sc.textFile("file:///home/ubuntu/crime/raw_data/row16.csv")

crimeHeader = crimeFile.filter(lambda l: "CMPLNT_NUM" in l)
crimeNoHeader = crimeFile.subtract(crimeHeader)
crimes = crimeNoHeader.mapPartitions(lambda x: csv.reader(x, delimiter=",")).filter(lambda row: len(row) > 15 and row[1] is not None).map(lambda row: (int('0'+row[14]), ParseDate(row[1])))  
crimes = crimes.filter(lambda row: row[1].year>MINYEAR and row[1].year<MAXYEAR and row[0]>0 )


In [5]:
# List of all the PCTs
PCTs = crimes.map(lambda row: (row[0])).distinct().zipWithIndex()

# biuld the index of PCTs
PCTsDict = dict(PCTs.collect())
PCTs.first()

(66, 0)

In [8]:
# Compute the number of weekday of the month crime events for each pcts between 2001 and 2014
# ( (precient, data), number 0f crime)
crimeCounts = crimes.map(lambda row: ((PCTsDict[row[0]], row[1]), 1)).reduceByKey(lambda x,y: x + y)
crimeCounts.top(2)

[((76, datetime.date(2016, 9, 30)), 10), ((76, datetime.date(2016, 9, 29)), 4)]

In [9]:
# Obtain all month-weekday combinations in the dataset
Dates = crimeCounts.map(lambda row: (row[0][1])).distinct()
Dates.top(2)

[datetime.date(2016, 9, 30), datetime.date(2016, 9, 29)]

In [12]:
# Generate all possible PCT-year-week combinations from 2001 to 2014
allPCTDates = PCTs.values().cartesian(Dates)#.map(lambda row: (row[0], row[1]))
allPCTDates.top(2)

[(76, datetime.date(2016, 9, 30)), (76, datetime.date(2016, 9, 29))]

In [13]:
# Determine missing PCT-date combinations in the dataset and insert them with
# count 0 to the crime counts
# RDD ((PCT, date), countCrime)
missingPCTDates = allPCTDates.subtract(crimeCounts.keys()).distinct()
allCrimeCounts = crimeCounts.union(missingPCTDates.map(lambda row: (row, 0)))
allCrimeCounts.top(2)

[((76, datetime.date(2016, 9, 30)), 10), ((76, datetime.date(2016, 9, 29)), 4)]

In [15]:
# Process the temperature
# Load the historical temperature for the city and filter it for the years 2005 to 2015

temperature = sc.textFile("file:///home/ubuntu/crime/raw_data/NYNEWYOR.txt").map(lambda line: [float(i) for i in line.split()]).filter(lambda row: row[2]>2015 ).map(lambda row: (date(int(row[2]), int(row[0]), int(row[1])), row[3]))
temperature.top(2)

[(datetime.date(2016, 12, 4), 44.7), (datetime.date(2016, 12, 3), 46.4)]

In [25]:
# Join the crime counts and average weekday temperature datasets, using month-weekday as key,
# unnest each row to a flat list, drop the year variable and convert to a LabeledPoint object
# joinedData RDD:(countCrime,(weekday, PCT, avg_temperature))
joinedData = allCrimeCounts.map(lambda row: ((row[0][1]), (row[0][0], row[1]))).join(temperature).map(lambda row: ((row[0].weekday(), row[1][0][0], row[1][1]), row[1][0][1])).reduceByKey(lambda x,y: x + y).map(lambda row: LabeledPoint(row[1], [row[0][0], row[0][1], row[0][2]]))
joinedData.top(2)

[LabeledPoint(8.0, [6.0,62.0,55.1]), LabeledPoint(24.0, [1.0,17.0,80.2])]

In [26]:
# Split the crime counts into training and test datasets
(training, test) = joinedData.randomSplit((0.94, 0.1))
test.top(2)

[LabeledPoint(22.0, [4.0,25.0,55.9]), LabeledPoint(17.0, [6.0,46.0,40.0])]

Train the Random Forest model:

In [None]:
# Train a Random Forest model to predict crimes
model = RandomForest.trainRegressor(training, categoricalFeaturesInfo = { 0: 7, 1: len(PCTsDict)},
                                    numTrees = 5, featureSubsetStrategy = "auto",
                                    impurity='variance', maxDepth=10, maxBins = len(PCTsDict))

In [None]:
# Save and load model
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
import shutil, sys
output_dir = 'file:///home/ubuntu/crime/myRandomForestClassificationModel'
shutil.rmtree(output_dir, ignore_errors=True)
model.save(sc, output_dir)

Try to train Naive Bayes model by the out come is not good enough

In [None]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.util import MLUtils

# Train a naive Bayes model.
model = NaiveBayes.train(training, 1.0)

In [31]:
# Save and load model
import shutil, sys
output_dir = 'file:///home/ubuntu/crime/myNaiveBayesModel'
shutil.rmtree(output_dir, ignore_errors=True)
model.save(sc, output_dir)

In [37]:
#sameModel = NaiveBayesModel.load(sc, 'file:///home/ubuntu/crime/myNaiveBayesModel')
model = RandomForestModel.load(sc, 'file:///home/ubuntu/crime/myRandomForestClassificationModel')

predictions = model.predict(test.map(lambda x: x.features))
meanCrimes = test.map(lambda x: x.label).mean()
labelsAndPredictions = test.map(lambda x:  x.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p)*(v - p)).sum()/float(test.count())
testSSE = labelsAndPredictions.map(lambda (v, p): (v - p)*(v - p)).sum()
testSST = labelsAndPredictions.map(lambda (v, p): (v - meanCrimes)*(v - meanCrimes)).sum()

Rsq = 1 - testSSE / testSST

print "r-square: " + str(Rsq)
print "mean square error: " + str(testMSE)

r-square: 0.5632372276
mean square error: 37.9313995195


In [42]:
#### Predicting crimes for a day####
PCTsDictInverse = dict((v, k) for k, v in PCTsDict.items())

data = []
for weekday in range(7):
    for tempForecast in range(10,100,5):
        # Test dataset for each precient with next week's info
        predictday = sc.parallelize(tuple([(weekday, PCT ,tempForecast) for PCT in range(len(PCTsDict))]))
        predictionsday = model.predict(predictday).zip(predictday.map(lambda row: PCTsDictInverse[row[1]])).sortByKey(False)
        # Obtain the top precients with highest likelihood of crime
        topCrimePCTs = predictionsday.take(10)
        row = [weekday, tempForecast]
        row.extend(topCrimePCTs)
        data.append(row)
for i in data:
    print i

[0, 10, (34.9285265049416, 75), (25.265, 43), (23.29022869022869, 40), (22.562745098039215, 47), (22.201587301587303, 67), (21.185555555555556, 14), (20.8034702307193, 46), (20.773333333333333, 42), (19.89507936507936, 44), (19.50117511520737, 18)]
[0, 15, (34.9285265049416, 75), (25.265, 43), (23.29022869022869, 40), (22.562745098039215, 47), (22.201587301587303, 67), (21.185555555555556, 14), (20.8034702307193, 46), (20.773333333333333, 42), (19.89507936507936, 44), (19.50117511520737, 18)]
[0, 20, (34.9285265049416, 75), (28.40897869022869, 40), (26.28905303030303, 43), (25.05375, 44), (24.807083333333335, 14), (24.748161764705884, 47), (23.94986111111111, 67), (22.412674776173844, 46), (22.156645021645023, 42), (21.88448680351906, 18)]
[0, 25, (34.9285265049416, 75), (28.40897869022869, 40), (26.28905303030303, 43), (25.05375, 44), (24.807083333333335, 14), (24.748161764705884, 47), (23.94986111111111, 67), (22.412674776173844, 46), (22.156645021645023, 42), (21.88448680351906, 18)