### Dim Time

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DateType, StringType
import datetime

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 3, Finished, Available, Finished)

In [2]:
gold_db = "gold"               
dim_date_table = f"{gold_db}.dim_date"

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 4, Finished, Available, Finished)

In [3]:
# Define start/end date range
start_date = datetime.date(2020, 1, 1)
end_date = datetime.date(2030, 12, 31)

# Create continuous date sequence
date_df = spark.createDataFrame(
    [(start_date + datetime.timedelta(days=i),)
     for i in range((end_date - start_date).days + 1)],
    ["the_day"]
).withColumn("dthe_day", F.col("the_day").cast(DateType()))

# Derive all DimDate attributes
dim_date = (
    date_df
    .withColumn("date_key", F.date_format("the_day", "yyyyMMdd").cast(IntegerType()))
    .withColumn("the_day_name", F.date_format("the_day", "EEEE"))
    .withColumn("the_week", F.weekofyear("the_day"))  
    .withColumn("the_iso_week", F.weekofyear("the_day"))  
    # Day of week: Monday=1, Sunday=7
    .withColumn("the_day_of_week", ((F.dayofweek("the_day") + 5) % 7 + 1))
    .withColumn("the_month", F.month("the_day"))
    .withColumn("the_month_name", F.date_format("the_day", "MMMM"))
    .withColumn("the_quarter", F.quarter("the_day"))
    .withColumn("the_year", F.year("the_day"))
    .withColumn("the_first_of_month", F.trunc("the_day", "month"))
    .withColumn(
        "the_last_of_year",
        F.to_date(F.concat_ws("-", F.col("the_year"), F.lit("12"), F.lit("31")))
    )
    .withColumn("the_day_of_year", F.dayofyear("the_day"))
    .select(
        "date_key",
        "the_day",
        "the_day_name",
        "the_week",
        "the_iso_week",
        "the_day_of_week",
        "the_month",
        "the_month_name",
        "the_quarter",
        "the_year",
        "the_first_of_month",
        "the_last_of_year",
        "the_day_of_year"
    )
)

# Write table
dim_date.write.mode("overwrite").saveAsTable(dim_date_table)

#Preview first 20 rows
display(spark.table(dim_date_table).limit(20))

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bec6b995-0b43-4d1d-a4bd-55b4a4d236f5)

### Dim Hour

In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType

# Create full range of hours and minutes
hours = list(range(0, 24))
minutes = list(range(0, 60))
dim_hour_table = "gold.dim_hour"


StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 6, Finished, Available, Finished)

In [5]:
# Create combinations (Cartesian product)
time_data = [(h, m) for h in hours for m in minutes]

dim_hour_df = spark.createDataFrame(time_data, ["hour", "minute"]) \
    .withColumn("hour", F.format_string("%02d", F.col("hour")).cast(StringType())) \
    .withColumn("minute", F.format_string("%02d", F.col("minute")).cast(StringType())) \
    .withColumn("time", F.concat_ws(":", F.col("hour"), F.col("minute")))

# Reorder columns to match schema
dim_hour_df = dim_hour_df.select("time", "hour", "minute")

# Save to gold table
dim_hour_df.write.mode("overwrite").saveAsTable(dim_hour_table)

display(spark.table(dim_hour_table).limit(20))

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7665714d-0e83-466e-bfac-82cda8a039f5)

### Function to Create Dim Tables with SCD Type 2

In [6]:
from pyspark.sql import functions as F, Window
from delta.tables import DeltaTable

