In [1]:
dbutils.fs.refreshMounts()

In [2]:
display(dbutils.fs.ls('/mnt/s3data'))

path,name,size
dbfs:/mnt/s3data/hotel_booking_cleaned.parquet/,hotel_booking_cleaned.parquet/,0
dbfs:/mnt/s3data/hotel_bookings.csv,hotel_bookings.csv,16855599


In [3]:
#import the dataset
delimiter=','
new_hotelDF = (spark.read
  .option("header", True)
  .option("inferSchema", True)
  .parquet("dbfs:/mnt/s3data/hotel_booking_cleaned.parquet/")
)

display(new_hotelDF)

hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,children,babies,meal,country,market_segment,is_repeated_guest,previous_cancellations,previous_bookings_not_canceled,reserved_room_type,assigned_room_type,booking_changes,deposit_type,agent,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests
Resort Hotel,0,7,2015,July,27,1,0,1,1,0,0,BB,GBR,Direct,0,0,0,A,C,0,No Deposit,,,0,Transient,75.0,0,0
Resort Hotel,0,13,2015,July,27,1,0,1,1,0,0,BB,GBR,Corporate,0,0,0,A,A,0,No Deposit,304.0,,0,Transient,75.0,0,0
Resort Hotel,0,14,2015,July,27,1,0,2,2,0,0,BB,GBR,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,98.0,0,1
Resort Hotel,0,14,2015,July,27,1,0,2,2,0,0,BB,GBR,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,98.0,0,1
Resort Hotel,0,0,2015,July,27,1,0,2,2,0,0,BB,PRT,Direct,0,0,0,C,C,0,No Deposit,,,0,Transient,107.0,0,0
Resort Hotel,0,9,2015,July,27,1,0,2,2,0,0,FB,PRT,Direct,0,0,0,C,C,0,No Deposit,303.0,,0,Transient,103.0,0,1
Resort Hotel,1,85,2015,July,27,1,0,3,2,0,0,BB,PRT,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,82.0,0,1
Resort Hotel,1,75,2015,July,27,1,0,3,2,0,0,HB,PRT,Offline TA/TO,0,0,0,D,D,0,No Deposit,15.0,,0,Transient,105.5,0,0
Resort Hotel,1,23,2015,July,27,1,0,4,2,0,0,BB,PRT,Online TA,0,0,0,E,E,0,No Deposit,240.0,,0,Transient,123.0,0,0
Resort Hotel,0,35,2015,July,27,1,0,4,2,0,0,HB,PRT,Online TA,0,0,0,D,D,0,No Deposit,240.0,,0,Transient,145.0,0,0


#### Convert the datatype of children

In [5]:
from pyspark.sql.types import DoubleType
changedtype_hotelDF = new_hotelDF.withColumn('children_num',new_hotelDF['children'].cast(DoubleType())).drop('children')
display(changedtype_hotelDF)

hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,babies,meal,country,market_segment,is_repeated_guest,previous_cancellations,previous_bookings_not_canceled,reserved_room_type,assigned_room_type,booking_changes,deposit_type,agent,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,children_num
Resort Hotel,0,7,2015,July,27,1,0,1,1,0,BB,GBR,Direct,0,0,0,A,C,0,No Deposit,,,0,Transient,75.0,0,0,0.0
Resort Hotel,0,13,2015,July,27,1,0,1,1,0,BB,GBR,Corporate,0,0,0,A,A,0,No Deposit,304.0,,0,Transient,75.0,0,0,0.0
Resort Hotel,0,14,2015,July,27,1,0,2,2,0,BB,GBR,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,98.0,0,1,0.0
Resort Hotel,0,14,2015,July,27,1,0,2,2,0,BB,GBR,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,98.0,0,1,0.0
Resort Hotel,0,0,2015,July,27,1,0,2,2,0,BB,PRT,Direct,0,0,0,C,C,0,No Deposit,,,0,Transient,107.0,0,0,0.0
Resort Hotel,0,9,2015,July,27,1,0,2,2,0,FB,PRT,Direct,0,0,0,C,C,0,No Deposit,303.0,,0,Transient,103.0,0,1,0.0
Resort Hotel,1,85,2015,July,27,1,0,3,2,0,BB,PRT,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,82.0,0,1,0.0
Resort Hotel,1,75,2015,July,27,1,0,3,2,0,HB,PRT,Offline TA/TO,0,0,0,D,D,0,No Deposit,15.0,,0,Transient,105.5,0,0,0.0
Resort Hotel,1,23,2015,July,27,1,0,4,2,0,BB,PRT,Online TA,0,0,0,E,E,0,No Deposit,240.0,,0,Transient,123.0,0,0,0.0
Resort Hotel,0,35,2015,July,27,1,0,4,2,0,HB,PRT,Online TA,0,0,0,D,D,0,No Deposit,240.0,,0,Transient,145.0,0,0,0.0


