# BUILDING DATA WAREHOUSE

In [0]:
spark

## Extracts data from your chosen sources

In [0]:
# Import Spark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create a SparkSession (Databricks typically provides one by default)
spark = SparkSession.builder.appName("TourismDataLoad").getOrCreate()

# Define the file paths (assuming they are uploaded to Databricks DBFS or local paths in Databricks environment)
attractions_visited_path = '/FileStore/tables/attractions_visited.csv'
flight_bookings_path = '/FileStore/tables/flight_bookings.csv'
hotel_stays_path = '/FileStore/tables/hotel_stays.csv'
tourist_demographics_path = 'dbfs:/FileStore/shared_uploads/swatisoni8899@gmail.com/tourist_demographics_lines-1.json'


# ----------------------------
# 1. Load CSV files
# ----------------------------
df_attractions = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(attractions_visited_path)

df_flight_bookings = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(flight_bookings_path)

df_hotel_stays = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(hotel_stays_path)

# ----------------------------
# 2. Define Schema for JSON and Load
# ----------------------------
demographics_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("nationality", StringType(), True),
    StructField("travel_purpose", StringType(), True)
])

df_tourist_demographics = spark.read \
    .schema(demographics_schema) \
    .json(tourist_demographics_path)

# ----------------------------
# 3. Save DataFrames as Delta Tables
# ----------------------------

df_attractions.write.format("delta").mode("overwrite").saveAsTable("attractions_visited")
df_flight_bookings.write.format("delta").mode("overwrite").saveAsTable("flight_bookings")
df_hotel_stays.write.format("delta").mode("overwrite").saveAsTable("hotel_stays")
df_tourist_demographics.write.format("delta").mode("overwrite").saveAsTable("tourist_demographics")

# ----------------------------
# 4. Verify the tables
# ----------------------------

# Show all tables in your current database
spark.sql("SHOW TABLES").show()

# Optional: Preview the data in tables
spark.sql("SELECT * FROM attractions_visited LIMIT 10").show()
spark.sql("SELECT * FROM flight_bookings LIMIT 10").show()
spark.sql("SELECT * FROM hotel_stays LIMIT 10").show()
spark.sql("SELECT * FROM tourist_demographics LIMIT 10").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default| attractions_visited|      false|
| default|     flight_bookings|      false|
| default|         hotel_stays|      false|
| default|tourist_demographics|      false|
+--------+--------------------+-----------+

+-------------+-----------------+--------+----------+--------------+------------------+--------------+
|attraction_id|  attraction_name|    city|visit_date|visitors_count|           revenue|average_rating|
+-------------+-----------------+--------+----------+--------------+------------------+--------------+
|            1|     Eiffel Tower|   Paris|2024-11-10|            64|           4195.84|          3.35|
|            2|Statue of Liberty|New York|2024-05-14|           884|           40133.6|          3.68|
|            3|     Eiffel Tower|   Paris|2025-02-17|           192|          17383.68|          4.02|
|            4|     Burj Khal

##Transforms the data into a normalized structure

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, monotonically_increasing_id, col

# Initialize Spark Session
spark = SparkSession.builder.appName("Normalization3NF").getOrCreate()

# --------------------------
# Load Raw Datasets (Staging Layer)
# --------------------------
tourist_df = spark.read.json('dbfs:/FileStore/shared_uploads/swatisoni8899@gmail.com/tourist_demographics_lines-1.json')
flight_bookings_df = spark.read.option("header", "true").option("inferSchema", "true").csv('/FileStore/tables/flight_bookings.csv')
hotel_stays_df = spark.read.option("header", "true").option("inferSchema", "true").csv('/FileStore/tables/hotel_stays.csv')
attractions_visited_df = spark.read.option("header", "true").option("inferSchema", "true").csv('/FileStore/tables/attractions_visited.csv')

# --------------------------
# Dimension Tables (Normalized)
# --------------------------

# Nationality Dimension
nationality_df = tourist_df.select(trim(col("nationality")).alias("nationality")).distinct() \
    .withColumn("nationality_id", monotonically_increasing_id())

# Travel Purpose Dimension
travel_purpose_df = tourist_df.select(trim(col("travel_purpose")).alias("travel_purpose")).distinct() \
    .withColumn("travel_purpose_id", monotonically_increasing_id())

