# Gold Layer - Data Modelling

## 1) Create Connection to Azure Storage account

In [0]:
storage_account = "team04sa"
application_id = "7ab46e7b-cc68-4f3f-9903-9a6bae8e347a"
directory_id = "b7a954b3-aa07-453e-b8a3-97101aeffcad"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "6CO8Q~LRNOBGY5V~1UjmhmTdtEQwcbNbiB6ojcaw")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

## 2) Read delta format from Silver Layer

In [0]:
GOLD_BASE = "abfss://fooddata@team04sa.dfs.core.windows.net/gold/"
SILVER_PATH = "abfss://fooddata@team04sa.dfs.core.windows.net/silver/"

silver_df = spark.read.format("delta").load(SILVER_PATH)

silver_df.cache()
print("rows:", silver_df.count())
silver_df.printSchema()

## 3) Create DIM Tables

In [0]:
from pyspark.sql import functions as F, Window

# Country
dim_country = (silver_df
               .select(F.col("adm0_id").alias("country_id"),
                       F.col("adm0_name").alias("country"))
               .distinct()
               .withColumn("country_sk", F.monotonically_increasing_id())
               .select("country_sk", "country_id", "country")
               )

# Product
dim_product = (silver_df
               .select(F.col("cm_id").alias("product_id"), 
                       F.col("cm_name").alias("product_name"),
                       F.col("cm_group").alias("product_group"))
               .distinct()
               .withColumn("product_sk", F.monotonically_increasing_id())
               .select("product_sk", "product_id", "product_name", "product_group")
               )

from pyspark.sql import functions as F

dim_time = (
    silver_df
    .select("date_month", "mp_year", "mp_month", "year_month")
    .distinct()
    # Quartal: floor((month-1)/3)+1  => 1..4
    .withColumn(
        "qtr",
        F.concat(
            F.lit("Q"),
            (F.floor((F.col("mp_month").cast("int") - F.lit(1)) / F.lit(3)) + F.lit(1)).cast("string")
        )
    )
    .withColumnRenamed("mp_year", "year")
    .withColumnRenamed("mp_month", "month")
    .withColumnRenamed("date_month", "date")
)

# Unit
dim_unit = (silver_df
            .select(F.col("std_unit_group"))
            .distinct()
            .withColumn("unit_sk", F.monotonically_increasing_id())
            .select("unit_sk", "std_unit_group")
            )

# Market
dim_market = (silver_df
              .select("mkt_name","mkt_id").distinct()
              .withColumn("market_sk", F.monotonically_increasing_id())
              .select("market_sk", "mkt_name","mkt_id")
              )

# Write DIMs
(dim_country.write.mode("overwrite").format("delta").save(GOLD_BASE+"dim_country"))
(dim_product.write.mode("overwrite").format("delta").save(GOLD_BASE+"dim_product"))
(dim_time.write.mode("overwrite").format("delta").save(GOLD_BASE+"dim_time"))
(dim_unit.write.mode("overwrite").format("delta").save(GOLD_BASE+"dim_unit"))
(dim_market.write.mode("overwrite").format("delta").option("mergeSchema", "true").save(GOLD_BASE+"dim_market"))

## 4) Create Fact table - fact_Food_Price_Comparison

In [0]:
# ---------- FACT: Country Difference (KAZ vs AFG) on ProductGroup ----------
from pyspark.sql import functions as F

keys = ["cm_group", "year_month"]

df = silver_df.withColumn("adm0_up", F.upper(F.col("adm0_name")))

kaz = (
    df.filter(F.col("adm0_up") == "KAZAKHSTAN")
      .groupBy(*keys)
      .agg(
          F.avg("mp_price_eur").alias("avg_kaz_eur"),
          F.count("*").alias("n_kaz")
      )
)

afg = (
    df.filter(F.col("adm0_up") == "AFGHANISTAN")
      .groupBy(*keys)
      .agg(
          F.avg("mp_price_eur").alias("avg_afg_eur"),
          F.count("*").alias("n_afg")
      )
)

both = (
    kaz.join(afg, keys, "inner")
       .withColumn("delta", F.col("avg_kaz_eur") - F.col("avg_afg_eur"))
       .withColumn(
           "delta_pct",
           F.when(F.col("avg_afg_eur") == 0, F.lit(None).cast("double"))
            .otherwise(F.col("delta") / F.col("avg_afg_eur"))
       )
)