####Convert category to index

In [7]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
#create a list of the columns that are string typed
categoricalColumns = [item[0] for item in changedtype_hotelDF.dtypes if item[1].startswith('string') ]
#define a list of stages in your pipeline
index_stages = []

#iterate through all categorical values
for categoricalCol in categoricalColumns:
    #create a indexer for those categorical values 
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_Index')

    #append the string Indexer to our list of stages
    index_stages += [stringIndexer]

#Create the pipeline. Assign the satges list to the pipeline key word stages
index_pipeline = Pipeline(stages = index_stages)
#fit the pipeline to our dataframe
index_pipelineModel = index_pipeline.fit(changedtype_hotelDF)
#transform the dataframe
indexed_hotelDF= index_pipelineModel.transform(changedtype_hotelDF)
display(indexed_hotelDF)

hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,babies,meal,country,market_segment,is_repeated_guest,previous_cancellations,previous_bookings_not_canceled,reserved_room_type,assigned_room_type,booking_changes,deposit_type,agent,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,children_num,hotel_Index,arrival_date_month_Index,meal_Index,country_Index,market_segment_Index,reserved_room_type_Index,assigned_room_type_Index,deposit_type_Index,agent_Index,company_Index,customer_type_Index
Resort Hotel,0,7,2015,July,27,1,0,1,1,0,BB,GBR,Direct,0,0,0,A,C,0,No Deposit,,,0,Transient,75.0,0,0,0.0,1.0,1.0,0.0,1.0,3.0,0.0,5.0,0.0,1.0,0.0,0.0
Resort Hotel,0,13,2015,July,27,1,0,1,1,0,BB,GBR,Corporate,0,0,0,A,A,0,No Deposit,304.0,,0,Transient,75.0,0,0,0.0,1.0,1.0,0.0,1.0,4.0,0.0,0.0,0.0,316.0,0.0,0.0
Resort Hotel,0,14,2015,July,27,1,0,2,2,0,BB,GBR,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,98.0,0,1,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0
Resort Hotel,0,14,2015,July,27,1,0,2,2,0,BB,GBR,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,98.0,0,1,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0
Resort Hotel,0,0,2015,July,27,1,0,2,2,0,BB,PRT,Direct,0,0,0,C,C,0,No Deposit,,,0,Transient,107.0,0,0,0.0,1.0,1.0,0.0,0.0,3.0,5.0,5.0,0.0,1.0,0.0,0.0
Resort Hotel,0,9,2015,July,27,1,0,2,2,0,FB,PRT,Direct,0,0,0,C,C,0,No Deposit,303.0,,0,Transient,103.0,0,1,0.0,1.0,1.0,4.0,0.0,3.0,5.0,5.0,0.0,261.0,0.0,0.0
Resort Hotel,1,85,2015,July,27,1,0,3,2,0,BB,PRT,Online TA,0,0,0,A,A,0,No Deposit,240.0,,0,Transient,82.0,0,1,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0
Resort Hotel,1,75,2015,July,27,1,0,3,2,0,HB,PRT,Offline TA/TO,0,0,0,D,D,0,No Deposit,15.0,,0,Transient,105.5,0,0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,0.0,32.0,0.0,0.0
Resort Hotel,1,23,2015,July,27,1,0,4,2,0,BB,PRT,Online TA,0,0,0,E,E,0,No Deposit,240.0,,0,Transient,123.0,0,0,0.0,1.0,1.0,0.0,0.0,0.0,2.0,2.0,0.0,2.0,0.0,0.0
Resort Hotel,0,35,2015,July,27,1,0,4,2,0,HB,PRT,Online TA,0,0,0,D,D,0,No Deposit,240.0,,0,Transient,145.0,0,0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,1.0,0.0,2.0,0.0,0.0


In [8]:
new_indexed_hotelDF = indexed_hotelDF.drop('hotel','arrival_date_month','meal','country',
                     'market_segment','reserved_room_type','assigned_room_type',
                     'deposit_type','agent','company', "customer_type")

#### Save indexed df to s3

In [10]:
new_indexed_hotelDF.write.parquet(path+"/hotel_booking_Indexcleaned.parquet")

#### Convert to onehotcode

In [12]:
new_indexed_hotelDF.columns[18:]

In [13]:
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml import Pipeline
#create a list of the columns that are index columns
indexColumns = new_indexed_hotelDF.columns[18:]
#define a list of stages in your pipeline
encoder_stages = []

#iterate through all index columns
for indexCol in indexColumns:
    #create a OneHotEncoderEstimator for those categorical values 
    encoder = OneHotEncoderEstimator(inputCols =[indexCol], outputCols = [indexCol + '_encoded'])

    #append the estimator to our list of stages
    encoder_stages += [encoder]