# Cities Dimension
cities_from_flights = flight_bookings_df.select(trim(col("origin_city")).alias("city")).union(
    flight_bookings_df.select(trim(col("destination_city")).alias("city"))
).distinct()

cities_from_hotels = hotel_stays_df.select(trim(col("city")).alias("city")).distinct()
cities_from_attractions = attractions_visited_df.select(trim(col("city")).alias("city")).distinct()

cities_df = cities_from_flights.union(cities_from_hotels).union(cities_from_attractions).distinct() \
    .withColumn("city_id", monotonically_increasing_id())

# Hotels Dimension
hotels_df = hotel_stays_df.select(trim(col("hotel_name")).alias("hotel_name")).distinct() \
    .withColumn("hotel_id", monotonically_increasing_id())

# Attractions Dimension
attractions_df = attractions_visited_df.select(trim(col("attraction_name")).alias("attraction_name")).distinct() \
    .withColumn("attraction_id", monotonically_increasing_id())

# --------------------------
# Tourist Table (Normalized)
# --------------------------
tourist_clean_df = tourist_df.alias("tourist") \
    .join(nationality_df.alias("nationality"), trim(col("tourist.nationality")) == col("nationality.nationality"), "left") \
    .join(travel_purpose_df.alias("purpose"), trim(col("tourist.travel_purpose")) == col("purpose.travel_purpose"), "left") \
    .select(
        col("tourist.customer_id"),
        trim(col("tourist.first_name")).alias("first_name"),
        trim(col("tourist.last_name")).alias("last_name"),
        col("tourist.gender"),
        col("tourist.age"),
        col("nationality.nationality_id"),
        col("purpose.travel_purpose_id")
    )

# --------------------------
# Flight Bookings Table (Normalized)
# --------------------------
cities_df_origin = cities_df.alias("origin_city")
cities_df_destination = cities_df.alias("destination_city")

flight_bookings_clean_df = flight_bookings_df.alias("flight") \
    .join(cities_df_origin, trim(col("flight.origin_city")) == trim(col("origin_city.city")), "left") \
    .join(cities_df_destination, trim(col("flight.destination_city")) == trim(col("destination_city.city")), "left") \
    .select(
        col("flight.booking_id"),
        col("flight.customer_id"),
        col("flight.flight_number"),
        col("flight.airline"),
        col("origin_city.city_id").alias("origin_city_id"),
        col("destination_city.city_id").alias("destination_city_id"),
        col("flight.departure_date"),
        col("flight.arrival_date"),
        col("flight.booking_date"),
        col("flight.ticket_price")
    )

# --------------------------
# Hotel Stays Table (Normalized)
# --------------------------
hotels_df_alias = hotels_df.alias("hotels")
cities_df_alias = cities_df.alias("hotel_city")

hotel_stays_clean_df = hotel_stays_df.alias("hotel_stay") \
    .join(hotels_df_alias, trim(col("hotel_stay.hotel_name")) == trim(col("hotels.hotel_name")), "left") \
    .join(cities_df_alias, trim(col("hotel_stay.city")) == trim(col("hotel_city.city")), "left") \
    .select(
        col("hotel_stay.booking_id").alias("stay_id"),               # Renamed booking_id to stay_id
        col("hotel_stay.customer_id"),
        col("hotels.hotel_id"),
        col("hotel_city.city_id"),
        col("hotel_stay.check_in_date").alias("check_in"),           # Aliased column for clarity
        col("hotel_stay.check_out_date").alias("check_out"),         # Aliased column for clarity
        col("hotel_stay.total_amount").alias("booking_amount")       # Aliased column for clarity
    )

# --------------------------
# Attractions Visited Table (Normalized)
# --------------------------
# Add surrogate visit_id column (since the raw data doesn't have one)
attractions_visited_df = attractions_visited_df \
    .withColumn("visit_id", monotonically_increasing_id())

attractions_df_alias = attractions_df.alias("attractions")
cities_df_alias_2 = cities_df.alias("attraction_city")