def scd2_merge(spark, src_df, target_table, business_keys, compare_cols, surrogate_key):
    """
    Generic SCD Type 2 merge:
      - src_df: DataFrame with business keys + compare columns (payload)
      - target_table: fully qualified target name (e.g. 'gld.dim_status')
      - business_keys: list of business key column names (or single string)
      - compare_cols: list of columns to compare for change detection
      - surrogate_key: name of surrogate column to create (e.g. 'status_key')
    Behavior:
      - Filters out rows with null/empty business keys
      - Detects changes using null-safe comparison (eqNullSafe)
      - On initial load: assigns surrogate using row_number()
      - On incremental: expire current rows for changed keys, append new versions with new surrogate
    """
    # normalize args
    if isinstance(business_keys, str):
        business_keys = [business_keys]
    if isinstance(compare_cols, str):
        compare_cols = [compare_cols]

    # 1) Filter out rows with null/empty business keys
    for k in business_keys:
        src_df = src_df.filter((F.col(k).isNotNull()) & (F.trim(F.col(k)) != ""))

    # 2) Prepare source with metadata
    src = (
        src_df
        .withColumn("is_current", F.lit(True))
        .withColumn("effective_start_ts", F.current_timestamp())
        .withColumn("effective_end_ts", F.lit(None).cast("timestamp"))
    )

    # 3) INITIAL LOAD (target does not exist)
    if not spark._jsparkSession.catalog().tableExists(target_table):
        # deterministic ordering for surrogate assignment: order by business keys
        w = Window.orderBy(*business_keys)
        src_with_sk = src.withColumn(surrogate_key, F.row_number().over(w))
        # place surrogate first
        cols = [surrogate_key] + business_keys + [c for c in src_with_sk.columns if c not in business_keys + [surrogate_key]]
        src_with_sk.select(*cols).write.format("delta").mode("overwrite").saveAsTable(target_table)
        print(f"Created new SCD2 table {target_table} (initial load). Assigned surrogate '{surrogate_key}'.")
        return

    # 4) INCREMENTAL LOAD
    tgt = spark.table(target_table)
    tgt_current = tgt.filter(F.col("is_current") == True)

    # 5) Join src to tgt_current on business keys (left join to detect new keys)
    join_cond = [src[f] == tgt_current[f] for f in business_keys]
    joined = src.alias("s").join(tgt_current.alias("t"), on=join_cond, how="left")

    # 6) Build change detection condition:
    #    changed if t.<first_business_key> is null (new key) OR any compare_col differs (null-safe)
    first_bk = business_keys[0]
    change_cond = F.col(f"t.{first_bk}").isNull()
    for c in compare_cols:
        # null-safe inequality: not (s.c <=> t.c)
        change_cond = change_cond | (~F.col(f"s.{c}").eqNullSafe(F.col(f"t.{c}")))

    changed = joined.filter(change_cond).select([F.col(f"s.{c}").alias(c) for c in src.columns]).distinct()

    changed_count = changed.count()
    if changed_count == 0:
        print(f"No changes detected for {target_table}.")
        return

    # 7) Expire existing current rows for those business keys using MERGE
    changed_keys_df = changed.select(*business_keys).distinct()
    changed_keys_df.createOrReplaceTempView("_tmp_changed_keys_for_scd2")

    # Build ON condition for MERGE SQL
    on_conditions = " AND ".join([f"t.{k} = c.{k}" for k in business_keys])
    merge_sql = f"""
        MERGE INTO {target_table} t
        USING (SELECT * FROM _tmp_changed_keys_for_scd2) c
        ON ({on_conditions})
        WHEN MATCHED AND t.is_current = true THEN
          UPDATE SET t.is_current = false, t.effective_end_ts = current_timestamp()
    """
    spark.sql(merge_sql)

    # 8) Assign surrogate keys for new rows (append)
    max_sk_row = tgt.agg(F.max(F.col(surrogate_key)).alias("max_sk")).collect()[0]
    max_sk = max_sk_row["max_sk"] if (max_sk_row and max_sk_row["max_sk"] is not None) else 0

    # deterministically order new rows to generate new surrogate values
    w_seq = Window.orderBy(*business_keys)
    new_rows = changed.withColumn("_rn", F.row_number().over(w_seq)) \
                      .withColumn(surrogate_key, F.col("_rn") + F.lit(max_sk)) \
                      .drop("_rn")

    # Ensure surrogate is first column when appending
    append_cols = [surrogate_key] + business_keys + [c for c in new_rows.columns if c not in business_keys + [surrogate_key]]
    new_rows.select(*append_cols).write.format("delta").mode("append").saveAsTable(target_table)

    print(f"SCD2 merge for {target_table} completed: {changed_count} new/changed rows appended (surrogate starts at {max_sk+1}).")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 8, Finished, Available, Finished)

