# Gold Layer: Star Schema (dbt Logic as PySpark)

**Ama√ß:** Silver'daki temiz binlerce tablolardan Power BI'a hazƒ±r Star Schema olu≈ütur.

| Tablo | Tip | A√ßƒ±klama |
|-------|-----|----------|
| `gold_fact_bookings` | Fact | Booking transaction + EUR KPI |
| `gold_dim_city` | Dim | City/Country + weather + exchange rates |
| `gold_dim_hotel` | Dim | Hotel attributes + tier |
| `gold_dim_date` | Dim | Calendar (Time Intelligence) |

**Not:** Bu notebook, `dbt_project/` i√ßindeki SQL modellerin PySpark kar≈üƒ±lƒ±ƒüƒ±dƒ±r.

In [1]:
from pyspark.sql.functions import (col, year, month, dayofmonth, quarter, dayofweek,
                                    weekofyear, dayofyear, date_format, when, lit,
                                    coalesce, round as spark_round, first, row_number,
                                    explode, sequence, to_date, expr)
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
from pyspark.sql.functions import sum as spark_sum, avg as spark_avg, count as spark_count, countDistinct, trim

print("ü•á Gold Layer ba≈ülatƒ±lƒ±yor...")

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 3, Finished, Available, Finished)

ü•á Gold Layer ba≈ülatƒ±lƒ±yor...


## 1. FACT_BOOKINGS

In [2]:
df_silver = spark.read.table("silver_bookings")

# Filter: 1900 sentinel dates & NULL hotel_id
df_fact = df_silver.filter(
    (col("booking_date") > lit("1950-01-01").cast("date")) &
    col("hotel_id").isNotNull() &
    col("booking_id").isNotNull()
)

df_fact = df_fact.select(
    # Keys
    col("booking_id"),
    col("hotel_id"),
    col("customer_id"),
    
    # Date
    col("booking_date"),
    year("booking_date").alias("booking_year"),
    month("booking_date").alias("booking_month"),
    
    # Location
    col("city_clean").alias("city"),
    col("country"),
    
    # Hotel
    col("hotel_name"),
    col("hotel_type"),
    col("star_rating"),
    
    # Booking details
    col("room_type"),
    col("nights"),
    col("adults"),
    col("children"),
    col("booking_status"),
    col("source_system"),
    
    # Financial
    col("currency"),
    col("total_amount"),
    col("room_price"),
    coalesce(col("exchange_rate_to_eur"), lit(1.0)).alias("exchange_rate_to_eur"),
    
    # KPI: Revenue in EUR
    spark_round(
        col("total_amount") * coalesce(col("exchange_rate_to_eur"), lit(1.0)), 2
    ).alias("total_amount_eur"),
    
    # KPI: Avg nightly rate
    when(col("nights") > 0,
         spark_round(col("total_amount") / col("nights"), 2)
    ).otherwise(col("total_amount")).alias("avg_nightly_rate"),
    
    # KPI: Avg nightly rate EUR
    when(col("nights") > 0,
         spark_round(
             (col("total_amount") * coalesce(col("exchange_rate_to_eur"), lit(1.0))) / col("nights"), 2
         )
    ).otherwise(
         spark_round(col("total_amount") * coalesce(col("exchange_rate_to_eur"), lit(1.0)), 2)
    ).alias("avg_nightly_rate_eur"),
    
    # Weather
    col("temperature_c"),
    col("weather_code")
)

df_fact.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_fact_bookings")

count = df_fact.count()
print(f"‚úÖ gold_fact_bookings: {count:,} rows")
df_fact.select("booking_id", "city", "currency", "total_amount", "total_amount_eur", "avg_nightly_rate_eur").show(5, truncate=False)

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 4, Finished, Available, Finished)

‚úÖ gold_fact_bookings: 1,239,248 rows
+-------------+---------+--------+------------+----------------+--------------------+
|booking_id   |city     |currency|total_amount|total_amount_eur|avg_nightly_rate_eur|
+-------------+---------+--------+------------+----------------+--------------------+
|Bkg_000000001|Ostrava  |EUR     |244.31      |244.31          |30.54               |
|Bkg_000000067|Ankara   |EUR     |1701.67     |1701.67         |1701.67             |
|Bkg_000000073|Rotterdam|EUR     |1168.27     |1168.27         |233.65              |
|Bkg_000000111|Surabaya |EUR     |1208.91     |1208.91         |134.32              |
|Bkg_000000227|Turku    |EUR     |1496.65     |1496.65         |124.72              |
+-------------+---------+--------+------------+----------------+--------------------+
only showing top 5 rows



## 2. DIM_CITY

In [3]:
df_city_dim = spark.read.table("silver_city_dim")

