In [1]:
# define display options
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 200)

In [2]:
# instantiate the spark instance
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "4G") \
    .getOrCreate()

In [3]:
ETL_CONF_JOB_URL="/home/jovyan/examples/tutorial/5"

In [5]:
# read delta file
# this will return the most recent version of the data
trips = spark.read.format("delta").load(f"{ETL_CONF_JOB_URL}/output/trips.delta")
trips.createOrReplaceTempView("trips")

In [6]:
# show the first five rows as table
trips.limit(5).toPandas()

Unnamed: 0,cab_type_id,vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type_id,pickup_location_id,dropoff_location_id
0,2,2,2013-08-01 12:14:37,2013-08-01 13:09:06,False,1,,,,,1,0.0,21.25,0.0,0.0,0.0,0.0,,,21.25,2,,
1,2,2,2013-08-01 13:13:00,2013-08-01 15:38:00,False,1,,,,,2,0.0,74.5,0.0,0.5,0.0,0.0,,,75.0,2,,
2,2,2,2013-08-01 13:48:00,2013-08-01 13:49:00,False,5,,,,,1,0.0,1.0,0.1,0.0,0.0,1.0,,,2.1,2,,
3,2,2,2013-08-01 14:38:35,2013-08-01 14:38:51,False,1,,,,,1,0.0,3.25,0.0,0.0,0.0,0.0,,,3.25,2,,
4,2,2,2013-08-01 15:51:45,2013-08-01 16:03:52,False,1,,,,,1,0.0,8.5,0.0,0.5,0.0,0.0,,,9.0,2,,


In [7]:
# read the sql file to ensure same execution in both training and prediction
f = open(f"{ETL_CONF_JOB_URL}/trips_enriched.sql", "r")
sql = f.read()
f.close()
print(sql)