#Create the pipeline. Assign the satges list to the pipeline key word stages
encoder_pipeline = Pipeline(stages = encoder_stages)
#fit the pipeline to our dataframe
encoder_pipelineModel = encoder_pipeline.fit(new_indexed_hotelDF)
#transform the dataframe
encoded_hotelDF= encoder_pipelineModel.transform(new_indexed_hotelDF)
display(encoded_hotelDF)

is_canceled,lead_time,arrival_date_year,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,babies,is_repeated_guest,previous_cancellations,previous_bookings_not_canceled,booking_changes,days_in_waiting_list,adr,required_car_parking_spaces,total_of_special_requests,children_num,hotel_Index,arrival_date_month_Index,meal_Index,country_Index,market_segment_Index,reserved_room_type_Index,assigned_room_type_Index,deposit_type_Index,agent_Index,company_Index,customer_type_Index,hotel_Index_encoded,arrival_date_month_Index_encoded,meal_Index_encoded,country_Index_encoded,market_segment_Index_encoded,reserved_room_type_Index_encoded,assigned_room_type_Index_encoded,deposit_type_Index_encoded,agent_Index_encoded,company_Index_encoded,customer_type_Index_encoded
0,7,2015,27,1,0,1,1,0,0,0,0,0,0,75.0,0,0,0.0,1.0,1.0,0.0,1.0,3.0,0.0,5.0,0.0,1.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(1), List(1.0))","List(0, 6, List(3), List(1.0))","List(0, 8, List(0), List(1.0))","List(0, 10, List(5), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(1), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
0,13,2015,27,1,0,1,1,0,0,0,0,0,0,75.0,0,0,0.0,1.0,1.0,0.0,1.0,4.0,0.0,0.0,0.0,316.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(1), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 8, List(0), List(1.0))","List(0, 10, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(316), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
0,14,2015,27,1,0,2,2,0,0,0,0,0,0,98.0,0,1,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(1), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 8, List(0), List(1.0))","List(0, 10, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(2), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
0,14,2015,27,1,0,2,2,0,0,0,0,0,0,98.0,0,1,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(1), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 8, List(0), List(1.0))","List(0, 10, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(2), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
0,0,2015,27,1,0,2,2,0,0,0,0,0,0,107.0,0,0,0.0,1.0,1.0,0.0,0.0,3.0,5.0,5.0,0.0,1.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(0), List(1.0))","List(0, 6, List(3), List(1.0))","List(0, 8, List(5), List(1.0))","List(0, 10, List(5), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(1), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
0,9,2015,27,1,0,2,2,0,0,0,0,0,0,103.0,0,1,0.0,1.0,1.0,4.0,0.0,3.0,5.0,5.0,0.0,261.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(), List())","List(0, 177, List(0), List(1.0))","List(0, 6, List(3), List(1.0))","List(0, 8, List(5), List(1.0))","List(0, 10, List(5), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(261), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
1,85,2015,27,1,0,3,2,0,0,0,0,0,0,82.0,0,1,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(0), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 8, List(0), List(1.0))","List(0, 10, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(2), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
1,75,2015,27,1,0,3,2,0,0,0,0,0,0,105.5,0,0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,0.0,32.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(1), List(1.0))","List(0, 177, List(0), List(1.0))","List(0, 6, List(1), List(1.0))","List(0, 8, List(1), List(1.0))","List(0, 10, List(1), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(32), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
1,23,2015,27,1,0,4,2,0,0,0,0,0,0,123.0,0,0,0.0,1.0,1.0,0.0,0.0,0.0,2.0,2.0,0.0,2.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(0), List(1.0))","List(0, 177, List(0), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 8, List(2), List(1.0))","List(0, 10, List(2), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(2), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"
0,35,2015,27,1,0,4,2,0,0,0,0,0,0,145.0,0,0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,1.0,0.0,2.0,0.0,0.0,"List(0, 1, List(), List())","List(0, 11, List(1), List(1.0))","List(0, 4, List(1), List(1.0))","List(0, 177, List(0), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 8, List(1), List(1.0))","List(0, 10, List(1), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 332, List(2), List(1.0))","List(0, 343, List(0), List(1.0))","List(0, 3, List(0), List(1.0))"


In [14]:
encoded_hotelDF.columns

In [15]:
hoteldf = encoded_hotelDF.drop('hotel_Index',
 'arrival_date_month_Index',
 'meal_Index',
 'country_Index',
 'market_segment_Index',
 'reserved_room_type_Index',
 'assigned_room_type_Index',
 'deposit_type_Index',
 'agent_Index',
 'company_Index',
 'customer_type_Index')

####Save it to s3

In [17]:
hoteldf.write.parquet(path+"/hotel_booking_indexencoded.parquet")

In [18]:
try:
  dbutils.fs.unmount('/mnt/s3data') 
except:
  print("/mnt/s3data has been unmounted")