# Country-IDs aus dim_country (Spalten: country_id, country_sk, country)
fcd = (
    both.alias("p")
    .join(
        dim_product.alias("dp"),
        F.col("p.cm_group").cast("string") == F.col("dp.product_group").cast("string"),
        "left"
    )
    .join(
        dim_country.alias("dc_kaz"),
        F.upper(F.trim(F.col("dc_kaz.country"))) == F.lit("KAZAKHSTAN"),
        "left"
    )
    .join(
        dim_country.alias("dc_afg"),
        F.upper(F.trim(F.col("dc_afg.country"))) == F.lit("AFGHANISTAN"),
        "left"
    )
    .select(
        F.col("p.cm_group"),
        F.col("p.year_month"),
        F.col("p.avg_kaz_eur").alias("avg_kaz"),
        F.col("p.avg_afg_eur").alias("avg_afg"),
        F.col("p.delta"),
        F.col("p.delta_pct"),
        F.col("p.n_kaz"),
        F.col("p.n_afg"),
        F.col("dp.product_id").cast("string").alias("product_id"),
        F.col("dp.product_sk").alias("product_sk"),
        F.col("dp.product_name").alias("product_name"),
        F.col("dc_kaz.country_id").alias("country_id_kaz"),
        F.col("dc_afg.country_id").alias("country_id_afg")
    )
)

display(fcd)

(fcd.write.mode("overwrite").format("delta").option("mergeSchema", "true").save(GOLD_BASE+"fact_Food_Price_Comparison"))

## 5) Create Fact table - fact_price_monthly

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# ---------- BASE ----------
base = (
    silver_df
    .groupBy("adm0_name","cm_id","std_unit_group","date_month")
    .agg(
        F.avg("price_per_unit_eur").alias("avg_price"),
        F.stddev_pop("price_per_unit_eur").alias("sd_price"),
        F.count("*").alias("n_obs"),
        F.avg(F.col("is_outlier_iqr").cast("int")).alias("outlier_rate")
    )
)

# ---------- Lags / Rates ----------
w = Window.partitionBy("adm0_name","cm_id","std_unit_group").orderBy("date_month")

fact_price_monthly = (
    base
    .withColumn("avg_price_lag1",  F.lag("avg_price", 1).over(w))
    .withColumn("avg_price_lag12", F.lag("avg_price",12).over(w))
    .withColumn(
        "mom_pct",
        F.when(F.col("avg_price_lag1").isNull() | (F.col("avg_price_lag1")==0), F.lit(None).cast("double"))
         .otherwise( (F.col("avg_price")-F.col("avg_price_lag1"))/F.col("avg_price_lag1") )
    )
    .withColumn(
        "yoy_pct",
        F.when(F.col("avg_price_lag12").isNull() | (F.col("avg_price_lag12")==0), F.lit(None).cast("double"))
         .otherwise( (F.col("avg_price")-F.col("avg_price_lag12"))/F.col("avg_price_lag12") )
    )
)

# ---------- DIM Joins ----------
fpm = (
    fact_price_monthly.alias("f")
    # Country: f.adm0_name ~ dc.country  (normalized case/whitespace)
    .join(
        dim_country.alias("dc"),
        F.upper(F.trim(F.col("f.adm0_name"))) == F.upper(F.trim(F.col("dc.country"))),
        "left"
    )
    # Product: f.cm_id ~ dp.product_id
    .join(
        dim_product.alias("dp"),
        F.col("f.cm_id").cast("string") == F.col("dp.product_id").cast("string"),
        "left"
    )
    # Time: f.date_month ~ dt.date  (dim_time hatte date_month -> date umbenannt)
    .join(
        dim_time.alias("dt"),
        F.col("f.date_month") == F.col("dt.date"),
        "left"
    )
    # Unit: std_unit_group passt 1:1
    .join(
        dim_unit.alias("du"),
        F.col("f.std_unit_group") == F.col("du.std_unit_group"),
        "left"
    )
    .select(
        F.col("dc.country_sk").alias("country_sk"),
        # Falls du lieber die natürliche ID im Fact führst:
        F.col("dp.product_id").alias("product_id"),
        # alternativ/zusätzlich: F.col("dp.product_sk").alias("product_sk"),
        F.col("du.unit_sk").alias("unit_sk"),
        F.col("dt.date").alias("date"),

        F.col("f.avg_price"),
        F.col("f.sd_price"),
        F.col("f.n_obs"),
        F.col("f.outlier_rate"),
        F.col("f.mom_pct"),
        F.col("f.yoy_pct")
    )
)
display(fpm)
(fpm.write.mode("overwrite").format("delta")
 .save(GOLD_BASE+"fact_price_monthly"))

