In [0]:
import urllib.request
urllib.request.urlretrieve("https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD", "/tmp/sf_03_18.csv")
dbutils.fs.mv("file:/tmp/sf_03_18.csv", "dbfs:/spark_proj1_sfcrime/data/sf_03_18.csv")
display(dbutils.fs.ls("dbfs:/spark_proj1_sfcrime/data/"))

path,name,size
dbfs:/spark_proj1_sfcrime/data/sf_03_18.csv,sf_03_18.csv,559169754


In [0]:
data_path = "dbfs:/spark_proj1_sfcrime/data/sf_03_18.csv"

In [0]:
# Import package 
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
#from ggplot import *
import warnings

import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [0]:
# Data Preprocessing
crime_data_lines = sc.textFile(data_path)

In [0]:
df_crimes = crime_data_lines.map(lambda line: [x.strip('"') for x in next(reader([line]))])
#get header
header = df_crimes.first()
print(header)

In [0]:
#remove the first line of data
crimes = df_crimes.filter(lambda x: x != header)

In [0]:
#get the total number of data 
print(crimes.count())

In [0]:
crimes

In [0]:
# Get dataframe and sql

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df_opt1 = spark.read.format("csv").option("header", "true").load(data_path)
display(df_opt1)
df_opt1.createOrReplaceTempView("sf_crime")

PdId,IncidntNum,Incident Code,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,location,SF Find Neighborhoods 2 2,Current Police Districts 2 2,Current Supervisor Districts 2 2,Analysis Neighborhoods 2 2,DELETE - Fire Prevention Districts 2 2,DELETE - Police Districts 2 2,DELETE - Supervisor Districts 2 2,DELETE - Zip Codes 2 2,DELETE - Neighborhoods 2 2,DELETE - 2017 Fix It Zones 2 2,Civic Center Harm Reduction Project Boundary 2 2,Fix It Zones as of 2017-11-06 2 2,DELETE - HSOC Zones 2 2,Fix It Zones as of 2018-02-07 2 2,"CBD, BID and GBD Boundaries as of 2017 2 2","Areas of Vulnerability, 2016 2 2",Central Market/Tenderloin Boundary 2 2,Central Market/Tenderloin Boundary Polygon - Updated 2 2,HSOC Zones as of 2018-06-05 2 2,OWED Public Spaces 2 2,Neighborhoods 2
11087941463010,110879414,63010,WARRANTS,WARRANT ARREST,Friday,07/01/2011,08:00,NORTHERN,"ARREST, BOOKED",3400 Block of PIERCE ST,-122.439757857075,37.8021507619169,POINT (-122.43975785707501 37.8021507619169),17.0,4,6,13,13.0,9,1,57,17,,,,,,,1,,,,,17.0
5117770316010,51177703,16010,DRUG/NARCOTIC,POSSESSION OF MARIJUANA,Tuesday,10/18/2005,14:30,TENDERLOIN,"ARREST, BOOKED",0 Block of UNITEDNATIONS PZ,-122.414317857881,37.7799444052046,POINT (-122.414317857881 37.7799444052046),21.0,5,10,36,7.0,10,9,28852,36,3.0,1.0,3.0,1.0,3.0,6.0,2,1.0,1.0,1.0,39.0,21.0
5011209707055,50112097,7055,VEHICLE THEFT,RECOVERED VEHICLE - STOLEN OUTSIDE SF,Saturday,01/29/2005,13:45,BAYVIEW,NONE,1500 Block of KIRKWOOD AV,-122.388798895151,37.7375755833256,POINT (-122.38879889515101 37.7375755833256),86.0,2,9,1,10.0,3,8,58,1,,,,,,,2,,,,,86.0
11044468164020,110444681,64020,NON-CRIMINAL,"AIDED CASE, MENTAL DISTURBED",Thursday,06/02/2011,02:52,CENTRAL,PSYCHOPATHIC CASE,900 Block of COLUMBUS AV,-122.414354301151,37.8031089840376,POINT (-122.41435430115101 37.8031089840376),107.0,6,3,32,3.0,1,10,308,32,,,,,,,1,,,,,107.0
3038311107024,30383111,7024,VEHICLE THEFT,STOLEN TRAILER,Saturday,02/01/2003,08:00,BAYVIEW,NONE,1500 Block of BAY SHORE BL,-122.401096851568,37.7245556697717,POINT (-122.401096851568 37.7245556697717),86.0,2,9,1,10.0,3,8,58,1,,,,,,,2,,,,,86.0
6018621406243,60186214,6243,LARCENY/THEFT,PETTY THEFT FROM LOCKED AUTO,Sunday,02/12/2006,17:00,CENTRAL,NONE,400 Block of DAVIS CT,-122.398187664281,37.7967148853927,POINT (-122.39818766428101 37.7967148853927),77.0,6,3,8,3.0,1,10,28860,6,,,,,,,1,,,,,77.0
9606640606374,96066406,6374,LARCENY/THEFT,GRAND THEFT OF PROPERTY,Thursday,08/06/2009,22:00,RICHMOND,NONE,2000 Block of BAKER ST,-122.444143357971,37.7899090887789,POINT (-122.44414335797102 37.7899090887789),102.0,8,6,30,13.0,6,1,29490,27,,,,,,,1,,,,,102.0
10037601606304,100376016,6304,LARCENY/THEFT,GRAND THEFT FROM A BUILDING,Friday,04/23/2010,16:30,MISSION,NONE,2600 Block of MISSION ST,-122.41860030589,37.7546255872838,POINT (-122.41860030589001 37.7546255872838),53.0,3,2,20,2.0,4,7,28859,19,,,,3.0,,,2,,,3.0,,53.0
5118607706244,51186077,6244,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,10/20/2005,11:00,SOUTHERN,NONE,800 Block of BRYANT ST,-122.403404791479,37.775420706711,POINT (-122.40340479147902 37.775420706711),32.0,1,10,34,14.0,2,9,28853,34,,,,,,,2,,,,,32.0
12038512205053,120385122,5053,BURGLARY,"BURGLARY OF STORE, UNLAWFUL ENTRY",Thursday,05/03/2012,10:00,SOUTHERN,NONE,600 Block of 2ND ST,-122.39123835272,37.7812532736369,POINT (-122.39123835272001 37.7812532736369),31.0,1,10,8,6.0,2,9,28856,6,,,,,,,1,,,,,31.0


