In [None]:
# define display options
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 200)

In [None]:
# instantiate the spark instance
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config('spark.executor.memory', '4GB') \
    .getOrCreate()

In [None]:
# read parquet file
trips = spark.read.parquet('/home/jovyan/tutorial/data/output/trips.parquet')
trips.createOrReplaceTempView('trips')

In [None]:
# show the first five rows as table
trips.limit(5).toPandas()

In [None]:
# read the sql file to ensure same execution in both training and prediction
f = open("/home/jovyan/tutorial/job/6/trips_enriched.sql", "r")
sql = f.read()
f.close()
print(sql)

In [None]:
# execute the sql against spark to generate the new dataframe
trips_enriched = spark.sql(sql)
trips_enriched.createOrReplaceTempView('trips_enriched')

In [None]:
# show the first five rows as table
trips_enriched.limit(5).toPandas()

In [None]:
# random split [75%, 25%] the enriched dataset into two datasets [train_df, test_df]
# it is important that the test_df is reserved to test the model against unseen data to test if the model is generalised
splits = trips_enriched.randomSplit([3.0, 1.0], 42)
train_df = splits[0]
test_df = splits[1]

In [None]:
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.tuning import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *

# create a vector with the input predictor columns
vectorAssembler = VectorAssembler(
    inputCols=["trip_distance", "passenger_count", "pickup_hour_0", "pickup_hour_1", "pickup_hour_2", "pickup_hour_3", "pickup_hour_4", "pickup_hour_5", "pickup_hour_6", "pickup_hour_7", "pickup_hour_8", "pickup_hour_9", "pickup_hour_10", "pickup_hour_11", "pickup_hour_12", "pickup_hour_13", "pickup_hour_14", "pickup_hour_15", "pickup_hour_16", "pickup_hour_17", "pickup_hour_18", "pickup_hour_19", "pickup_hour_20", "pickup_hour_21", "pickup_hour_22", "pickup_hour_23", "pickup_dayofweek_0", "pickup_dayofweek_1", "pickup_dayofweek_2", "pickup_dayofweek_3", "pickup_dayofweek_4", "pickup_dayofweek_5", "pickup_dayofweek_6", "duration", "jfk"],
    outputCol="features")

# define the model type to train - in this case a regression model to predict a continuous variable
gbt = GBTRegressor(
    featuresCol='features', 
    labelCol='total_amount', 
    predictionCol='prediction', 
    maxDepth=5, 
    maxBins=32, 
    minInstancesPerNode=1, 
    minInfoGain=0.0, 
    maxMemoryInMB=512, 
    cacheNodeIds=False, 
    subsamplingRate=1.0, 
    checkpointInterval=10, 
    lossType='squared',
    maxIter=10,
    stepSize=0.1,
    seed=None)

# define a sequence of stages
pipeline = Pipeline(stages=[
    vectorAssembler, \
    gbt \
    ])

# create a matrix of parameters to try whilst training
# parameter grids al
paramGrid = ParamGridBuilder() \
    .addGrid(param=gbt.maxIter, values=[20, 30]) \
    .addGrid(param=gbt.maxBins, values=[32, 64]) \
    .build()

# define the evaluation
# this is testing prediction vs total_amount difference using the root mean square error metric
regressionEvaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='total_amount', 
    metricName='rmse')

# set up the model for running
crossValidator = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = paramGrid,
    evaluator = regressionEvaluator,
    numFolds = 3,
    collectSubModels=True)

In [None]:
# run the training on the train_df
crossValidatorModel = crossValidator.fit(train_df)

In [None]:
# to demonstrate how the training works loop through all the sub models and print parameters and result
# you can see that this is a brute force parameter search. This means the more .addGrid() parameters tested will result in longer training time
for fold, foldModel in enumerate(crossValidatorModel.subModels, start=1):
    for grid, gridModel in enumerate(foldModel, start=1):
        prediction = gridModel.transform(train_df)
        rmse = regressionEvaluator.evaluate(prediction)
        maxBins = gridModel.stages[-1]._java_obj.getMaxBins()
        maxIter = gridModel.stages[-1]._java_obj.getMaxIter()
        print(f'{{"fold": {fold}, "grid": {grid}, "maxBins": {maxBins}, "maxIter": {maxIter} "rmse": {rmse}}}')

In [None]:
# select the best/most generalised model
pipelineModel = crossValidatorModel.bestModel

In [None]:
# transform the test dataset to test the model on unseen data
prediction = pipelineModel.transform(test_df)

In [None]:
# extract the best model parameters from the pipelineModel to help reduce future number of training runs
rmse = regressionEvaluator.evaluate(prediction)
maxBins = pipelineModel.stages[-1]._java_obj.getMaxBins()
maxIter = pipelineModel.stages[-1]._java_obj.getMaxIter()
f'{{"maxBins": {maxBins}, "maxIter": {maxIter} "rmse": {rmse}}}'

In [None]:
# write model for inference via arc
pipelineModel \
    .write() \
    .overwrite() \
    .save('/home/jovyan/tutorial/job/6/trips_enriched.model')

In [None]:
# load model for inference via python
pipelineModel = PipelineModel.load('/home/jovyan/tutorial/job/6/trips_enriched.model')

In [None]:
# transform a dataset and show some results
trips_prediction = pipelineModel.transform(trips_enriched)
trips_prediction.select("fare_amount", "prediction").limit(5).toPandas()