attractions_visited_clean_df = attractions_visited_df.alias("visit") \
    .join(attractions_df_alias, trim(col("visit.attraction_name")) == trim(col("attractions.attraction_name")), "left") \
    .join(cities_df_alias_2, trim(col("visit.city")) == trim(col("attraction_city.city")), "left") \
    .select(
        col("visit.visit_id"),
        col("visit.attraction_id"),        # existing column in dataset
        col("attractions.attraction_id").alias("normalized_attraction_id"),
        col("attraction_city.city_id"),
        col("visit.visit_date"),
        col("visit.visitors_count"),
        col("visit.revenue"),
        col("visit.average_rating")
    )

# --------------------------
# Save Normalized Tables as Delta Tables
# --------------------------

# Dimension Tables
nationality_df.write.format("delta").mode("overwrite").saveAsTable("normalized_nationality")
travel_purpose_df.write.format("delta").mode("overwrite").saveAsTable("normalized_travel_purpose")
cities_df.write.format("delta").mode("overwrite").saveAsTable("normalized_cities")
hotels_df.write.format("delta").mode("overwrite").saveAsTable("normalized_hotels")
attractions_df.write.format("delta").mode("overwrite").saveAsTable("normalized_attractions")

# Fact Tables (Normalized)
tourist_clean_df.write.format("delta").mode("overwrite").saveAsTable("normalized_tourist")
flight_bookings_clean_df.write.format("delta").mode("overwrite").saveAsTable("normalized_flight_bookings")
hotel_stays_clean_df.write.format("delta").mode("overwrite").saveAsTable("normalized_hotel_stays")
attractions_visited_clean_df.write.format("delta").mode("overwrite").saveAsTable("normalized_attractions_visited")

# --------------------------
# Verify Tables Created
# --------------------------
spark.sql("SHOW TABLES").show()

# OPTIONAL: Preview the data in tables
spark.sql("SELECT * FROM normalized_tourist LIMIT 10").show()
spark.sql("SELECT * FROM normalized_flight_bookings LIMIT 10").show()
spark.sql("SELECT * FROM normalized_hotel_stays LIMIT 10").show()
spark.sql("SELECT * FROM normalized_attractions_visited LIMIT 10").show()


+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default| attractions_visited|      false|
| default|     flight_bookings|      false|
| default|         hotel_stays|      false|
| default|normalized_attrac...|      false|
| default|normalized_attrac...|      false|
| default|   normalized_cities|      false|
| default|normalized_flight...|      false|
| default|normalized_hotel_...|      false|
| default|   normalized_hotels|      false|
| default|normalized_nation...|      false|
| default|  normalized_tourist|      false|
| default|normalized_travel...|      false|
| default|tourist_demographics|      false|
+--------+--------------------+-----------+

+-----------+----------+---------+------+---+--------------+-----------------+
|customer_id|first_name|last_name|gender|age|nationality_id|travel_purpose_id|
+-----------+----------+---------+------+---+--------------+-----------------+
|          1| 

##Loads the normalized data into staging tables 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, current_date, lit, monotonically_increasing_id

# Start Spark Session
spark = SparkSession.builder.appName("StarSchemaWithSCD").getOrCreate()

# Load the staging data (already normalized)
tourist_df = spark.read.format("delta").table("normalized_tourist")
cities_df = spark.read.format("delta").table("normalized_cities")
hotels_df = spark.read.format("delta").table("normalized_hotels")
attractions_df = spark.read.format("delta").table("normalized_attractions")
flight_bookings_df = spark.read.format("delta").table("normalized_flight_bookings")
hotel_stays_df = spark.read.format("delta").table("normalized_hotel_stays")
attractions_visited_df = spark.read.format("delta").table("normalized_attractions_visited")



##Data Quality Check

In [0]:
# 1. Row Count
row_count_tourist = spark.sql("SELECT COUNT(*) FROM stg_tourist_demographics").first()[0]
print(f"Row count for stg_tourist_demographics: {row_count_tourist}")

# 2. Null Checks on Primary Fields
spark.sql("""
    SELECT *
    FROM stg_tourist_demographics
    WHERE customer_id IS NULL
""").show()

# 3. Duplicate Check on customer_id
spark.sql("""
    SELECT customer_id, COUNT(*)
    FROM stg_tourist_demographics
    GROUP BY customer_id
    HAVING COUNT(*) > 1
""").show()

# 4. Data Domain Checks (Gender field)
spark.sql("""
    SELECT DISTINCT gender
    FROM stg_tourist_demographics
""").show()