In [0]:
df_opt1.printSchema()

In [0]:
%sql
select count(*)
from sf_crime

count(1)
2160953


In [0]:
## find number of crimes in different categories

# dataframe based solution
q1_result = df_opt1.groupBy('category').count().orderBy('count', ascending=False)
display(q1_result)

category,count
LARCENY/THEFT,477975
OTHER OFFENSES,303027
NON-CRIMINAL,236937
ASSAULT,191384
VEHICLE THEFT,126228
DRUG/NARCOTIC,117875
VANDALISM,114718
WARRANTS,99821
BURGLARY,91067
SUSPICIOUS OCC,79740


In [0]:
# SQL based solution
crime_category_num = spark.sql("SELECT  category, COUNT(*) AS Count FROM sf_crime GROUP BY category ORDER BY Count DESC")
display(crime_category_num)

category,Count
LARCENY/THEFT,477975
OTHER OFFENSES,303027
NON-CRIMINAL,236937
ASSAULT,191384
VEHICLE THEFT,126228
DRUG/NARCOTIC,117875
VANDALISM,114718
WARRANTS,99821
BURGLARY,91067
SUSPICIOUS OCC,79740


In [0]:
## Count number of crimes of different districts
crimeDistrict = spark.sql("SELECT PdDistrict, COUNT(*) as Count FROM sf_crime GROUP BY PdDistrict ORDER BY Count DESC")
display(crimeDistrict)

PdDistrict,Count
SOUTHERN,394234
MISSION,293072
NORTHERN,269229
CENTRAL,223962
BAYVIEW,210729
TENDERLOIN,189034
INGLESIDE,186645
TARAVAL,158710
PARK,121070
RICHMOND,114267


In [0]:
## Count number of crimes each "Sunday" at "SF downtown"
q3_result = spark.sql("""
                      with Sunday_dt_crime as(
                      select substring(Date,1,5) as Date,
                             substring(Date,7) as Year
                      from sf_crime
                      where (DayOfWeek = 'Sunday'
                             and -122.423671 <= X
                             and X <= 122.412497
                             and 37.773510 <= Y
                             and Y <= 37.782137)
                             )
                             
                      select Year, Date, COUNT(*) as Count
                      from Sunday_dt_crime
                      group by Year, Date
                      order by Year, Date
                      """)
display(q3_result)

Year,Date,Count
2003,01/05,28
2003,01/12,33
2003,01/19,19
2003,01/26,32
2003,02/02,45
2003,02/09,46
2003,02/16,50
2003,02/23,49
2003,03/02,40
2003,03/09,49


