In [113]:
sys.executable

'/Users/mhendrickson/anaconda/bin/python'

In [208]:
sc

<pyspark.context.SparkContext at 0x107ad3510>

In [115]:
import geopandas as gpd
import pyproj
import shapely.geometry as geom
import csv
import datetime

In [116]:
proj = pyproj.Proj(init='epsg:2263', preserve_units=True)
COLLISIONS_FN = 'data/collisions_cleaned.csv'
JANUARY_FN = 'data/1january.csv'

In [117]:
collisions = sc.textFile(COLLISIONS_FN,use_unicode=False).cache()
january = sc.textFile(JANUARY_FN,use_unicode=False).cache()

In [118]:
list(enumerate(collisons.first().split(',')))


[(0, 'DATE'),
 (1, 'TIME'),
 (2, 'BOROUGH'),
 (3, 'ZIP CODE'),
 (4, 'LATITUDE'),
 (5, 'LONGITUDE'),
 (6, 'LOCATION'),
 (7, 'ON STREET NAME'),
 (8, 'CROSS STREET NAME'),
 (9, 'OFF STREET NAME'),
 (10, 'NUMBER OF PERSONS INJURED'),
 (11, 'NUMBER OF PERSONS KILLED'),
 (12, 'NUMBER OF PEDESTRIANS INJURED'),
 (13, 'NUMBER OF PEDESTRIANS KILLED'),
 (14, 'NUMBER OF CYCLIST INJURED'),
 (15, 'NUMBER OF CYCLIST KILLED'),
 (16, 'NUMBER OF MOTORIST INJURED'),
 (17, 'NUMBER OF MOTORIST KILLED'),
 (18, 'CONTRIBUTING FACTOR VEHICLE 1'),
 (19, 'CONTRIBUTING FACTOR VEHICLE 2'),
 (20, 'CONTRIBUTING FACTOR VEHICLE 3'),
 (21, 'CONTRIBUTING FACTOR VEHICLE 4'),
 (22, 'CONTRIBUTING FACTOR VEHICLE 5'),
 (23, 'UNIQUE KEY'),
 (24, 'VEHICLE TYPE CODE 1'),
 (25, 'VEHICLE TYPE CODE 2'),
 (26, 'VEHICLE TYPE CODE 3'),
 (27, 'VEHICLE TYPE CODE 4'),
 (28, 'VEHICLE TYPE CODE 5'),
 (29, 'Counts'),
 (30, 'Collision Rate'),
 (31, 'LAT'),
 (32, 'LONG')]

# Injury Totals by Year