## 6) fact_corr_cross_kaz_afg_mkt_prod

In [0]:
# Monatsmittel je Land × Produkt × Markt
base_p = (
    silver_df
    .groupBy("adm0_name","cm_id","cm_group","mkt_id","mkt_name","year_month")
    .agg(F.avg("price_per_unit_eur").alias("avg_price"))
)

w_prod = Window.partitionBy("adm0_name","cm_id","mkt_id").orderBy("year_month")
rets_prod = (
    base_p
    .withColumn("avg_price_lag1", F.lag("avg_price", 1).over(w_prod))
    .withColumn("ret",
        F.when((F.col("avg_price")>0) & (F.col("avg_price_lag1")>0),
               F.log(F.col("avg_price")/F.col("avg_price_lag1"))))
    .filter(F.col("ret").isNotNull())
)

afg_p = (
    rets_prod.filter(F.upper("adm0_name")=="AFGHANISTAN")
        .select(F.col("cm_group").alias("pg_afg"),
                F.col("cm_id").alias("product_id_afg"),
                F.col("mkt_id").alias("mkt_afg_id"),
                F.col("mkt_name").alias("mkt_afg_name"),
                "year_month",
                F.col("ret").alias("ret_afg"))
)

kaz_p = (
    rets_prod.filter(F.upper("adm0_name")=="KAZAKHSTAN")
        .select(F.col("cm_group").alias("pg_kaz"),
                F.col("cm_id").alias("product_id_kaz"),
                F.col("mkt_id").alias("mkt_kaz_id"),
                F.col("mkt_name").alias("mkt_kaz_name"),
                "year_month",
                F.col("ret").alias("ret_kaz"))
)

pairs_p = afg_p.join(kaz_p, "year_month", "inner")

fact_corr_cross_kaz_afg_mkt_prod = (
    pairs_p
    .groupBy("pg_afg","product_id_afg","mkt_afg_id","mkt_afg_name",
             "pg_kaz","product_id_kaz","mkt_kaz_id","mkt_kaz_name")
    .agg(
        F.corr("ret_afg","ret_kaz").alias("corr_returns"),
        F.count("*").alias("n_pairs"),
        F.stddev_pop("ret_afg").alias("sd_ret_afg"),
        F.stddev_pop("ret_kaz").alias("sd_ret_kaz"),
        F.avg("ret_afg").alias("mean_ret_afg"),
        F.avg("ret_kaz").alias("mean_ret_kaz"),
        F.min("year_month").alias("start_month"),
        F.max("year_month").alias("end_month")
    )
    .withColumn("same_group", F.col("pg_afg")==F.col("pg_kaz"))
    .filter(F.col("n_pairs") >= MIN_PAIRS)
)

# Produkt-Keys & -Namen aus der DIM dazujoinen
dp = dim_product.select(
        F.col("product_id").cast("string").alias("product_id"),
        "product_sk","product_name","product_group"
)

fact_corr_cross_kaz_afg_mkt_prod = (
    fact_corr_cross_kaz_afg_mkt_prod
    .join(dp.alias("dpa"),
          F.col("product_id_afg").cast("string")==F.col("dpa.product_id"), "left")
    .join(dp.alias("dpk"),
          F.col("product_id_kaz").cast("string")==F.col("dpk.product_id"), "left")
    .select(
        "pg_afg","product_id_afg",
        F.col("dpa.product_sk").alias("product_sk_afg"),
        F.col("dpa.product_name").alias("product_name_afg"),
        "mkt_afg_id","mkt_afg_name",
        "pg_kaz","product_id_kaz",
        F.col("dpk.product_sk").alias("product_sk_kaz"),
        F.col("dpk.product_name").alias("product_name_kaz"),
        "mkt_kaz_id","mkt_kaz_name",
        "corr_returns","n_pairs",
        "sd_ret_afg","sd_ret_kaz","mean_ret_afg","mean_ret_kaz",
        "start_month","end_month","same_group"
    )
)
display(fact_corr_cross_kaz_afg_mkt_prod)

(fact_corr_cross_kaz_afg_mkt_prod.write
    .mode("overwrite")
    .format("delta")
    .save(GOLD_BASE + "fact_corr_cross_kaz_afg_mkt_prod"))