-- enrich the data by:
-- - filtering bad data
-- - one-hot encode hour component of pickup_datetime
-- - one-hot encode dayofweek component of pickup_datetime
-- - calculate duration in seconds
-- - adding flag to indicate whether pickup/dropoff within jfk airport bounding box
SELECT 
    *
    ,CAST(HOUR(pickup_datetime) = 0 AS INT) AS pickup_hour_0
    ,CAST(HOUR(pickup_datetime) = 1 AS INT) AS pickup_hour_1
    ,CAST(HOUR(pickup_datetime) = 2 AS INT) AS pickup_hour_2
    ,CAST(HOUR(pickup_datetime) = 3 AS INT) AS pickup_hour_3
    ,CAST(HOUR(pickup_datetime) = 4 AS INT) AS pickup_hour_4
    ,CAST(HOUR(pickup_datetime) = 5 AS INT) AS pickup_hour_5
    ,CAST(HOUR(pickup_datetime) = 6 AS INT) AS pickup_hour_6
    ,CAST(HOUR(pickup_datetime) = 7 AS INT) AS pickup_hour_7
    ,CAST(HOUR(pickup_datetime) = 8 AS INT) AS pickup_hour_8
    ,CAST(HOUR(pickup_datetime) = 9 AS INT) AS pickup_hour_9
    ,CAST(HOUR(pickup_datetime) = 10 AS INT) AS pickup_hour_10
    ,CAST(HOUR(pickup_datetime) = 

In [8]:
# execute the sql against spark to generate the new dataframe
trips_enriched = spark.sql(sql)
trips_enriched.createOrReplaceTempView('trips_enriched')

In [9]:
# show the first five rows as table
trips_enriched.limit(5).toPandas()

Unnamed: 0,cab_type_id,vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type_id,pickup_location_id,dropoff_location_id,pickup_hour_0,pickup_hour_1,pickup_hour_2,pickup_hour_3,pickup_hour_4,pickup_hour_5,pickup_hour_6,pickup_hour_7,pickup_hour_8,pickup_hour_9,pickup_hour_10,pickup_hour_11,pickup_hour_12,pickup_hour_13,pickup_hour_14,pickup_hour_15,pickup_hour_16,pickup_hour_17,pickup_hour_18,pickup_hour_19,pickup_hour_20,pickup_hour_21,pickup_hour_22,pickup_hour_23,pickup_dayofweek_0,pickup_dayofweek_1,pickup_dayofweek_2,pickup_dayofweek_3,pickup_dayofweek_4,pickup_dayofweek_5,pickup_dayofweek_6,duration,jfk
0,2,2,2013-08-07 18:12:46,2013-08-07 18:15:16,False,1,-73.93748474121094,40.75839233398438,-73.93748474121094,40.75820922851563,1,0.01,3.5,0.0,0.5,0.0,0.0,,,4.0,2,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,150,0
1,2,2,2013-08-08 15:50:09,2013-08-08 16:05:32,False,1,-73.87947845458984,40.8068962097168,-73.88209533691406,40.84098815917969,5,3.03,14.0,0.0,0.5,0.0,0.0,,,14.5,2,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,923,0
2,2,2,2013-08-08 18:25:39,2013-08-08 19:06:25,False,1,-73.88729858398438,40.85950469970703,-74.00154113769531,40.71981048583984,1,16.87,49.5,0.0,0.5,0.0,0.0,,,50.0,2,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,2446,0
3,2,2,2013-08-08 20:07:41,2013-08-08 20:22:23,False,1,-73.92064666748047,40.86028671264648,-73.9046401977539,40.86942291259766,5,1.84,11.0,1.0,0.5,0.0,0.0,,,12.5,2,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,882,0
4,2,2,2013-08-08 21:28:13,2013-08-08 22:02:23,False,1,-73.94114685058594,40.83941650390625,-73.98725891113281,40.75222396850586,5,7.68,29.0,1.0,0.5,0.0,0.0,,,30.5,2,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,2050,0


In [20]:
# random split [75%, 25%] the enriched dataset into two datasets [train_df, test_df]
# it is important that the test_df is reserved to test the model against unseen data to test if the model is generalised
splits = trips_enriched.randomSplit([3.0, 1.0], 42)
train_df = splits[0]
test_df = splits[1]
train_df.cache()
test_df.cache()

DataFrame[cab_type_id: bigint, vendor_id: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_fwd_flag: boolean, rate_code_id: int, pickup_longitude: decimal(18,14), pickup_latitude: decimal(18,14), dropoff_longitude: decimal(18,14), dropoff_latitude: decimal(18,14), passenger_count: int, trip_distance: decimal(25,15), fare_amount: decimal(10,2), extra: decimal(10,2), mta_tax: decimal(10,2), tip_amount: decimal(10,2), tolls_amount: decimal(10,2), ehail_fee: decimal(10,2), improvement_surcharge: decimal(10,2), total_amount: decimal(10,2), payment_type_id: bigint, pickup_location_id: int, dropoff_location_id: int, pickup_hour_0: int, pickup_hour_1: int, pickup_hour_2: int, pickup_hour_3: int, pickup_hour_4: int, pickup_hour_5: int, pickup_hour_6: int, pickup_hour_7: int, pickup_hour_8: int, pickup_hour_9: int, pickup_hour_10: int, pickup_hour_11: int, pickup_hour_12: int, pickup_hour_13: int, pickup_hour_14: int, pickup_hour_15: int, pickup_hour_16: int, pickup_hou

In [11]:
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.tuning import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *

# create a vector with the input predictor columns
vectorAssembler = VectorAssembler(
    inputCols=["trip_distance", "passenger_count", "pickup_hour_0", "pickup_hour_1", "pickup_hour_2", "pickup_hour_3", "pickup_hour_4", "pickup_hour_5", "pickup_hour_6", "pickup_hour_7", "pickup_hour_8", "pickup_hour_9", "pickup_hour_10", "pickup_hour_11", "pickup_hour_12", "pickup_hour_13", "pickup_hour_14", "pickup_hour_15", "pickup_hour_16", "pickup_hour_17", "pickup_hour_18", "pickup_hour_19", "pickup_hour_20", "pickup_hour_21", "pickup_hour_22", "pickup_hour_23", "pickup_dayofweek_0", "pickup_dayofweek_1", "pickup_dayofweek_2", "pickup_dayofweek_3", "pickup_dayofweek_4", "pickup_dayofweek_5", "pickup_dayofweek_6", "duration", "jfk"],
    outputCol="features")

# define the model type to train - in this case a regression model to predict a continuous variable
gbt = GBTRegressor(
    featuresCol='features', 
    labelCol='total_amount', 
    predictionCol='prediction', 
    maxDepth=5, 
    maxBins=32, 
    minInstancesPerNode=1, 
    minInfoGain=0.0, 
    maxMemoryInMB=512, 
    cacheNodeIds=False, 
    subsamplingRate=1.0, 
    checkpointInterval=10, 
    lossType='squared',
    maxIter=10,
    stepSize=0.1,
    seed=None)

# define a sequence of stages
pipeline = Pipeline(stages=[
    vectorAssembler, \
    gbt \
    ])

# create a matrix of parameters to try whilst training
# parameter grids al
paramGrid = ParamGridBuilder() \
    .addGrid(param=gbt.maxIter, values=[20, 30]) \
    .addGrid(param=gbt.maxBins, values=[32, 64]) \
    .build()

# define the evaluation
# this is testing prediction vs total_amount difference using the root mean square error metric
regressionEvaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='total_amount', 
    metricName='rmse')

# set up the model for running
crossValidator = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = paramGrid,
    evaluator = regressionEvaluator,
    numFolds = 3,
    collectSubModels=True)

