# Minnesota crime data analysis and modeling

In [2]:
from csv import reader
from pyspark.sql import Row

from pyspark.sql.types import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import os
os.environ['PYSAPRK_PYTHON']='pyton3'

In [3]:
import urllib.request
#urllib.request.urlretrieve('https://opendata.arcgis.com/datasets/08ff2c3bec594dd2a7a8566b2a81d452_0.csv','/tmp/mn_crime_data.csv') #download data to local
#dbutils.fs.mv('file:/tmp/mn_crime_data.csv','dbfs:/laioffer/spark_hw1/data/mn_15.csv') #move local file to dbfs

display(dbutils.fs.ls('file:/tmp/'))

path,name,size
file:/tmp/.ICE-unix/,.ICE-unix/,4096
file:/tmp/hsperfdata_root/,hsperfdata_root/,4096
file:/tmp/.X11-unix/,.X11-unix/,4096
file:/tmp/chauffeur-daemon.pid,chauffeur-daemon.pid,5
file:/tmp/driver-daemon.pid,driver-daemon.pid,5
file:/tmp/chauffeur-daemon-params,chauffeur-daemon-params,22
file:/tmp/custom-spark.conf,custom-spark.conf,165
file:/tmp/driver-env.sh,driver-env.sh,2634
file:/tmp/chauffeur-env.sh,chauffeur-env.sh,156
file:/tmp/Rserv/,Rserv/,4096


In [4]:
from pyspark.sql import SparkSession
data_path='dbfs:/laioffer/spark_hw1/data/mn_06_20.csv'
#old files (2015-2017)
data_path_old='dbfs:/laioffer/spark_hw1/data/mn_1*.csv'
spark=SparkSession \
    .builder\
    .appName('crime analysis') \
    .config('spark.some.config.option','some-value') \
    .getOrCreate()

df_old=spark.read.format('csv').option('header','true').option("mergeSchema","true").load(data_path_old)


In [5]:
#explore the dataframe for 2015-2017
df_old.createOrReplaceTempView('df_view')
display(spark.sql('select max(ReportedDate),min(ReportedDate),count(*) from df_view'))

max(ReportedDate),min(ReportedDate),count(1)
2017/12/31 11:28:00,2015/01/01 00:04:59,62238


In [6]:

from pyspark.sql.functions import to_date,to_timestamp,hour,date_format
from pyspark.sql.functions import udf,col,upper
from datetime import datetime
#new data from June 2018 to June 2020
df_new=spark.read.format('csv').option('header','true').load(data_path)
df_new=df_new.withColumn('IncDate',to_date(df_new.beginDate,'yyyy/MM/dd'))
time_func=udf(lambda x: ''.join([str(int(x)//100).zfill(2),':',str(int(x)%100).zfill(2)]),StringType())
ucr_map={'01':'MURDER','03':'RAPE','04':'ROBBERY','05':'ASSAULT','06':'BURGLARY','07':'LARCENY','08':'AUTO THEFT','10':'ARSON'}
  
ucr_func=udf(lambda x:ucr_map.get(x.strip(),''))
#assumption: beginTime and reportedTime is local time, reportedDateTime is UTC time
hour_func=udf(lambda x: int(x)//100,IntegerType())
df_new=df_new.withColumn('IncHour',hour_func(col('beginTime')))   #udf return default type is string?
df_new=df_new.withColumn('IncTime',time_func(col('beginTime')))
df_new=df_new.withColumn('DayOfWeek',date_format(df_new.IncDate,'E'))
df_new=df_new.withColumn('Category',ucr_func(col('UCRCode')))
df_new=df_new.withColumn('neighborhood',upper(df_new.neighborhood))

df_new.createOrReplaceTempView('mn_crime')
spark.sql('describe mn_crime').show()

In [7]:

df_old=df_old.withColumn('IncDate',to_date(df_old.BeginDate,'yyyy/MM/dd'))

#assumption: beginTime and reportedTime is local time, reportedDateTime is UTC time
hour_func=udf(lambda x: int(x[:2]),IntegerType())

df_old=df_old.withColumn('IncHour',hour_func(col('Time')))   
df_old=df_old.withColumnRenamed('Time','IncTime')
df_old=df_old.withColumn('DayOfWeek',date_format(df_old.IncDate,'E'))
df_old=df_old.withColumn('Category',ucr_func(col('UCRCode')))
df_old=df_old.withColumn('Neighborhood',upper(df_old.Neighborhood))
display(df_old)


FID,PublicAddress,ControlNbr,CCN,Precinct,ReportedDate,BeginDate,IncTime,Offense,Description,UCRCode,EnteredDate,GBSID,Lat,Long,X,Y,Neighborhood,LastChanged,LastUpdateDate,IncDate,IncHour,DayOfWeek,Category
1,0021XX Broadway AV W,3363209,MP 2014 457317,4,2015/01/01 00:04:59,2014/12/31 20:10:00,20:10:00,AUTOTH,Motor Vehicle Theft,8,2015/01/01 00:05:21,15215.0,45.0030963,-93.30702211,-10386890.1882,5622008.9491,JORDAN,2015/01/01 01:33:55,2017/03/03 13:40:06,2014-12-31,20,Wed,AUTO THEFT
2,Girard AV N / Plymouth AV N,3363210,MP 2014 457574,4,2015/01/01 00:16:00,2014/12/31 23:48:59,23:49:00,AUTOTH,Motor Vehicle Theft,8,2015/01/01 00:13:17,,44.99196,-93.29717,-10385793.49,5620255.548,NEAR - NORTH,2015/02/20 06:12:55,2017/03/03 13:40:06,2014-12-31,23,Wed,AUTO THEFT
3,2 AV N / 4 ST N,3363218,MP 2015 999999,1,2015/01/01 01:13:52,2014/12/31 19:30:00,19:30:00,THEFT,Other Theft,7,2015/01/01 01:13:52,,44.98205,-93.27369,-10383179.94,5618696.632,DOWNTOWN WEST,2015/01/02 07:04:14,2017/03/03 13:40:06,2014-12-31,19,Wed,LARCENY
4,0032XX Lyndale AV N,3363220,MP 2014 457244,4,2015/01/01 01:24:00,2014/12/31 17:00:00,17:00:00,BURGD,Burglary Of Dwelling,6,2015/01/01 01:24:32,11106.0,45.01399955,-93.28802025,-10384774.9117,5623725.6974,MCKINLEY,2015/01/05 07:04:46,2017/03/03 13:40:06,2014-12-31,17,Wed,BURGLARY
5,0055XX 11 AV S,3363223,MP 2015 000138,3,2015/01/01 01:40:00,2014/12/31 15:59:59,16:00:00,BURGD,Burglary Of Dwelling,6,2015/01/01 01:29:11,14453.0,44.90251102,-93.25890427,-10381533.7358,5606186.8759,DIAMOND LAKE,2015/03/07 14:29:54,2017/03/03 13:40:06,2014-12-31,16,Wed,BURGLARY
6,0025XX Cedar AV S,3363228,MP 2015 000151,3,2015/01/01 01:47:00,2015/01/01 01:05:00,01:05:00,ASLT2,Asslt W/dngrs Weapon,5,2015/01/01 01:46:29,16946.0,44.95779751,-93.24733141,-10380245.4502,5614880.0125,EAST PHILLIPS,2015/01/01 04:43:16,2017/03/03 13:40:06,2015-01-01,1,Thu,ASSAULT
7,1 AV N / 3 ST N,3363235,MP 2015 000215,1,2015/01/01 02:50:00,2015/01/01 01:40:00,01:40:00,ROBPER,Robbery Of Person,4,2015/01/01 02:49:04,,44.98207,-93.27139,-10382923.28,5618698.959,DOWNTOWN WEST,2015/01/01 16:44:35,2017/03/03 13:40:06,2015-01-01,1,Thu,ROBBERY
8,0013XX Lagoon AV,3363242,MP 2015 000295,5,2015/01/01 03:30:00,2015/01/01 01:30:00,01:30:00,THEFT,Other Theft,7,2015/01/01 03:19:32,19775.0,44.94925537,-93.29636314,-10385703.6378,5613536.3167,LOWRY HILL EAST,2015/01/05 01:32:05,2017/03/03 13:40:06,2015-01-01,1,Thu,LARCENY
9,0001XX 4 ST N,3363243,MP 2015 000325,1,2015/01/01 03:35:00,2015/01/01 02:58:00,02:58:00,ASLT4,Aslt-police/emerg P,5,2015/01/01 03:31:45,17122.0,44.98164758,-93.27311624,-10383115.8047,5618632.7352,DOWNTOWN WEST,2015/01/02 01:05:16,2017/03/03 13:40:06,2015-01-01,2,Thu,ASSAULT
10,0001XX 4 ST S,3363251,MP 2015 000381,1,2015/01/01 04:37:00,2015/01/01 01:44:59,01:45:00,AUTOTH,Motor Vehicle Theft,8,2015/01/01 04:36:30,17605.0,44.97893095,-93.26775742,-10382519.2634,5618205.2048,DOWNTOWN WEST,2015/01/05 07:51:37,2017/03/03 13:40:06,2015-01-01,1,Thu,AUTO THEFT


In [8]:
from pyspark.sql.functions import year
df_opt1=df_new.select('X','Y','publicaddress','caseNumber','precinct','reportedDate','beginDate','offense','description','UCRCode','neighborhood','IncDate','IncHour','IncTime','DayOfWeek','Category')
df=df_old.select('Long' ,'Lat' ,'PublicAddress','CCN' ,'Precinct' ,'ReportedDate' ,'BeginDate' ,'Offense'\
            ,'Description','UCRCode','Neighborhood','IncDate','IncHour','IncTime','DayOfWeek','Category')
df_opt1=df_opt1.unionAll(df.filter(year(df.IncDate)>=2015))
df_opt1.count()

In [9]:

display(df_new.select('neighborhood').distinct())

neighborhood
LORING PARK
FULTON
DIAMOND LAKE
EAST PHILLIPS
DOWNTOWN WEST
BANCROFT
EAST HARRIET
JORDAN
WINDOM PARK
LYNDALE


In [10]:
#I am interested in which day of the week is more dangerous  -- it's Friday and Sat
count_by_day=df_opt1.groupBy('DayOfWeek').count().orderBy('count',ascending=False)
display(count_by_day)

DayOfWeek,count
Fri,16094
Sat,15943
Mon,15038
Sun,14950
Thu,14728
Wed,14614
Tue,14551


In [11]:

count_by_category=df_opt1.groupBy('category').count().orderBy('count',ascending=False)
display(count_by_category)

category,count
LARCENY,58590
BURGLARY,17306
AUTO THEFT,11327
ASSAULT,8822
ROBBERY,7633
RAPE,1630
ARSON,449
MURDER,161


In [12]:
count_by_district=df_opt1.groupBy('neighborhood').count().orderBy('count',ascending=False)
display(count_by_district)

neighborhood,count
DOWNTOWN WEST,11485
WHITTIER,4509
LONGFELLOW,3312
MARCY HOLMES,3256
LOWRY HILL EAST,3027
JORDAN,2842
NEAR - NORTH,2698
HAWTHORNE,2473
LORING PARK,2435
POWDERHORN PARK,2303


In [13]:
import math

MN_downtown=( -93.258133,44.986656)
#if distance is within 10, then it's downtown
def is_downtown(loc_x,loc_y):
  d_x,d_y=MN_downtown
  distance=math.sqrt((loc_x-d_x)**2+(loc_y-d_y)**2)
  if distance<10:
    return 'Y'
  return 'N'

is_downtown_func=udf(lambda x,y: is_downtown(float(x),float(y)))
df_opt1=df_opt1.withColumn('IsDowntown',is_downtown_func(col('X'),col('Y')))
df_opt1.createOrReplaceTempView('mn_crime')
crimeOnSunday=df_opt1.filter(df_opt1.DayOfWeek=='Sun').filter(df_opt1.IsDowntown=='Y')\
.groupBy(df_opt1.IncDate)\
.count()\
.orderBy('count',ascending=False)

#there is missing data between Jan 2018 and Jun 2018
display(crimeOnSunday)

IncDate,count
2019-11-17,94
2016-10-16,92
2015-08-16,91
2019-07-07,89
2017-06-25,87
2016-09-11,86
2017-01-01,84
2016-09-18,84
2017-08-20,84
2019-08-25,82


In [14]:
from pyspark.sql.functions import year,month,dayofmonth
crimePerMonth=df_opt1.filter(year(df_opt1.IncDate)<2018).groupBy(year(df_opt1.IncDate).alias('Year'),month(df_opt1.IncDate).alias('Month'))\
.count()\
.orderBy('Year','Month')
#crimePerMonth=spark.sql("select year(IncDate) as IncYear,month(IncDate) as IncMonth,count(*) as Count from mn_crime group by IncYear,IncMonth order by IncMonth")
display(crimePerMonth)

Year,Month,count
2015,1,1552
2015,2,1080
2015,3,1508
2015,4,1521
2015,5,1660
2015,6,1803
2015,7,2008
2015,8,1991
2015,9,1868
2015,10,1717


In [15]:
crimePerMonth_2018=df_opt1.filter(year(df_opt1.IncDate)>=2018).groupBy(year(df_opt1.IncDate).alias('Year'),month(df_opt1.IncDate).alias('Month'))\
.count()\
.orderBy('Year','Month')
display(crimePerMonth_2018)

Year,Month,count
2018,6,706
2018,7,1873
2018,8,1832
2018,9,1846
2018,10,1887
2018,11,1646
2018,12,1510
2019,1,1505
2019,2,1291
2019,3,1436


##### Insight from Q4
Since I couldn't find 2018 data from Jan to Jun, I did two charts to show the trend.<br>
One is for historical data, the other one is from June 2018 to June 2020.<br>
From historical data, we can see the trend of each year is the same. Every year, crime peak is between June and Aug. Summer is the most dangerous season for Minneapolis.<br>
Oddly, this year, there is a small peak in March. I would think this small peak is related to COVID-19 and a lot of people lost their jobs in March.<br>
All in all,business should take actions to protect their property such as upgrading/adding anti-theft system.

In [17]:

crimeOnCertainDay=df_opt1.filter(year(df_opt1.IncDate).isin(2018,2019,2020)).filter(month(df_opt1.IncDate)==12).filter(dayofmonth(df_opt1.IncDate)==15)\
.groupBy(year(df_opt1.IncDate).alias('Year'),df_opt1.IncHour)\
.count()\
.orderBy('count',ascending=False)


#crimeOnCertainDay=spark.sql("select year(IncDate) as IncYear,IncHour,count(*) as Count from mn_crime where month(IncDate)=12 and day(IncDate)=15 group by IncYear,IncHour order by IncHour")
display(crimeOnCertainDay)

Year,IncHour,count
2018,0,7
2018,22,6
2019,22,5
2018,23,5
2019,17,5
2019,19,5
2019,18,4
2019,12,4
2018,20,4
2018,21,4


##### Insight from Q5
Incidents number reaches to the peak around midnight on Dec 15th. Tourists are recommended to stay inside during midnight.

In [19]:
districtTopThree=df_opt1.groupBy('neighborhood').count().orderBy('count',ascending=False).limit(3)
#districtTopThree=spark.sql("select neighborhood,count(*) as count from mn_crime group by neighborhood order by count desc limit 3")
crimeCategoryHour=df_opt1.join(districtTopThree,df_opt1.neighborhood==districtTopThree.neighborhood,'inner')\
.groupBy(df_opt1.neighborhood,df_opt1.Category,df_opt1.IncHour)\
.count()\
.orderBy('count',ascending=False)



In [20]:
display(crimeCategoryHour)

neighborhood,Category,IncHour,count
DOWNTOWN WEST,LARCENY,0,832
DOWNTOWN WEST,LARCENY,1,681
DOWNTOWN WEST,LARCENY,23,516
DOWNTOWN WEST,LARCENY,12,505
DOWNTOWN WEST,LARCENY,2,480
DOWNTOWN WEST,LARCENY,17,466
DOWNTOWN WEST,LARCENY,18,452
DOWNTOWN WEST,LARCENY,16,448
DOWNTOWN WEST,LARCENY,15,440
DOWNTOWN WEST,LARCENY,14,407


In [21]:
display(crimeCategoryHour)

neighborhood,Category,IncHour,count
DOWNTOWN WEST,LARCENY,0,832
DOWNTOWN WEST,LARCENY,1,681
DOWNTOWN WEST,LARCENY,23,516
DOWNTOWN WEST,LARCENY,12,505
DOWNTOWN WEST,LARCENY,2,480
DOWNTOWN WEST,LARCENY,17,466
DOWNTOWN WEST,LARCENY,18,452
DOWNTOWN WEST,LARCENY,16,448
DOWNTOWN WEST,LARCENY,15,440
DOWNTOWN WEST,LARCENY,14,407


##### Insight from Q6
6.1 larceny, assault and robbery mostly happen in downtown west but burglary mostly happens in Whittier <br>
6.2 larceny, assault and robbery happen most likely in Downtown West during midnight. Police need to be distributed to Downtown West every night during midnight (11pm to 1am); auto theft is more likely to happen during the day

In [23]:
from pyspark.sql.functions import dayofweek,sum
from pyspark.sql.window import Window

count_by_cat_day=df_opt1.groupBy('Category','DayOfWeek',dayofweek(df_opt1.IncDate).alias('DayOfWeek_num')).count().orderBy('DayOfWeek_num',ascending=False)
window=Window.partitionBy(count_by_cat_day.Category)
count_by_cat_day=count_by_cat_day.withColumn('sum_per_cat',sum('count').over(window))
count_by_cat_day=count_by_cat_day.withColumn('CategoryPercent',col('count')*100/col('sum_per_cat'))
display(count_by_cat_day.orderBy('CategoryPercent',ascending=False))

Category,DayOfWeek,DayOfWeek_num,count,sum_per_cat,CategoryPercent
RAPE,Sat,7,329,1630,20.1840490797546
MURDER,Sun,1,30,161,18.633540372670808
MURDER,Sat,7,30,161,18.633540372670808
RAPE,Sun,1,289,1630,17.73006134969325
ASSAULT,Sat,7,1517,8822,17.195647245522558
ASSAULT,Sun,1,1514,8822,17.161641351167535
RAPE,Fri,6,273,1630,16.74846625766871
ROBBERY,Sat,7,1262,7633,16.53347307742696
ARSON,Sat,7,74,449,16.481069042316257
ROBBERY,Sun,1,1241,7633,16.258351893095767


##### Insight from Q7
First of all, there is no resolution data in MN safety data so I analyzed how each category is distributed to every day.<br>
All categories are evenly distributed to everyday. But highly dangerous crimes like rape and murder tend to happen on weekends so people especially women should be more cautious on weekends. <br>

In [25]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
df_spatial=df_opt1.groupBy('X','Y').count().select('X','Y',col('count').alias('IncNumber'))
display(df_spatial)

X,Y,IncNumber
-93.2797026468921,44.9438300026939,14
-93.2436263050818,44.9616176925086,7
-93.2779149994767,44.9295793457673,8
-93.2714111968197,44.9778621002801,23
-93.2625358418126,44.9242304623602,22
-93.2626436396467,44.9506329511168,21
-93.2740252942965,44.964579652819,14
-93.2776005891272,44.9881618229986,36
-93.2447941149512,44.9964205966971,2
-93.2740073279908,44.9617893124083,24


In [26]:
import matplotlib.pyplot as plt
FEATURES_COL=['X','Y']
for col in df_spatial.columns:
  if col in FEATURES_COL:
    df_spatial=df_spatial.withColumn(col,df_spatial[col].cast('float'))
assembler=VectorAssembler(inputCols=FEATURES_COL,outputCol='features')
df_kmeans=assembler.transform(df_spatial).select('IncNumber','features')

#optimize choice of k
cost=np.zeros(10)
for k in range(2,10):
  kmeans=KMeans().setK(k).setSeed(5).setFeaturesCol('features')
  #sample without replacement
  model=kmeans.fit(df_kmeans.sample(False,0.1,seed=58))
  cost[k]=model.computeCost(df_kmeans)
fig,ax=plt.subplots(1,1,figsize=(8,6))
ax.plot(range(2,10),cost[2:10])
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.set_title('The Elbow Method Graph')
display()

In [27]:
#based on the elbow graph, the cluster number to use is 6
#apply k-means to df_kmeans
X=df_kmeans
kmeans=KMeans().setK(6).setSeed(5).setFeaturesCol('features')
model=kmeans.fit(X)
y_kmeans=model.transform(X)
centers=model.clusterCenters()
print('Cluster Centers: ')
for center in centers:
  print(center)

In [28]:
[x[0][0] for x in y_kmeans.filter(y_kmeans.prediction==0).select('features').collect()]

In [29]:
y_kmeans.createOrReplaceTempView('spatial_view')


In [30]:
%sql
select prediction as ClusterNum, sum(IncNumber) as TotalInc from spatial_view group by prediction order by TotalInc desc

ClusterNum,TotalInc
5,42779
3,19618
4,18823
2,16198
0,8336
1,164


In [31]:
y_pdf=y_kmeans.toPandas()

In [32]:
colors=['red','blue','green','cyan','magenta','purple']
for i in range(6):
  if i!=1:
    x=[x[0] for x in y_pdf.loc[y_pdf['prediction']==i,'features']]
    #[x[0][0] for x in y_kmeans.filter(y_kmeans.prediction==i).select('features').collect()]
    y=[x[1] for x in y_pdf.loc[y_pdf['prediction']==i,'features']]
    #[x[0][1] for x in y_kmeans.filter(y_kmeans.prediction==i).select('features').collect()]
    
    plt.scatter(x,y, s=1, c=colors[i], label ='Cluster '+str(i+1))
    plt.scatter(centers[i][0], centers[i][1], s=300, c='black', label = 'Centroids')
#Plot the centroid. This time we're going to use the cluster centres  #attribute that returns here the coordinates of the centroid.

plt.title('Clusters of Districts')
plt.xlabel('X')
plt.ylabel('Y')

plt.show()

I am a resident of Minnesota. Minneapolis is a relatively safe city to live but due to COVID-19 and protests happened early this year, I want to analyze its current safety status. <br>
I retrieved public safty data(Jan 2015 - June 2020) from Minnesota open data website http://opendata.minneapolismn.gov/ and loaded them into spark dataframe. <br>
Because there is some system upgrade in 2018, its metadata before and after 2018 are slightly different. I had to download 4 files and merge them together.<br>
I cleaned the data by converting date/time columns to the format I need and converted UCR code to its description for the following analysis. I also defined a function to label if a location belongs to downtown or not.<br>
After I have all data in good shape, I analyzed the data in the following aspects:
1. number of crimes by category,district and month ---to get an overall impression about "what" safety threats are, "where" these threats are and "when" the city is safe
2. number of crimes by hour to get into hour level ---to see when is the best time to visit the city
3. number of crimes by category and hour in top 3 dangerous districts ---to get insight about where and when to distribute police
4. crime category distribution on each day  ---to get an idea about which day of the week is more dangerous