In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# dim_flights
df_dim_flights = (
    spark.table("silver.silver_flights")
    .withColumn("flight_sk", monotonically_increasing_id())  # surrogate key
    .select("flight_sk", "flight_id", "airline", "origin", "destination", "flight_date")
)

df_dim_flights.write.mode("overwrite").saveAsTable("gold.dim_flights")

# dim_airports
df_dim_airports = (
    spark.table("silver.silver_airports")
    .withColumn("airport_sk", monotonically_increasing_id())
    .select("airport_sk", "airport_id", "airport_name", "city", "country")
)

df_dim_airports.write.mode("overwrite").saveAsTable("gold.dim_airports")

# dim_passengers
df_dim_passengers = (
    spark.table("silver.silver_passengers")
    .withColumn("passenger_sk", monotonically_increasing_id())
    .select("passenger_sk", "passenger_id", "name", "gender", "nationality")
)

df_dim_passengers.write.mode("overwrite").saveAsTable("gold.dim_passengers")

In [0]:
df_fact_bookings = (
    spark.table("silver.silver_bookings")
    # join với dim_passengers
    .join(df_dim_passengers, "passenger_id", "left")
    # join với dim_flights
    .join(df_dim_flights, "flight_id", "left")
    # join với dim_airports
    .join(df_dim_airports, "airport_id", "left")
    .select(
        "booking_id",
        "passenger_sk",
        "flight_sk",
        "airport_sk",
        "amount",
        "booking_date"
    )
)

df_fact_bookings.write.mode("overwrite").saveAsTable("gold.fact_bookings")

In [0]:
spark.sql("SHOW TABLES IN gold").show()

spark.sql("SELECT * FROM gold.fact_bookings LIMIT 10").show()
