In [1]:
import numpy as np
import pandas as pd
import scipy.stats as stats
import pyspark as ps
import pyspark.sql.functions as F


In [213]:
# Helper functions put here. Errors with using pyspark functions in helper.py

def auto_theft_filter(df):
    """ Returns pandas dataframe filtered on grand and petty theft from vehicles
    
    Inputs:
    spark dataframe
    
    Returns:
    list containing various metrics, including:
    - total crime counts
    - petty and grand theft counts (from autos)
    - total car theft counts
    - fraction of car thefts relative to all crime
    - total resolved crime counts
    - petty and grand theft resolved counts
    - fraction of resolved car thefts relative to all resolved crime
        
    """    
    total_counts = df.count()
    
    # Car info
    petty, petty_counts, grand, grand_counts = auto_counts(df)
    
    total_car_counts = petty_counts + grand_counts
    
    auto_fraction = round((petty_counts + grand_counts)/df.count(),5)
    
    total_resolved = resolution_counts(df)
    
    petty_resolved = resolution_counts(petty)
    
    grand_resolved = resolution_counts(grand)
    
    auto_fract_resolved = round((petty_resolved+grand_resolved)/total_resolved, 5)
    
    
    series = list([total_counts, petty_counts, grand_counts, total_car_counts, 
                   auto_fraction, total_resolved, petty_resolved, grand_resolved, 
                   auto_fract_resolved])
        
    return series



def auto_counts(df):
    """ Returns counts for petty and grand theft from autos"""

    petty = df.filter((F.col('Descript') == "PETTY THEFT FROM LOCKED AUTO") |
                         (F.col('Descript') == "PETTY THEFT FROM UNLOCKED AUTO"))
    petty_counts = petty.count()
    
    
    grand = df.filter((F.col('Descript') == "GRAND THEFT FROM LOCKED AUTO") |
                             (F.col('Descript') == "GRAND THEFT FROM UNLOCKED AUTO"))
    grand_counts = grand.count()

    return petty, petty_counts, grand, grand_counts


def resolution_counts(df):
    """Counts the 'resolution' of crimes given a dataframe. Also computes
    the fraction of resolved crime for auto theft realtive to other crimes
    
    Inputs
    spark dataframe
    
    Returns
    counts of "resolved" crime
         
    """
    
    return df.filter((F.col('Resolution') != "NONE")).count()
    


def auto_to_pandas(df):
    """Returns a panda dataframe"""
    
    df_filtered = df.filter((F.col('Descript') == "GRAND THEFT FROM LOCKED AUTO") | 
                  (F.col('Descript') == "PETTY THEFT FROM LOCKED AUTO") |
                  (F.col('Descript') == "GRAND THEFT FROM UNLOCKED AUTO") | 
                  (F.col('Descript') == "PETTY THEFT FROM UNLOCKED AUTO"))
    
    return df_filtered.toPandas()    
    
    
    
def mission_filter(df):
    """Filters out regions inside or outside the Mission area
    
    Inputs
    spark dataframe
    
    Returns
    counts in the mission vs outside the mission
    """
    # Define boundary of Mission District (16th & Guerro to 24th & Potrero)
    x_min = -122.424047
    x_max = -122.406339
    
    y_min = 37.753016
    y_max = 37.764775
    
    
    mission = df.filter((F.col('X') >= x_min) &
                           (F.col('X') <= x_max) &
                           (F.col('Y') >= y_min) &
                           (F.col('Y') <= y_max))
    
    in_mission = mission.count()
    out_mission = df.count() - in_mission
    total_counts = df.count()
    
    # Looking at violent counts in mission vs outside mission
    mv_count, mnon_v_counts, mfraction = violent_counts(mission)
    
    v_count, non_v_counts, fraction = violent_counts(df)
    
    v_ex_mission = v_count - mv_count
    nonv_ex_mission = non_v_counts - mnon_v_counts
       
    
    return [total_counts, in_mission, mv_count, mnon_v_counts, v_ex_mission, 
           nonv_ex_mission, round(mfraction,5), round(fraction,5)] 

    