Row count for stg_tourist_demographics: 1000
+---+-----------+----------+------+---------+-----------+--------------+
|age|customer_id|first_name|gender|last_name|nationality|travel_purpose|
+---+-----------+----------+------+---------+-----------+--------------+
+---+-----------+----------+------+---------+-----------+--------------+

+-----------+--------+
|customer_id|count(1)|
+-----------+--------+
+-----------+--------+

+------+
|gender|
+------+
|Female|
| Other|
|  Male|
+------+



In [0]:
# 1. Row Count
row_count_flight = spark.sql("SELECT COUNT(*) FROM stg_flight_bookings").first()[0]
print(f"Row count for stg_flight_bookings: {row_count_flight}")

# 2. Null Checks
spark.sql("""
    SELECT *
    FROM stg_flight_bookings
    WHERE booking_id IS NULL OR customer_id IS NULL
""").show()

# 3. Duplicate Check on booking_id
spark.sql("""
    SELECT booking_id, COUNT(*)
    FROM stg_flight_bookings
    GROUP BY booking_id
    HAVING COUNT(*) > 1
""").show()

# 4. Domain Checks (Ticket Price >= 0)
spark.sql("""
    SELECT *
    FROM stg_flight_bookings
    WHERE ticket_price < 0
""").show()


Row count for stg_flight_bookings: 1000
+----------+-----------+-------------+-------+-----------+----------------+--------------+------------+------------+------------+
|booking_id|customer_id|flight_number|airline|origin_city|destination_city|departure_date|arrival_date|booking_date|ticket_price|
+----------+-----------+-------------+-------+-----------+----------------+--------------+------------+------------+------------+
+----------+-----------+-------------+-------+-----------+----------------+--------------+------------+------------+------------+

+----------+--------+
|booking_id|count(1)|
+----------+--------+
+----------+--------+

+----------+-----------+-------------+-------+-----------+----------------+--------------+------------+------------+------------+
|booking_id|customer_id|flight_number|airline|origin_city|destination_city|departure_date|arrival_date|booking_date|ticket_price|
+----------+-----------+-------------+-------+-----------+----------------+--------------+

In [0]:
# 1. Row Count
row_count_hotel = spark.sql("SELECT COUNT(*) FROM stg_hotel_stays").first()[0]
print(f"Row count for stg_hotel_stays: {row_count_hotel}")

# 2. Null Checks
spark.sql("""
    SELECT *
    FROM stg_hotel_stays
    WHERE booking_id IS NULL OR customer_id IS NULL
""").show()

# 3. Duplicate Check on booking_id
spark.sql("""
    SELECT booking_id, COUNT(*)
    FROM stg_hotel_stays
    GROUP BY booking_id
    HAVING COUNT(*) > 1
""").show()

# 4. Domain Check (Booking Amount >= 0)
spark.sql("""
    SELECT *
    FROM stg_hotel_stays
    WHERE total_amount < 0
""").show()


Row count for stg_hotel_stays: 1000
+----------+-----------+----------+----+---------+-------------+--------------+------------+------------+
|booking_id|customer_id|hotel_name|city|room_type|check_in_date|check_out_date|booking_date|total_amount|
+----------+-----------+----------+----+---------+-------------+--------------+------------+------------+
+----------+-----------+----------+----+---------+-------------+--------------+------------+------------+

+----------+--------+
|booking_id|count(1)|
+----------+--------+
+----------+--------+

+----------+-----------+----------+----+---------+-------------+--------------+------------+------------+
|booking_id|customer_id|hotel_name|city|room_type|check_in_date|check_out_date|booking_date|total_amount|
+----------+-----------+----------+----+---------+-------------+--------------+------------+------------+
+----------+-----------+----------+----+---------+-------------+--------------+------------+------------+



In [0]:
# 1. Row Count
row_count_attractions = spark.sql("SELECT COUNT(*) FROM stg_attractions_visited").first()[0]
print(f"Row count for stg_attractions_visited: {row_count_attractions}")

# 2. Null Checks
spark.sql("""
    SELECT *
    FROM stg_attractions_visited
    WHERE attraction_id IS NULL
""").show()

