In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Hyperparameter optimization") \
  .getOrCreate()

24/10/15 23:02:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
data = spark.read.csv("data/hotel-reservations.csv", header=True, inferSchema=True)

In [3]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
print("Train size: ", train_data.count())
print("Test size: ", test_data.count())

Train size:  29040
Test size:  7235


In [6]:
stages = []

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder 

categorical_cols = ['type_of_meal_plan', 'room_type_reserved', 'arrival_month', 'market_segment_type']

for col in categorical_cols:
    string_indexer = StringIndexer(inputCol=col, outputCol= col + '_index')
    print(f'StringIndexer {string_indexer.getInputCol()} -> {string_indexer.getOutputCol()}')
    
    encoder = OneHotEncoder(inputCol=string_indexer.getOutputCol(), outputCol=col + '_vec', dropLast=False)
    print(f'OneHotEncoder {encoder.getInputCol()} -> {encoder.getOutputCol()}')
    print()
    
    stages += [string_indexer, encoder]

StringIndexer type_of_meal_plan -> type_of_meal_plan_index
OneHotEncoder type_of_meal_plan_index -> type_of_meal_plan_vec

StringIndexer room_type_reserved -> room_type_reserved_index
OneHotEncoder room_type_reserved_index -> room_type_reserved_vec

StringIndexer arrival_month -> arrival_month_index
OneHotEncoder arrival_month_index -> arrival_month_vec

StringIndexer market_segment_type -> market_segment_type_index
OneHotEncoder market_segment_type_index -> market_segment_type_vec



In [8]:
stages += [StringIndexer(inputCol='booking_status', outputCol= 'booking_status_index')]

In [9]:
encoded_categorical_cols = [col + "_vec" for col in categorical_cols]
encoded_categorical_cols

['type_of_meal_plan_vec',
 'room_type_reserved_vec',
 'arrival_month_vec',
 'market_segment_type_vec']

In [10]:
numeric_cols = ['no_of_adults', 'no_of_children', 'no_of_weekend_nights', 'no_of_week_nights',
               'required_car_parking_space', 'lead_time',
               'repeated_guest', 'no_of_previous_cancellations', 'no_of_previous_bookings_not_canceled',
               'avg_price_per_room', 'no_of_special_requests'] 

In [11]:
input_columns = encoded_categorical_cols + numeric_cols
input_columns

['type_of_meal_plan_vec',
 'room_type_reserved_vec',
 'arrival_month_vec',
 'market_segment_type_vec',
 'no_of_adults',
 'no_of_children',
 'no_of_weekend_nights',
 'no_of_week_nights',
 'required_car_parking_space',
 'lead_time',
 'repeated_guest',
 'no_of_previous_cancellations',
 'no_of_previous_bookings_not_canceled',
 'avg_price_per_room',
 'no_of_special_requests']

In [12]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=input_columns, outputCol='features')

stages.append(assembler)

In [13]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol='features', labelCol='booking_status_index', maxIter=10)

stages.append(gbt)

In [14]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)

In [15]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10]) \
    .addGrid(gbt.maxIter, [10, 20]) \
    .addGrid(gbt.stepSize, [0.1, 0.2]) \
    .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol='booking_status_index',
    predictionCol='prediction',
    metricName='accuracy'
)

tvs_model = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    seed=42
)

best_model = tvs_model.fit(train_data)

24/10/15 23:03:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/10/15 23:03:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/10/15 23:03:43 WARN DAGScheduler: Broadcasting large task binary with size 1027.2 KiB
24/10/15 23:03:43 WARN DAGScheduler: Broadcasting large task binary with size 1020.0 KiB
24/10/15 23:03:43 WARN DAGScheduler: Broadcasting large task binary with size 1020.5 KiB
24/10/15 23:03:43 WARN DAGScheduler: Broadcasting large task binary with size 1021.0 KiB
24/10/15 23:03:44 WARN DAGScheduler: Broadcasting large task binary with size 1022.3 KiB
24/10/15 23:03:44 WARN DAGScheduler: Broadcasting large task binary with size 1024.5 KiB
24/10/15 23:03:44 WARN DAGScheduler: Broadcasting large task binary with size 1028.3 KiB
24/10/15 23:03:44 WARN DAGScheduler: Broadcasting large task binary with size 1035.7 KiB
24/10/15 23:03:44 WARN DAGScheduler: Broadcasting large task binary wit

In [16]:
predictions = best_model.transform(test_data)

In [17]:
accuracy = evaluator.evaluate(predictions) * 100
print(f'Accuracy = {accuracy:.2f}%')

24/10/15 23:04:14 WARN DAGScheduler: Broadcasting large task binary with size 1920.5 KiB


Accuracy = 87.99%


In [18]:
for param_values, metric_value in zip(paramGrid, best_model.validationMetrics):
    for param, value in param_values.items():
        print(f'{param.name} = {value}')
    print(f'Accuracy -> {metric_value}')


maxDepth = 5
maxIter = 10
stepSize = 0.1
Accuracy -> 0.8435946990612921
maxDepth = 5
maxIter = 10
stepSize = 0.2
Accuracy -> 0.8513252346769741
maxDepth = 5
maxIter = 20
stepSize = 0.1
Accuracy -> 0.8558807288790723
maxDepth = 5
maxIter = 20
stepSize = 0.2
Accuracy -> 0.8601601325234677
maxDepth = 10
maxIter = 10
stepSize = 0.1
Accuracy -> 0.8716178906681391
maxDepth = 10
maxIter = 10
stepSize = 0.2
Accuracy -> 0.8725842076200994
maxDepth = 10
maxIter = 20
stepSize = 0.1
Accuracy -> 0.8738266151297626
maxDepth = 10
maxIter = 20
stepSize = 0.2
Accuracy -> 0.8728602981778023
