In [0]:
# 1. Import all necessary functions from PySpark
from pyspark.sql import functions as F

# ----------------- LOAD DATA -----------------
# IMPORTANT: Replace these paths with your actual file paths from Databricks
path_customers = "/Volumes/workspace/big_data_project/big_data/customers.csv"
path_hotels = "/Volumes/workspace/big_data_project/big_data/hotels.csv"
path_bookings = "/Volumes/workspace/big_data_project/big_data/bookings.csv"

# Read the CSV files into DataFrames
customers_df = spark.read.option("header", "true").csv(path_customers)
hotels_df = spark.read.option("header", "true").csv(path_hotels)
bookings_df = spark.read.option("header", "true").csv(path_bookings)

# ----------------- DATA PREPARATION -----------------
# Convert columns to the correct data types for calculations
bookings_df = bookings_df.withColumn("stay_duration", F.col("stay_duration").cast("int")) \
    .withColumn("amount_spent", F.col("amount_spent").cast("double")) \
    .withColumn("booking_date", F.col("booking_date").cast("date"))

# Create a single master DataFrame by joining the tables
master_df = bookings_df.join(hotels_df, "hotel_id").join(customers_df, "customer_id")

display(master_df)

# ----------------- PYSPARK CORE TASKS -----------------
# Task 1: Compute total revenue per hotel (only for 'Booked' status)
revenue_per_hotel_df = master_df.filter(F.col("status") == "Booked") \
    .groupBy("hotel_name") \
    .agg(F.round(F.sum("amount_spent"), 2).alias("total_revenue")) \
    .orderBy(F.col("total_revenue").desc())
display(revenue_per_hotel_df)

# Task 2: Calculate average stay duration by location
avg_stay_by_location_df = master_df.groupBy("location") \
    .agg(F.round(F.avg("stay_duration"), 2).alias("average_stay_duration")) \
    .orderBy(F.col("average_stay_duration").desc())
display(avg_stay_by_location_df)

# ----------------- PYSPARK SQL TASKS -----------------
# Task 1: Identify hotels with the highest cancellation rates
cancellation_rates_df = master_df.groupBy("hotel_name") \
    .agg(
        F.count("*").alias("total_bookings"),
        F.sum(F.when(F.col("status") == "Cancelled", 1).otherwise(0)).alias("cancelled_bookings")
    ) \
    .withColumn("cancellation_rate_percent", F.round((F.col("cancelled_bookings") / F.col("total_bookings")) * 100, 2)) \
    .orderBy(F.col("cancellation_rate_percent").desc())
display(cancellation_rates_df)

# Task 2: Find seasonal trends (revenue by month and location for 'Booked' status)
seasonal_trends_df = master_df.filter(F.col("status") == "Booked") \
    .withColumn("booking_month", F.month("booking_date")) \
    .groupBy("booking_month", "location") \
    .agg(F.round(F.sum("amount_spent"), 2).alias("total_revenue")) \
    .orderBy("booking_month", "location")
display(seasonal_trends_df)

# ----------------- SAVE OUTPUTS -----------------
# Define a base path for all your output files
output_path_base = "/dbfs/tmp/my_volume/"

# Save each result DataFrame to its own folder as a single CSV file
revenue_per_hotel_df.coalesce(1).write.mode("overwrite").option("header","true").csv(output_path_base + "revenue_per_hotel")
avg_stay_by_location_df.coalesce(1).write.mode("overwrite").option("header","true").csv(output_path_base + "avg_stay_by_location")
cancellation_rates_df.coalesce(1).write.mode("overwrite").option("header","true").csv(output_path_base + "cancellation_rates")
seasonal_trends_df.coalesce(1).write.mode("overwrite").option("header","true").csv(output_path_base + "seasonal_trends")

print("All output files have been successfully saved to your S3 bucket.")

customer_id,hotel_id,booking_id,location,booking_date,stay_duration,amount_spent,status,rating,hotel_name,city,customer_name,city.1
121,215,1,Hyderabad,2025-01-15,3,21500.0,Booked,5,Park Hyatt,Hyderabad,Rina Kapoor,Hyderabad
142,207,2,Mumbai,2025-03-22,2,19800.0,Booked,4,Four Seasons,Mumbai,Kareena Kapoor,Kolkata
183,210,3,Bangalore,2025-08-10,5,35000.0,Cancelled,0,ITC Gardenia,Bangalore,Pankaj Tripathi,Bangalore
104,213,4,Hyderabad,2025-07-04,4,45000.0,Booked,5,Taj Falaknuma Palace,Hyderabad,Rohan Mehra,Hyderabad
165,211,5,Bangalore,2025-02-18,1,8500.0,Booked,3,Shangri-La,Bangalore,Kajol Devgan,Bangalore
196,219,6,Chennai,2025-06-30,3,24000.0,Booked,4,Taj Coromandel,Chennai,Zaira Wasim,Kolkata
117,209,7,Bangalore,2025-09-01,2,22000.0,Booked,5,The Ritz-Carlton,Bangalore,Priya Sharma,Bangalore
138,206,8,Mumbai,2025-04-12,7,65000.0,Cancelled,0,Trident Nariman Point,Mumbai,Rani Mukerji,Mumbai
159,204,9,Delhi,2025-05-05,3,18000.0,Booked,4,ITC Maurya,Delhi,Sridevi Kapoor,Bangalore
170,218,10,Chennai,2025-08-25,4,29000.0,Booked,4,The Leela Palace,Chennai,Raveena Tandon,Chennai


Databricks visualization. Run in Databricks to view.

hotel_name,total_revenue
Taj Falaknuma Palace,272000.0
The Ritz-Carlton,232800.0
St. Regis,209000.0
The Taj Mahal Palace,188000.0
Leela Palace,159000.0
ITC Maurya,145000.0
Park Hyatt,137000.0
The Leela Palace,136000.0
Four Seasons,118800.0
ITC Grand Chola,113000.0


location,average_stay_duration
Mumbai,3.84
Hyderabad,3.79
Chennai,3.62
Bangalore,3.26
Delhi,3.22


hotel_name,total_bookings,cancelled_bookings,cancellation_rate_percent
Shangri-La,5,3,60.0
Conrad Bengaluru,5,3,60.0
Trident Nariman Point,5,3,60.0
Hyatt Regency,5,2,40.0
The Westin Mindspace,5,2,40.0
ITC Kohenur,5,2,40.0
ITC Gardenia,5,2,40.0
Taj Coromandel,6,2,33.33
The Ritz-Carlton,8,1,12.5
ITC Maurya,5,0,0.0


booking_month,location,total_revenue
1,Bangalore,84000.0
1,Chennai,121000.0
1,Delhi,62000.0
1,Hyderabad,51000.0
2,Bangalore,27000.0
2,Hyderabad,134000.0
2,Mumbai,113000.0
3,Bangalore,61800.0
3,Delhi,29000.0
3,Hyderabad,86000.0


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-5866084564046110>, line 64[0m
[1;32m     61[0m output_path_base [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/dbfs/tmp/my_volume/[39m[38;5;124m"[39m
[1;32m     63[0m [38;5;66;03m# Save each result DataFrame to its own folder as a single CSV file[39;00m
[0;32m---> 64[0m revenue_per_hotel_df[38;5;241m.[39mcoalesce([38;5;241m1[39m)[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39mcsv(output_path_base [38;5;241m+[39m [38;5;124m"[39m[38;5;124mrevenue_per_hotel[39m[38;5;124m"[39m)
[1;32m     65[0m avg_stay_by_location_df[38;5;241m.[39mcoalesce([38;5;241m1[39m)[38;5;