# 3. Duplicate Check on attraction_id + visit_date (or combination key)
spark.sql("""
    SELECT attraction_id, visit_date, COUNT(*)
    FROM stg_attractions_visited
    GROUP BY attraction_id, visit_date
    HAVING COUNT(*) > 1
""").show()

# 4. Domain Check (Visitors count >= 0)
spark.sql("""
    SELECT *
    FROM stg_attractions_visited
    WHERE visitors_count < 0
""").show()


Row count for stg_attractions_visited: 1000
+-------------+---------------+----+----------+--------------+-------+--------------+
|attraction_id|attraction_name|city|visit_date|visitors_count|revenue|average_rating|
+-------------+---------------+----+----------+--------------+-------+--------------+
+-------------+---------------+----+----------+--------------+-------+--------------+

+-------------+----------+--------+
|attraction_id|visit_date|count(1)|
+-------------+----------+--------+
+-------------+----------+--------+

+-------------+---------------+----+----------+--------------+-------+--------------+
|attraction_id|attraction_name|city|visit_date|visitors_count|revenue|average_rating|
+-------------+---------------+----+----------+--------------+-------+--------------+
+-------------+---------------+----+----------+--------------+-------+--------------+



##Build Dimensional Modeling

In [0]:
from pyspark.sql import Row

dq_checks = [
    Row(table="stg_tourist_demographics", check="Row Count", result=row_count_tourist),
    Row(table="stg_flight_bookings", check="Row Count", result=row_count_flight),
    Row(table="stg_hotel_stays", check="Row Count", result=row_count_hotel),
    Row(table="stg_attractions_visited", check="Row Count", result=row_count_attractions)
]

dq_df = spark.createDataFrame(dq_checks)
dq_df.show()

# Save DQ Summary
dq_df.write.format("delta").mode("overwrite").saveAsTable("staging_dq_summary")


+--------------------+---------+------+
|               table|    check|result|
+--------------------+---------+------+
|stg_tourist_demog...|Row Count|  1000|
| stg_flight_bookings|Row Count|  1000|
|     stg_hotel_stays|Row Count|  1000|
|stg_attractions_v...|Row Count|  1000|
+--------------------+---------+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, monotonically_increasing_id, col, lit, current_date

# Initialize Spark Session
spark = SparkSession.builder.appName("DimensionalModelSCD").getOrCreate()

# --------------------------
# Load Raw Datasets (Staging Layer)
# --------------------------
tourist_df = spark.read.json("dbfs:/FileStore/shared_uploads/swatisoni8899@gmail.com/tourist_demographics_lines-1.json")
flight_bookings_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/FileStore/tables/flight_bookings.csv")
hotel_stays_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/FileStore/tables/hotel_stays.csv")
attractions_visited_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/FileStore/tables/attractions_visited.csv")

# --------------------------
# Dimension Tables
# --------------------------

# Dim_Nationality (Static)
nationality_df = tourist_df.select(trim(col("nationality")).alias("nationality")).distinct() \
    .withColumn("nationality_sk", monotonically_increasing_id())

# Dim_Travel_Purpose (Static)
travel_purpose_df = tourist_df.select(trim(col("travel_purpose")).alias("travel_purpose")).distinct() \
    .withColumn("travel_purpose_sk", monotonically_increasing_id())

# Dim_City (Static)
cities_from_flights = flight_bookings_df.select(trim(col("origin_city")).alias("city")) \
    .union(flight_bookings_df.select(trim(col("destination_city")).alias("city"))).distinct()

cities_from_hotels = hotel_stays_df.select(trim(col("city")).alias("city")).distinct()
cities_from_attractions = attractions_visited_df.select(trim(col("city")).alias("city")).distinct()

cities_df = cities_from_flights.union(cities_from_hotels).union(cities_from_attractions).distinct() \
    .withColumn("city_sk", monotonically_increasing_id())

# Dim_Hotel
hotels_df = hotel_stays_df.select(trim(col("hotel_name")).alias("hotel_name")).distinct() \
    .withColumn("hotel_sk", monotonically_increasing_id())

# Dim_Attraction
attractions_df = attractions_visited_df.select(trim(col("attraction_name")).alias("attraction_name")).distinct() \
    .withColumn("attraction_sk", monotonically_increasing_id())

# --------------------------
# SCD2 - Dim_Tourist
# --------------------------

