# Data Prep for Destination Visualization & Prediction

In [25]:
import numpy as np
import scipy as sp
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql.types import *

import geohash

from datetime import *
from dateutil.parser import parse

pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
pd.set_option('display.notebook_repr_html', True)

sc

<pyspark.context.SparkContext at 0x7fca6913cc10>

In [26]:
#File needed for geohash routines
sc.addPyFile("geohash.py")

## 1. Parse the CSV and construct RDDs

In [38]:
"""
    2: pickup dattime
    6: pickup long
    7: pickup lat
    10: dropoff long
    11: dropoff lat
"""
def yCabParse(strRecord):    
    return (parse(strRecord[1]), float(strRecord[5]), float(strRecord[6]), float(strRecord[7]), float(strRecord[8]))

yCabRDD = sc.textFile("./data/nyc/green_tripdata_2013-08.csv").map(lambda line: tuple(line.split(',')))
yCabRDD = yCabRDD.map(lambda record: yCabParse(record))

In [39]:
combinedRDD = yCabRDD
combinedRDD.count()
combinedRDD.collect()

[(datetime.datetime(2013, 8, 1, 8, 14, 37), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 9, 13), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 9, 48), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 10, 38, 35), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 11, 51, 45), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 14, 33, 39), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 17, 19), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 17, 22),
  -73.9377670288086,
  40.758480072021484,
  -73.9377670288086,
  40.758480072021484),
 (datetime.datetime(2013, 8, 1, 17, 24),
  -73.93792724609375,
  40.757843017578125,
  -73.93792724609375,
  40.757843017578125),
 (datetime.datetime(2013, 8, 1, 19, 21, 9), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 19, 29, 27), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 1, 19, 33, 28), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 2, 9, 37, 44), 0.0, 0.0, 0.0, 0.0),
 (datetime.datetime(2013, 8, 2, 9

In [40]:
"""
    2: pickup dattime
    6: pickup long
    7: pickup lat
    8: dropoff long
    9: dropoff lat
"""
def prepData(record, onlyLocationAgg = False):
    
    geohashAccuracy = 6
    minsPerBin = 48
    
    pickupDatetime = record[0] 
    pickupLong = record[1]
    pickupLat = record[2]
    dropOffLong = record[3]
    dropOffLat = record[4]
    
    if pickupLat < 50 and pickupLat > 35 and pickupLong < -50 and pickupLong > -80:
        pickupGeohash = geohash.encode(pickupLat,pickupLong, geohashAccuracy)
    else:
        return None
    
    if dropOffLat < 50 and dropOffLat > 35 and dropOffLong < -50 and dropOffLong > -80:
        dropOffGeohash = geohash.encode(dropOffLat,dropOffLong, geohashAccuracy)
    else:
        return None
    
    
    #time_cat
    d = pickupDatetime
    
    totalMinsPerDay = 1440
    totalBins = totalMinsPerDay/minsPerBin
    
    elapsMins = (d.hour)*60 + d.minute
    #minsPerBin = totalMinsPerDay/totalBins
    currentBin = elapsMins/minsPerBin
    binnedHour = d.hour #elapsMins/60
    binnedMin = (currentBin * minsPerBin)- (binnedHour * 60)
    
    binStr = ""
    
    if (binnedHour/10>0):
        binStr = str(binnedHour)
    else:
        binStr = "0"+str(binnedHour)
    
    binStr = binStr + ":"
    
    if (binnedMin/10>0):
        binStr = binStr + str(binnedMin)
    else:
        binStr = binStr + "0"+str(binnedMin)
    
    time_num = (binnedHour*60 + binnedMin + minsPerBin / 2.0)/(60*24)  
    
    #day of week
    
    
    #weekend
    if d.weekday() in [5,6]:
        weekend = 1
    else:
        weekend = 0
    
    if onlyLocationAgg is False:
        return ((pickupGeohash, dropOffGeohash,time_num,d.weekday()),1)
    else:
        return ((pickupGeohash, dropOffGeohash, weekend),1)

## 2. Create aggregations across time & locations

#### The output of this is used in the predictions notebook. Here we aggregate the data by pickup location, drop off location day of week & time slot.

In [44]:
combinedCleanRDD = combinedRDD.map(lambda record: prepData(record)).filter(lambda a: a is not None)

In [45]:
groupedRDD = combinedCleanRDD.reduceByKey(lambda a,b: a+b)

In [46]:
groupedRDD.cache()

PythonRDD[74] at RDD at PythonRDD.scala:48

In [47]:
def toCSVLine(record):
    data = [record[0][0], record[0][1], record[0][2], record[0][3], record[1]]
    return ','.join(str(d) for d in data)

csvRDD = groupedRDD.map(toCSVLine)
csvRDD.repartition(1).saveAsTextFile('./data/jason_destination1')

## 3. Aggregations only by locations

#### Here we aggregate all the data by only pickup & drop off locations

In [48]:
combinedCleanRDD = combinedRDD.map(lambda record: prepData(record, True)).filter(lambda a: a is not None)

In [49]:
groupedRDD = combinedCleanRDD.reduceByKey(lambda a,b: a+b)

In [50]:
groupedRDD.cache()
%time groupedRDD.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.11 s


4702

In [51]:
groupedRDD.take(2)

[(('dr5rtp', 'dr5ru5', 1), 1), (('dr72rh', 'dr72xs', 0), 1)]

In [52]:
#((pickupGeohash, dropOffGeohash, weekend),1)
def toCSVLineOnlyLocnAgg(record):
    data = [record[0][0], record[0][1], record[0][2], record[1]]
    return ','.join(str(d) for d in data)

csvRDD = groupedRDD.map(toCSVLineOnlyLocnAgg)
csvRDD.repartition(1).saveAsTextFile('./data/jason_destination2')

In [53]:
sc.textFile("./data/jason_destination2/p*").map(lambda line: line).take(5)

[u'dr5rtp,dr5ru5,1,1',
 u'dr72rh,dr72xs,0,1',
 u'dr72mc,dr72jw,0,1',
 u'dr5ryj,dr5rvc,1,1',
 u'dr5ryh,dr5rue,0,1']

In [None]:
# Need to fix the CSV file: lat/long needs to be included instead of geohash
names = ["pickup_geohash","dropoff_geohash","weekend", "count"]
df=pd.read_csv("./tmplocaldata/final/groupbydestn_only_locn/singlefile/part-00000", header=None, names = names)

def decodegeo(geo, which):
    if len(geo) >= 6:
        geodecoded = geohash.decode(geo)
        return geodecoded[which]
    else:
        return 0
    
def further_data_prep(df):
    df['pickup_lat'] = df['pickup_geohash'].apply(lambda geo: decodegeo(geo, 0))
    df['pickup_long'] = df['pickup_geohash'].apply(lambda geo: decodegeo(geo, 1))
    df['dropoff_lat'] = df['dropoff_geohash'].apply(lambda geo: decodegeo(geo, 0))
    df['dropoff_long'] = df['dropoff_geohash'].apply(lambda geo: decodegeo(geo, 1))    
    return df

df = further_data_prep(df)
df.drop('pickup_geohash', axis=1, inplace=True)
df.drop('dropoff_geohash', axis=1, inplace=True)
df = df[["pickup_lat","pickup_long", "dropoff_lat","dropoff_long","weekend","count"]]
df.to_csv("pickup_dropoff_aggregated.csv", index=False)

#### Tableau Visualization from the above CSV is avialable [here] (https://public.tableau.com/profile/publish/pickup-destination-coupling/Dashboard1#!/publish-confirm)

In [8]:
%%html
<script type='text/javascript' src='https://public.tableau.com/javascripts/api/viz_v1.js'></script><div class='tableauPlaceholder' style='width: 804px; height: 519px;'><noscript><a href='#'><img alt='Where do people go from where? ' src='https:&#47;&#47;public.tableau.com&#47;static&#47;images&#47;pi&#47;pickup-destination-coupling&#47;Dashboard1&#47;1_rss.png' style='border: none' /></a></noscript><object class='tableauViz' width='804' height='519' style='display:none;'><param name='host_url' value='https%3A%2F%2Fpublic.tableau.com%2F' /> <param name='site_root' value='' /><param name='name' value='pickup-destination-coupling&#47;Dashboard1' /><param name='tabs' value='no' /><param name='toolbar' value='yes' /><param name='static_image' value='https:&#47;&#47;public.tableau.com&#47;static&#47;images&#47;pi&#47;pickup-destination-coupling&#47;Dashboard1&#47;1.png' /> <param name='animate_transition' value='yes' /><param name='display_static_image' value='yes' /><param name='display_spinner' value='yes' /><param name='display_overlay' value='yes' /><param name='display_count' value='yes' /><param name='showVizHome' value='no' /><param name='showTabs' value='y' /><param name='bootstrapWhenNotified' value='true' /></object></div>

# Appendix

## Machine learning sandbox (for reference only - pls ignore)

In [None]:

from pyspark.mllib.tree import RandomForest, RandomForestModel


In [None]:
sc.textFile("s3://testsetu/nyc/final/groupbydestn/singlefile/pa*").map(lambda line: line).take(5)

In [None]:
"""
pickupGeohash, dropOffGeohash,time_num,day_of_week, count
"""
def groupedParse(strRecord):    
    return (strRecord[0], strRecord[1], float(strRecord[2]), int(strRecord[3]), int(strRecord[4]))

gpRDD = sc.textFile("s3://testsetu/nyc/final/groupbydestn/singlefile/pa*").map(lambda line: tuple(line.split(','))).map(lambda x: groupedParse(x))

In [None]:
gpRDD.cache()
gpRDD.count()
dropoffGeohashes = gpRDD.map(lambda x: x[1]).distinct().collect()
dictLength = len(dropoffGeohashes)
dropoffGeohashDict = {}
i = 0
for gh in dropoffGeohashes:
    dropoffGeohashDict[gh] = i
    i = i +1

broadcastGH = sc.broadcast(dropoffGeohashDict)

In [None]:
from pyspark.mllib.regression import LabeledPoint
import math
#Create features as labeledpoint

"""
0:pickupGeohash
1:dropOffGeohash
2:time_num
3:day_of_week
4: count
"""
def extractFeaturesforML(record):
    #np.array([1.0, 0.0, 3.0])
    
    count = record[4]
    timeCos = math.cos(record[2] * 2 * math.pi)
    timeSin = math.sin(record[2] * 2 * math.pi)
    
    #dayCos = math.cos(key[3] * 2 * math.pi)
    #daySin = math.sin(key[3] * 2 * math.pi)
    
    pickupLatLong = geohash.decode(record[0])
    pickupLat=pickupLatLong[0]
    pickupLong=pickupLatLong[1]    
    
    features_ = np.array([record[2], record[3], count, timeCos, timeSin, pickupLat, pickupLong])
    
    return LabeledPoint(broadcastGH.value[record[1]], features_)

gpRDD.map(lambda x: extractFeaturesforML(x)).take(2)
featuresLP = gpRDD.map(lambda x: extractFeaturesforML(x)).cache()
%time featuresLP.count()

In [None]:
# Sandbox code: not optimal
(trainingData, testData) = featuresLP.randomSplit([0.8, 0.2])

model = RandomForest.trainClassifier(trainingData, numClasses=dictLength, categoricalFeaturesInfo={},
                                     numTrees=10, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=10, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
#print(model.toDebugString())
