**Import Required Libraries**

In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

**Load Project Utilities & Initialize Notebook Widgets**

In [0]:
%run /Workspace/consolidated_pipeline/1_setup/utilities

In [0]:
print(bronze_schema, silver_schema, gold_schema)

bronze silver gold


In [0]:
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "orders", "Data Source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

base_path = f's3://sportsbar-final/{data_source}'
landing_path = f"{base_path}/landing/"
processed_path = f"{base_path}/processed/"
print("Base Path: ", base_path)
print("Landing Path: ", landing_path)
print("Processed Path: ", processed_path)


# define the tables
bronze_table = f"{catalog}.{bronze_schema}.{data_source}"
silver_table = f"{catalog}.{silver_schema}.{data_source}"
gold_table = f"{catalog}.{gold_schema}.sb_fact_{data_source}"

Base Path:  s3://sportsbar-final/orders
Landing Path:  s3://sportsbar-final/orders/landing/
Processed Path:  s3://sportsbar-final/orders/processed/


## Bronze

In [0]:
df = spark.read.options(header=True, inferSchema=True).csv(f"{landing_path}/*.csv").withColumn("read_timestamp", F.current_timestamp()).select("*", "_metadata.file_name", "_metadata.file_size")

print("Total Rows: ", df.count())
df.show(5)

Total Rows:  8834
+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|     order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|
+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|FDEC818102602|Tuesday, December...|     789102|  25891502|    196.0|2025-11-24 14:09:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|     789102|  25891503|     NULL|2025-11-24 14:09:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|     789102|  25891602|    147.0|2025-11-24 14:09:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|     789102|  25891101|    337.0|2025-11-24 14:09:...|orders_2025_12_16...|    23060|
|FDEC818102602|Tuesday, December...|    INVALID|  25891202|    202.0|2025-11-24 14:09:...|orders_2025_12_16...|    23060|
+-----

In [0]:
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("append") \
 .saveAsTable(bronze_table)

### Staging table to process just the arrived incremenal data

In [0]:
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{bronze_schema}.staging_{data_source}")

### Moving files from source to processed directory

In [0]:
files = dbutils.fs.ls(landing_path)
for file_info in files:
    dbutils.fs.mv(
        file_info.path,
        f"{processed_path}/{file_info.name}",
        True
    )

## Silver

In [0]:
df_orders = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.staging_{data_source};")
df_orders.show(2)

+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|    order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|FDEC85903601|Thursday, Decembe...|     789903|  77777777|    342.0|2025-11-24 14:09:...|orders_2025_12_04...|    22385|
|FDEC85903601|Thursday, Decembe...|     789903|  25891302|     53.0|2025-11-24 14:09:...|orders_2025_12_04...|    22385|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
only showing top 2 rows


**Transformations**

In [0]:
# 1. Keep only rows where order_qty is present
df_orders = df_orders.filter(F.col("order_qty").isNotNull())


# 2. Clean customer_id → keep numeric, else set to 999999
df_orders = df_orders.withColumn(
    "customer_id",
    F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id"))
     .otherwise("999999")
     .cast("string")
)

# 3. Remove weekday name from the date text
#    "Tuesday, July 01, 2025" → "July 01, 2025"
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.regexp_replace(F.col("order_placement_date"), r"^[A-Za-z]+,\s*", "")
)

# 4. Parse order_placement_date using multiple possible formats
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.coalesce(
        F.try_to_date("order_placement_date", "yyyy/MM/dd"),
        F.try_to_date("order_placement_date", "dd-MM-yyyy"),
        F.try_to_date("order_placement_date", "dd/MM/yyyy"),
        F.try_to_date("order_placement_date", "MMMM dd, yyyy"),
    )
)

# 5. Drop duplicates
df_orders = df_orders.dropDuplicates(["order_id", "order_placement_date", "customer_id", "product_id", "order_qty"])

# 5. convert product id to string
df_orders = df_orders.withColumn('product_id', F.col('product_id').cast('string'))