tourist_dim_df = tourist_df.alias("staging") \
    .join(nationality_df.alias("nat"), trim(col("staging.nationality")) == col("nat.nationality"), "left") \
    .join(travel_purpose_df.alias("tp"), trim(col("staging.travel_purpose")) == col("tp.travel_purpose"), "left") \
    .select(
        col("staging.customer_id"),
        trim(col("staging.first_name")).alias("first_name"),
        trim(col("staging.last_name")).alias("last_name"),
        col("staging.gender"),
        col("staging.age"),
        col("nat.nationality_sk"),
        col("tp.travel_purpose_sk"),
        lit(current_date()).alias("start_date"),
        lit("9999-12-31").alias("end_date"),
        lit("Y").alias("is_current")
    ).withColumn("tourist_sk", monotonically_increasing_id())

# --------------------------
# Fact_Flight_Bookings
# --------------------------
cities_df_origin = cities_df.alias("origin_city")
cities_df_destination = cities_df.alias("destination_city")

fact_flight_bookings_df = flight_bookings_df.alias("flight") \
    .join(cities_df_origin, trim(col("flight.origin_city")) == trim(col("origin_city.city")), "left") \
    .join(cities_df_destination, trim(col("flight.destination_city")) == trim(col("destination_city.city")), "left") \
    .select(
        col("flight.booking_id"),
        col("flight.customer_id"),
        col("origin_city.city_sk").alias("origin_city_sk"),
        col("destination_city.city_sk").alias("destination_city_sk"),
        col("flight.flight_number"),
        col("flight.airline"),
        col("flight.departure_date"),
        col("flight.arrival_date"),
        col("flight.booking_date"),
        col("flight.ticket_price")
    )

# --------------------------
# Fact_Hotel_Stays
# --------------------------
hotels_df_alias = hotels_df.alias("hotels")
cities_df_alias = cities_df.alias("hotel_city")

fact_hotel_stays_df = hotel_stays_df.alias("hotel") \
    .join(hotels_df_alias, trim(col("hotel.hotel_name")) == trim(col("hotels.hotel_name")), "left") \
    .join(cities_df_alias, trim(col("hotel.city")) == trim(col("hotel_city.city")), "left") \
    .select(
        col("hotel.booking_id").alias("stay_id"),
        col("hotel.customer_id"),
        col("hotels.hotel_sk"),
        col("hotel_city.city_sk"),
        col("hotel.check_in_date").alias("check_in"),
        col("hotel.check_out_date").alias("check_out"),
        col("hotel.total_amount").alias("booking_amount")
    )

# --------------------------
# Fact_Attractions_Visits (No customer_id)
# --------------------------
attractions_df_alias = attractions_df.alias("attractions")
cities_df_alias_2 = cities_df.alias("attraction_city")

fact_attractions_visits_df = attractions_visited_df.alias("visit") \
    .join(attractions_df_alias, trim(col("visit.attraction_name")) == trim(col("attractions.attraction_name")), "left") \
    .join(cities_df_alias_2, trim(col("visit.city")) == trim(col("attraction_city.city")), "left") \
    .withColumn("visit_sk", monotonically_increasing_id()) \
    .select(
        col("visit_sk"),
        col("attractions.attraction_sk"),
        col("attraction_city.city_sk"),
        col("visit.visit_date"),
        col("visit.visitors_count"),
        col("visit.revenue"),
        col("visit.average_rating")
    )

# --------------------------
# Save Dimension Tables (with SCD2 tourist)
# --------------------------
nationality_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("dim_nationality")
travel_purpose_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("dim_travel_purpose")
cities_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("dim_city")
hotels_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("dim_hotel")
attractions_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("dim_attraction")
tourist_dim_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("dim_tourist")

# --------------------------
# Save Fact Tables
# --------------------------
fact_flight_bookings_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("fact_flight_bookings")
fact_hotel_stays_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("fact_hotel_stays")
fact_attractions_visits_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("fact_attractions_visits")

# --------------------------
# Verify
# --------------------------

spark.sql("SELECT * FROM dim_tourist LIMIT 10").show()
spark.sql("SELECT * FROM fact_flight_bookings LIMIT 10").show()
spark.sql("SELECT * FROM fact_hotel_stays LIMIT 10").show()
spark.sql("SELECT * FROM fact_attractions_visits LIMIT 10").show()


