In [1]:
import numpy as np
import scipy as sp
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql.types import *

import TaxiSparkSchema
import geohash

from datetime import *
from dateutil.parser import parse

pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
pd.set_option('display.notebook_repr_html', True)

sc

<pyspark.context.SparkContext at 0x7f3e03a67e10>

## 0. Prerequsites

1. Setup the spark cluster
2. Upload (a) this notebook (b) TaxiSparkSchema.py (c) geohash.py
3. When you start pyspark, use the following command instead. This is required for loading the spark-csv framework used by data frames. This library is used in the last section for sanity checking.
>pyspark --packages com.databricks:spark-csv_2.10:1.3.0

## 1. Setup the schema and load required libraries

In [2]:
yCabSchema = TaxiSparkSchema.getYellowCabSchema()
gCabSchema = TaxiSparkSchema.getGreenCabSchema()

# of columns:  20
# of columns:  22


In [3]:
sc.addPyFile("TaxiSparkSchema.py")
sc.addPyFile("geohash.py")

In [4]:
sqlContext = SQLContext(sc)
yCabDF = sqlContext.read.format('com.databricks.spark.csv').options( mode="PERMISSIVE", header='false').load('s3://testsetu/nyc/final/yellow/consolidated/part*', schema = yCabSchema)
gCabDF = sqlContext.read.format('com.databricks.spark.csv').options( mode="PERMISSIVE", header='false').load('s3://testsetu/nyc/final/green/consolidated/pa*', schema = gCabSchema)
#gCabDF = sqlContext.read.format('com.databricks.spark.csv').options( mode="PERMISSIVE", header='false').load('s3://testsetu/nyc/final/green/consolidated/pa*', schema = gCabSchema)

In [5]:
#select fields avaialble in both
gCabMinimum = gCabDF.select("cab_company","vendor_id","pickup_datetime","dropoff_datetime","passenger_count", "trip_distance","pickup_longitude","pickup_latitude","rate_code_id","store_and_fwd_flag", "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount")
combinedDF = yCabDF.unionAll(gCabMinimum)

## 2. Some basic cleanup

There seems to be 325 records in the Yellow Cab data with **incorrect latitud**. the below code filters them out.

In [6]:
yCabDF = yCabDF.filter((yCabDF.pickup_latitude >-90.0) & (yCabDF.pickup_latitude < 90.0))

In [22]:
totalRecordsYellowCab = 407403053
print "# dirty latitude records: ", totalRecordsYellowCab - yFil.count()

# dirty latitude recoerds:  325


## 3. Checking out the data structure

In [64]:
combinedDF.printSchema()

root
 |-- cab_company: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- rate_code_id: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)



In [66]:
#Number of Yellow cab records
%time yCabDF.count()

CPU times: user 16 ms, sys: 8 ms, total: 24 ms
Wall time: 4min 9s


407402728

In [6]:
#Number of Green cab records
%time print "Count of # records from data frame: ", gCabDF.count()

Count of # records from data frame:  26869879
CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 30.9 s


In [63]:
%time print "Count of # combine records from data frame: ", combinedDF.count()

Count of # combine records from data frame:  434272607
CPU times: user 108 ms, sys: 36 ms, total: 144 ms
Wall time: 7min 22s


## 4. Checking out the GeoSpatial data

In [9]:
precision = 6
gCabPickupGeoHash = gCabDF.select("pickup_latitude", "pickup_longitude").map(lambda latLong: geohash.encode(latLong[0], latLong[1], precision))
%time print gCabPickupGeoHash.distinct().count()

3968
CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 1min 2s


In [27]:
precision = 6
yCabPickupGeoHash = yCabDF.select("pickup_latitude", "pickup_longitude").map(lambda latLong: geohash.encode(latLong[0], latLong[1], precision))
%time print yCabPickupGeoHash.distinct().count()

30365
CPU times: user 120 ms, sys: 64 ms, total: 184 ms
Wall time: 14min 13s


## 5. Feature Extraction

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql import functions as F

In [8]:
#Pointer to the original dataframe
gCabDFOrginal = gCabDF

### 5.1 Add geohash features

In [23]:
#add geohash6 & geohash7
geohashUDF=udf(geohash.encode, StringType())

gCabDF =  gCabDF.withColumn("pickup_geohash6",geohashUDF(gCabDF.pickup_latitude, gCabDF.pickup_longitude, F.lit(6)))
gCabDF =  gCabDF.withColumn("pickup_geohash7",geohashUDF(gCabDF.pickup_latitude, gCabDF.pickup_longitude, F.lit(7)))

### 5.2 Add date/time features

