-
Notifications
You must be signed in to change notification settings - Fork 0
/
BDM_HW7_ch3019.py
105 lines (83 loc) · 3.78 KB
/
BDM_HW7_ch3019.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from pyspark import SparkContext
from datetime import datetime as dt
def distance(origin, destination):
'''
Compute distance between two points given their latitude and longitude.
Original code from: https://gist.github.com/rochacbruno/2883505
'''
import math
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # earth radius in km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c # disatnce in km
d = d * 0.621371 # distance in mile
return d
def filter_yellow(partId, list_of_records):
'''
Filter yellow taxi trips not within 0.25 miles of the station "Greenwich Ave & 8 Ave"
or at "February 1st 2015"
Then yield the potential yellow taxi trip dropoff time.
'''
import csv
sla = 40.73901691 # latitude of the station
slon = -74.00263761 # longitude of the station
if partId == 0:
list_of_records.next() # skipping the first line
reader = csv.reader(list_of_records)
for row in reader:
day_time = row[1].split('.')[0] # uniform the time format to yyyy-mm-dd hh:mm:ss
if day_time.split(' ')[0] == '2015-02-01': # filter trips not at "February 1st 2015"
# some trips miss information of dropoff location
try:
yla = float(row[4])
ylon = float(row[5])
except ValueError:
continue
if distance((sla, slon), (yla, ylon)) <= 0.25: # Filter trips not within 0.25 miles
yield (day_time)
def filter_citibike(partId, list_of_records):
'''
Filter citibike trips not start at the station "Greenwich Ave & 8 Ave"
or at "February 1st 2015"
Then yield the potential citibike trip start time.
'''
import csv
if partId == 0:
list_of_records.next() # skipping the first line
reader = csv.reader(list_of_records)
for row in reader:
day_time = row[3].split('+')[0] # uniform the time format to yyyy-mm-dd hh:mm:ss
## Filter trips not start at the station or not at "February 1st 2015"
if row[6] == 'Greenwich Ave & 8 Ave' and day_time.split(' ')[0] == '2015-02-01':
yield (day_time)
def filter_pair(list_of_records):
'''
Keep records of the citibike trip starting after the taxi trip ended
and no more than 10 minutes after that.
'''
for pair in list_of_records:
(yt, ct) = pair
if ct > yt: # citibike trip start after taxi trip end
if (ct - yt).seconds <= 600: # with in 10 mins/600 seconds
yield (1)
if __name__ == '__main__':
sc = SparkContext()
# Read the data from HDFS /tmp folder
yellow = sc.textFile('/tmp/yellow.csv.gz')
citibike = sc.textFile('/tmp/citibike.csv')
# Get potential yellow taxi trips' dropoff time and convert to datetime
yellowtrips = yellow.mapPartitionsWithIndex(filter_yellow) \
.map(lambda x: dt.strptime(x, '%Y-%m-%d %H:%M:%S'))
# Get potential citibike trips' dropoff time and convert to datetime
citibiketrips = citibike.mapPartitionsWithIndex(filter_citibike) \
.map(lambda x: dt.strptime(x, '%Y-%m-%d %H:%M:%S'))
# Generate all pairs and count the matched pairs
S = yellowtrips.cartesian(citibiketrips).mapPartitions(filter_pair).count()
## print 10 blank line to find result at the dumbo output window
#print '\n'*10
print S