+-----------+----------+---------+------+---+--------------+-----------------+----------+----------+----------+----------+
|customer_id|first_name|last_name|gender|age|nationality_sk|travel_purpose_sk|start_date|  end_date|is_current|tourist_sk|
+-----------+----------+---------+------+---+--------------+-----------------+----------+----------+----------+----------+
|          1|      John| Phillips|  Male| 55|             7|                0|2025-03-14|9999-12-31|         Y|         0|
|          2|     David|   Sawyer|Female| 55|            74|                1|2025-03-14|9999-12-31|         Y|         1|
|          3|      Juan|     Bell|  Male| 56|           203|                1|2025-03-14|9999-12-31|         Y|         2|
|          4|     Paula|  Sherman|Female| 46|           216|                3|2025-03-14|9999-12-31|         Y|         3|
|          5|   Anthony|   Briggs|Female| 27|           174|                3|2025-03-14|9999-12-31|         Y|         4|
|          6|   

## Analytical queries 

### Query 1: Top 5 Cities by Tourist Flight Bookings and Revenue

In [0]:
%sql
SELECT 
    dc.city AS destination_city,
    COUNT(ffb.booking_id) AS total_bookings,
    SUM(ffb.ticket_price) AS total_revenue
FROM 
    fact_flight_bookings ffb
JOIN 
    dim_city dc ON ffb.destination_city_sk = dc.city_sk
WHERE 
    ffb.booking_date >= '2024-01-01'  -- Filter: Recent bookings (adjust as needed)
GROUP BY 
    dc.city
ORDER BY 
    total_bookings DESC
LIMIT 5;


destination_city,total_bookings,total_revenue
Toronto,160,130129.57000000004
Tokyo,147,123212.80000000005
Paris,144,106830.38
Dubai,144,111831.91000000005
Sydney,141,106664.22000000004


Databricks visualization. Run in Databricks to view.

### Query 2: Average Stay Duration and Spending by Nationality

In [0]:
%sql
SELECT 
    dn.nationality,
    COUNT(fhs.stay_id) AS total_stays,
    ROUND(AVG(DATEDIFF(fhs.check_out, fhs.check_in)), 2) AS avg_stay_duration_days,
    ROUND(SUM(fhs.booking_amount), 2) AS total_spent
FROM 
    fact_hotel_stays fhs
JOIN 
    dim_tourist dt 
    ON fhs.customer_id = dt.customer_id
JOIN 
    dim_nationality dn 
    ON dt.nationality_sk = dn.nationality_sk
WHERE 
    dt.is_current = 'Y'  -- SCD2: Only current records
GROUP BY 
    dn.nationality
ORDER BY 
    total_spent DESC
LIMIT 20;


nationality,total_stays,avg_stay_duration_days,total_spent
Palau,18,5.83,7744.9
Mauritania,11,5.09,7281.38
Bouvet Island (Bouvetoya),14,6.14,7132.75
Turkey,12,5.58,6641.65
Mayotte,9,5.22,6398.43
Serbia,12,5.75,6258.3
Djibouti,9,5.22,6182.44
Bangladesh,12,4.92,6173.21
French Southern Territories,12,5.25,6169.14
Sudan,10,6.3,5558.53


Databricks visualization. Run in Databricks to view.

### Query 3 (Complex): High-Value Tourists and Their Multi-Activity Engagement

In [0]:
%sql
WITH tourist_spending AS (
    SELECT 
        dt.tourist_sk,
        dt.first_name,
        dt.last_name,
        SUM(ffb.ticket_price) AS flight_spending,
        SUM(fhs.booking_amount) AS hotel_spending,
        SUM(fav.revenue) AS attraction_spending,
        (SUM(ffb.ticket_price) + SUM(fhs.booking_amount) + SUM(fav.revenue)) AS total_spending
    FROM 
        dim_tourist dt
    LEFT JOIN 
        fact_flight_bookings ffb ON dt.customer_id = ffb.customer_id
    LEFT JOIN 
        fact_hotel_stays fhs ON dt.customer_id = fhs.customer_id
    LEFT JOIN 
        fact_attractions_visits fav ON fav.attraction_sk IS NOT NULL  -- Cross-service inclusion
    WHERE 
        dt.is_current = 'Y'
    GROUP BY 
        dt.tourist_sk, dt.first_name, dt.last_name
)
SELECT 
    tourist_sk,
    first_name,
    last_name,
    ROUND(flight_spending, 2) AS flight_spending,
    ROUND(hotel_spending, 2) AS hotel_spending,
    ROUND(attraction_spending, 2) AS attraction_spending,
    ROUND(total_spending, 2) AS total_spending