In [24]:
def timeBinString(d, minsPerBin):   
    totalMinsPerDay = 1440
    totalBins = totalMinsPerDay/minsPerBin
    
    elapsMins = (d.hour)*60 + d.minute
    #minsPerBin = totalMinsPerDay/totalBins
    currentBin = elapsMins/minsPerBin
    binnedHour = d.hour #elapsMins/60
    binnedMin = (currentBin * minsPerBin)- (binnedHour * 60)
    
    binStr = ""
    
    if (binnedHour/10>0):
        binStr = str(binnedHour)
    else:
        binStr = "0"+str(binnedHour)
    
    binStr = binStr + ":"
    
    if (binnedMin/10>0):
        binStr = binStr + str(binnedMin)
    else:
        binStr = binStr + "0"+str(binnedMin)
    
    return binStr

In [25]:
#register it as a UDF
timeBinStringUDF=udf(timeBinString, StringType())

In [26]:
#Add feature: time_num
minsPerBin = 15
gCabDF =  gCabDF.withColumn("date_cat",timeBinStringUDF(gCabDF.pickup_datetime, F.lit(minsPerBin)))

In [27]:
#Add feature: day_cat
def dayOfWeek(d):   
    #from Sam's code
    dayStr = {0: "Mon",
                  1: "Tue",
                  2: "Wed",
                  3: "Thu",
                  4: "Fri",
                  5: "Sat",
                  6: "Sun"}
    return dayStr[d.weekday()]

#register it as a UDF
dayOfWeekUDF=udf(dayOfWeek, StringType())

In [28]:
gCabDF =  gCabDF.withColumn("day_cat",dayOfWeekUDF(gCabDF.pickup_datetime))

In [29]:
def isWeekend(d):
    if d.weekday() in [5,6]:
        return 1
    else:
        return 0

#register it as a UDF
isWeekendUDF=udf(isWeekend, StringType())

In [30]:
gCabDF =  gCabDF.withColumn("weekend",isWeekendUDF(gCabDF.pickup_datetime))

## Build aggregate functions

In [208]:
gGeoHashes = gCabDF.select("pickup_geohash6","date_cat","day_cat").groupby("pickup_geohash6").count().collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 55.0 failed 4 times, most recent failure: Lost task 13.3 in stage 55.0 (TID 3053, ip-172-31-48-44.ec2.internal): ExecutorLostFailure (executor 20 lost)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


## Appendix: Test snippets

#### Optimal way of adding columns in dataframe

In [None]:
%time gCabDF.withColumn("geohash6",geohashUDF(gCabDF.pickup_latitude, gCabDF.pickup_longitude, F.lit(6))).select("geohash6").distinct().count()

#### Non optimal way of adding columns in dataframe

In [28]:
#Add extra column: geohash
#gCabDF.withColumn("pickup_geohash", geohash.encode(col(gCabDF.pickup_latitude, col(gCabDF.pickup_longitude), precision))
#gCabDF.map(lambda record: )
gCabGeoDF = gCabDF.map(lambda record: Row(**dict(record.asDict(), geohash6=geohash.encode(record[7], record[6], precision=6)))).toDF()
gCabGeoDF.select("pickup_latitude", "pickup_longitude", "geohash6").take(3)

[Row(pickup_latitude=40.74317169189453, pickup_longitude=-73.88326263427734, geohash6=u'dr5ry9'),
 Row(pickup_latitude=40.68368148803711, pickup_longitude=-73.96488189697266, geohash6=u'dr5rky'),
 Row(pickup_latitude=40.75718688964844, pickup_longitude=-73.95572662353516, geohash6=u'dr5rv5')]

In [43]:
#Reference code: custom UDF
def geohash6(latitude, longitude):#, precision):
    return geohash.encode(latitude, longitude, 6)

geohash6UDF=udf(geohash6, StringType())
geohashUDF=udf(geohash.encode, StringType())
%time gCabDF.select(geohash6UDF(gCabDF.pickup_latitude, gCabDF.pickup_longitude)).distinct().count()

#gCabDF.select(geohashUDF(gCabDF.pickup_latitude, gCabDF.pickup_longitude,6)).show(2)

CPU times: user 20 ms, sys: 12 ms, total: 32 ms
Wall time: 1min 18s


3968

### Other test snippets

In [None]:
%time print gCabDF.select("pickup_geohash6").distinct().count()
#gCabGeoDF.select("pickup_latitude", "pickup_longitude", "geohash6").take(3)

In [None]:
gCabDF.select("pickup_latitude", "pickup_longitude").map(lambda latLong: geohash.encode(latLong[0], latLong[1], precision))

In [45]:
#Reference code: custom UDF
from pyspark.sql import functions as F

geohashUDF=udf(geohash.encode, StringType())

%time gCabDF.select(geohashUDF(gCabDF.pickup_latitude, gCabDF.pickup_longitude, F.lit(6))).distinct().count()

CPU times: user 24 ms, sys: 12 ms, total: 36 ms
Wall time: 1min 19s


3968

In [73]:
d = datetime.now()
print d

2015-12-05 08:56:17.190601


In [88]:
def timeBucket(d):
    elapsMins = (d.hour)*60 + d.minute
    elapsMins

536


In [169]:
d = parse("2015-12-05 01:25:45")
print timeBinString(d, 15)

5
01:15


In [160]:
1/10>0

False

In [93]:
b

22

In [94]:
24*60

1440