In [7]:
# Load from silver layer
sil_order = spark.table("silver.order")
sil_supplier = spark.table("silver.supplier")


StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 9, Finished, Available, Finished)

### Dim Status

In [8]:
src_status = sil_order.select(F.col("status")).distinct()
scd2_merge(spark, src_status, "gold.dim_status", business_keys="status", compare_cols=["status"], surrogate_key="status_key")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 10, Finished, Available, Finished)

No changes detected for gold.dim_status.


In [9]:
%%sql
SELECT * FROM gold.dim_status 


StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 11, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 5 fields>

### Dim Stop Status

In [10]:
src_stop_status = sil_order.select(F.col("stop_status")).distinct()
scd2_merge(spark, src_stop_status, "gold.dim_stop_status", business_keys="stop_status", compare_cols=["stop_status"], surrogate_key="stopstatus_key")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 12, Finished, Available, Finished)

No changes detected for gold.dim_stop_status.


In [11]:
%%sql
SELECT * FROM gold.dim_stop_status

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 13, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 5 fields>

### Dim Service

In [12]:
src_service = sil_order.select(F.col("service_id")).distinct()
scd2_merge(spark, src_service, "gold.dim_service", business_keys="service_id", compare_cols=["service_id"], surrogate_key="service_key")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 14, Finished, Available, Finished)

No changes detected for gold.dim_service.


In [13]:
%%sql
SELECT * FROM gold.dim_service

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 15, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 5 fields>

### Dim City

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

# --- 1) Define city data ---
data = [
    (1, "Thành phố Hà Nội")
]

schema = StructType([
    StructField("city_key", IntegerType(), False),
    StructField("city_name", StringType(), False)
])

city_df = spark.createDataFrame(data, schema)

# --- 2) Add SCD Type 2 metadata ---
city_scd_df = (city_df
    .withColumn("is_current", F.lit(True))
    .withColumn("effective_start_ts", F.current_timestamp())
    .withColumn("effective_end_ts", F.lit(None).cast(TimestampType()))
)

# --- 3) Save as Delta Table ---
city_scd_df.write.format("delta").mode("overwrite").saveAsTable("gold.dim_city")

# --- 4) Verification ---
print("Created gold.dim_city with rows:", city_scd_df.count())
display(spark.table("gold.dim_city"))


StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 16, Finished, Available, Finished)

Created gold.dim_city with rows: 1


SynapseWidget(Synapse.DataFrame, 1e26ab24-dbc7-43dc-a834-e72f5507143e)

In [15]:
%%sql
SELECT * FROM gold.dim_city

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 17, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 5 fields>

### Dim District

In [16]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from datetime import datetime

