# SF crime data analysis and modeling
### Data Source: https://www.kaggle.com/c/sf-crime/data

In [1]:
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
#from ggplot import *
import warnings
from pyspark.sql.functions import col

import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [2]:
#Initializing PySpark
from pyspark import SparkContext, SparkConf

# #Spark Config
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)

In [4]:
# read data from the data storage
# please upload your data into databricks community at first. 
crime_data_lines = sc.textFile('sf_data.csv')
#prepare data 
df_crimes = crime_data_lines.map(lambda line: [x.strip('"') for x in next(reader([line]))])

In [5]:
#get header
header = df_crimes.first()
header

['Dates',
 'Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'X',
 'Y']

In [6]:
#remove the first line of data
crimes = df_crimes.filter(lambda x: x != header)

In [7]:
#get the first line of data
display(crimes.take(5))

[['2015-05-13 23:53:00',
  'WARRANTS',
  'WARRANT ARREST',
  'Wednesday',
  'NORTHERN',
  'ARREST, BOOKED',
  'OAK ST / LAGUNA ST',
  '-122.425891675136',
  '37.7745985956747'],
 ['2015-05-13 23:53:00',
  'OTHER OFFENSES',
  'TRAFFIC VIOLATION ARREST',
  'Wednesday',
  'NORTHERN',
  'ARREST, BOOKED',
  'OAK ST / LAGUNA ST',
  '-122.425891675136',
  '37.7745985956747'],
 ['2015-05-13 23:33:00',
  'OTHER OFFENSES',
  'TRAFFIC VIOLATION ARREST',
  'Wednesday',
  'NORTHERN',
  'ARREST, BOOKED',
  'VANNESS AV / GREENWICH ST',
  '-122.42436302145',
  '37.8004143219856'],
 ['2015-05-13 23:30:00',
  'LARCENY/THEFT',
  'GRAND THEFT FROM LOCKED AUTO',
  'Wednesday',
  'NORTHERN',
  'NONE',
  '1500 Block of LOMBARD ST',
  '-122.42699532676599',
  '37.80087263276921'],
 ['2015-05-13 23:30:00',
  'LARCENY/THEFT',
  'GRAND THEFT FROM LOCKED AUTO',
  'Wednesday',
  'PARK',
  'NONE',
  '100 Block of BRODERICK ST',
  '-122.438737622757',
  '37.771541172057795']]

In [8]:
#get the total number of data 
crimes.count()

878049

#### Solove big data issues via Spark
###### approach 1: use RDD (not recommend)
###### approach 2: use Dataframe, register the RDD to a dataframe (recommend for DE)
###### approach 3: use SQL (recomend for data analysis or DS)

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [10]:
df_opt1 = spark.read.format("csv").option("header", "true").load("sf_data.csv")
display(df_opt1)
df_opt1.createOrReplaceTempView("sf_crime")

DataFrame[Dates: string, Category: string, Descript: string, DayOfWeek: string, PdDistrict: string, Resolution: string, Address: string, X: string, Y: string]

In [11]:
df_opt1.show(10)

+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...|   -122.42436302145|  37.8004143219856|
|2015-05-13 23:30:00| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHER

In [12]:
from pyspark.sql import Row

def createRow(keys, values):
  assert len(keys) == len(values)
  mapped = dict(zip(keys, values))
  return Row(**mapped)

rdd_rows = crimes.map(lambda x: createRow(header, x))

df_opt2 = spark.createDataFrame(rdd_rows)
df_opt2.createOrReplaceTempView("sf_crime")

In [13]:
display(df_opt2)

DataFrame[Address: string, Category: string, Dates: string, DayOfWeek: string, Descript: string, PdDistrict: string, Resolution: string, X: string, Y: string]

In [14]:
df_opt2.show(10)