In [0]:
## Analysis number of crime in each month of 2015-2018
q4_result = spark.sql("""
                      with Monthly_crime as(
                      select Date,
                             substring(Date,7) as Year,
                             substring(Date,1,2) as Month
                      from sf_crime
                      )

                      select Month, COUNT(*) as Count
                      from Monthly_crime
                      where Year in ('2015', '2016', '2017', '2018')
                      group by Month
                      order by Month
                      """)
display(q4_result)


Month,Count
1,50851
2,45666
3,49831
4,47598
5,42567
6,37376
7,38043
8,38458
9,37373
10,39122


In [0]:
q4_result2 = spark.sql("""
                      with Monthly_crime as(
                      select Date,
                             substring(Date,7) as Year,
                             substring(Date,1,2) as Month
                      from sf_crime
                      )
                      
                      select Year, Month, COUNT(*) as Count
                      from Monthly_crime
                      where Year in ('2015', '2016', '2017', '2018')
                      group by Year, Month
                      order by Year, Month
                      """)
display(q4_result2)

Year,Month,Count
2015,1,13370
2015,2,12055
2015,3,13674
2015,4,12705
2015,5,13511
2015,6,13073
2015,7,13158
2015,8,13523
2015,9,12666
2015,10,12889


In [0]:
## show number of crimes by hour 
q5_result = spark.sql("""
                      select substring(Time,1,2) as Hour,
                      count(*) as Count
                      from sf_crime
                      group by Hour
                      order by Hour
                      """)
display(q5_result)

Hour,Count
0,111449
1,64426
2,53963
3,35271
4,24997
5,22178
6,32898
7,53103
8,79464
9,86795


In [0]:
# Show number of crime by hour for records in Christmas
q5_result_ch = spark.sql("""
                      select substring(Time,1,2) as Hour,
                      count(*) as Count
                      from sf_crime
                      where Date like '12/25/%'
                      group by Hour
                      order by Hour
                      """)
display(q5_result_ch)

Hour,Count
0,215
1,138
2,114
3,79
4,60
5,49
6,57
7,62
8,110
9,83


In [0]:
# Show number of crime by hour for records in New Year Eve (12/31 - 01/01)
q5_result_ne = spark.sql("""
                      select substring(Time,1,2) as Hour,
                             substring(Date,1,5) as Date_in_year,
                             count(*) as Count
                      from sf_crime
                      where Date like '12/31/%' or Date like '01/01/%'
                      group by Date_in_year, Hour
                      order by Date_in_year desc, Hour
                      """)
display(q5_result_ne)

Hour,Date_in_year,Count
0,12/31,286
1,12/31,162
2,12/31,138
3,12/31,72
4,12/31,58
5,12/31,59
6,12/31,72
7,12/31,98
8,12/31,179
9,12/31,173


In [0]:
## (1) Step1: Find out the top-3 danger disrict
## (2) Step2: find out the crime event w.r.t category and time (hour) from the result of step 1
## (3) give your advice to distribute the police based on your analysis results.

In [0]:
crimeDistrict.createOrReplaceTempView("crimeDistrict")

Dist_rank = spark.sql("""
                      select PdDistrict, Count as Crime_Number,
                             rank() over(order by Count desc) as Rank
                      from crimeDistrict
                      """)
Dist_rank.createOrReplaceTempView("Dist_rank")

q6_s1_result = spark.sql("""
                       select PdDistrict, Crime_Number
                       from Dist_rank
                       where Rank <= 3
                       """)
q6_s1_result.createOrReplaceTempView("q6_s1_result")

display(q6_s1_result)

PdDistrict,Crime_Number
SOUTHERN,394234
MISSION,293072
NORTHERN,269229


In [0]:
# Step 2 - Category

Top3_Dist = spark.sql("""
                      select sf_crime.PdDistrict, sf_crime.category, sf_crime.Time
                      from sf_crime
                      inner join q6_s1_result on sf_crime.PdDistrict = q6_s1_result.PdDistrict
                      """)
Top3_Dist.createOrReplaceTempView("Top3_Dist")

q6_s2_result1 = spark.sql("""
                         select category,
                                count(*) as Crime_Number
                         from Top3_Dist
                         group by category
                         order by Crime_Number desc
                         """)
display(q6_s2_result1)

category,Crime_Number
LARCENY/THEFT,242382
OTHER OFFENSES,127563
NON-CRIMINAL,107688
ASSAULT,80080
DRUG/NARCOTIC,50300
WARRANTS,48628
VANDALISM,45591
VEHICLE THEFT,43482
BURGLARY,36364
SUSPICIOUS OCC,31554