# --- 1) Source rows: (city_key, district, district_key, latitude, longitude) ---
data = [
    (1, "Huyện Ba Vì", 33, 21.15, 105.4),
    (1, "Huyện Chương Mỹ", 17, 20.85, 105.68),
    (1, "Huyện Đan Phượng", 18, 21.09, 105.68),
    (1, "Huyện Đông Anh", 20, 21.13, 105.82),
    (1, "Huyện Gia Lâm", 16, 21.02, 105.95),
    (1, "Huyện Hoài Đức", 14, 21.02, 105.68),
    (1, "Huyện Mê Linh", 32, 21.18, 105.7),
    (1, "Huyện Mỹ Đức", 26, 20.73, 105.7),
    (1, "Huyện Phú Xuyên", 29, 20.73, 105.9),
    (1, "Huyện Phúc Thọ", 21, 21.09, 105.58),
    (1, "Huyện Quốc Oai", 28, 20.96, 105.61),
    (1, "Huyện Sóc Sơn", 24, 21.28, 105.82),
    (1, "Huyện Thạch Thất", 27, 21.05, 105.55),
    (1, "Huyện Thanh Oai", 15, 20.88, 105.77),
    (1, "Huyện Thanh Trì", 12, 20.95, 105.85),
    (1, "Huyện Thường Tín", 31, 20.83, 105.88),
    (1, "Huyện Ứng Hòa", 25, 20.73, 105.8),
    (1, "Quận Ba Đình", 9, 21.0366, 105.8347),
    (1, "Quận Bắc Từ Liêm", 6, 21.07, 105.75),
    (1, "Quận Cầu Giấy", 1, 21.031, 105.792),
    (1, "Quận Đống Đa", 2, 21.0166, 105.825),
    (1, "Quận Hà Đông", 5, 20.96, 105.77),
    (1, "Quận Hai Bà Trưng", 7, 21.0069, 105.86),
    (1, "Quận Hoàn Kiếm", 13, 21.0285, 105.8542),
    (1, "Quận Hoàng Mai", 8, 20.97, 105.86),
    (1, "Quận Long Biên", 4, 21.04, 105.9),
    (1, "Quận Nam Từ Liêm", 10, 21.02, 105.75),
    (1, "Quận Tây Hồ", 3, 21.07, 105.823),
    (1, "Quận Thanh Xuân", 11, 21.0, 105.81),
    (1, "Thị xã Sơn Tây", 19, 21.1333, 105.5)
]

# --- 2) Schema and create DataFrame ---
schema = StructType([
    StructField("city_key", IntegerType(), False),
    StructField("district", StringType(), False),
    StructField("district_key", IntegerType(), False),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
])

src_df = spark.createDataFrame(data, schema)

# --- 3) Add SCD2 metadata for an initial load ---
scd_df = (src_df
          .withColumn("is_current", F.lit(True))
          .withColumn("effective_start_ts", F.current_timestamp())
          .withColumn("effective_end_ts", F.lit(None).cast(TimestampType()))
         )

# Reorder columns (optional) to a conventional layout
final_df = scd_df.select(
    "district_key",
    "district",
    "city_key",
    "latitude",
    "longitude",
    "is_current",
    "effective_start_ts",
    "effective_end_ts"
)

# --- 4) Persist to Delta (Gold) ---
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
final_df.write.format("delta").mode("overwrite").saveAsTable("gold.dim_district")

# --- 5) Quick verification ---
print("Created gold.dim_district with rows:", final_df.count())
display(spark.table("gold.dim_district").orderBy("district_key").limit(200))


StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 18, Finished, Available, Finished)

Created gold.dim_district with rows: 30


SynapseWidget(Synapse.DataFrame, d2e2ceeb-4690-4c27-ba99-5dd49b339a78)

In [17]:
%%sql
SELECT * FROM gold.dim_district

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 19, Finished, Available, Finished)

<Spark SQL result set with 30 rows and 8 fields>

### Dim Supplier

In [18]:
src_supplier = sil_supplier.select(
    F.col("supplier_id"),
    F.col("age"),
    F.col("activate_time"),
    F.col("create_time"),
    F.col("first_activate_time"),
    F.col("first_complete_time")
).distinct()
scd2_merge(spark, src_supplier, "gold.dim_supplier", business_keys="supplier_id", compare_cols=["age", "activate_time", "create_time", "first_activate_time", "first_complete_time"], surrogate_key="supplier_key")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 20, Finished, Available, Finished)

No changes detected for gold.dim_supplier.


In [19]:
%%sql
SELECT * FROM gold.dim_supplier

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 21, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 11 fields>

### Fact Order