+--------------------+--------------+-------------------+---------+--------------------+----------+--------------+-------------------+------------------+
|             Address|      Category|              Dates|DayOfWeek|            Descript|PdDistrict|    Resolution|                  X|                 Y|
+--------------------+--------------+-------------------+---------+--------------------+----------+--------------+-------------------+------------------+
|  OAK ST / LAGUNA ST|      WARRANTS|2015-05-13 23:53:00|Wednesday|      WARRANT ARREST|  NORTHERN|ARREST, BOOKED|  -122.425891675136|  37.7745985956747|
|  OAK ST / LAGUNA ST|OTHER OFFENSES|2015-05-13 23:53:00|Wednesday|TRAFFIC VIOLATION...|  NORTHERN|ARREST, BOOKED|  -122.425891675136|  37.7745985956747|
|VANNESS AV / GREE...|OTHER OFFENSES|2015-05-13 23:33:00|Wednesday|TRAFFIC VIOLATION...|  NORTHERN|ARREST, BOOKED|   -122.42436302145|  37.8004143219856|
|1500 Block of LOM...| LARCENY/THEFT|2015-05-13 23:30:00|Wednesday|GRAND THE

In [16]:
df_opt3 = crimes.toDF(['Dates', 'Category', 'Descript', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y'])
display(df_opt3)
df_opt3.createOrReplaceTempView("sf_crime")

DataFrame[Dates: string, Category: string, Descript: string, DayOfWeek: string, PdDistrict: string, Resolution: string, Address: string, X: string, Y: string]

In [17]:
df_opt3.show(10)

+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...|   -122.42436302145|  37.8004143219856|
|2015-05-13 23:30:00| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHER

#### Q1 question (OLAP):
#####  Write a Spark program that counts the number of crimes for different category.
Below are some example codes to demonstrate the way to use Spark RDD, DF, and SQL to work with big data. You can follow this example to finish other questions.

In [25]:
catorgory_set_rdd = crimes.map(lambda item: (item[1],1))
from operator import add
result = sorted(catorgory_set_rdd.reduceByKey(add).collect(), key = lambda item: -item[1])
display(result)

[('2019/06/30', 578),
 ('2018/08/01', 556),
 ('2018/08/24', 532),
 ('2019/10/02', 527),
 ('2019/04/03', 521),
 ('2018/08/10', 517),
 ('2019/09/02', 515),
 ('2019/08/10', 514),
 ('2018/04/25', 514),
 ('2019/10/04', 513),
 ('2019/08/07', 512),
 ('2019/02/01', 512),
 ('2019/08/30', 507),
 ('2018/04/11', 504),
 ('2018/04/13', 504),
 ('2018/07/27', 504),
 ('2019/10/28', 501),
 ('2019/07/26', 501),
 ('2019/02/20', 501),
 ('2018/08/03', 499),
 ('2018/08/14', 498),
 ('2018/11/09', 497),
 ('2018/01/01', 496),
 ('2019/09/06', 494),
 ('2019/12/04', 494),
 ('2019/07/24', 491),
 ('2019/07/01', 489),
 ('2018/08/25', 489),
 ('2018/07/21', 489),
 ('2018/10/15', 488),
 ('2018/05/08', 488),
 ('2018/01/26', 488),
 ('2018/07/20', 486),
 ('2018/07/13', 486),
 ('2018/04/20', 486),
 ('2018/06/01', 485),
 ('2019/10/25', 485),
 ('2018/05/18', 485),
 ('2019/05/24', 484),
 ('2018/06/24', 484),
 ('2018/09/08', 484),
 ('2019/08/14', 484),
 ('2018/07/17', 482),
 ('2019/08/01', 482),
 ('2018/10/01', 482),
 ('2018/05

In [18]:
q1_result = df_opt1.groupBy('Category').count().orderBy('count', ascending=False)
display(q1_result)

DataFrame[Category: string, count: bigint]

In [19]:
q1_result.show(n = 30, truncate=False)

+---------------------------+------+
|Category                   |count |
+---------------------------+------+
|LARCENY/THEFT              |174900|
|OTHER OFFENSES             |126182|
|NON-CRIMINAL               |92304 |
|ASSAULT                    |76876 |
|DRUG/NARCOTIC              |53971 |
|VEHICLE THEFT              |53781 |
|VANDALISM                  |44725 |
|WARRANTS                   |42214 |
|BURGLARY                   |36755 |
|SUSPICIOUS OCC             |31414 |
|MISSING PERSON             |25989 |
|ROBBERY                    |23000 |
|FRAUD                      |16679 |
|FORGERY/COUNTERFEITING     |10609 |
|SECONDARY CODES            |9985  |
|WEAPON LAWS                |8555  |
|PROSTITUTION               |7484  |
|TRESPASS                   |7326  |
|STOLEN PROPERTY            |4540  |
|SEX OFFENSES FORCIBLE      |4388  |
|DISORDERLY CONDUCT         |4320  |
|DRUNKENNESS                |4280  |
|RECOVERED VEHICLE          |3138  |
|KIDNAPPING                 |2341  |
|

In [22]:
#Spark SQL based
crimeCategory = spark.sql("SELECT  category, COUNT(*) AS Count FROM sf_crime GROUP BY category ORDER BY count DESC")
display(crimeCategory)

DataFrame[category: string, Count: bigint]

In [24]:
crimeCategory.show(10)

+--------------+------+
|      category| Count|
+--------------+------+
| LARCENY/THEFT|174900|
|OTHER OFFENSES|126182|
|  NON-CRIMINAL| 92304|
|       ASSAULT| 76876|
| DRUG/NARCOTIC| 53971|
| VEHICLE THEFT| 53781|
|     VANDALISM| 44725|
|      WARRANTS| 42214|
|      BURGLARY| 36755|
|SUSPICIOUS OCC| 31414|
+--------------+------+
only showing top 10 rows



In [25]:
# important hints: 
## first step: spark df or sql to compute the statisitc result 
## second step: export your result to a pandas dataframe. 
crimes_pd_df = crimeCategory.toPandas()

In [27]:
# Spark does not support this function, please refer https://matplotlib.org/ for visuliation. You need to use display to show the figure in the databricks community. 
display(crimes_pd_df)

Unnamed: 0,category,Count
0,LARCENY/THEFT,174900
1,OTHER OFFENSES,126182
2,NON-CRIMINAL,92304
3,ASSAULT,76876
4,DRUG/NARCOTIC,53971
5,VEHICLE THEFT,53781
6,VANDALISM,44725
7,WARRANTS,42214
8,BURGLARY,36755
9,SUSPICIOUS OCC,31414


### We can see the top-5 frequency crimes are:
1	LARCENY/THEFT <br>
2	OTHER OFFENSES <br>
3	NON-CRIMINAL <br>
4	ASSAULT <br>
5	DRUG/NARCOTIC <br>

#### Q2 question (OLAP)
Counts the number of crimes for different district, and visualize your results

In [28]:
crimeDistrict = spark.sql("SELECT  PdDistrict, COUNT(*) AS Count FROM sf_crime GROUP BY PdDistrict ORDER BY Count DESC")
display(crimeDistrict)

DataFrame[PdDistrict: string, Count: bigint]

In [30]:
crimeDistrict.show(25)

+----------+------+
|PdDistrict| Count|
+----------+------+
|  SOUTHERN|157182|
|   MISSION|119908|
|  NORTHERN|105296|
|   BAYVIEW| 89431|
|   CENTRAL| 85460|
|TENDERLOIN| 81809|
| INGLESIDE| 78845|
|   TARAVAL| 65596|
|      PARK| 49313|
|  RICHMOND| 45209|
+----------+------+



Southern and Mission distric are most dangerous, there are more than 100K crimes committed between 2003 to 2015. <br>

Compared to other districts, Park and RichMond are public security environmental. <br>

From the map, we can see Northern districs are safer than Southern ones. <br>

#### Q3 question (OLAP)
Count the number of crimes each "Sunday" at "SF downtown". hints: SF downtown is defiend via the range of spatial location. Thus, you need to write your own UDF function to filter data which are located inside certain spatial range. You can follow the example here: https://changhsinlee.com/pyspark-udf/

In [31]:
PdDistrict = spark.sql("SELECT distinct PdDistrict FROM sf_crime")

In [33]:
PdDistrict.show()

+----------+
|PdDistrict|
+----------+
|   MISSION|
|   BAYVIEW|
|   CENTRAL|
|   TARAVAL|
|TENDERLOIN|
| INGLESIDE|
|      PARK|
|  SOUTHERN|
|  RICHMOND|
|  NORTHERN|
+----------+



In [34]:
##sql
crimecount = spark.sql("SELECT  PdDistrict, DayOfWeek, COUNT(*) AS Count FROM sf_crime WHERE PdDistrict in ('MISSION','CENTRAL','TARAVAL','TENDERLOIN','RICHMOND','PARK') and DayOfWeek='Sunday' GROUP BY PdDistrict,DayOfWeek ORDER BY Count DESC")

In [35]:
crimecount.show()

+----------+---------+-----+
|PdDistrict|DayOfWeek|Count|
+----------+---------+-----+
|   MISSION|   Sunday|15874|
|   CENTRAL|   Sunday|12197|
|TENDERLOIN|   Sunday|10178|
|   TARAVAL|   Sunday| 8331|
|      PARK|   Sunday| 6646|
|  RICHMOND|   Sunday| 6089|
+----------+---------+-----+



In [38]:
crimecount_pd=crimecount.toPandas()
display(crimecount_pd)

Unnamed: 0,PdDistrict,DayOfWeek,Count
0,MISSION,Sunday,15874
1,CENTRAL,Sunday,12197
2,TENDERLOIN,Sunday,10178
3,TARAVAL,Sunday,8331
4,PARK,Sunday,6646
5,RICHMOND,Sunday,6089


#### Q4 question (OLAP)
Analysis the number of crime in each month of 2015, 2016, 2017, 2018. Then, give your insights for the output results. What is the business impact for your result?

In [51]:
from datetime import datetime
import pyspark.sql.functions as pysparkSqlFunc
func = pysparkSqlFunc.udf(lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
df = df_opt3.withColumn('new_date', func(col('Dates')))
df.createOrReplaceTempView('sf_crime1')

In [69]:
display(df)

DataFrame[Dates: string, Category: string, Descript: string, DayOfWeek: string, PdDistrict: string, Resolution: string, Address: string, X: string, Y: string, new_date: date]

In [53]:
##sql
Monthcrime = spark.sql("SELECT YEAR(new_date) as year, Month(new_date) as month, Count(*) from sf_crime1 group by YEAR(new_date), Month(new_date) order by 1,2")

DataFrame[year: int, month: int, count(1): bigint]

In [68]:
display(Monthcrime)

DataFrame[year: int, month: int, count(1): bigint]

#### Q5 question (OLAP)¶
Analysis the number of crime w.r.t the hour in certian day like 2015/12/15, 2016/12/15, 2017/12/15, 2018/10/15. Then, give your travel suggestion to visit SF.



In [70]:
func1 =  pysparkSqlFunc.udf (lambda x: datetime.strptime(x, '%H:%M'), TimestampType())
df2 = df_opt3.withColumn('new_time', func1(col('Time')))
df2.createOrReplaceTempView('sf_crime2')
display(df2)

AnalysisException: "cannot resolve '`Time`' given input columns: [DayOfWeek, Category, Dates, X, Address, PdDistrict, Y, Resolution, Descript];;\n'Project [Dates#113, Category#114, Descript#115, DayOfWeek#116, PdDistrict#117, Resolution#118, Address#119, X#120, Y#121, <lambda>('Time) AS new_time#475]\n+- LogicalRDD [Dates#113, Category#114, Descript#115, DayOfWeek#116, PdDistrict#117, Resolution#118, Address#119, X#120, Y#121], false\n"

#### For different category of crime, find the percentage of resolution. Based on the output, give your hints to adjust the policy.

In [71]:
resolution=spark.sql('select PdDistrict, Resolution, count(*) as CT from sf_crime1 group by 1,2 order by 1,2')
resolution.createOrReplaceTempView('resolution')

display(resolution)

DataFrame[PdDistrict: string, Resolution: string, CT: bigint]

In [72]:
resolution.show()

+----------+--------------------+-----+
|PdDistrict|          Resolution|   CT|
+----------+--------------------+-----+
|   BAYVIEW|      ARREST, BOOKED|19492|
|   BAYVIEW|       ARREST, CITED| 9795|
|   BAYVIEW|CLEARED-CONTACT J...|   42|
|   BAYVIEW|COMPLAINANT REFUS...|  678|
|   BAYVIEW|DISTRICT ATTORNEY...|  659|
|   BAYVIEW|EXCEPTIONAL CLEAR...|  128|
|   BAYVIEW| JUVENILE ADMONISHED|  177|
|   BAYVIEW|     JUVENILE BOOKED|  855|
|   BAYVIEW|      JUVENILE CITED|  352|
|   BAYVIEW|   JUVENILE DIVERTED|   74|
|   BAYVIEW|             LOCATED| 3180|
|   BAYVIEW|                NONE|51785|
|   BAYVIEW|      NOT PROSECUTED|  280|
|   BAYVIEW|PROSECUTED BY OUT...|  186|
|   BAYVIEW|PROSECUTED FOR LE...|   11|
|   BAYVIEW|   PSYCHOPATHIC CASE|  806|
|   BAYVIEW|           UNFOUNDED|  931|
|   CENTRAL|      ARREST, BOOKED|13830|
|   CENTRAL|       ARREST, CITED| 5964|
|   CENTRAL|CLEARED-CONTACT J...|    9|
+----------+--------------------+-----+
only showing top 20 rows



In [73]:
resolution2=spark.sql('select PdDistrict, solved, unsolved, solved/(solved+unsolved) as percentageofsolved from (select PdDistrict, sum(case when Resolution in ("NONE","UNFOUNDED") then CT else 0 end) as unsolved, sum(case when PdDistrict not in ("NONE","UNFOUNDED") then CT else 0 end) as solved from resolution group by PdDistrict) order by percentageofsolved DESC')
resolution2.show()

+----------+------+--------+------------------+
|PdDistrict|solved|unsolved|percentageofsolved|
+----------+------+--------+------------------+
|TENDERLOIN| 81809|   28406|0.7422673864718958|
|   MISSION|119908|   64336|0.6508108812227263|
|   BAYVIEW| 89431|   52716| 0.629144477196142|
|  SOUTHERN|157182|   95504|0.6220447511931805|
|      PARK| 49313|   31394|0.6110126754804416|
| INGLESIDE| 78845|   51939|0.6028642647418645|
|  NORTHERN|105296|   71344|0.5961050724637681|
|   TARAVAL| 65596|   45785|0.5889334805756816|
|   CENTRAL| 85460|   61458|0.5816850215766618|
|  RICHMOND| 45209|   33493|0.5744326700719169|
+----------+------+--------+------------------+



#### Q8 question (Apply Spark ML clustering for spatial data analysis)
Extra: visualize the spatial distribution of crimes and run a kmeans clustering algorithm (please use Spark ML kmeans)
You can refer Spark ML Kmeans a example: https://spark.apache.org/docs/latest/ml-clustering.html#k-means

In [75]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator 
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
import matplotlib.cm as cm
#loads data
df8=df_opt1.select(['Count','X','Y'])
Feature_Col=['X','Y']
for col in df8.columns:
  if col in Feature_Col:
    df8=df8.withColumn(col,df8[col].cast('float'))
vecAssembler=VectorAssembler(inputCols=Feature_Col,outputCol="features")
df_kmeans=vecAssembler.transform(df8).select('IncidntNum','features')

AnalysisException: "cannot resolve '`Count`' given input columns: [Address, Descript, DayOfWeek, Category, X, Y, Resolution, PdDistrict, Dates];;\n'Project ['Count, X#17, Y#18]\n+- Relation[Dates#10,Category#11,Descript#12,DayOfWeek#13,PdDistrict#14,Resolution#15,Address#16,X#17,Y#18] csv\n"