# Machine learning with PySpark
##### Applying machine learning in a distributed computing environment.
***
The goal of this project is to use PySpark and a decision tree to predict whether or not a guest will cancel their stay at a hotel.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

In [2]:
# Creating a Spark session using all available cores
# Note: there must be no space after '\'

spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("spark_analysis") \
                    .getOrCreate()

In [None]:
# Downloading the cleaned hotel bookings dataset

!kaggle datasets download -d rpereiracruz/cleaned-hotel-bookings
!unzip cleaned-hotel-bookings.zip
!rm cleaned-hotel-bookings.zip

In [4]:
# Reading the dataset

file_path = "cleaned_hotel_bookings.csv"
hotel_bookings = spark.read.csv(file_path, 
                               header=True,
                               sep=',',
                               inferSchema=True, 
                               nullValue="NA") 

hotel_bookings = hotel_bookings.drop("_c0")

# Taking a look at the results
hotel_bookings.printSchema()
#hotel_bookings.show()

# DEBUG
#print(f"Number of records: {hotel_bookings.count()}")
#print(f"Column types: {hotel_bookings.dtypes}")

# **DEBUG: printing the number of null values per column
#null_values = {col:hotel_bookings.filter(hotel_bookings[col].isNull()).count() for col in hotel_bookings.columns}
#null_values

root
 |-- hotel: string (nullable = true)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: integer (nullable = true)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = true)
 |-- country: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = true)
 |-- assigned_room_type: string (nullable = true)
 |-- booking_changes: integer (nullable = true)
 |-- deposit_type: string (nullable = true)
 |-- agent

In [81]:
# Dealing with the categorical variables that will be used

cols = ["hotel", "deposit_type", "days_in_waiting_list", "reservation_status", "reserved_room_type", "assigned_room_type",
        "customer_type", "country"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(hotel_bookings) for column in cols] 

pipeline = Pipeline(stages=indexers)
hotel_bookings_cl = pipeline.fit(hotel_bookings).transform(hotel_bookings)

In [94]:
# Identifying and selecting the features in the dataframe

idx_cols = [x for x in hotel_bookings_cl.columns if x.endswith("_index")]
feature_cols = ["is_repeated_guest", "booking_changes", "adr", "agent", "company",
                "previous_cancellations", "previous_bookings_not_canceled"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

training_data = assembler.transform(hotel_bookings_cl)

In [95]:
# Splitting the data into train and test sets

hotel_train, hotel_test = training_data.randomSplit([0.75, 0.25], seed=40)

hotel_train

DataFrame[hotel: string, is_canceled: int, lead_time: int, arrival_date: string, arrival_date_week_number: int, stays_in_weekend_nights: int, stays_in_week_nights: int, adults: int, children: int, babies: int, meal: string, country: string, market_segment: string, distribution_channel: string, is_repeated_guest: int, previous_cancellations: int, previous_bookings_not_canceled: int, reserved_room_type: string, assigned_room_type: string, booking_changes: int, deposit_type: string, agent: int, company: int, days_in_waiting_list: int, customer_type: string, adr: double, required_car_parking_spaces: int, total_of_special_requests: int, reservation_status: string, reservation_status_date: string, hotel_index: double, deposit_type_index: double, days_in_waiting_list_index: double, reservation_status_index: double, reserved_room_type_index: double, assigned_room_type_index: double, customer_type_index: double, country_index: double, features: vector]

In [108]:
# Creating and fitting the decision tree model

tree = DecisionTreeClassifier(labelCol="is_canceled")

tree_model = tree.fit(hotel_train)

# Making predictions
predict = tree_model.transform(hotel_test)

predict.select("hotel", "is_canceled", "prediction", "probability").show(5, False)


+------------+-----------+----------+--------------------------------------+
|hotel       |is_canceled|prediction|probability                           |
+------------+-----------+----------+--------------------------------------+
|Resort Hotel|0          |0.0       |[0.66181174387725,0.33818825612275005]|
|Resort Hotel|0          |0.0       |[0.66181174387725,0.33818825612275005]|
|Resort Hotel|0          |0.0       |[0.66181174387725,0.33818825612275005]|
|Resort Hotel|0          |0.0       |[0.66181174387725,0.33818825612275005]|
|Resort Hotel|0          |0.0       |[0.66181174387725,0.33818825612275005]|
+------------+-----------+----------+--------------------------------------+
only showing top 5 rows



In [None]:
# Closing the connection with Spark

spark.stop()