FROM 
    tourist_spending
WHERE 
    total_spending > 10000  -- High-value threshold
ORDER BY 
    total_spending DESC
LIMIT 10;


tourist_sk,first_name,last_name,flight_spending,hotel_spending,attraction_spending,total_spending
441,Sarah,Reyes,11425560.0,6630520.0,470187794.08,488243874.08
182,Anne,Cross,8588820.0,4721000.0,352640845.56,365950665.56
434,Christopher,Cruz,10464640.0,2986850.0,293867371.3,307318861.3
817,Daniel,Nixon,9750480.0,2564950.0,293867371.3,306182801.3
556,Whitney,Cantu,7197240.0,4312080.0,264480634.17,275989954.17
832,Deborah,Rosario,5271780.0,5697570.0,264480634.17,275449984.17
115,Cory,Richardson,4965270.0,4466760.0,264480634.17,273912664.17
937,Amy,Mckee,6216330.0,2631780.0,264480634.17,273328744.17
755,Rodney,Hale,10819160.0,4884720.0,235093897.04,250797777.04
277,David,Huff,9345520.0,6044440.0,235093897.04,250483857.04


Databricks visualization. Run in Databricks to view.

### Monthly Flight Revenue and Top Airlines

In [0]:
%sql
SELECT
    DATE_FORMAT(ffb.booking_date, 'yyyy-MM') AS booking_month,
    ffb.airline,
    COUNT(ffb.booking_id) AS total_bookings,
    ROUND(SUM(ffb.ticket_price), 2) AS total_revenue
FROM
    fact_flight_bookings ffb
JOIN
    dim_city dc_origin ON ffb.origin_city_sk = dc_origin.city_sk
JOIN
    dim_city dc_dest ON ffb.destination_city_sk = dc_dest.city_sk
WHERE
    ffb.booking_date >= '2024-01-01'
GROUP BY
    DATE_FORMAT(ffb.booking_date, 'yyyy-MM'),
    ffb.airline
ORDER BY
    booking_month ASC,
    total_revenue DESC;


booking_month,airline,total_bookings,total_revenue
2024-03,Delta,18,13807.1
2024-03,Emirates,13,12804.91
2024-03,Lufthansa,12,11280.24
2024-03,United Airlines,11,7987.95
2024-03,Qatar Airways,8,6164.99
2024-04,United Airlines,18,17090.78
2024-04,Delta,20,15758.51
2024-04,Qatar Airways,18,14385.42
2024-04,Lufthansa,16,11823.63
2024-04,Emirates,14,10220.32


Databricks visualization. Run in Databricks to view.

###Top Attractions by Revenue and Visitor Ratings 

In [0]:

%sql
SELECT
    da.attraction_name,
    dc.city AS attraction_city,
    ROUND(SUM(fav.revenue), 2) AS total_revenue,
    ROUND(AVG(fav.average_rating), 2) AS avg_rating,
    SUM(fav.visitors_count) AS total_visitors
FROM
    fact_attractions_visits fav
JOIN
    dim_attraction da ON fav.attraction_sk = da.attraction_sk
JOIN
    dim_city dc ON fav.city_sk = dc.city_sk
GROUP BY
    da.attraction_name,
    dc.city
ORDER BY
    total_revenue DESC,
    avg_rating DESC
LIMIT 100;


attraction_name,attraction_city,total_revenue,avg_rating,total_visitors
Statue of Liberty,New York,6643657.88,3.99,125911
Great Wall,Beijing,6297677.77,4.0,118007
Eiffel Tower,Paris,6295144.53,3.96,111755
Burj Khalifa,Dubai,5609753.97,4.04,103792
Sydney Opera House,Sydney,4540502.98,3.93,82627


Databricks visualization. Run in Databricks to view.