def violent_counts(df):
    """ Returns the violent and non-violent crime counts
    
    Inputs
    spark dataframe
    
    Returns
    counts for violent, non-violent, and ratio    
    
    """
    
    total_counts = df.count()
    
    violent = df.filter((F.col('Category') == 'ASSAULT') |
                        (F.col('Category') == 'SEX OFFENSES, FORCIBLE') |
                        (F.col('Category') == 'ROBBERY') |
                        (F.col('Category') == 'KIDNAPPING') |
                        (F.col('Category') == 'ARSON'))

    violent_count = violent.count()
    non_violent_counts = df.count() - violent_count   
    fraction_violent = violent_count/total_counts

    return violent_count, non_violent_counts, fraction_violent  

        
def save_csv(df, filename):
    """Saves spark df as a csv so that I can read it into a pandas df in another notebook
    
    Inputs:
    spark dataframe
    
    Returns:
    saved csv file
    
    """
    path = 'year_data/'+filename
    df_pd = df.toPandas()
    df_pd.to_csv(path)
    
    return None
    
    

    
def time_counts(df):
    """ Returns a counts of crimes committed within a particular time
    
    Inputs
    dataframe, time
    
    """
    timelist = range(0,24)
    time_count = []
    
    df = df.withColumn('Time',from_unixtime(unix_timestamp('Time', 'HH:MM')).alias('time'))
    
    for hour in timelist:
        time_count.append(df.filter(F.hour('Time') == hour).count())
    
    
    return time_count    

In [31]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )
sc = spark.sparkContext

In [4]:
# Read in as CSV...it's A LOT of data and takes a long time in pandas
crime_raw = pd.read_csv('Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv')
# crime_raw


In [5]:
crime_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2215024 entries, 0 to 2215023
Data columns (total 33 columns):
IncidntNum                                              int64
Category                                                object
Descript                                                object
DayOfWeek                                               object
Date                                                    object
Time                                                    object
PdDistrict                                              object
Resolution                                              object
Address                                                 object
X                                                       float64
Y                                                       float64
Location                                                object
PdId                                                    int64
SF Find Neighborhoods                                   float64
Curr

In [108]:
crime_raw['Category'].value_counts()
#crime_raw['Y'].isnull().sum()

LARCENY/THEFT                  480448
OTHER OFFENSES                 309358
NON-CRIMINAL                   238323
ASSAULT                        194694
VEHICLE THEFT                  126602
DRUG/NARCOTIC                  119628
VANDALISM                      116059
WARRANTS                       101379
BURGLARY                        91543
SUSPICIOUS OCC                  80444
MISSING PERSON                  64961
ROBBERY                         55867
FRAUD                           41542
SECONDARY CODES                 25831
FORGERY/COUNTERFEITING          23050
WEAPON LAWS                     22234
TRESPASS                        19449
PROSTITUTION                    16701
STOLEN PROPERTY                 11891
SEX OFFENSES, FORCIBLE          11742
DISORDERLY CONDUCT              10040
DRUNKENNESS                      9826
RECOVERED VEHICLE                8716
DRIVING UNDER THE INFLUENCE      5672
KIDNAPPING                       5346
RUNAWAY                          4440
LIQUOR LAWS 

In [7]:
# Check out some value counts for each columns
columns = crime_raw.columns
# for col in columns:
#     print(col)
#     print(crime_raw[col].value_counts(dropna=False))

## Read data in as Spark DF and remove extraneous columns

Is there any useful data in SF Find Neighborhoods or Analysis Neighborhoods?

In [8]:
crime = spark.read.csv('Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv',
                        header=True,
                        quote='"',
                        sep=',',
                        inferSchema=True)


In [9]:
crime.printSchema()

