In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.getOrCreate()

df = spark.read.csv('hotel_bookings.csv', header=True, inferSchema=True)
#Fill na and nulls
df = df.na.fill("")
df = df.na.fill(0)

In [2]:
df.printSchema()

root
 |-- hotel: string (nullable = false)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_month: string (nullable = false)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: string (nullable = false)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = false)
 |-- country: string (nullable = false)
 |-- market_segment: string (nullable = false)
 |-- distribution_channel: string (nullable = false)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = false)
 |-- assigned_room_type: string

In [3]:
columns_to_index = ['arrival_date_month', 'meal', 'country', 'market_segment', 'distribution_channel', 'reserved_room_type', 'assigned_room_type', 'deposit_type', 'customer_type']
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in columns_to_index ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)
df_r=df_r.withColumn('children',df_r['children'].cast("integer").alias('children'))

In [4]:
df_r.printSchema()

root
 |-- hotel: string (nullable = false)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_month: string (nullable = false)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: integer (nullable = true)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = false)
 |-- country: string (nullable = false)
 |-- market_segment: string (nullable = false)
 |-- distribution_channel: string (nullable = false)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = false)
 |-- assigned_room_type: string

In [5]:
df_r = df_r.drop(*columns_to_index)
df_r = df_r.drop("hotel","country_index", "company", "agent" ,"reservation_status", "reservation_status_date", "deposit_type_index","customer_type_index")

In [6]:
df_r.printSchema()

root
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: integer (nullable = true)
 |-- babies: integer (nullable = true)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- booking_changes: integer (nullable = true)
 |-- days_in_waiting_list: integer (nullable = true)
 |-- adr: double (nullable = false)
 |-- required_car_parking_spaces: integer (nullable = true)
 |-- total_of_special_requests: integer (nullable = true)
 |-- arrival_date_month_index: double (nullable = false)
 |-- meal_index: double (nullable = 

In [7]:
input_cols = list(set(df_r.columns)-set(['is_canceled']))


In [8]:
input_cols

['arrival_date_week_number',
 'distribution_channel_index',
 'babies',
 'market_segment_index',
 'stays_in_week_nights',
 'previous_cancellations',
 'reserved_room_type_index',
 'is_repeated_guest',
 'total_of_special_requests',
 'required_car_parking_spaces',
 'booking_changes',
 'arrival_date_month_index',
 'stays_in_weekend_nights',
 'arrival_date_year',
 'children',
 'adults',
 'days_in_waiting_list',
 'meal_index',
 'arrival_date_day_of_month',
 'lead_time',
 'previous_bookings_not_canceled',
 'assigned_room_type_index',
 'adr']

In [9]:
vectorizer = VectorAssembler(inputCols=input_cols, outputCol='features')
df_r = vectorizer.setHandleInvalid("keep").transform(df_r)
df_r.show(1)

+-----------+---------+-----------------+------------------------+-------------------------+-----------------------+--------------------+------+--------+------+-----------------+----------------------+------------------------------+---------------+--------------------+---+---------------------------+-------------------------+------------------------+----------+--------------------+--------------------------+------------------------+------------------------+--------------------+
|is_canceled|lead_time|arrival_date_year|arrival_date_week_number|arrival_date_day_of_month|stays_in_weekend_nights|stays_in_week_nights|adults|children|babies|is_repeated_guest|previous_cancellations|previous_bookings_not_canceled|booking_changes|days_in_waiting_list|adr|required_car_parking_spaces|total_of_special_requests|arrival_date_month_index|meal_index|market_segment_index|distribution_channel_index|reserved_room_type_index|assigned_room_type_index|            features|
+-----------+---------+-----------

In [10]:
df_train, df_test = df_r.randomSplit([0.8,0.2], seed = 0, )
rf_clf = RandomForestClassifier(featuresCol='features', labelCol='is_canceled')
rf_clf = rf_clf.fit(df_train)
df_test = rf_clf.transform(df_test)
df_test = df_test.select('features', 'is_canceled', 'rawPrediction', 'probability', 'prediction')

criterion = MulticlassClassificationEvaluator(labelCol='is_canceled')
acc = criterion.evaluate(df_test)

In [11]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-----+-----------+---------+-----------------+------------------+------------------------+-------------------------+-----------------------+--------------------+------+--------+------+----+-------+--------------+--------------------+-----------------+----------------------+------------------------------+------------------+------------------+---------------+------------+-----+-------+--------------------+-------------+---+---------------------------+-------------------------+------------------+-----------------------+
|hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_week_number|arrival_date_day_of_month|stays_in_weekend_nights|stays_in_week_nights|adults|children|babies|meal|country|market_segment|distribution_channel|is_repeated_guest|previous_cancellations|previous_bookings_not_canceled|reserved_room_type|assigned_room_type|booking_changes|deposit_type|agent|company|days_in_waiting_list|customer_type|adr|required_car_parking_spaces|total_of_special_reque

In [12]:
for c in df_r.columns:
    print(c, df_r.select(c).distinct().count())

is_canceled 2
lead_time 479
arrival_date_year 3
arrival_date_week_number 53
arrival_date_day_of_month 31
stays_in_weekend_nights 17
stays_in_week_nights 35
adults 14
children 6
babies 5
is_repeated_guest 2
previous_cancellations 15
previous_bookings_not_canceled 73
booking_changes 21
days_in_waiting_list 128
adr 8879
required_car_parking_spaces 5
total_of_special_requests 6
arrival_date_month_index 12
meal_index 5
market_segment_index 8
distribution_channel_index 5
reserved_room_type_index 10
assigned_room_type_index 12
features 84568


In [13]:
acc

0.7488295237125038

In [16]:
df_r.select('is_canceled').count()

119390

In [17]:
df_r.groupBy("is_canceled").count().show()

+-----------+-----+
|is_canceled|count|
+-----------+-----+
|          1|44224|
|          0|75166|
+-----------+-----+