In [12]:
# run the training on the train_df
crossValidatorModel = crossValidator.fit(train_df)

In [13]:
# to demonstrate how the training works loop through all the sub models and print parameters and result
# you can see that this is a brute force parameter search. This means the more .addGrid() parameters tested will result in longer training time
for fold, foldModel in enumerate(crossValidatorModel.subModels, start=1):
    for grid, gridModel in enumerate(foldModel, start=1):
        prediction = gridModel.transform(train_df)
        rmse = regressionEvaluator.evaluate(prediction)
        maxBins = gridModel.stages[-1]._java_obj.getMaxBins()
        maxIter = gridModel.stages[-1]._java_obj.getMaxIter()
        print(f'{{"fold": {fold}, "grid": {grid}, "maxBins": {maxBins}, "maxIter": {maxIter} "rmse": {rmse}}}')

{"fold": 1, "grid": 1, "maxBins": 32, "maxIter": 20 "rmse": 3.8382473993744126}
{"fold": 1, "grid": 2, "maxBins": 64, "maxIter": 20 "rmse": 3.7856268121831125}
{"fold": 1, "grid": 3, "maxBins": 32, "maxIter": 30 "rmse": 3.7693787362268742}
{"fold": 1, "grid": 4, "maxBins": 64, "maxIter": 30 "rmse": 3.705350554174966}
{"fold": 2, "grid": 1, "maxBins": 32, "maxIter": 20 "rmse": 3.852827621805609}
{"fold": 2, "grid": 2, "maxBins": 64, "maxIter": 20 "rmse": 3.742760776668639}
{"fold": 2, "grid": 3, "maxBins": 32, "maxIter": 30 "rmse": 3.7824453112788192}
{"fold": 2, "grid": 4, "maxBins": 64, "maxIter": 30 "rmse": 3.665460640323096}
{"fold": 3, "grid": 1, "maxBins": 32, "maxIter": 20 "rmse": 3.821493594852989}
{"fold": 3, "grid": 2, "maxBins": 64, "maxIter": 20 "rmse": 3.7383330579729273}
{"fold": 3, "grid": 3, "maxBins": 32, "maxIter": 30 "rmse": 3.7443189715692706}
{"fold": 3, "grid": 4, "maxBins": 64, "maxIter": 30 "rmse": 3.653482410504139}


In [14]:
# select the best/most generalised model
pipelineModel = crossValidatorModel.bestModel

In [15]:
# transform the test dataset to test the model on unseen data
prediction = pipelineModel.transform(test_df)

In [16]:
# extract the best model parameters from the pipelineModel to help reduce future number of training runs
rmse = regressionEvaluator.evaluate(prediction)
maxBins = pipelineModel.stages[-1]._java_obj.getMaxBins()
maxIter = pipelineModel.stages[-1]._java_obj.getMaxIter()
f'{{"maxBins": {maxBins}, "maxIter": {maxIter} "rmse": {rmse}}}'

'{"maxBins": 64, "maxIter": 30 "rmse": 3.899400391825619}'

In [17]:
# write model for inference via arc
pipelineModel \
    .write() \
    .overwrite() \
    .save(f"{ETL_CONF_JOB_URL}/trips_enriched.model")

In [18]:
# load model for inference via python
pipelineModel = PipelineModel.load(f'{ETL_CONF_JOB_URL}/trips_enriched.model')

In [19]:
# transform a dataset and show some results
trips_prediction = pipelineModel.transform(trips_enriched)
trips_prediction.select("fare_amount", "prediction").limit(5).toPandas()

Unnamed: 0,fare_amount,prediction
0,3.5,5.888963
1,14.0,14.478513
2,49.5,58.140575
3,11.0,12.005386
4,29.0,34.007778
