In [1]:
# define display options
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 200)

In [2]:
# instantiate the spark instance
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config('spark.executor.memory', '4GB') \
    .getOrCreate()

In [3]:
# read parquet file
green_tripdata0 = spark.read.parquet('/opt/tutorial/starter/data/output/green_tripdata.parquet')
green_tripdata0.createOrReplaceTempView('green_tripdata0')

In [11]:
# enrich the data by:
# - filtering bad data
# - one-hot encode hour component of lpep_pickup_datetime
# - one-hot encode dayofweek component of lpep_pickup_datetime
# - calculate duration in seconds
# - adding flag to indicate whether pickup/dropoff within jfk airport bounding box
green_tripdata_enriched = spark.sql('''
SELECT 
    *
    ,CAST(HOUR(lpep_pickup_datetime) = 0 AS INT) AS pickup_hour_0
    ,CAST(HOUR(lpep_pickup_datetime) = 1 AS INT) AS pickup_hour_1
    ,CAST(HOUR(lpep_pickup_datetime) = 2 AS INT) AS pickup_hour_2
    ,CAST(HOUR(lpep_pickup_datetime) = 3 AS INT) AS pickup_hour_3
    ,CAST(HOUR(lpep_pickup_datetime) = 4 AS INT) AS pickup_hour_4
    ,CAST(HOUR(lpep_pickup_datetime) = 5 AS INT) AS pickup_hour_5
    ,CAST(HOUR(lpep_pickup_datetime) = 6 AS INT) AS pickup_hour_6
    ,CAST(HOUR(lpep_pickup_datetime) = 7 AS INT) AS pickup_hour_7
    ,CAST(HOUR(lpep_pickup_datetime) = 8 AS INT) AS pickup_hour_8
    ,CAST(HOUR(lpep_pickup_datetime) = 9 AS INT) AS pickup_hour_9
    ,CAST(HOUR(lpep_pickup_datetime) = 10 AS INT) AS pickup_hour_10
    ,CAST(HOUR(lpep_pickup_datetime) = 11 AS INT) AS pickup_hour_11
    ,CAST(HOUR(lpep_pickup_datetime) = 12 AS INT) AS pickup_hour_12
    ,CAST(HOUR(lpep_pickup_datetime) = 13 AS INT) AS pickup_hour_13
    ,CAST(HOUR(lpep_pickup_datetime) = 14 AS INT) AS pickup_hour_14
    ,CAST(HOUR(lpep_pickup_datetime) = 15 AS INT) AS pickup_hour_15
    ,CAST(HOUR(lpep_pickup_datetime) = 16 AS INT) AS pickup_hour_16
    ,CAST(HOUR(lpep_pickup_datetime) = 17 AS INT) AS pickup_hour_17
    ,CAST(HOUR(lpep_pickup_datetime) = 18 AS INT) AS pickup_hour_18
    ,CAST(HOUR(lpep_pickup_datetime) = 19 AS INT) AS pickup_hour_19
    ,CAST(HOUR(lpep_pickup_datetime) = 20 AS INT) AS pickup_hour_20
    ,CAST(HOUR(lpep_pickup_datetime) = 21 AS INT) AS pickup_hour_21
    ,CAST(HOUR(lpep_pickup_datetime) = 22 AS INT) AS pickup_hour_22
    ,CAST(HOUR(lpep_pickup_datetime) = 23 AS INT) AS pickup_hour_23
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 0 AS INT) AS pickup_dayofweek_0
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 1 AS INT) AS pickup_dayofweek_1
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 2 AS INT) AS pickup_dayofweek_2
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 3 AS INT) AS pickup_dayofweek_3
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 4 AS INT) AS pickup_dayofweek_4
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 5 AS INT) AS pickup_dayofweek_5
    ,CAST(DAYOFWEEK(lpep_pickup_datetime) = 6 AS INT) AS pickup_dayofweek_6
    ,UNIX_TIMESTAMP(lpep_dropoff_datetime) - UNIX_TIMESTAMP(lpep_pickup_datetime) AS duration
    ,CASE
        WHEN 
            (pickup_latitude < 40.651381 
            AND pickup_latitude > 40.640668
            AND pickup_longitude < -73.776283
            AND pickup_longitude > -73.794694)
            OR
            (dropoff_latitude < 40.651381 
            AND dropoff_latitude > 40.640668
            AND dropoff_longitude < -73.776283
            AND dropoff_longitude > -73.794694)           
        THEN 1 
        ELSE 0
    END AS jfk
FROM green_tripdata0
WHERE trip_distance > 0
AND pickup_longitude IS NOT NULL
AND pickup_latitude IS NOT NULL
AND dropoff_longitude IS NOT NULL
AND dropoff_latitude IS NOT NULL
''')
green_tripdata_enriched.createOrReplaceTempView('green_tripdata_enriched')

In [12]:
green_tripdata_enriched.limit(5).toPandas()

