# DE1 — Lab 2: PostgreSQL → Star Schema ETL
> Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
---


Execute all cells. Attach evidence and fill metrics.

## 0. Setup and schemas

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab2").getOrCreate()
base = "data/"
# Explicit schemas
customers_schema = T.StructType([
    T.StructField("customer_id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), True),
    T.StructField("email", T.StringType(), True),
    T.StructField("created_at", T.TimestampType(), True),
])
brands_schema = T.StructType([
    T.StructField("brand_id", T.IntegerType(), False),
    T.StructField("brand_name", T.StringType(), True),
])
categories_schema = T.StructType([
    T.StructField("category_id", T.IntegerType(), False),
    T.StructField("category_name", T.StringType(), True),
])
products_schema = T.StructType([
    T.StructField("product_id", T.IntegerType(), False),
    T.StructField("product_name", T.StringType(), True),
    T.StructField("brand_id", T.IntegerType(), True),
    T.StructField("category_id", T.IntegerType(), True),
    T.StructField("price", T.DoubleType(), True),
])
orders_schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("order_date", T.TimestampType(), True),
])
order_items_schema = T.StructType([
    T.StructField("order_item_id", T.IntegerType(), False),
    T.StructField("order_id", T.IntegerType(), True),
    T.StructField("product_id", T.IntegerType(), True),
    T.StructField("quantity", T.IntegerType(), True),
    T.StructField("unit_price", T.DoubleType(), True),
])


## 1. Ingest operational tables (from CSV exports)

In [None]:
customers = spark.read.schema(customers_schema).option("header","true").csv(base+"lab2_customers.csv")
brands = spark.read.schema(brands_schema).option("header","true").csv(base+"lab2_brands.csv")
categories = spark.read.schema(categories_schema).option("header","true").csv(base+"lab2_categories.csv")
products = spark.read.schema(products_schema).option("header","true").csv(base+"lab2_products.csv")
orders = spark.read.schema(orders_schema).option("header","true").csv(base+"lab2_orders.csv")
order_items = spark.read.schema(order_items_schema).option("header","true").csv(base+"lab2_order_items.csv")

for name, df in [("customers",customers),("brands",brands),("categories",categories),("products",products),("orders",orders),("order_items",order_items)]:
    print(name, df.count())


### Evidence: ingestion plan

In [None]:
ingest = orders.join(order_items, "order_id").select("order_id").distinct()
ingest.explain("formatted")
from datetime import datetime as _dt
import pathlib
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_ingest.txt","w") as f:
    f.write(str(_dt.now())+"\n")
    f.write(ingest._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_ingest.txt")


## 2. Surrogate key function

In [None]:
def sk(cols):
    # stable 64-bit positive surrogate key from natural keys
    return F.abs(F.xxhash64(*[F.col(c) for c in cols]))


## 3. Build dimensions

In [None]:
dim_customer = customers.select(
    sk(["customer_id"]).alias("customer_sk"),
    "customer_id","name","email","created_at"
)

dim_brand = brands.select(
    sk(["brand_id"]).alias("brand_sk"),
    "brand_id","brand_name"
)

dim_category = categories.select(
    sk(["category_id"]).alias("category_sk"),
    "category_id","category_name"
)

dim_product = products.select(
    sk(["product_id"]).alias("product_sk"),
    "product_id","product_name",
    sk(["brand_id"]).alias("brand_sk"),
    sk(["category_id"]).alias("category_sk"),
    "price"
)


## 4. Build date dimension

In [None]:
from pyspark.sql import Window as W
dates = orders.select(F.to_date("order_date").alias("date")).distinct()
dim_date = dates.select(
    sk(["date"]).alias("date_sk"),
    F.col("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.date_format("date","E").alias("dow")
)


## 5. Build fact_sales with broadcast joins where appropriate

In [None]:
oi = order_items.alias("oi")
p = products.alias("p")
o = orders.alias("o")
c = customers.alias("c")

# Join with small dimensions using DF copies to compute SKs, then broadcast dims by size heuristic
df_fact = (oi
    .join(p, F.col("oi.product_id")==F.col("p.product_id"))
    .join(o, "order_id")
    .join(c, "customer_id")
    .withColumn("date", F.to_date("order_date"))
)

# Attach surrogate keys
df_fact = (df_fact
    .withColumn("date_sk", sk(["date"]))
    .withColumn("customer_sk", sk(["customer_id"]))
    .withColumn("product_sk", sk(["product_id"]))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("subtotal", F.col("quantity")*F.col("unit_price"))
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .select("order_id","date_sk","customer_sk","product_sk","quantity","unit_price","subtotal","year","month")
)

df_fact.explain("formatted")
with open("proof/plan_fact_join.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(df_fact._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_fact_join.txt")


## 6. Write Parquet outputs (partitioned by year, month)

In [None]:
base_out = "outputs/lab2"
(dim_customer.write.mode("overwrite").parquet(f"{base_out}/dim_customer"))
(dim_brand.write.mode("overwrite").parquet(f"{base_out}/dim_brand"))
(dim_category.write.mode("overwrite").parquet(f"{base_out}/dim_category"))
(dim_product.write.mode("overwrite").parquet(f"{base_out}/dim_product"))
(dim_date.write.mode("overwrite").parquet(f"{base_out}/dim_date"))
(df_fact.write.mode("overwrite").partitionBy("year","month").parquet(f"{base_out}/fact_sales"))
print("Parquet written under outputs/lab2/")


## 7. Plan comparison: projection and layout

In [None]:
# Case A: join and then project
a = (orders.join(order_items, "order_id")
            .join(products, "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
a.explain("formatted")
_ = a.count()

# Case B: project early
b = (orders.select("order_id","order_date")
            .join(order_items.select("order_id","product_id","quantity"), "order_id")
            .join(products.select("product_id","price"), "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
b.explain("formatted")
_ = b.count()

print("Record Spark UI metrics for both runs in lab2_metrics_log.csv")


## 8. Cleanup

In [None]:
spark.stop()
print("Spark session stopped.")
