In [3]:
!pip3 install shapely

Collecting shapely
  Downloading Shapely-1.6.1-cp35-cp35m-macosx_10_6_intel.whl (2.6MB)
[K    100% |████████████████████████████████| 2.6MB 43kB/s ta 0:00:017
[?25hInstalling collected packages: shapely
Successfully installed shapely-1.6.1


# Geospatial analysis 


- 14 million trip in NYC


## Goal Compute the utilization 

In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.rdd import portable_hash
from pyspark.statcounter import StatCounter
import shapely


import os
import json

from datetime import datetime
from operator import itemgetter
#from itertools import chain, imap
from shapely.geometry import shape, Point

from matplotlib import pyplot as plt

spark = SparkSession.builder.appName("Taxi")\
        .config("spark.driver.memory", "6g")\
        .config("spark.driver.cores", "8")\
        .getOrCreate()
sc=spark.sparkContext


In [2]:
from  pprint import pprint
def title(s):
    pprint("---- %s -----" %s)    
    
def see(s, v):
    pprint("---- %s -----" %s)
    pprint(v)

# 1. Sample the Data
- Save the sample to a file, because the sampling itself is time-consuming

In [5]:
taxiRawAll = sc.textFile("../sample.csv")
header = sc.parallelize(taxiRawAll.take(1))
taxiRaw = taxiRawAll.sample(withReplacement=False, fraction=0.0001 ,seed=17)
taxiRaw.coalesce(1) #Makes 1 file as an output, since it reduced the # of partitions to 1
#header.union(taxiRaw).saveAsTextFile("../data/ch08-geospatial/trip_data_sample.csv")

CoalescedRDD[18] at coalesce at NativeMethodAccessorImpl.java:-2

# 2. Read the sample Data
- Save the sample to a file, because the sampling itself is time-consuming

In [6]:
#read the sample
taxiRaw = sc.textFile("../sample.csv")

In [7]:
taxiRaw.take(3)

['medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude',
 '89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171',
 '0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.50,-74.006683,40.731781,-73.994499,40.75066']

In [8]:
df = spark.read.csv("../sample.csv", header=True)
taxi = df.toPandas()
# df.select("hack_license", "pickup_datetime", "pickup_longitude", "pickup_latitude", "dropoff_datetime", "dropoff_longitude", "dropoff_latitude").toPandas()
# from pyspark.sql import functions as F
# def toPoint(x,y):
#     return Point(float(x), float(x))
# df= df.withColumn("startT", F.to_timestamp(df.pickup_datetime))
# df= df.withColumn("endT", F.to_timestamp(df.dropoff_datetime))
# df.select("hack_license", "startT", "endT").limit(10).toPandas()

# 3. Parse the data handling Time and Geo  types
- We are mainly intersted in each Trip's:
    - Some Unique ID for the car (license)
    - Pick-up location
    - Pick-up time
    - Drop-off location
    - Drop-off time
- We use python datetime class to store dates
- We use shapely to store geo coordinates

In [9]:
def parse(fields):
    license = fields[1]
    pickupTime = datetime.strptime(fields[5], '%Y-%m-%d %H:%M:%S')
    dropoffTime = datetime.strptime(fields[6], '%Y-%m-%d %H:%M:%S')
    try:
        pickupLoc = Point(float(fields[10]), float(fields[11]))
        dropoffLoc = Point(float(fields[12]), float(fields[13]))
    except ValueError:
        pickupLoc = Point(0.0, 0.0)
        dropoffLoc= Point(0.0, 0.0)
    trip = {'pickupTime':pickupTime, 'dropoffTime':dropoffTime, 'pickupLoc':pickupLoc, 'dropoffLoc':dropoffLoc}
    return (license, trip)


taxiParsed = taxiRaw\
        .map(lambda line: line.split(','))\
        .filter(lambda fields: len(fields) == 14 and fields[0] != "medallion")\
        .map(parse)
taxiParsed.cache()

see("taxiParsed", taxiParsed.take(5))

'---- taxiParsed -----'
[('BA96DE419E711691B9445D6A6307C170',
  {'dropoffLoc': <shapely.geometry.point.Point object at 0x10e33d940>,
   'dropoffTime': datetime.datetime(2013, 1, 1, 15, 18, 10),
   'pickupLoc': <shapely.geometry.point.Point object at 0x10e33d9b0>,
   'pickupTime': datetime.datetime(2013, 1, 1, 15, 11, 48)}),
 ('9FD8F69F0804BDB5549F40E9DA1BE472',
  {'dropoffLoc': <shapely.geometry.point.Point object at 0x10e33d5c0>,
   'dropoffTime': datetime.datetime(2013, 1, 6, 0, 22, 54),
   'pickupLoc': <shapely.geometry.point.Point object at 0x10e33da20>,
   'pickupTime': datetime.datetime(2013, 1, 6, 0, 18, 35)}),
 ('9FD8F69F0804BDB5549F40E9DA1BE472',
  {'dropoffLoc': <shapely.geometry.point.Point object at 0x10e33da90>,
   'dropoffTime': datetime.datetime(2013, 1, 5, 18, 54, 23),
   'pickupLoc': <shapely.geometry.point.Point object at 0x10e33db00>,
   'pickupTime': datetime.datetime(2013, 1, 5, 18, 49, 41)}),
 ('51EE87E3205C985EF8431D850C786310',
  {'dropoffLoc': <shapely.geometry

# 4. Process Time: 
## 4.1 Explore the distribution of trip duration (hours)

In [10]:
def hours(trip):
    d= trip['dropoffTime'] - trip['pickupTime']
    return int( ((d.days)*24) + (d.seconds/3600))

hoursCount = taxiParsed.values().map(hours).countByValue().items()
sortedHoursCount = sorted(hoursCount, key=itemgetter(0), reverse=False)

for val in sortedHoursCount:
    print(val)    


(0, 99946)
(1, 50)
(2, 3)


## 4.2 Remove trips with -ve durations and longer than 3 hours

In [11]:
def goodHour(hrs):
    return 0 <= hrs and hrs < 3

taxiClean = taxiParsed.filter(lambda x: goodHour( hours(x[1]) )).cache()
taxiParsed.unpersist()

see("taxiClean", taxiClean.take(5))

'---- taxiClean -----'
[('BA96DE419E711691B9445D6A6307C170',
  {'dropoffLoc': <shapely.geometry.point.Point object at 0x10e33d9e8>,
   'dropoffTime': datetime.datetime(2013, 1, 1, 15, 18, 10),
   'pickupLoc': <shapely.geometry.point.Point object at 0x10e387a90>,
   'pickupTime': datetime.datetime(2013, 1, 1, 15, 11, 48)}),
 ('9FD8F69F0804BDB5549F40E9DA1BE472',
  {'dropoffLoc': <shapely.geometry.point.Point object at 0x10e3876a0>,
   'dropoffTime': datetime.datetime(2013, 1, 6, 0, 22, 54),
   'pickupLoc': <shapely.geometry.point.Point object at 0x10e387b00>,
   'pickupTime': datetime.datetime(2013, 1, 6, 0, 18, 35)}),
 ('9FD8F69F0804BDB5549F40E9DA1BE472',
  {'dropoffLoc': <shapely.geometry.point.Point object at 0x10e387b70>,
   'dropoffTime': datetime.datetime(2013, 1, 5, 18, 54, 23),
   'pickupLoc': <shapely.geometry.point.Point object at 0x10e387c18>,
   'pickupTime': datetime.datetime(2013, 1, 5, 18, 49, 41)}),
 ('51EE87E3205C985EF8431D850C786310',
  {'dropoffLoc': <shapely.geometry.

# 5.Process Geo data
## 5.1 Geet a list of NY Zones (boroughs)

In [17]:
with open('../work/boroughs.geojson', 'r') as f:
        geo = json.load(f)
features = geo['features']
for f in features:
    f["shape"] = shape(f['geometry'])

see("features", features[:3])

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.


## 5.2 Sort the zones by area and broadcast to executors

In [18]:
areaSortedFeatures = sorted(features, key=lambda f: (int(f['properties']["boroughCode"]), -f["shape"].area), reverse=False)
bFeatures = sc.broadcast(areaSortedFeatures)
see("areaSortedFeatures", areaSortedFeatures[:3])

KeyError: 'boroughCode'

## 5.3 Convert long/lat to  zone

In [19]:
def borough(trip):
    for f in bFeatures.value:
        #contains checks if the 
        if f['shape'].contains(trip["dropoffLoc"]):
            return str(f['properties']["borough"])
    return None

boroughCount = taxiClean.values().map(borough).countByValue().items()

see("boroughCount", list(boroughCount))

'---- boroughCount -----'
[('Manhattan', 88164),
 ('Queens', 5475),
 ('Brooklyn', 3595),
 (None, 2357),
 ('Bronx', 395),
 ('Staten Island', 13)]


## 5.4 Remove trips that started/ended in unknown zones

In [20]:
def hasZero(trip):
    zero = Point(0.0, 0.0)
    return (zero == trip["pickupLoc"] or zero == trip["dropoffLoc"])

taxiDone = taxiClean.filter(lambda x: not hasZero(x[1])).cache()

In [21]:
boroughCount = taxiDone.values().map(borough).countByValue().items()
see("boroughCount", list(boroughCount))

'---- boroughCount -----'
[('Manhattan', 88139),
 ('Queens', 5470),
 ('Brooklyn', 3594),
 (None, 456),
 ('Bronx', 395),
 ('Staten Island', 13)]


## 6 Sessionize trips
- Look at sorted list of trips for each driver
- Once the difference between any 2 consecurtive trips is longer than 4 hours, we consider that a new session has started

In [22]:
epoch = datetime.utcfromtimestamp(0)

def getMillis(time):
    return (time - epoch).total_seconds() * 1000.0

def partitioner(n):
    def partitioner_(x):
        return portable_hash(x[0]) % n
    return partitioner_


def groupSorted(it, splitFunc):
    cur={'lic': None, 'trips': []}
    def mapper(x):
        lic = x[0][0]
        trip = x[1]
        if(lic != cur['lic'] or splitFunc(cur['trips'][-1], trip)):
            result = (cur['lic'], cur['trips'])
            cur['lic'] = lic
            cur['trips'] = [trip]
            if(len(result[1]) == 0):
                return None
            else:
                return result
        else:
            cur['trips'].append(trip)
            return None
    m = list(map(mapper, it))
    m.append((cur['lic'], cur['trips']))
    return filter(lambda f: f is not None, m)

def groupByKeyAndSortValues(rdd, secondaryKeyFunc, splitFunc):
    presess = rdd.map(lambda x: ((x[0], secondaryKeyFunc(x[1])), x[1]) )
    numPartitions = presess.getNumPartitions()
    return presess.repartitionAndSortWithinPartitions(partitionFunc=partitioner(numPartitions))\
        .mapPartitions(lambda partition: groupSorted(partition, splitFunc))
        
def split(t1, t2):
    d = t2['pickupTime'] - t1['pickupTime']
    return ((d.days*24) + (d.seconds/3600)) >= 4

def secondaryKeyFunc(trip):
    return getMillis(trip["pickupTime"])

sessions = groupByKeyAndSortValues(taxiDone, secondaryKeyFunc, split).cache()

see("sesions",sessions.take(10))

'---- sesions -----'
[('001C8AAB90AEE49F36FCAA7B4136C81A',
  [{'dropoffLoc': <shapely.geometry.point.Point object at 0x1103a61d0>,
    'dropoffTime': datetime.datetime(2013, 1, 13, 2, 32),
    'pickupLoc': <shapely.geometry.point.Point object at 0x1103a6160>,
    'pickupTime': datetime.datetime(2013, 1, 13, 2, 22)},
   {'dropoffLoc': <shapely.geometry.point.Point object at 0x1103a62b0>,
    'dropoffTime': datetime.datetime(2013, 1, 13, 4, 41),
    'pickupLoc': <shapely.geometry.point.Point object at 0x1103a6240>,
    'pickupTime': datetime.datetime(2013, 1, 13, 4, 19)},
   {'dropoffLoc': <shapely.geometry.point.Point object at 0x1103a6390>,
    'dropoffTime': datetime.datetime(2013, 1, 13, 6, 30),
    'pickupLoc': <shapely.geometry.point.Point object at 0x1103a6320>,
    'pickupTime': datetime.datetime(2013, 1, 13, 6, 20)},
   {'dropoffLoc': <shapely.geometry.point.Point object at 0x1103a6470>,
    'dropoffTime': datetime.datetime(2013, 1, 13, 6, 44),
    'pickupLoc': <shapely.geometry

In [23]:
sessions.map(lambda x: (x[0], len(x[1]))).take(20)

[('001C8AAB90AEE49F36FCAA7B4136C81A', 4),
 ('0025133AD810DBE80D35FCA8BF0BCA1F', 2),
 ('00374328FBA75FBFCA7522671250F573', 1),
 ('00447A6197DBB329FBF764139ACA6EC4', 5),
 ('00567B1CBFD51DDFAC73359B09238922', 21),
 ('006114F940CB87B3ABDCE9BF6DF6FCC4', 25),
 ('00711D0CC3FB5BC905BB62D9B62296D6', 2),
 ('00711D0CC3FB5BC905BB62D9B62296D6', 3),
 ('007357E7FFE212879B9B85C7F4681AE5', 19),
 ('007439EEDB510EF8277C567BD200B08F', 2),
 ('0078BA33E03313B58559834168E7495F', 1),
 ('008BE4F3FF93935048998BB183D27263', 1),
 ('00B442110FA2D04A167DB0C9E7C73545', 15),
 ('00E40328ECC4327E912FE820506B38D7', 15),
 ('010149C4A7812863669DB7DBFC91ED60', 1),
 ('010149C4A7812863669DB7DBFC91ED60', 1),
 ('01060D63D29CE42C8CBBB5412B6FCE56', 2),
 ('010BDF76921D5A266B8F8AC7CDECE8F4', 26),
 ('011EB4B6E7DE7B08CDF1149D7FC7E4A2', 2),
 ('011EDCD6D636C9F1B0D9AD291AEECC37', 17)]

In [24]:
sessions.countByKey()

defaultdict(int,
            {'001C8AAB90AEE49F36FCAA7B4136C81A': 1,
             '0025133AD810DBE80D35FCA8BF0BCA1F': 1,
             '00374328FBA75FBFCA7522671250F573': 1,
             '00447A6197DBB329FBF764139ACA6EC4': 1,
             '00567B1CBFD51DDFAC73359B09238922': 1,
             '006114F940CB87B3ABDCE9BF6DF6FCC4': 1,
             '00711D0CC3FB5BC905BB62D9B62296D6': 2,
             '007357E7FFE212879B9B85C7F4681AE5': 1,
             '007439EEDB510EF8277C567BD200B08F': 1,
             '0078BA33E03313B58559834168E7495F': 1,
             '008BE4F3FF93935048998BB183D27263': 1,
             '00B442110FA2D04A167DB0C9E7C73545': 1,
             '00E40328ECC4327E912FE820506B38D7': 1,
             '010149C4A7812863669DB7DBFC91ED60': 2,
             '01060D63D29CE42C8CBBB5412B6FCE56': 1,
             '010BDF76921D5A266B8F8AC7CDECE8F4': 1,
             '011EB4B6E7DE7B08CDF1149D7FC7E4A2': 1,
             '011EDCD6D636C9F1B0D9AD291AEECC37': 1,
             '01202D837DD4454C771A2F4D6E9ABA17'

In [38]:

# def flatmap(f, items):
#     return chain.from_iterable(imap(f, items))

def sliding(num, l):
    return [l[i:i + num] for i in range(len(l) - (num-1))]

def boroughDuration(t1, t2):
    b = borough(t1)
    d = t2["pickupTime"] - t1["dropoffTime"]
    return (b, d)

def mapper(trips):
    iter = sliding(2, trips)
    viter = filter(lambda x: len(x) == 2, iter)
    return map(lambda p: boroughDuration(p[0], p[1]), viter)

boroughDurations = sessions.values().flatMap(mapper).cache()

boroughDurationsCount = boroughDurations.values().map(lambda d: int((d.days*24) + (d.seconds/3600))).countByValue().items()

sortedBoroughDurationsCount = sorted(boroughDurationsCount, key=itemgetter(1), reverse=True)

for val in sortedBoroughDurationsCount:
    print(val)

taxiDone.unpersist()

(0, 80875)
(1, 4566)
(2, 984)
(3, 200)


PythonRDD[18] at RDD at PythonRDD.scala:48

In [39]:
def stats(d):
    s = StatCounter()
    return s.merge(((d.days*24*3600) + d.seconds))

boroughCollected = boroughDurations\
    .filter(lambda x: ((x[1].days*24*3600) + x[1].seconds) >= 0)\
    .mapValues(stats)\
    .reduceByKey(lambda a, b: a.mergeStats(b))\
    .collect()

for val in boroughCollected:
    print(val)

boroughDurations.unpersist()

('Manhattan', (count: 79209, mean: 901.9571260841541, stdev: 1406.68763878, max: 14040.0, min: 0.0))
('Brooklyn', (count: 2592, mean: 2058.6458333333353, stdev: 2137.4312626, max: 12540.0, min: 0.0))
(None, (count: 358, mean: 2365.3072625698323, stdev: 2121.50357519, max: 12900.0, min: 0.0))
('Queens', (count: 4132, mean: 2641.884559535333, stdev: 2238.15972011, max: 13260.0, min: 0.0))
('Bronx', (count: 301, mean: 2196.4784053156145, stdev: 2009.55513755, max: 11280.0, min: 60.0))
('Staten Island', (count: 7, mean: 3531.4285714285716, stdev: 2020.60407213, max: 6300.0, min: 660.0))


PythonRDD[60] at RDD at PythonRDD.scala:48

In [1]:
rdd = sc.parallelize([("b", (2,3)), ("a", (1,2)), ("a", (2,7)), ("b", (4,2))],3)
rdd2 = rdd.map(lambda x: ((x[0], x[1][0]), x[1]))
rdd2.collect()



NameError: name 'sc' is not defined

In [36]:
rdd3 = rdd2.sortByKey()
rdd3.collect()

[(('a', 1), (1, 2)),
 (('a', 2), (2, 7)),
 (('b', 2), (2, 3)),
 (('b', 4), (4, 2))]

[(('b', 2), (2, 3)),
 (('b', 4), (4, 2)),
 (('a', 1), (1, 2)),
 (('a', 2), (2, 7))]