In [20]:
from pyspark.sql import functions as F
fact_table = "gold.fact_order"
dim_status_tbl = "gold.dim_status"
dim_stop_status_tbl = "gold.dim_stop_status"
dim_service_tbl = "gold.dim_service"
dim_district_tbl = "gold.dim_district"
dim_supplier_tbl = "gold.dim_supplier"

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 22, Finished, Available, Finished)

In [21]:
order = spark.table("silver.order")
silver_supplier = spark.table("silver.supplier").select("supplier_id")


dim_status = spark.table(dim_status_tbl).filter(F.col("is_current") == True).select("status_key", "status")
dim_stop_status = spark.table(dim_stop_status_tbl).filter(F.col("is_current") == True).select("stopstatus_key", "stop_status")
dim_service = spark.table(dim_service_tbl).filter(F.col("is_current") == True).select("service_key", "service_id")
# dim_district uses composite key: actual_city_name + district
dim_district = spark.table(dim_district_tbl).filter(F.col("is_current") == True).select("district_key", "district",  F.col("city_key").alias("district_city_key")).alias("d")

# dim_supplier current includes supplier_key and last activity/login fields (ensure names)
dim_supplier = spark.table(dim_supplier_tbl).filter(F.col("is_current") == True) \
    .select("supplier_key", "supplier_id")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 23, Finished, Available, Finished)

In [22]:
fact = (order
    # join status -> status_key
    .join(dim_status, on=[order.status == dim_status.status], how="left")
    # join stop_status -> stopstatus_key
    .join(dim_stop_status, on=[order.stop_status == dim_stop_status.stop_status], how="left")
    # join service -> service_key
    .join(dim_service, on=[order.service_id == dim_service.service_id], how="left")
    # join district -> district_key using both city_key and district
     .join(dim_district,on=[(order.district == dim_district.district)], how="left")
    # join supplier -> supplier_key (order.supplier_id => dim_supplier.supplier_id)
    .join(dim_supplier, on=[order.supplier_id == dim_supplier.supplier_id], how="left")
    # .join(silver_supplier, on=[order.supplier_id == silver_supplier.id], how="left")
)


StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 24, Finished, Available, Finished)

In [23]:
fact_selected = fact.select(
    # Core business fields
    F.col("order_id"),
    F.col("stop_id"),
    F.col("distance"),
    F.col("cancel_by_user"),
    F.col("cancel_code"),
    F.col("cancel_comment"),
    F.col("partner"),

    # Dimension surrogate keys
    F.col("status_key"),
    F.col("stopstatus_key"),
    F.col("district_key"),
    F.col("service_key"),
    F.col("supplier_key"),

    # Date & Hour (as HH:mm text)
    F.col("create_time_date").alias("create_date"),
    F.col("create_time_time").alias("create_hour"),

    F.col("order_time_date").alias("order_date"),
    F.col("order_time_time").alias("order_hour"),

    F.col("accept_time_date").alias("accept_date"),
    F.col("accept_time_time").alias("accept_hour"),

    F.col("board_time_date").alias("board_date"),
    F.col("board_time_time").alias("board_hour"),

    F.col("pickup_time_date").alias("pickup_date"),
    F.col("pickup_time_time").alias("pickup_hour"),

    F.col("complete_time_date").alias("complete_date"),
    F.col("complete_time_time").alias("complete_hour"),

    F.col("cancel_time_date").alias("cancel_date"),
    F.col("cancel_time_time").alias("cancel_hour")

)

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 25, Finished, Available, Finished)

In [24]:
fact_selected.write.format("delta").mode("overwrite").saveAsTable(fact_table)

print(f"Created {fact_table} successfully with HH:mm formatted hour columns.")

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 26, Finished, Available, Finished)

Created gold.fact_order successfully with HH:mm formatted hour columns.


In [26]:
%%sql
SELECT * FROM gold.fact_order LIMIT 100

StatementMeta(, 4dfe6b41-b421-4ca0-b132-2fbdad108fa3, 28, Finished, Available, Finished)

<Spark SQL result set with 100 rows and 30 fields>