#### Feature Engineering

#### 01 - Load Data

In [0]:
import os
booking_df = spark.read.option("header", True).option("inferSchema", True).csv(f"file:{os.path.dirname(os.getcwd())}/data/booking.csv")
display(booking_df)

In [0]:
# Replace spaces in column names with underscores
for col in booking_df.columns:
    booking_df = booking_df.withColumnRenamed(col, col.replace(' ', '_'))

display(booking_df)

In [0]:
# Prinf the schema
booking_df.printSchema()

#### 02 - Feature Engineering Steps

In [0]:
import pyspark.pandas as ps

# Convert Spark DataFrame to pandas-on-Spark DataFrame
psdf = booking_df.pandas_api()

# Strip whitespace and convert to datetime, coerce errors to NaT
psdf['date_of_reservation'] = psdf['date_of_reservation'].str.strip()
psdf['date_of_reservation'] = ps.to_datetime(psdf['date_of_reservation'], errors='coerce', infer_datetime_format=True)

# Fill NaT with default date
psdf['date_of_reservation'] = psdf['date_of_reservation'].fillna("1970-01-01")

# Convert back to Spark DataFrame
booking_df = psdf.to_spark()

In [0]:
from pyspark.sql.functions import year, month, dayofmonth, col

# Extract day, month, and year from date_of_reservation
booking_df = booking_df.withColumn("reservation_year", year(col("date_of_reservation"))) \
                       .withColumn("reservation_month", month(col("date_of_reservation"))) \
                       .withColumn("reservation_day", dayofmonth(col("date_of_reservation")))



In [0]:
# Aggregate features
booking_df = booking_df.withColumn("total_nights", col("number_of_weekend_nights") + col("number_of_week_nights"))

# Interaction features
booking_df = booking_df.withColumn("lead_time_adults", col("lead_time") * col("number_of_adults"))


In [0]:
# Encode categorical variables
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ["type_of_meal", "room_type", "market_segment_type"]]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in ["type_of_meal", "room_type", "market_segment_type"]]


In [0]:
# Binarize target variable if necessary
booking_df = booking_df.withColumn("booking_status_binary", when(col("booking_status") == "Canceled", 0).otherwise(1))

In [0]:
# Remove the existing column if it exists
for column in ["type_of_meal_encoded", "room_type_encoded", "market_segment_type_encoded"]:
    if column in booking_df.columns:
        booking_df = booking_df.drop(column)

# Create a pipeline for the transformations
pipeline = Pipeline(stages=indexers + encoders)
booking_df = pipeline.fit(booking_df).transform(booking_df)

display(booking_df)

In [0]:
# Drop intermediate columns if necessary
booking_df = booking_df.drop("type_of_meal_index", "room_type_index", "market_segment_type_index")


In [0]:
display(booking_df)

#### 03. Write Table 

In [0]:
# Create a new Schema under the "booking" schema
spark.sql("CREATE SCHEMA IF NOT EXISTS booking")


In [0]:
# Use the booking schema
spark.sql("USE booking")


In [0]:
# Specify train-val-test split
train_ratio, val_ratio, test_ratio = 0.7, 0.2, 0.1
booking_df = (booking_df.withColumn("random", F.rand(seed=42))
                                .withColumn("split",
                                            F.when(F.col("random") < train_ratio, "train")
                                            .when(F.col("random") < train_ratio + val_ratio, "validate")
                                            .otherwise("test"))
                                .drop("random"))

# Write table for training
(booking_df.write.mode("overwrite")
               .option("overwriteSchema", "true")
               .saveAsTable("mlops_booking_training"))