In [119]:
def extractInjuries(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
        if row[10]!= '0':
            injury_count = int(row[10])
            year = row[0].split('/')[2]            
            yield (year,injury_count)

injuryRecords = collisions.mapPartitionsWithIndex(extractInjuries).reduceByKey(lambda x, y: x + y)
injuryRecords.take(20)

[('2017', 10322),
 ('2016', 36552),
 ('2013', 46939),
 ('2015', 43263),
 ('2012', 23564),
 ('2014', 43312)]

# Death Totals by Year

In [120]:
def extractDeaths(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
        if row[11]!= '0':
            death_count = int(row[11])
            year = row[0].split('/')[2]            
            yield (year,death_count)

deathRecords = collisions.mapPartitionsWithIndex(extractDeaths).reduceByKey(lambda x, y: x + y)
deathRecords.take(20)

[('2017', 33),
 ('2016', 128),
 ('2013', 243),
 ('2015', 201),
 ('2012', 122),
 ('2014', 215)]

# Contributing Factors

In [121]:
def accidentFactors(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
        if row[18]!= 'Unspecified' and row[18] != '':
            cause = row[18]
            year = row[0].split('/')[2]            
            yield ((cause,year),1)

accidentFactors = collisions.mapPartitionsWithIndex(accidentFactors).reduceByKey(lambda x,y: x+y) #.max(lambda x:x[1])
sorted(accidentFactors.collect())

[(('Accelerator Defective', '2012'), 45),
 (('Accelerator Defective', '2013'), 64),
 (('Accelerator Defective', '2014'), 77),
 (('Accelerator Defective', '2015'), 66),
 (('Accelerator Defective', '2016'), 65),
 (('Accelerator Defective', '2017'), 23),
 (('Aggressive Driving/Road Rage', '2012'), 304),
 (('Aggressive Driving/Road Rage', '2013'), 463),
 (('Aggressive Driving/Road Rage', '2014'), 505),
 (('Aggressive Driving/Road Rage', '2015'), 689),
 (('Aggressive Driving/Road Rage', '2016'), 383),
 (('Aggressive Driving/Road Rage', '2017'), 189),
 (('Alcohol Involvement', '2012'), 630),
 (('Alcohol Involvement', '2013'), 1273),
 (('Alcohol Involvement', '2014'), 1490),
 (('Alcohol Involvement', '2015'), 1530),
 (('Alcohol Involvement', '2016'), 1029),
 (('Alcohol Involvement', '2017'), 468),
 (('Animals Action', '2012'), 34),
 (('Animals Action', '2013'), 69),
 (('Animals Action', '2014'), 83),
 (('Animals Action', '2015'), 88),
 (('Animals Action', '2016'), 111),
 (('Animals Action', '

# Accidents Involving Cyclists

In [158]:
def extractCyclists(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
        if row[14]!= '' and row[14] != '0':
#             location = row[6]
            lat = row[6].split(',')[0][1:9] + '' , '' + row[6].split(',')[1][0:9]
#             lng = row[6].split(',')[1][0:9]
            injured = int(row[14]) + int(row[15])    # includes injury and death         
            yield (lat),injured

cyclistAccidents = collisions.mapPartitionsWithIndex(extractCyclists).reduceByKey(lambda x,y: x+y) #.max(lambda x:x[1])
cyclistAccidents.collect()

[(('40.68280', '-73.96071'), 1),
 (('40.87617', '-73.84902'), 1),
 (('40.71253', '-74.00766'), 2),
 (('40.71632', '-73.94806'), 1),
 (('40.78544', '-73.77867'), 1),
 (('40.68340', '-73.88072'), 1),
 (('40.61114', '-73.74849'), 1),
 (('40.59307', '-73.94726'), 1),
 (('40.76981', '-73.90176'), 1),
 (('40.75837', '-73.87904'), 1),
 (('40.68427', '-73.77501'), 1),
 (('40.76698', '-73.91689'), 4),
 (('40.74039', '-73.97907'), 1),
 (('40.67874', '-73.79399'), 1),
 (('40.69214', '-73.91946'), 2),
 (('40.70703', '-73.83165'), 1),
 (('40.81909', '-73.94092'), 2),
 (('40.53979', '-74.14812'), 1),
 (('40.67065', '-73.95797'), 2),
 (('40.72146', '-73.98852'), 2),
 (('40.69854', '-73.94115'), 1),
 (('40.59085', '-73.96744'), 1),
 (('40.63215', '-74.02750'), 2),
 (('40.64651', '-73.95810'), 1),
 (('40.67815', '-73.94415'), 1),
 (('40.69558', '-73.92076'), 3),
 (('40.68142', '-73.83366'), 1),
 (('40.78851', '-73.97696'), 1),
 (('40.67342', '-73.95016'), 4),
 (('40.59238', '-73.78004'), 1),
 (('40.808

# Taxi Trips by Location

In [131]:
list(enumerate(january.first().split(',')))

[(0, 'pickup_datetime'),
 (1, 'pickup_latitude'),
 (2, 'pickup_longitude'),
 (3, 'trip_distance'),
 (4, 'passenger_count'),
 (5, 'dropoff_datetime'),
 (6, 'dropoff_latitude'),
 (7, 'dropoff_longitude'),
 (8, 'fare_amount'),
 (9, 'tolls_amount'),
 (10, 'taxes_amount'),
 (11, 'tip_amount'),
 (12, 'payment_amount'),
 (13, 'payment_type')]

In [197]:
taxiTrips.count()

10482803

### Location, (taxi trips, injuries)

In [180]:
sorted(taxiTrips.join(cyclistAccidents).take(20))

[(('40.70309', '-73.98664'), (1, 1)),
 (('40.70562', '-74.00676'), (1, 1)),
 (('40.71653', '-74.00667'), (4, 1)),
 (('40.71885', '-74.00484'), (1, 2)),
 (('40.72746', '-73.97947'), (10, 3)),
 (('40.73026', '-73.95347'), (1, 1)),
 (('40.73109', '-73.98278'), (5, 2)),
 (('40.74706', '-73.97923'), (8, 2)),
 (('40.74843', '-73.99619'), (2, 2)),
 (('40.75587', '-73.97282'), (10, 2)),
 (('40.75714', '-73.99356'), (4, 3)),
 (('40.75736', '-73.98409'), (3, 1)),
 (('40.75957', '-73.96802'), (3, 3)),
 (('40.75973', '-73.99168'), (4, 5)),
 (('40.76649', '-73.95695'), (9, 2)),
 (('40.76689', '-73.92139'), (1, 4)),
 (('40.77169', '-73.95918'), (7, 1)),
 (('40.78423', '-73.94706'), (2, 3)),
 (('40.79099', '-73.96517'), (1, 2)),
 (('40.79545', '-73.96562'), (2, 5))]

### Location, (injuries, taxitrips)

In [181]:
sorted(cyclistAccidents.join(taxiTrips).take(20))

[(('40.70309', '-73.98664'), (1, 1)),
 (('40.70562', '-74.00676'), (1, 1)),
 (('40.71653', '-74.00667'), (1, 4)),
 (('40.71885', '-74.00484'), (2, 1)),
 (('40.72746', '-73.97947'), (3, 10)),
 (('40.73026', '-73.95347'), (1, 1)),
 (('40.73109', '-73.98278'), (2, 5)),
 (('40.74152', '-73.99379'), (1, 9)),
 (('40.74706', '-73.97923'), (2, 8)),
 (('40.74843', '-73.99619'), (2, 2)),
 (('40.75714', '-73.99356'), (3, 4)),
 (('40.75736', '-73.98409'), (1, 3)),
 (('40.75957', '-73.96802'), (3, 3)),
 (('40.75973', '-73.99168'), (5, 4)),
 (('40.76274', '-73.99691'), (2, 1)),
 (('40.76649', '-73.95695'), (2, 9)),
 (('40.76689', '-73.92139'), (4, 1)),
 (('40.77169', '-73.95918'), (1, 7)),
 (('40.78423', '-73.94706'), (3, 2)),
 (('40.79545', '-73.96562'), (5, 2))]

In [182]:
cyclistAccidents.join(taxiTrips).count()

1306

### The following is hardly statistically significant or accurate as we're only dividing the number of cyclists injured by the number of cabs at that intersection in one month

In [201]:
sorted(taxiTrips.join(cyclistAccidents).mapValues(lambda x: float(x[1])/float(x[0])).take(20))
#.mapValues(lambda x: x[0]/x[1])

[(('40.70309', '-73.98664'), 1.0),
 (('40.70562', '-74.00676'), 1.0),
 (('40.71653', '-74.00667'), 0.25),
 (('40.71885', '-74.00484'), 2.0),
 (('40.72746', '-73.97947'), 0.3),
 (('40.73026', '-73.95347'), 1.0),
 (('40.73109', '-73.98278'), 0.4),
 (('40.74706', '-73.97923'), 0.25),
 (('40.74843', '-73.99619'), 1.0),
 (('40.75587', '-73.97282'), 0.2),
 (('40.75714', '-73.99356'), 0.75),
 (('40.75736', '-73.98409'), 0.3333333333333333),
 (('40.75957', '-73.96802'), 1.0),
 (('40.75973', '-73.99168'), 1.25),
 (('40.76649', '-73.95695'), 0.2222222222222222),
 (('40.76689', '-73.92139'), 4.0),
 (('40.77169', '-73.95918'), 0.14285714285714285),
 (('40.78423', '-73.94706'), 1.5),
 (('40.79099', '-73.96517'), 2.0),
 (('40.79545', '-73.96562'), 2.5)]

In [361]:
bike_taxi = taxiTrips.join(cyclistAccidents) #.collect() #.mapValues(lambda x: float(x[1])/float(x[0]))

In [206]:
with open('bike_taxi.csv', 'wb') as myfile:
    wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
    for i in range (0,len(bike_taxi)):
        wr.writerow(bike_taxi[i])

In [209]:
allmonths = ('data/bike_taxi_full.csv')
allmonths = sc.textFile(allmonths,use_unicode=False).cache()

In [211]:
list(enumerate(allmonths.first().split(',')))

[(0, '"LATITUDE AND LONGITUDE"'), (1, '"TAXITRIP WITH ACCIDENTS"')]

In [352]:
def extractAll(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
        if row[1][1] and row[1][4] != ' ':
            location = row[0].split("'")[1] +" , "+ row[0].split("'")[3]
            taxi_count = int(row[1][1])
            bike_injury = int(row[1][4])
            yield ((location, bike_injury), taxi_count)

allRecords = allmonths.mapPartitionsWithIndex(extractAll).reduceByKey(lambda x,y: x+y)
allRecords.take(10)
# allRecords.mapValues(lambda x: x*100).collect()
#mylist[1200:]

[(('40.69339 , -73.92511', 1), 1),
 (('40.73643 , -74.00663', 2), 9),
 (('40.75827 , -73.93335', 1), 5),
 (('40.72364 , -73.98528', 5), 26),
 (('40.68417 , -73.99219', 1), 5),
 (('40.80202 , -73.94968', 5), 4),
 (('40.75303 , -73.94061', 1), 2),
 (('40.72957 , -73.99326', 2), 15),
 (('40.77207 , -73.94993', 2), 18),
 (('40.73101 , -73.98591', 7), 10)]

In [358]:
february = 'data/2february.csv'
february = sc.textFile(february,use_unicode=False).cache()

In [359]:
list(enumerate(february_taxi.first().split(',')))

[(0, 'pickup_datetime'),
 (1, 'pickup_latitude'),
 (2, 'pickup_longitude'),
 (3, 'trip_distance'),
 (4, 'passenger_count'),
 (5, 'dropoff_datetime'),
 (6, 'dropoff_latitude'),
 (7, 'dropoff_longitude'),
 (8, 'fare_amount'),
 (9, 'tolls_amount'),
 (10, 'taxes_amount'),
 (11, 'tip_amount'),
 (12, 'payment_amount'),
 (13, 'payment_type')]

In [196]:
def extractTrips(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
#         if row[1] and row[2] !='0.0' and row[6] and row [7] != '0.0':
        pickup = row[1],row[2]
        dropoff = row[6],row[7]       
        yield (pickup,1)
        yield (dropoff,1)

taxiTrips = january.mapPartitionsWithIndex(extractTrips) #.reduceByKey(lambda x,y: x+y) #.max(lambda x:x[1])
taxiTrips.take(20)


[(('40.75041', '-73.98632'), 2),
 (('40.77246', '-73.94674'), 4),
 (('40.76126', '-73.81888'), 1),
 (('40.80658', '-73.91396'), 1),
 (('40.73625', '-74.00101'), 1),
 (('40.75901', '-73.99017'), 3),
 (('40.70985', '-74.01522'), 3),
 (('40.71459', '-74.00237'), 2),
 (('40.77659', '-73.98288'), 1),
 (('40.75967', '-73.98653'), 1),
 (('40.78306', '-73.97391'), 2),
 (('40.76025', '-73.98076'), 1),
 (('40.7531', '-73.97783'), 3),
 (('40.76011', '-73.96691'), 1),
 (('40.7339', '-73.99102'), 4),
 (('40.75432', '-73.97818'), 3),
 (('40.74292', '-74.00025'), 4),
 (('40.76564', '-73.9755'), 1),
 (('40.73109', '-73.98609'), 4),
 (('40.67198', '-73.98664'), 1)]

In [360]:
def extractTrips(partitionId,partition):
    if partitionId == 0:
        partition.next()
    import csv
    reader = csv.reader(partition)
    for row in reader:
#         if row[1] and row[2] !='0.0' and row[6] and row [7] != '0.0':
        pickup = row[1],row[2]
        dropoff = row[6],row[7]       
        yield (pickup,1)
        yield (dropoff,1)

febTrips = february.mapPartitionsWithIndex(extractTrips) #.reduceByKey(lambda x,y: x+y) #.max(lambda x:x[1])
febTrips.take(20)

[(('40.80687', '-73.96497'), 3),
 (('40.76021', '-73.97854'), 1),
 (('40.8142', '-73.91927'), 1),
 (('40.75415', '-73.98847'), 3),
 (('40.75283', '-73.96607'), 1),
 (('40.75026', '-73.9694'), 1),
 (('40.78316', '-73.95266'), 1),
 (('40.75793', '-73.97179'), 1),
 (('40.7782', '-73.96241'), 1),
 (('40.76013', '-73.99418'), 1),
 (('40.71463', '-73.98412'), 1),
 (('40.7282', '-74.00306'), 1),
 (('40.77359', '-73.9643'), 1),
 (('40.66908', '-73.97983'), 1),
 (('40.77972', '-73.98724'), 1),
 (('40.7034', '-73.99378'), 1),
 (('40.7888', '-73.95519'), 1),
 (('40.76076', '-73.96429'), 1),
 (('40.7202', '-74.00984'), 2),
 (('40.79331', '-73.97079'), 1)]

In [374]:
jan_feb = febTrips.join(taxiTrips).reduceByKey(lambda x,y: x+y).mapValues(lambda x: x[0]+x[1])

In [377]:
marchTrips = 'data/3march.csv'
aprilTrips = 'data/4april.csv'
mayTrips = 'data/5may.csv'
juneTrips = 'data/6june.csv'

marchTrips = sc.textFile(marchTrips,use_unicode=False).cache()
aprilTrips = sc.textFile(aprilTrips,use_unicode=False).cache()
mayTrips = sc.textFile(mayTrips,use_unicode=False).cache()
juneTrips = sc.textFile(juneTrips,use_unicode=False).cache()

In [379]:
janTrips = january.mapPartitionsWithIndex(extractTrips)
febTrips = february.mapPartitionsWithIndex(extractTrips)
marchTrips = marchTrips.mapPartitionsWithIndex(extractTrips)
aprilTrips = aprilTrips.mapPartitionsWithIndex(extractTrips)
mayTrips = mayTrips.mapPartitionsWithIndex(extractTrips)
juneTrips = juneTrips.mapPartitionsWithIndex(extractTrips)

jan_feb = febTrips.join(taxiTrips).reduceByKey(lambda x,y: x+y).mapValues(lambda x: x[0]+x[1])
jan_march = marchTrips.join(jan_feb).reduceByKey(lambda x,y: x+y).mapValues(lambda x: x[0]+x[1])
jan_april = aprilTrips.join(jan_march).reduceByKey(lambda x,y: x+y).mapValues(lambda x: x[0]+x[1])
jan_may = mayTrips.join(jan_april).reduceByKey(lambda x,y: x+y).mapValues(lambda x: x[0]+x[1])
jan_june = juneTrips.join(jan_may).reduceByKey(lambda x,y: x+y).mapValues(lambda x: x[0]+x[1])

In [None]:
jan_june.take(15)