In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
data = spark.read.csv('/FileStore/tables/Crimes___2001_to_present-9be1b.csv', inferSchema=True, header=True)

In [2]:
data.columns

In [3]:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']

data = data.select([column for column in data.columns if column not in drop_list])

In [4]:
data.show(5)

In [5]:
data.printSchema()

In [6]:
from pyspark.sql.functions import col

# by top 20 categories
data.groupBy("Primary Type") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

In [7]:
crime_type_groups = data.groupBy('Description').count()

In [8]:
crime_type_counts = crime_type_groups.orderBy('count', ascending=False)
counts_pddf = pd.DataFrame(crime_type_counts.rdd.map(lambda l: l.asDict()).collect())

In [9]:

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_color_codes("pastel")
plt.rcParams["figure.figsize"] = [20, 8]

sns.set(style="whitegrid")
sns.set_color_codes("dark")

#sns.despine(left=True, bottom=True)
type_graph = sns.barplot(x='count', y='Primary Type', data=counts_pddf)
type_graph.set(ylabel="Primary Type", xlabel="Crimes Record Count")
display()

In [10]:
# by top 20 descriptions
data.groupBy("Description") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

In [11]:
crime_type_groups = data.groupBy('Description').count()
crime_type_counts = crime_type_groups.orderBy('count', ascending=False)
counts_pddf = pd.DataFrame(crime_type_counts.rdd.map(lambda l: l.asDict()).collect())

In [12]:
# by top 20 descriptions
data.groupBy("Location Description") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

In [13]:

crime_type_groups = data.groupBy('Location Description').count()
crime_type_counts = crime_type_groups.orderBy('count', ascending=False)
counts_pddf = pd.DataFrame(crime_type_counts.rdd.map(lambda l: l.asDict()).collect())

In [14]:
display(counts_pddf)

Location Description,count
STREET,1839454
RESIDENCE,1196676
APARTMENT,740756
SIDEWALK,689585
OTHER,269815
PARKING LOT/GARAGE(NON.RESID.),202812
ALLEY,156957
"SCHOOL, PUBLIC, BUILDING",146198
RESIDENCE-GARAGE,135526
SMALL RETAIL STORE,127841


In [15]:
import datetime
from pyspark.sql.functions import *

In [16]:
data = data.withColumn('date_time', to_timestamp('date', 'MM/dd/yyyy hh:mm:ss a'))\
       .withColumn('month', trunc('date_time', 'YYYY')) #adding a month column to be able to view stats on a monthly basis

In [17]:

data.select(['date','date_time', 'month'])\
  .show(n=2, truncate=False)

In [18]:
import datetime


In [19]:
# Extract the "hour" field from the date into a separate column called "hour"
df_hour = data.withColumn('hour', hour(data['date_time']))

In [20]:

# Derive a data frame with crime counts per hour of the day:
hourly_count = data_hour.groupBy(['Primary Type', 'hour']).count().cache()
hourly_total_count = hourly_count.groupBy('hour').sum('count')

In [21]:

hourly_count_pddf = pd.DataFrame(hourly_total_count.select(hourly_total_count['hour'], hourly_total_count['sum(count)'].alias('count'))\
                                .rdd.map(lambda l: l.asDict())\
                                 .collect())

In [22]:
hourly_count_pddf = hourly_count_pddf.sort_values(by='hour')

In [23]:
fig, ax = plt.subplots()
ax.plot(hourly_count_pddf['hour'], hourly_count_pddf['count'], label='Hourly Count')

ax.set(xlabel='Hour of Day', ylabel='Total records',
       title='Overall hourly crime numbers')
ax.grid(b=True, which='both', axis='y')
ax.legend()
display()

In [24]:
hourly_count_pddf = hourly_count_pddf.sort_values(by='hour')
fig, ax = plt.subplots()
ax.plot(hourly_count_pddf['hour'], hourly_count_pddf['count'], label='Hourly Count')

ax.set(xlabel='Hour of Day', ylabel='Total records',
       title='Overall hourly crime numbers')
ax.grid(b=True, which='both', axis='y')
ax.legend()
display()

In [25]:
df_dates = df_hour.withColumn('week_day', dayofweek(df_hour['date_time']))\
                 .withColumn('year_month', month(df_hour['date_time']))\
                 .withColumn('month_day', dayofmonth(df_hour['date_time']))\
                 .withColumn('date_number', datediff(data['date_time'], to_date(lit('2001-01-01'), format='yyyy-MM-dd')))\
                 .cache()

In [26]:

week_day_crime_counts = df_dates.groupBy('week_day').count()

In [27]:

week_day_crime_counts_pddf = pd.DataFrame(week_day_crime_counts.orderBy('week_day').rdd.map(lambda e: e.asDict()).collect())

In [28]:
sns.barplot(data=week_day_crime_counts_pddf, x='week_day', y='count')
display()

In [29]:

year_month_crime_counts = df_dates.groupBy('year_month').count()

In [30]:
year_month_crime_counts_pddf = pd.DataFrame(year_month_crime_counts.orderBy('year_month').rdd.map(lambda e: e.asDict()).collect())


In [31]:
sns.barplot(data=year_month_crime_counts_pddf, y='count', x='year_month')
display()

In [32]:
  from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Primary Type", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] # standard stop words

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [33]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Primary Type", outputCol = "label")

In [34]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

In [35]:
dataset.show(5)

In [36]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [37]:
############Logistic Regression using Count Vector Features
# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [38]:
predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("Description","Primary Type","Location Description","label","prediction","probability") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [39]:
############Accuracy of Logistic Regression using Count Vector Features
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [40]:
#####Model trained using Naive Bayes Classifiers#####
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1)

# train the model
model = nb.fit(trainingData)

In [41]:

predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Description","Primary Type","Location Description","label","prediction","probability") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [42]:
#####Naive Bayes Accuracy Prediction###
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [43]:
#######Accuracy calculation with Random Forest#####
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [44]:
predictions = rfModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("Description","Primary Type","Location Description","label","prediction","probability") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [45]:
######### Result of Random Forest#####
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)