Plan: 
1. extract year, district
2. filter and extract year and district columns from year 2001 - 2015
3. regular expression match all rows where year is 4-digit integers, district is 3-digit integers
3. create key, value pair: block, 1 (1 is to count the crime record once)
4. calculate count(crime) by {year,district} 
5. split counts RDD into two seperate RDD based on years under Daley and Emanuel
6. map {district, number of crime} for two RDD
7. reduceByKey to get count(crime) by district for Delay and Emanuel
8. join Delay and Emanuel by district
9. create new field of difference between count(crime) of each district 
10. perform student-t hyphothesis test on: Daley count - Emanuel count >0

In [1]:
#from pyspark import SparkContext
from pyspark import SparkContext
import csv
import re
from operator import add
from pyspark.mllib.stat import Statistics
import math
from scipy import stats
#from pyspark.sql import SQLContext
from geopy.geocoders import Nominatim

sc = SparkContext.getOrCreate()

In [2]:
#use when done with current pyspark context
sc.stop()

In [2]:
crimeFile = sc.textFile("Crimes_-_2001_to_present.csv")
crimeFile.take(5)

['ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location',
 '10078659,HY267429,05/19/2015 11:57:00 PM,010XX E 79TH ST,143A,WEAPONS VIOLATION,UNLAWFUL POSS OF HANDGUN,STREET,true,false,0624,006,8,44,15,1184626,1852799,2015,05/26/2015 12:42:06 PM,41.751242944,-87.599004724,"(41.751242944, -87.599004724)"',
 '10078598,HY267408,05/19/2015 11:50:00 PM,067XX N SHERIDAN RD,3731,INTERFERENCE WITH PUBLIC OFFICER,OBSTRUCTING IDENTIFICATION,STREET,true,false,2432,024,49,1,24,1167071,1944859,2015,05/26/2015 12:42:06 PM,42.004255918,-87.660691083,"(42.004255918, -87.660691083)"',
 '10078625,HY267417,05/19/2015 11:47:00 PM,026XX E 77TH ST,2170,NARCOTICS,POSSESSION OF DRUG EQUIPMENT,STREET,true,false,0421,004,7,43,18,1195299,1854463,2015,05/26/2015 12:42:06 PM,41.755552462,-87.559839339,"(41.755552462, -87.559839339)"',
 '10078662,HY267423,05/19/2015 1

In [3]:
schemaString=crimeFile.first()
schemaString

'ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location'

In [35]:
#split by "," to create columns
fields = crimeFile.map(lambda line: line.split(","))
fields.count()

5801845

In [38]:
(5801845-4438686)/5801845

0.23495267453715155

In [37]:
#remove rows where columns 11: district, 18: year is empty
fields = fields.filter(lambda x: len(x[11])==3 and len(x[17])==4) 
fields.count()
#lose about 23.5% data

4438686

In [39]:
fields.take(10)

[['10078659',
  'HY267429',
  '05/19/2015 11:57:00 PM',
  '010XX E 79TH ST',
  '143A',
  'WEAPONS VIOLATION',
  'UNLAWFUL POSS OF HANDGUN',
  'STREET',
  'true',
  'false',
  '0624',
  '006',
  '8',
  '44',
  '15',
  '1184626',
  '1852799',
  '2015',
  '05/26/2015 12:42:06 PM',
  '41.751242944',
  '-87.599004724',
  '"(41.751242944',
  ' -87.599004724)"'],
 ['10078598',
  'HY267408',
  '05/19/2015 11:50:00 PM',
  '067XX N SHERIDAN RD',
  '3731',
  'INTERFERENCE WITH PUBLIC OFFICER',
  'OBSTRUCTING IDENTIFICATION',
  'STREET',
  'true',
  'false',
  '2432',
  '024',
  '49',
  '1',
  '24',
  '1167071',
  '1944859',
  '2015',
  '05/26/2015 12:42:06 PM',
  '42.004255918',
  '-87.660691083',
  '"(42.004255918',
  ' -87.660691083)"'],
 ['10078625',
  'HY267417',
  '05/19/2015 11:47:00 PM',
  '026XX E 77TH ST',
  '2170',
  'NARCOTICS',
  'POSSESSION OF DRUG EQUIPMENT',
  'STREET',
  'true',
  'false',
  '0421',
  '004',
  '7',
  '43',
  '18',
  '1195299',
  '1854463',
  '2015',
  '05/26/2015 

In [40]:
#use regular expression to filter and get rows with district - 3 digit number, year - 4 digit number
fields= fields.filter( lambda x: re.findall("[0-9]+", x[11]) and re.findall("[0-9]+", x[17]))
fields.count()

#test= fields.map( lambda x: re.findall("[0-9]+", x[11]) and re.findall("[0-9]+", x[18]))
#rdd.filter(lambda x:len(re.findall("[A-Za-z]+", x, 0)) > 1).map(lambda x: re.findall("[A-Za-z]+", x, 0)[1])

4438686

In [41]:
fields.take(100)

[['10078659',
  'HY267429',
  '05/19/2015 11:57:00 PM',
  '010XX E 79TH ST',
  '143A',
  'WEAPONS VIOLATION',
  'UNLAWFUL POSS OF HANDGUN',
  'STREET',
  'true',
  'false',
  '0624',
  '006',
  '8',
  '44',
  '15',
  '1184626',
  '1852799',
  '2015',
  '05/26/2015 12:42:06 PM',
  '41.751242944',
  '-87.599004724',
  '"(41.751242944',
  ' -87.599004724)"'],
 ['10078598',
  'HY267408',
  '05/19/2015 11:50:00 PM',
  '067XX N SHERIDAN RD',
  '3731',
  'INTERFERENCE WITH PUBLIC OFFICER',
  'OBSTRUCTING IDENTIFICATION',
  'STREET',
  'true',
  'false',
  '2432',
  '024',
  '49',
  '1',
  '24',
  '1167071',
  '1944859',
  '2015',
  '05/26/2015 12:42:06 PM',
  '42.004255918',
  '-87.660691083',
  '"(42.004255918',
  ' -87.660691083)"'],
 ['10078625',
  'HY267417',
  '05/19/2015 11:47:00 PM',
  '026XX E 77TH ST',
  '2170',
  'NARCOTICS',
  'POSSESSION OF DRUG EQUIPMENT',
  'STREET',
  'true',
  'false',
  '0421',
  '004',
  '7',
  '43',
  '18',
  '1195299',
  '1854463',
  '2015',
  '05/26/2015 

In [42]:
#select columns 11: district, 18: year
fields2 = fields.map(lambda f: (f[11], f[17]))
fields2.take(10)

[('006', '2015'),
 ('024', '2015'),
 ('004', '2015'),
 ('003', '2015'),
 ('009', '2015'),
 ('010', '2015'),
 ('007', '2015'),
 ('014', '2015'),
 ('011', '2015'),
 ('008', '2015')]

In [43]:
#map to count each record once: (district, year), count(crime)
pairs = fields2.map(lambda x: (x[0] + ',' + x[1],1)) 
pairs.take(10)

[('006,2015', 1),
 ('024,2015', 1),
 ('004,2015', 1),
 ('003,2015', 1),
 ('009,2015', 1),
 ('010,2015', 1),
 ('007,2015', 1),
 ('014,2015', 1),
 ('011,2015', 1),
 ('008,2015', 1)]

In [44]:
#reduceByKey to generate key, value pair: (district, year), count(crime)
counts = pairs.reduceByKey(lambda x,y: int(x) + int(y))
counts.take(10)

[('011,2013', 21279),
 ('011,2001', 151),
 ('006,2011', 19740),
 ('010,2015', 3907),
 ('010,2012', 14639),
 ('017,2003', 7743),
 ('004,2011', 20298),
 ('017,2001', 53),
 ('014,2009', 15194),
 ('019,2015', 3573)]

In [45]:
#top10 = counts.map(lambda x: (x[1], x[0])).sortByKey(ascending = False).take(100)

In [46]:
#convert to RDD
#top10rdd =  sc.parallelize(top10)
#top10rdd.take(10)

[(30857, '008,2006'),
 (29959, '008,2004'),
 (29885, '008,2007'),
 (29837, '008,2005'),
 (29420, '008,2008'),
 (28022, '011,2004'),
 (27918, '011,2005'),
 (27434, '007,2004'),
 (27347, '008,2009'),
 (27050, '011,2006')]

In [58]:
#map key,value pair back to district, year, count(crime)
#counts2 = counts.map(lambda x: (re.split(',', x[1]), x[0]))

counts2 = counts.map(lambda x: (x[0].split(',')[0], x[0].split(',')[1], x[1]))
counts2.count()

347

In [60]:
counts2.take(10)

[('011', '2013', 21279),
 ('011', '2001', 151),
 ('006', '2011', 19740),
 ('010', '2015', 3907),
 ('010', '2012', 14639),
 ('017', '2003', 7743),
 ('004', '2011', 20298),
 ('017', '2001', 53),
 ('014', '2009', 15194),
 ('019', '2015', 3573)]

In [62]:
#split into two RDD objects by year for Daly (2001 - 2010), Emanuel (2011 - 2015)
Daley = counts2.filter(lambda x: int(x[1])<=2010).map(lambda y: (y[0], y[2]))
Daley.take(10)

[('011', 151),
 ('017', 7743),
 ('017', 53),
 ('014', 15194),
 ('015', 19403),
 ('019', 18987),
 ('007', 26505),
 ('018', 15542),
 ('003', 19608),
 ('022', 11763)]

In [63]:
Emanuel = counts2.filter(lambda x: int(x[1])>=2011).map(lambda y: (y[0], y[2]))
Emanuel.take(10)

[('011', 21279),
 ('006', 19740),
 ('010', 3907),
 ('010', 14639),
 ('004', 20298),
 ('019', 3573),
 ('010', 11948),
 ('016', 3104),
 ('004', 15859),
 ('009', 14351)]

In [77]:
#key value pair of {district, count of crime}, return 26 pairs
#divide count of crime by number of years to get average annual number of crime
Daley2 = Daley.reduceByKey(lambda x,y: int(x) + int(y))
Daley2 = Daley2.map(lambda x: (x[0], int(x[1])/10))
Daley2.take(26) 

[('017', 9022.9),
 ('007', 19300.9),
 ('005', 13758.8),
 ('016', 10238.7),
 ('014', 12517.1),
 ('009', 15985.0),
 ('024', 9719.6),
 ('004', 17453.5),
 ('002', 14313.9),
 ('012', 15008.7),
 ('031', 6.7),
 ('022', 10089.3),
 ('010', 13096.0),
 ('018', 13292.2),
 ('003', 16327.8),
 ('008', 22027.2),
 ('025', 18510.2),
 ('023', 0.3),
 ('019', 13833.0),
 ('021', 0.4),
 ('015', 14155.8),
 ('001', 11561.3),
 ('011', 19439.1),
 ('020', 5383.3),
 ('013', 0.1),
 ('006', 18047.7)]

In [78]:
#key value pair of {district, count of crime}, return 24 pairs
Emanuel2 = Emanuel.reduceByKey(lambda x,y: int(x) + int(y))
Emanuel2 = Emanuel2.map(lambda x: (x[0], int(x[1])/5))
Emanuel2.take(24)

[('017', 7416.6),
 ('007', 15609.8),
 ('016', 8754.4),
 ('005', 11760.0),
 ('014', 9588.4),
 ('009', 13162.4),
 ('024', 7480.0),
 ('004', 15586.0),
 ('002', 10750.8),
 ('012', 12057.0),
 ('031', 10.6),
 ('022', 8402.2),
 ('010', 11648.4),
 ('018', 11129.2),
 ('003', 13669.4),
 ('008', 17750.2),
 ('025', 15450.4),
 ('019', 11868.0),
 ('015', 11833.6),
 ('001', 9971.6),
 ('011', 17947.2),
 ('020', 4212.8),
 ('013', 0.2),
 ('006', 15499.0)]

In [79]:
#inner join Daley2 and Emanuel by district: {district, (Delay count, Emanuel count)}
joined = Daley2.join(Emanuel2)
joined.count()

24

In [80]:
joined.take(24)

[('017', (9022.9, 7416.6)),
 ('009', (15985.0, 13162.4)),
 ('024', (9719.6, 7480.0)),
 ('012', (15008.7, 12057.0)),
 ('010', (13096.0, 11648.4)),
 ('019', (13833.0, 11868.0)),
 ('015', (14155.8, 11833.6)),
 ('001', (11561.3, 9971.6)),
 ('006', (18047.7, 15499.0)),
 ('007', (19300.9, 15609.8)),
 ('005', (13758.8, 11760.0)),
 ('016', (10238.7, 8754.4)),
 ('014', (12517.1, 9588.4)),
 ('004', (17453.5, 15586.0)),
 ('002', (14313.9, 10750.8)),
 ('031', (6.7, 10.6)),
 ('022', (10089.3, 8402.2)),
 ('018', (13292.2, 11129.2)),
 ('003', (16327.8, 13669.4)),
 ('008', (22027.2, 17750.2)),
 ('025', (18510.2, 15450.4)),
 ('011', (19439.1, 17947.2)),
 ('020', (5383.3, 4212.8)),
 ('013', (0.1, 0.2))]

In [93]:
#map to get {district, Delay count - Emanuel count}
crime_diff_sample = joined.map(lambda x: (x[0], float(x[1][0])-float(x[1][1])))
crime_diff_sample.take(24)

[('017', 1606.2999999999993),
 ('009', 2822.6000000000004),
 ('024', 2239.6000000000004),
 ('012', 2951.7000000000007),
 ('010', 1447.6000000000004),
 ('019', 1965.0),
 ('015', 2322.199999999999),
 ('001', 1589.699999999999),
 ('006', 2548.7000000000007),
 ('007', 3691.100000000002),
 ('005', 1998.7999999999993),
 ('016', 1484.300000000001),
 ('014', 2928.7000000000007),
 ('004', 1867.5),
 ('002', 3563.1000000000004),
 ('031', -3.8999999999999995),
 ('022', 1687.0999999999985),
 ('018', 2163.0),
 ('003', 2658.3999999999996),
 ('008', 4277.0),
 ('025', 3059.800000000001),
 ('011', 1491.8999999999978),
 ('020', 1170.5),
 ('013', -0.1)]

In [97]:
#calculate sample mean, sample standard deviation
#calculate t-statistics and perform 1-sided student-t test to test hypothesis null: (Delay count - Emanuel count) >0
#https://math.stackexchange.com/questions/1287990/estimating-population-standard-deviation-with-sample-standard-deviation
crime_diff_sample.persist()
mean = crime_diff_sample.map(lambda x: x[1]).mean()
sd = crime_diff_sample.map(lambda x: x[1]).stdev()
n = crime_diff_sample.map(lambda x: x[1]).count()


In [118]:
t = (mean-0)/(sd/math.sqrt(n))
t


10.425179223281729

In [119]:
p_value = stats.t.sf(float(t), n-1)  # one-sided pvalue = Prob(mean>0)
p_value


1.7323259949349822e-10

Here p_value is very small approximatedly zero. 
Hence, we reject the hypothesis and conclude that average amount of crime per year during Daley's office is statistically higher than that during Emanuel's office.