root
 |-- IncidntNum: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- PdId: long (nullable = true)
 |-- SF Find Neighborhoods: integer (nullable = true)
 |-- Current Police Districts: integer (nullable = true)
 |-- Current Supervisor Districts: integer (nullable = true)
 |-- Analysis Neighborhoods: integer (nullable = true)
 |-- DELETE - Fire Prevention Districts: integer (nullable = true)
 |-- DELETE - Police Districts: integer (nullable = true)
 |-- DELETE - Supervisor Districts: integer (nullable = true)
 |-- DELETE - Zip Codes: integer (nullable = true)
 |-- DELETE - Neighborhoods: in

In [10]:
# Drop additional columns
print(columns)
columns_to_drop = list(columns[17:])
columns_to_drop += ['IncidntNum', 'PdId', 'Current Police Districts',
                    'Current Supervisor Districts','Location', 'SF Find Neighborhoods',
                    'Analysis Neighborhoods']
columns_to_drop

Index(['IncidntNum', 'Category', 'Descript', 'DayOfWeek', 'Date', 'Time',
       'PdDistrict', 'Resolution', 'Address', 'X', 'Y', 'Location', 'PdId',
       'SF Find Neighborhoods', 'Current Police Districts',
       'Current Supervisor Districts', 'Analysis Neighborhoods',
       'DELETE - Fire Prevention Districts', 'DELETE - Police Districts',
       'DELETE - Supervisor Districts', 'DELETE - Zip Codes',
       'DELETE - Neighborhoods', 'DELETE - 2017 Fix It Zones',
       'Civic Center Harm Reduction Project Boundary',
       'Fix It Zones as of 2017-11-06 ', 'DELETE - HSOC Zones',
       'Fix It Zones as of 2018-02-07',
       'CBD, BID and GBD Boundaries as of 2017',
       'Areas of Vulnerability, 2016', 'Central Market/Tenderloin Boundary',
       'Central Market/Tenderloin Boundary Polygon - Updated',
       'HSOC Zones as of 2018-06-05', 'OWED Public Spaces'],
      dtype='object')


['DELETE - Fire Prevention Districts',
 'DELETE - Police Districts',
 'DELETE - Supervisor Districts',
 'DELETE - Zip Codes',
 'DELETE - Neighborhoods',
 'DELETE - 2017 Fix It Zones',
 'Civic Center Harm Reduction Project Boundary',
 'Fix It Zones as of 2017-11-06 ',
 'DELETE - HSOC Zones',
 'Fix It Zones as of 2018-02-07',
 'CBD, BID and GBD Boundaries as of 2017',
 'Areas of Vulnerability, 2016',
 'Central Market/Tenderloin Boundary',
 'Central Market/Tenderloin Boundary Polygon - Updated',
 'HSOC Zones as of 2018-06-05',
 'OWED Public Spaces',
 'IncidntNum',
 'PdId',
 'Current Police Districts',
 'Current Supervisor Districts',
 'Location',
 'SF Find Neighborhoods',
 'Analysis Neighborhoods']

In [11]:
for col in columns_to_drop:
    crime = crime.drop(col)
crime.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)



## Now drop all rows with a NaN in them and turn string date into datetime. 
Apparently there aren't values with nan in them...Already cleaned after removing unnecessary columns

In [12]:
crime.count()

In [13]:
crime = crime.dropna(how='any')
crime.count()

2215024

In [None]:
# Turn date into something useable
from pyspark.sql.functions import unix_timestamp, from_unixtime

crime2 = crime.withColumn('Date',from_unixtime(unix_timestamp('Date', 'MM/dd/yyy')).alias('date'))
crime2.count()

## Figure out which categories and descriptions fit auto theft and explore data a little more
Below is a spark datafram playground. The SQL playground is at the bottom.

In [107]:
#crime.groupBy('Category').count().orderBy('count', ascending=False).show()
#crime.groupBy('Descript').count().orderBy('count', ascending=False).show()
crime.filter(F.col('Category') == 'ASSAULT').select('Descript').show(20)

