# Crime Statistics based on PySpark

### Modification History
\>\>\> ZJ Zhang (Feb 11th, 2018)

I use pyspark to statistically study the crimes in San Francisco, with data ("sf_data.csv") are downloaded from the [Police Department Incidents](https://data.sfgov.org/Public-Safety/Police-Department-Incidents/tmnf-yvry). 

In [1]:
## import spark env
from csv import reader
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [2]:
## load data
crime_file = sc.textFile('sf_data.csv')
print("There are %d lines in the file."%(crime_file.count()))
## take a look at data
print("- quick look at the data:")
crime_file.take(5)

There are 2177995 lines in the file.
- quick look at the data:


['IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location,PdId',
 '150060275,NON-CRIMINAL,LOST PROPERTY,Monday,01/19/2015,14:00,MISSION,NONE,18TH ST / VALENCIA ST,-122.42158168137,37.7617007179518,"(37.7617007179518, -122.42158168137)",15006027571000',
 '150098210,ROBBERY,"ROBBERY, BODILY FORCE",Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821003074',
 '150098210,ASSAULT,AGGRAVATED ASSAULT WITH BODILY FORCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821004014',
 '150098210,SECONDARY CODES,DOMESTIC VIOLENCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821015200']

In [4]:
## define fields and build a schema
schemaString = crime_file.first()
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
schema = StructType(fields)
## step prints
print("There are %d fields:"%(len(fields)))
fields

There are 13 fields:


[StructField(IncidntNum,StringType,true),
 StructField(Category,StringType,true),
 StructField(Descript,StringType,true),
 StructField(DayOfWeek,StringType,true),
 StructField(Date,StringType,true),
 StructField(Time,StringType,true),
 StructField(PdDistrict,StringType,true),
 StructField(Resolution,StringType,true),
 StructField(Address,StringType,true),
 StructField(X,StringType,true),
 StructField(Y,StringType,true),
 StructField(Location,StringType,true),
 StructField(PdId,StringType,true)]

In [5]:
## strip header
header = crime_file.filter(lambda l: "IncidntNum" in l)
crime_nh_file = crime_file.subtract(header)
# step prints
print("There are %d crime entries."%(crime_nh_file.count()))

There are 2177994 crime entries.


In [6]:
## build data frame
crime_temp = crime_nh_file.map(lambda l: [x.strip('"') for x in next(reader([l]))])
crime_df = sqlContext.createDataFrame(crime_temp, schema)
# step prints
crime_df.head(5)

[Row(IncidntNum='150098561', Category='NON-CRIMINAL', Descript='AIDED CASE, MENTAL DISTURBED', DayOfWeek='Sunday', Date='02/01/2015', Time='17:58', PdDistrict='BAYVIEW', Resolution='EXCEPTIONAL CLEARANCE', Address='2500 Block of GRIFFITH ST', X='-122.386437671186', Y='37.7191834462785', Location='(37.7191834462785, -122.386437671186)', PdId='15009856164020'),
 Row(IncidntNum='150098652', Category='ASSAULT', Descript='STALKING', DayOfWeek='Monday', Date='01/05/2015', Time='00:01', PdDistrict='CENTRAL', Resolution='NONE', Address='700 Block of POWELL ST', X='-122.409144009353', Y='37.7915661006349', Location='(37.7915661006349, -122.409144009353)', PdId='15009865215201'),
 Row(IncidntNum='150098674', Category='DRIVING UNDER THE INFLUENCE', Descript='DRIVING WHILE UNDER THE INFLUENCE OF DRUGS', DayOfWeek='Sunday', Date='02/01/2015', Time='18:39', PdDistrict='NORTHERN', Resolution='ARREST, BOOKED', Address='CALIFORNIA ST / POLK ST', X='-122.420691680792', Y='37.7905770710537', Location='(3

In [10]:
## he number of crimes for different category:
crime_df.groupBy("Category").count().show(50, False)

+---------------------------+------+
|Category                   |count |
+---------------------------+------+
|FRAUD                      |40849 |
|SUICIDE                    |1279  |
|LIQUOR LAWS                |4073  |
|SECONDARY CODES            |25306 |
|FAMILY OFFENSES            |1173  |
|MISSING PERSON             |63908 |
|OTHER OFFENSES             |304955|
|DRIVING UNDER THE INFLUENCE|5603  |
|WARRANTS                   |100043|
|ARSON                      |3858  |
|FORGERY/COUNTERFEITING     |22873 |
|GAMBLING                   |342   |
|BRIBERY                    |799   |
|ASSAULT                    |191003|
|DRUNKENNESS                |9759  |
|EXTORTION                  |728   |
|TREA                       |14    |
|WEAPON LAWS                |21774 |
|LOITERING                  |2420  |
|SUSPICIOUS OCC             |79083 |
|ROBBERY                    |55009 |
|SEX OFFENSES, FORCIBLE     |11447 |
|PROSTITUTION               |16660 |
|EMBEZZLEMENT               |2938  |
|

In [8]:
## the number of crimes for different district:
crime_df.groupBy("PdDistrict").count().show()

+----------+------+
|PdDistrict| count|
+----------+------+
|   MISSION|294915|
|   BAYVIEW|217828|
|   CENTRAL|221330|
|   TARAVAL|164106|
|TENDERLOIN|189437|
| INGLESIDE|191149|
|      PARK|123497|
|  SOUTHERN|392965|
|  RICHMOND|114854|
|          |     1|
|  NORTHERN|267912|
+----------+------+



In [12]:
# count the number of crimes each Sunday at SF downtown.
print("There are %d crime events occuring every Sunday at SF downtown."%(crime_df.filter(crime_df.DayOfWeek == "Sunday").count()))

There are 289823 crime events occuring every Sunday at SF downtown.


A very useful guide is [here](https://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/).