In [0]:
# larceny/theft
q6_s2_result2 = spark.sql("""
                         select substring(Time,1,2) as Hour,
                                count(*) as Crime_Number
                         from Top3_Dist
                         where category = 'LARCENY/THEFT'
                         group by Hour
                         order by Hour
                         """)
display(q6_s2_result2)

Hour,Crime_Number
0,10082
1,6644
2,4065
3,2419
4,1542
5,1490
6,2478
7,3780
8,6278
9,7661


In [0]:
# burglary
q6_s2_result2 = spark.sql("""
                         select substring(Time,1,2) as Hour,
                                count(*) as Crime_Number
                         from Top3_Dist
                         where category = 'BURGLARY'
                         group by Hour
                         order by Hour
                         """)
display(q6_s2_result2)

Hour,Crime_Number
0,1585
1,1064
2,1152
3,1229
4,1032
5,948
6,858
7,998
8,1669
9,1379


In [0]:
# assault
q6_s2_result2 = spark.sql("""
                         select substring(Time,1,2) as Hour,
                                count(*) as Crime_Number
                         from Top3_Dist
                         where category = 'ASSAULT'
                         group by Hour
                         order by Hour
                         """)
display(q6_s2_result2)


Hour,Crime_Number
0,4675
1,4310
2,3837
3,1846
4,1127
5,889
6,1216
7,1743
8,2578
9,2885


In [0]:
## for different categories of crime, find the percentage of resolution
res_num = spark.sql("""select category, resolution, count(*) as N_res from sf_crime group by category, resolution""")
res_num.createOrReplaceTempView("res_num")

cate_num = spark.sql("""select category, count(*) as N_cate from sf_crime group by category""")
cate_num.createOrReplaceTempView("cate_num")

q7_result = spark.sql("""
                      select distinct sf_crime.category, sf_crime.resolution, N_res/N_cate as Percentage
                      from (sf_crime left join res_num on sf_crime.category = res_num.category and sf_crime.resolution = res_num.resolution)
                      left join cate_num on sf_crime.category = cate_num.category
                      order by category, resolution""")
q7_result.createOrReplaceTempView("q7_result")
# Percentage of resolution for LARCENY/THEFT
q7 = spark.sql("""
               select Resolution, Percentage
               from q7_result
               where category = 'LARCENY/THEFT'
               order by Percentage desc
               """)
display(q7)

Resolution,Percentage
NONE,0.9162131910664784
"ARREST, BOOKED",0.0525885245044196
"ARREST, CITED",0.0218902662273131
NOT PROSECUTED,0.0033474554108478
UNFOUNDED,0.0025545269104032
COMPLAINANT REFUSES TO PROSECUTE,0.0009854071865683352
DISTRICT ATTORNEY REFUSES TO PROSECUTE,0.0009770385480412154
EXCEPTIONAL CLEARANCE,0.000924734557246718
PROSECUTED BY OUTSIDE AGENCY,0.0003263769025576652
PSYCHOPATHIC CASE,9.833150269365552e-05


In [0]:
# Percentage of resolution for BURGLARY
q7 = spark.sql("""
               select Resolution, Percentage
               from q7_result
               where category = 'BURGLARY'
               order by Percentage desc
               """)
display(q7)

Resolution,Percentage
NONE,0.8417209307432989
"ARREST, BOOKED",0.140171522066171
UNFOUNDED,0.0059846047415639
DISTRICT ATTORNEY REFUSES TO PROSECUTE,0.0045351224922309
COMPLAINANT REFUSES TO PROSECUTE,0.0032723159871303
"ARREST, CITED",0.0016032152151712
EXCEPTIONAL CLEARANCE,0.0014714441015955
NOT PROSECUTED,0.0006039509372220453
PROSECUTED BY OUTSIDE AGENCY,0.0003513896362019173
LOCATED,0.00016471389196964873


In [0]:
# Percentage of resolution for ASSAULT
q7 = spark.sql("""
               select Resolution, Percentage
               from q7_result
               where category = 'ASSAULT'
               order by Percentage desc
               """)
display(q7)

Resolution,Percentage
NONE,0.5943391297078126
"ARREST, BOOKED",0.3281831292062032
"ARREST, CITED",0.0375005225097186
DISTRICT ATTORNEY REFUSES TO PROSECUTE,0.0136479538519416
COMPLAINANT REFUSES TO PROSECUTE,0.0111085566191531
UNFOUNDED,0.0066619989131797
EXCEPTIONAL CLEARANCE,0.0038456715294904
PSYCHOPATHIC CASE,0.0022415666931404
NOT PROSECUTED,0.0011442962839108
PROSECUTED BY OUTSIDE AGENCY,0.0007315136061530745