In [0]:
# check what's the maximum and minimum date
df_orders.agg(
    F.min("order_placement_date").alias("min_date"),
    F.max("order_placement_date").alias("max_date")
).show()

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2025-12-04|2025-12-30|
+----------+----------+



**Join with products**

In [0]:
df_products = spark.table("fmcg.silver.products")
df_joined = df_orders.join(df_products, on="product_id", how="inner").select(df_orders["*"], df_products["product_code"])

df_joined.show(5)

+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+--------------------+
|     order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|        product_code|
+-------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+--------------------+
| FDEC87522503|          2025-12-04|     789522|  25891403|    342.0|2025-11-24 14:09:...|orders_2025_12_04...|    22385|77b6f538a9d0e0cf8...|
| FDEC89522601|          2025-12-08|     789522|  25891403|    476.0|2025-11-24 14:09:...|orders_2025_12_08...|    21711|77b6f538a9d0e0cf8...|
|FDEC817203502|          2025-12-15|     789203|  25891203|    300.0|2025-11-24 14:09:...|orders_2025_12_15...|    20287|889c67757ece9c973...|
|FDEC818101501|          2025-12-16|     789101|  25891501|    221.0|2025-11-24 14:09:...|orders_2025_12_16...|    23060|ee1f7df9cf660ef02...|

In [0]:
if not (spark.catalog.tableExists(silver_table)):
    df_joined.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable(silver_table)
else:
    silver_delta = DeltaTable.forName(spark, silver_table)
    silver_delta.alias("silver").merge(df_joined.alias("bronze"), "silver.order_placement_date = bronze.order_placement_date AND silver.order_id = bronze.order_id AND silver.product_code = bronze.product_code AND silver.customer_id = bronze.customer_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

### Staging table to process just the arrived incremenal data

In [0]:
# stagging for incremental data

df_joined.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{silver_schema}.staging_{data_source}")

## Gold

In [0]:
df_gold = spark.sql(f"SELECT order_id, order_placement_date as date, customer_id as customer_code, product_code, product_id, order_qty as sold_quantity FROM {catalog}.{silver_schema}.staging_{data_source};")

df_gold.show(2)

+------------+----------+-------------+--------------------+----------+-------------+
|    order_id|      date|customer_code|        product_code|product_id|sold_quantity|
+------------+----------+-------------+--------------------+----------+-------------+
|FDEC87522503|2025-12-04|       789522|77b6f538a9d0e0cf8...|  25891403|        342.0|
|FDEC89522601|2025-12-08|       789522|77b6f538a9d0e0cf8...|  25891403|        476.0|
+------------+----------+-------------+--------------------+----------+-------------+
only showing top 2 rows


In [0]:
df_gold.count()

6947

In [0]:
if not (spark.catalog.tableExists(gold_table)):
    print("creating New Table")
    df_gold.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_table)
else:
    gold_delta = DeltaTable.forName(spark, gold_table)
    gold_delta.alias("source").merge(df_gold.alias("gold"), "source.date = gold.date AND source.order_id = gold.order_id AND source.product_code = gold.product_code AND source.customer_code = gold.customer_code").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

## Merging with Parent company

- Note: We want data for monthly level but child data is on daily level

**Incremental Load**

In [0]:
# df_child = your incremental daily rows

df_child =  spark.sql(f"SELECT order_placement_date as date FROM {catalog}.{silver_schema}.staging_{data_source}")

incremental_month_df = df_child.select(
    F.trunc("date", "MM").alias("start_month")
).distinct()

incremental_month_df.show()

incremental_month_df.createOrReplaceTempView("incremental_months")

+-----------+
|start_month|
+-----------+
| 2025-12-01|
+-----------+



In [0]:
monthly_table = spark.sql(f"""
    SELECT date, product_code, customer_code, sold_quantity
    FROM {catalog}.{gold_schema}.sb_fact_orders sbf
    INNER JOIN incremental_months m
        ON trunc(sbf.date, 'MM') = m.start_month
""")