df_dim_city = df_city_dim.filter(
    col("city").isNotNull() & (col("city") != "Unknown")
).dropDuplicates(["city"]).select(
    row_number().over(Window.orderBy("city")).alias("city_key"),
    col("city"),
    col("country"),
    col("temperature_c"),
    col("weather_code"),
    coalesce(col("rate_eur"), lit(0)).alias("rate_eur"),
    coalesce(col("rate_usd"), lit(0)).alias("rate_usd"),
    coalesce(col("rate_gbp"), lit(0)).alias("rate_gbp"),
    coalesce(col("rate_jpy"), lit(0)).alias("rate_jpy"),
    coalesce(col("rate_try"), lit(0)).alias("rate_try"),
    coalesce(col("rate_aed"), lit(0)).alias("rate_aed"),
    coalesce(col("rate_cny"), lit(0)).alias("rate_cny")
)

df_dim_city.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_city")

print(f"‚úÖ gold_dim_city: {df_dim_city.count()} cities (unique)")
df_dim_city.show(5, truncate=False)

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 5, Finished, Available, Finished)

‚úÖ gold_dim_city: 247 cities (unique)
+--------+---------+--------+-------------+------------+--------+--------+--------+----------+--------+--------+--------+
|city_key|city     |country |temperature_c|weather_code|rate_eur|rate_usd|rate_gbp|rate_jpy  |rate_try|rate_aed|rate_cny|
+--------+---------+--------+-------------+------------+--------+--------+--------+----------+--------+--------+--------+
|1       |Aalborg  |Kenya   |0.4          |51          |1.0     |1.177139|0.874196|182.436915|51.56482|4.323083|8.135807|
|2       |Aarhus   |Nigeria |1.0          |51          |1.0     |1.177139|0.874196|182.436915|51.56482|4.323083|8.135807|
|3       |Abu Dhabi|Colombia|21.7         |0           |1.0     |1.177139|0.874196|182.436915|51.56482|4.323083|8.135807|
|4       |Abuja    |India   |22.7         |0           |1.0     |1.177139|0.874196|182.436915|51.56482|4.323083|8.135807|
|5       |Adelaide |Ireland |25.7         |1           |1.0     |1.177139|0.874196|182.436915|51.56482|4.32

## 3. DIM_HOTEL

In [4]:
df_silver = spark.read.table("silver_bookings")

w = Window.partitionBy("hotel_id").orderBy(col("booking_date").desc())

df_hotels = df_silver.filter(col("hotel_id").isNotNull()) \
    .withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .select(
        col("hotel_id"),
        col("hotel_name"),
        col("city_clean").alias("city"),
        col("country"),
        col("hotel_type"),
        col("star_rating"),
        col("latitude"),
        col("longitude"),
        when(col("star_rating") >= 4, lit("Premium"))
            .when(col("star_rating") >= 3, lit("Standard"))
            .otherwise(lit("Budget")).alias("hotel_tier")
    )

df_hotels.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_hotel")

print(f"‚úÖ gold_dim_hotel: {df_hotels.count()} hotels")
df_hotels.groupBy("hotel_tier").count().show()

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 6, Finished, Available, Finished)

‚úÖ gold_dim_hotel: 1001 hotels
+----------+-----+
|hotel_tier|count|
+----------+-----+
|   Premium|  400|
|    Budget|  424|
|  Standard|  177|
+----------+-----+



## 4. DIM_DATE

In [5]:
# Generate date spine: 2020-01-01 ‚Üí 2026-12-31
df_dates = spark.sql("""
    SELECT explode(sequence(
        to_date('2020-01-01'),
        to_date('2026-12-31'),
        interval 1 day
    )) AS date_key
""")

df_dim_date = df_dates.select(
    col("date_key"),
    year("date_key").alias("year"),
    month("date_key").alias("month"),
    dayofmonth("date_key").alias("day"),
    quarter("date_key").alias("quarter"),
    date_format("date_key", "MMMM").alias("month_name"),
    date_format("date_key", "MMM").alias("month_short"),
    date_format("date_key", "EEEE").alias("day_name"),
    date_format("date_key", "EEE").alias("day_short"),
    weekofyear("date_key").alias("week_of_year"),
    dayofweek("date_key").alias("day_of_week"),
    dayofyear("date_key").alias("day_of_year"),
    when(dayofweek("date_key").isin(1, 7), lit(True))
        .otherwise(lit(False)).alias("is_weekend"),
    expr("concat(year(date_key), '-Q', quarter(date_key))").alias("year_quarter"),
    date_format("date_key", "yyyy-MM").alias("year_month")
)

df_dim_date.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_date")

print(f"‚úÖ gold_dim_date: {df_dim_date.count()} days (2020-2026)")
df_dim_date.show(5, truncate=False)

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 7, Finished, Available, Finished)