Unnamed: 0,vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type,_filename,_index,_errors,pickup_hour_0,pickup_hour_1,pickup_hour_2,pickup_hour_3,pickup_hour_4,pickup_hour_5,pickup_hour_6,pickup_hour_7,pickup_hour_8,pickup_hour_9,pickup_hour_10,pickup_hour_11,pickup_hour_12,pickup_hour_13,pickup_hour_14,pickup_hour_15,pickup_hour_16,pickup_hour_17,pickup_hour_18,pickup_hour_19,pickup_hour_20,pickup_hour_21,pickup_hour_22,pickup_hour_23,pickup_dayofweek_0,pickup_dayofweek_1,pickup_dayofweek_2,pickup_dayofweek_3,pickup_dayofweek_4,pickup_dayofweek_5,pickup_dayofweek_6,duration,jfk
0,2,2013-08-07 18:12:46,2013-08-07 18:15:16,False,1,-73.93748474121094,40.75839233398438,-73.93748474121094,40.75820922851563,1,0.01,3.5,0.0,0.5,0.0,0.0,,4.0,2,,file:///opt/tutorial/starter/data/input/green_tripdata/0/green_tripdata_2013-08.csv,167,[],0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,150,0
1,2,2013-08-08 15:50:09,2013-08-08 16:05:32,False,1,-73.87947845458984,40.8068962097168,-73.88209533691406,40.84098815917969,5,3.03,14.0,0.0,0.5,0.0,0.0,,14.5,2,,file:///opt/tutorial/starter/data/input/green_tripdata/0/green_tripdata_2013-08.csv,246,[],0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,923,0
2,2,2013-08-08 18:25:39,2013-08-08 19:06:25,False,1,-73.88729858398438,40.85950469970703,-74.00154113769531,40.71981048583984,1,16.87,49.5,0.0,0.5,0.0,0.0,,50.0,2,,file:///opt/tutorial/starter/data/input/green_tripdata/0/green_tripdata_2013-08.csv,275,[],0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,2446,0
3,2,2013-08-08 20:07:41,2013-08-08 20:22:23,False,1,-73.92064666748047,40.86028671264648,-73.9046401977539,40.86942291259766,5,1.84,11.0,1.0,0.5,0.0,0.0,,12.5,2,,file:///opt/tutorial/starter/data/input/green_tripdata/0/green_tripdata_2013-08.csv,282,[],0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,882,0
4,2,2013-08-08 21:28:13,2013-08-08 22:02:23,False,1,-73.94114685058594,40.83941650390625,-73.98725891113281,40.75222396850586,5,7.68,29.0,1.0,0.5,0.0,0.0,,30.5,2,,file:///opt/tutorial/starter/data/input/green_tripdata/0/green_tripdata_2013-08.csv,288,[],0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,2050,0


In [13]:
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.tuning import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *

# create a vector with the input predictor columns
vectorAssembler = VectorAssembler(
    inputCols=["trip_distance", "passenger_count", "pickup_hour_0", "pickup_hour_1", "pickup_hour_2", "pickup_hour_3", "pickup_hour_4", "pickup_hour_5", "pickup_hour_6", "pickup_hour_7", "pickup_hour_8", "pickup_hour_9", "pickup_hour_10", "pickup_hour_11", "pickup_hour_12", "pickup_hour_13", "pickup_hour_14", "pickup_hour_15", "pickup_hour_16", "pickup_hour_17", "pickup_hour_18", "pickup_hour_19", "pickup_hour_20", "pickup_hour_21", "pickup_hour_22", "pickup_hour_23", "pickup_dayofweek_0", "pickup_dayofweek_1", "pickup_dayofweek_2", "pickup_dayofweek_3", "pickup_dayofweek_4", "pickup_dayofweek_5", "pickup_dayofweek_6", "duration", "jfk"],
    outputCol="features")

gbt = GBTRegressor(
    featuresCol='features', 
    labelCol='total_amount', 
    predictionCol='prediction', 
    maxDepth=5, 
    maxBins=32, 
    minInstancesPerNode=1, 
    minInfoGain=0.0, 
    maxMemoryInMB=512, 
    cacheNodeIds=False, 
    subsamplingRate=1.0, 
    checkpointInterval=10, 
    lossType='squared',
    maxIter=10,
    stepSize=0.1,
    seed=None)

# define a sequence of stages
pipeline = Pipeline(stages=[
    vectorAssembler, \
    gbt \
    ])

# create a matrix of parameters to try whilst training
paramGrid = ParamGridBuilder() \
    .addGrid(param=gbt.maxIter, values=[20, 30]) \
    .build()

# define the evaluation
regressionEvaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='total_amount', 
    metricName='rmse')

# set up the model for running
crossValidator = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = paramGrid,
    evaluator = regressionEvaluator,
    numFolds = 2)

In [14]:
# perform a random split into two datasets [train_df, test_df]
splits = green_tripdata_enriched.randomSplit([3.0, 1.0], 42)
train_df = splits[0]
test_df = splits[1]

In [15]:
# run the training and select the best/most generalised model
crossValidatorModel = crossValidator.fit(train_df)
pipelineModel = crossValidatorModel.bestModel

In [16]:
# transform the test dataset
prediction = pipelineModel.transform(test_df)
regressionEvaluator.evaluate(prediction)

7.187219489221803

In [18]:
# write model for later use
pipelineModel \
    .write() \
    .overwrite() \
    .save('/opt/tutorial/starter/job/1/green_tripdata_enriched.model')

In [13]:
# read predictions parquet file
green_tripdata0_prediction = spark.read.parquet('/opt/tutorial/starter/data/output/green_tripdata0_prediction.parquet')
green_tripdata0_prediction.select("fare_amount", "prediction").limit(5).toPandas()

Unnamed: 0,fare_amount,prediction
0,22.5,27.877014
1,5.5,6.948871
2,24.0,31.107692
3,5.5,6.466168
4,18.0,21.043908