+--------------------+
|            Descript|
+--------------------+
|             BATTERY|
|             BATTERY|
|             BATTERY|
|             BATTERY|
|             BATTERY|
|             BATTERY|
|             BATTERY|
|             BATTERY|
|THREATS AGAINST LIFE|
|ATTEMPTED HOMICID...|
|THREATS AGAINST LIFE|
|             BATTERY|
|CHILD ABUSE (PHYS...|
|             BATTERY|
|             BATTERY|
|             BATTERY|
|THREATS AGAINST LIFE|
|             BATTERY|
|THREATS AGAINST LIFE|
|             BATTERY|
+--------------------+
only showing top 20 rows



## Create a dataframe for each year to make the processing a little easier.
Because I'm brand new to spark, it was easier for me to process the data year by year, extracting relavent information from each dataframe and putting it all together in a pandas dataframe. 

2215024

In [193]:
# Create dataframe for each year
crime_2003 = crime2.filter((F.col('Date') > '2002-12-31') & (F.col('Date') < '2004-01-01'))
crime_2004 = crime2.filter((F.col('Date') > '2003-12-31') & (F.col('Date') < '2005-01-01'))
crime_2005 = crime2.filter((F.col('Date') > '2004-12-31') & (F.col('Date') < '2006-01-01'))
crime_2006 = crime2.filter((F.col('Date') > '2005-12-31') & (F.col('Date') < '2007-01-01'))
crime_2007 = crime2.filter((F.col('Date') > '2006-12-31') & (F.col('Date') < '2008-01-01'))
crime_2008 = crime2.filter((F.col('Date') > '2007-12-31') & (F.col('Date') < '2009-01-01'))
crime_2009 = crime2.filter((F.col('Date') > '2008-12-31') & (F.col('Date') < '2010-01-01'))
crime_2010 = crime2.filter((F.col('Date') > '2009-12-31') & (F.col('Date') < '2011-01-01'))
crime_2011 = crime2.filter((F.col('Date') > '2010-12-31') & (F.col('Date') < '2012-01-01'))
crime_2012 = crime2.filter((F.col('Date') > '2011-12-31') & (F.col('Date') < '2013-01-01'))
crime_2013 = crime2.filter((F.col('Date') > '2012-12-31') & (F.col('Date') < '2014-01-01'))
crime_2014 = crime2.filter((F.col('Date') > '2013-12-31') & (F.col('Date') < '2015-01-01'))
crime_2015 = crime2.filter((F.col('Date') > '2014-12-31') & (F.col('Date') < '2016-01-01'))
crime_2016 = crime2.filter((F.col('Date') > '2015-12-31') & (F.col('Date') < '2017-01-01'))
crime_2017 = crime2.filter((F.col('Date') > '2016-12-31') & (F.col('Date') < '2018-01-01'))


In [17]:
crime_years = [crime_2003, crime_2004, crime_2005, crime_2006, crime_2007, crime_2008, 
               crime_2009, crime_2010, crime_2011, crime_2012, crime_2013, crime_2014, 
               crime_2015, crime_2016, crime_2017]

years = ['2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012',
         '2013', '2014', '2015', '2016', '2017']

crimes_per_year = {}

for dataset, year in zip(crime_years,years):
    crimes_per_year[year] = dataset.count()

crimes_per_year

{'2003': 149176,
 '2004': 148471,
 '2005': 142591,
 '2006': 138247,
 '2007': 138006,
 '2008': 141670,
 '2009': 140215,
 '2010': 133868,
 '2011': 133094,
 '2012': 141267,
 '2013': 153166,
 '2014': 150518,
 '2015': 156927,
 '2016': 151297,
 '2017': 155254}

## Change the Time format to an integer representing the hour
This makes it easier to manipulate than when it's a string

In [None]:
# This is done in the time_counts() function

## Playground for testing functions
Using this space to test small functions before putting them into the helper code block

In [128]:
auto_theft_filter(crime_2013)

[153166, 4126, 14578, 18704, 0.12212, 62496, 170, 74, 0.0039]

In [125]:
crime_2013

(18356, 134810, 0.13616200578592091)

In [138]:
mission_filter(crime_2013)

[153166,
 8419,
 1235,
 7184,
 17121,
 127626,
 0.14669200617650552,
 0.11984382957053132]

## Collect aggregate crime statistics 

In [47]:
# Go through all years and calculate grand/petty auto thefts
# crime_data_by_year = []
# for year, dataset in zip(years, crime_years):
#     data = auto_theft_filter(dataset)
#     data.insert(0,year)
#     crime_data_by_year.append(data)
    
crime_data_by_year    

[['2003', 149176, 5276, 6895, 12171, 0.08159, 61933, 138, 179, 0.00512],
 ['2004', 148471, 4922, 6735, 11657, 0.07851, 58232, 137, 161, 0.00512],
 ['2005', 142591, 4045, 8989, 13034, 0.09141, 52282, 79, 129, 0.00398],
 ['2006', 138247, 4604, 10835, 15439, 0.11168, 52823, 156, 158, 0.00594],
 ['2007', 138006, 3578, 10196, 13774, 0.09981, 57347, 138, 211, 0.00609],
 ['2008', 141670, 3225, 9273, 12498, 0.08822, 60706, 77, 204, 0.00463],
 ['2009', 140215, 2587, 9197, 11784, 0.08404, 63067, 66, 181, 0.00392],
 ['2010', 133868, 1697, 8301, 9998, 0.07469, 58162, 80, 93, 0.00297],
 ['2011', 133094, 1719, 8890, 10609, 0.07971, 55727, 84, 136, 0.00395],
 ['2012', 141267, 3058, 10456, 13514, 0.09566, 51030, 100, 68, 0.00329],
 ['2013', 153166, 4126, 14578, 18704, 0.12212, 62496, 170, 74, 0.0039],
 ['2014', 150518, 4232, 17375, 21607, 0.14355, 54631, 139, 76, 0.00394],
 ['2015', 156927, 4968, 20562, 25530, 0.16269, 45798, 134, 74, 0.00454],
 ['2016', 151297, 4590, 19714, 24304, 0.16064, 42921, 122

## Now Collect information about violent crime in the Mission

In [140]:
# Go through all the data and also calculate counts in the Mission vs Outside the mission
# mission_crime_by_year = []
# for year, dataset in zip(years, crime_years):
#     data = mission_filter(dataset)
#     data.insert(0,year)
#     mission_crime_by_year.append(data)
    
# mission_crime_by_year

[['2003', 149176, 8976, 1008, 7968, 16967, 123233, 0.1123, 0.1205],
 ['2004', 148471, 8906, 1085, 7821, 16497, 123068, 0.12183, 0.11842],
 ['2005', 142591, 8523, 999, 7524, 15349, 118719, 0.11721, 0.11465],
 ['2006', 138247, 8298, 1094, 7204, 16698, 113251, 0.13184, 0.1287],
 ['2007', 138006, 8260, 1068, 7192, 16735, 113011, 0.1293, 0.129],
 ['2008', 141670, 9019, 1170, 7849, 17066, 115585, 0.12973, 0.12872],
 ['2009', 140215, 8441, 1088, 7353, 16212, 115562, 0.12889, 0.12338],
 ['2010', 133868, 8009, 934, 7075, 16107, 109752, 0.11662, 0.1273],
 ['2011', 133094, 7662, 978, 6684, 16078, 109354, 0.12764, 0.12815],
 ['2012', 141267, 7718, 1107, 6611, 16401, 117148, 0.14343, 0.12394],
 ['2013', 153166, 8419, 1235, 7184, 17121, 127626, 0.14669, 0.11984],
 ['2014', 150518, 8081, 1089, 6992, 16403, 126034, 0.13476, 0.11621],
 ['2015', 156927, 7404, 1194, 6210, 17302, 132221, 0.16126, 0.11786],
 ['2016', 151297, 7649, 1123, 6526, 17319, 126329, 0.14682, 0.12189],
 ['2017', 155254, 8150, 1260, 

## Finally let's look at the hour data
It might be nice to have a pretty histogram with the crime rate based on hour.

In [214]:
time_of_crime = []
for year, dataset in zip(years, crime_years):
    crime_data = time_counts(dataset)
    crime_data.insert(0,year)
    time_of_crime.append(crime_data)
time_of_crime

[['2003',
  6785,
  3937,
  3519,
  2288,
  1572,
  1326,
  2104,
  3815,
  5844,
  6376,
  6739,
  6411,
  9168,
  7386,
  7508,
  8049,
  8318,
  9107,
  9722,
  8563,
  7574,
  7553,
  7954,
  7558],
 ['2004',
  7308,
  3966,
  3382,
  2227,
  1565,
  1426,
  2152,
  3869,
  5932,
  6143,
  6743,
  6812,
  8749,
  7407,
  7790,
  7847,
  8096,
  9112,
  9398,
  8058,
  7446,
  7551,
  8007,
  7485],
 ['2005',
  6993,
  3965,
  3391,
  2230,
  1637,
  1377,
  2135,
  3817,
  5964,
  6088,
  6305,
  6460,
  8414,
  6818,
  7156,
  7495,
  7847,
  8797,
  8876,
  8007,
  7132,
  7032,
  7560,
  7095],
 ['2006',
  7001,
  3941,
  3431,
  2166,
  1486,
  1405,
  1998,
  3579,
  5526,
  5947,
  6277,
  6057,
  8635,
  6712,
  7012,
  7544,
  7933,
  8357,
  8510,
  7428,
  6746,
  6695,
  7117,
  6744],
 ['2007',
  6851,
  4101,
  3449,
  2110,
  1516,
  1364,
  2065,
  3419,
  5259,
  5688,
  6041,
  6028,
  8392,
  6854,
  7224,
  7459,
  7754,
  8424,
  8558,
  7617,
  6969,
  6834,
  

In [173]:
time = np.arange(0,24)
time_dict = {}
for dataset in crime_years:
    print(time_counts(dataset, 0))

0
0
0
0
0
0
0
0
0
0
0
0
0
0
0


## Working on heat map
Need aggregate 

## Use SQL to look for similar descriptions and double check counts

In [145]:
crime_2013.createOrReplaceTempView('crime')

In [146]:
violent_crime = ['ASSUALT', 'BATTERY', 'ROBBERY', 'VIOLENCE', 'BOMBING', 'MURDER', 
                 'MANSLAUGHTER',]

In [147]:
query = '''SELECT DISTINCT(Descript)
           FROM crime
           WHERE Descript LIKE '%BATTERY%'
           '''
results_battery = spark.sql(query)

In [148]:
query_all_theft = '''SELECT COUNT(*)
                     FROM crime
                     WHERE Descript LIKE '%THEFT%'
                     '''
results_all_theft = spark.sql(query_all_theft)

In [149]:
query_auto_theft = '''SELECT COUNT(*)
                 FROM crime
                 WHERE Descript LIKE '%GRAND THEFT FROM LOCKED AUTO%'
                 OR Descript LIKE '%PETTY THEFT FROM LOCKED AUTO%'
                 OR Descript LIKE '%GRAND THEFT FROM UNLOCKED AUTO%'
                 OR Descript LIKE '%PETTY THEFT FROM UNLOCKED AUTO%'
                 '''
results_auto_theft = spark.sql(query_auto_theft)

In [152]:
query_time = '''SELECT DISTINCT(Time)
                FROM crime
                '''
spark.sql(query_time).show()


+-----+
| Time|
+-----+
|03:15|
|02:39|
|18:47|
|17:58|
|07:24|
|21:02|
|01:45|
|04:02|
|18:05|
|00:16|
|19:03|
|17:35|
|11:19|
|06:43|
|09:10|
|17:23|
|11:01|
|06:27|
|02:52|
|05:34|
+-----+
only showing top 20 rows



In [263]:
#results_petty.take(30)
#results_battery.take(30)
results_theft.show()
#results_all_theft.show()

+--------+
|count(1)|
+--------+
|   18704|
+--------+



+-------+------+
|summary|  Time|
+-------+------+
|  count|153166|
|   mean|  null|
| stddev|  null|
|    min| 00:01|
|    max| 23:59|
+-------+------+