‚úÖ gold_dim_date: 2557 days (2020-2026)
+----------+----+-----+---+-------+----------+-----------+---------+---------+------------+-----------+-----------+----------+------------+----------+
|date_key  |year|month|day|quarter|month_name|month_short|day_name |day_short|week_of_year|day_of_week|day_of_year|is_weekend|year_quarter|year_month|
+----------+----+-----+---+-------+----------+-----------+---------+---------+------------+-----------+-----------+----------+------------+----------+
|2020-01-01|2020|1    |1  |1      |January   |Jan        |Wednesday|Wed      |1           |4          |1          |false     |2020-Q1     |2020-01   |
|2020-01-02|2020|1    |2  |1      |January   |Jan        |Thursday |Thu      |1           |5          |2          |false     |2020-Q1     |2020-01   |
|2020-01-03|2020|1    |3  |1      |January   |Jan        |Friday   |Fri      |1           |6          |3          |false     |2020-Q1     |2020-01   |
|2020-01-04|2020|1    |4  |1      |January   |Jan    

# KPI

In [6]:
df_fact = spark.read.table("gold_fact_bookings")
df_city = spark.read.table("gold_dim_city").dropDuplicates(["city"])

df_kpi = df_fact.groupBy(
    "city", "country", "booking_year", "booking_month", "hotel_type"
).agg(
    spark_count("*").alias("total_bookings"),
    spark_round(spark_sum("total_amount_eur"), 2).alias("total_revenue_eur"),
    spark_round(spark_avg("total_amount_eur"), 2).alias("avg_booking_value_eur"),
    spark_round(spark_avg("avg_nightly_rate_eur"), 2).alias("avg_nightly_rate_eur"),
    spark_round(spark_avg("nights"), 1).alias("avg_nights"),
    spark_sum(col("adults") + coalesce(col("children"), lit(0))).alias("total_guests"),
    spark_round(
        spark_sum(when(col("booking_status") == "Canceled", 1).otherwise(0)) * 100.0 / spark_count("*"), 1
    ).alias("cancellation_rate")
)

df_kpi_final = df_kpi.join(
    df_city.select("city", col("temperature_c").alias("city_temperature_c"), col("weather_code").alias("city_weather_code")),
    on="city", how="left"
)

df_kpi_final.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_kpi_revenue")

print(f"‚úÖ gold_kpi_revenue: {df_kpi_final.count():,} rows")
df_kpi_final.orderBy(col("total_revenue_eur").desc()).show(10, truncate=False)

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 8, Finished, Available, Finished)

‚úÖ gold_kpi_revenue: 135,380 rows
+------------+--------------+------------+-------------+--------------+--------------+-----------------+---------------------+--------------------+----------+------------+-----------------+------------------+-----------------+
|city        |country       |booking_year|booking_month|hotel_type    |total_bookings|total_revenue_eur|avg_booking_value_eur|avg_nightly_rate_eur|avg_nights|total_guests|cancellation_rate|city_temperature_c|city_weather_code|
+------------+--------------+------------+-------------+--------------+--------------+-----------------+---------------------+--------------------+----------+------------+-----------------+------------------+-----------------+
|Buenos Aires|Argentina     |2025        |9            |Resort        |23            |1.739760476E7    |756417.6             |377.44              |875.4     |83.0        |0.0              |22.7              |0                |
|Bucharest   |Romania       |2024        |5            |B

## 5. Summary

In [7]:
print("\n" + "="*60)
print("ü•á GOLD LAYER COMPLETE")
print("="*60)

gold_tables = ["gold_fact_bookings", "gold_dim_city", "gold_dim_hotel", "gold_dim_date"]
for t in gold_tables:
    try:
        c = spark.read.table(t).count()
        print(f"  ‚úÖ {t:<25} ‚Üí {c:>10,} rows")
    except:
        print(f"  ‚ùå {t:<25} ‚Üí NOT FOUND")

print("="*60)
print("\nüéØ Power BI'da Star Schema:")
print("   fact_bookings ‚Üí dim_city  (city = city)")
print("   fact_bookings ‚Üí dim_hotel (hotel_id = hotel_id)")
print("   fact_bookings ‚Üí dim_date  (booking_date = date_key)")

StatementMeta(, e9e3ca4b-6afb-43dc-b07a-2ad8b0ba4a8c, 9, Finished, Available, Finished)


ü•á GOLD LAYER COMPLETE
  ‚úÖ gold_fact_bookings        ‚Üí  1,239,248 rows
  ‚úÖ gold_dim_city             ‚Üí        247 rows
  ‚úÖ gold_dim_hotel            ‚Üí      1,001 rows
  ‚úÖ gold_dim_date             ‚Üí      2,557 rows

üéØ Power BI'da Star Schema:
   fact_bookings ‚Üí dim_city  (city = city)
   fact_bookings ‚Üí dim_hotel (hotel_id = hotel_id)
   fact_bookings ‚Üí dim_date  (booking_date = date_key)