print("Total Rows: ", monthly_table.count())
monthly_table.show(10)

Total Rows:  7736
+----------+--------------------+-------------+-------------+
|      date|        product_code|customer_code|sold_quantity|
+----------+--------------------+-------------+-------------+
|2025-12-04|77b6f538a9d0e0cf8...|       789522|        342.0|
|2025-12-08|77b6f538a9d0e0cf8...|       789522|        476.0|
|2025-12-15|889c67757ece9c973...|       789203|        300.0|
|2025-12-16|ee1f7df9cf660ef02...|       789101|        221.0|
|2025-12-16|889c67757ece9c973...|       789101|        393.0|
|2025-12-21|ee1f7df9cf660ef02...|       789622|        190.0|
|2025-12-05|e91ba9d665f90254d...|       789303|        462.0|
|2025-12-14|889c67757ece9c973...|       789702|        322.0|
|2025-12-23|e91ba9d665f90254d...|       789201|        343.0|
|2025-12-26|3cab59f0592428527...|       789702|         37.0|
+----------+--------------------+-------------+-------------+
only showing top 10 rows


In [0]:
monthly_table.select('date').distinct().orderBy('date').show()

+----------+
|      date|
+----------+
|2025-12-01|
|2025-12-02|
|2025-12-03|
|2025-12-04|
|2025-12-05|
|2025-12-06|
|2025-12-07|
|2025-12-08|
|2025-12-09|
|2025-12-10|
|2025-12-11|
|2025-12-12|
|2025-12-13|
|2025-12-14|
|2025-12-15|
|2025-12-16|
|2025-12-17|
|2025-12-18|
|2025-12-19|
|2025-12-20|
+----------+
only showing top 20 rows


In [0]:
df_monthly_recalc = (
    monthly_table
    .withColumn("month_start", F.trunc("date", "MM"))
    .groupBy("month_start", "product_code", "customer_code")
    .agg(F.sum("sold_quantity").alias("sold_quantity"))
    .withColumnRenamed("month_start", "date")   # month_start → date = first of month
)

df_monthly_recalc.show(10, truncate=False)

+----------+----------------------------------------------------------------+-------------+-------------+
|date      |product_code                                                    |customer_code|sold_quantity|
+----------+----------------------------------------------------------------+-------------+-------------+
|2025-12-01|778c2a7aa27bfdb211fd5ece048de80d00fbf3d6924bd908d91054796ba16ab6|789402       |1096.0       |
|2025-12-01|778c2a7aa27bfdb211fd5ece048de80d00fbf3d6924bd908d91054796ba16ab6|789503       |1839.0       |
|2025-12-01|ee1f7df9cf660ef02c33037d8d6eb94cbefe8e7b84c306e9387f09b0cae0abae|789703       |1759.0       |
|2025-12-01|3cab59f05924285270313afcfe40a08983bb03dd88f432e34fc6336914c14345|789103       |686.0        |
|2025-12-01|889c67757ece9c973791dfbc2d47b026a3342cc7255e47a3170329d158e897c2|789402       |3340.0       |
|2025-12-01|0cb7b2f42657b625f754e833aa1cf6a967be26f17415f5342302ebb0e90c8a28|789321       |3765.0       |
|2025-12-01|2e387cef1424d6e7b162b45622d4b1a788

In [0]:
df_monthly_recalc.count()

612

In [0]:
gold_parent_delta = DeltaTable.forName(spark, f"{catalog}.{gold_schema}.fact_orders")
gold_parent_delta.alias("parent_gold").merge(df_monthly_recalc.alias("child_gold"), "parent_gold.date = child_gold.date AND parent_gold.product_code = child_gold.product_code AND parent_gold.customer_code = child_gold.customer_code").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

## Cleanup

In [0]:
%sql
DROP TABLE fmcg.bronze.staging_orders;

In [0]:
%sql
DROP TABLE fmcg.silver.staging_orders;