In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from pyspark.sql.window import Window

In [0]:
%run /Workspace/Users/himabindut9715@gmail.com/databricks_project/1_setup/utilities

In [0]:
print(bronze_schema, silver_schema, gold_schema)

bronze silver gold


In [0]:
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "orders", "Data Source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

base_path=base_path=f's3://sports-bar-hb/{data_source}'
landing_path = f"{base_path}/landing/"
processed_path = f"{base_path}/processed/"
print("Base Path: ", base_path)
print("Landing Path: ", landing_path)
print("Processed Path: ", processed_path)


# define the tables
bronze_table = f"{catalog}.{bronze_schema}.{data_source}"
silver_table = f"{catalog}.{silver_schema}.{data_source}"
gold_table = f"{catalog}.{gold_schema}.sb_fact_{data_source}"

Base Path:  s3://sports-bar-hb/orders
Landing Path:  s3://sports-bar-hb/orders/landing/
Processed Path:  s3://sports-bar-hb/orders/processed/


In [0]:
df = (
    spark.read
         .options(header=True, inferSchema=True)
         .csv(f"{landing_path}")
         .withColumn("read_timestamp", F.current_timestamp())
         .select("*", "_metadata.file_name","_metadata.file_size")
)

print("Total Rows: ", df.count())
df.show(5)


Total Rows:  350
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|    order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|FDEC83622503|Monday, December ...|     789622|  25891302|     39.0|2025-12-17 18:32:...|orders_2025_12_01...|    21062|
|FDEC83622503|Monday, December ...|     789622|  25891301|     26.0|2025-12-17 18:32:...|orders_2025_12_01...|    21062|
|FDEC83622503|Monday, December ...|     789622|  25891503|    205.0|2025-12-17 18:32:...|orders_2025_12_01...|    21062|
|FDEC83622503|Monday, December ...|     789622|  25891501|    184.0|2025-12-17 18:32:...|orders_2025_12_01...|    21062|
|FDEC83622503|Monday, December ...|     789622|  25891101|    327.0|2025-12-17 18:32:...|orders_2025_12_01...|    21062|
+------------+-

In [0]:
display(df.limit(20))

order_id,order_placement_date,customer_id,product_id,order_qty,read_timestamp,file_name,file_size
FDEC83622503,"Monday, December 01, 2025",789622,25891302,39.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",789622,25891301,26.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",789622,25891503,205.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",789622,25891501,184.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",789622,25891101,327.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",INVALID,25891401,480.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",INVALID,25891401,480.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,"Monday, December 01, 2025",789622,25891301,26.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC83622503,01-12-2025,789622,25891503,205.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062
FDEC82301603,"Monday, December 01, 2025",789301,25891302,54.0,2025-12-17T18:32:27.554Z,orders_2025_12_01.csv,21062


In [0]:
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .option("mergeSchema", "true") \
 .mode("append") \
 .saveAsTable(bronze_table)

### Moving files from source to processed directory

In [0]:
files = dbutils.fs.ls(landing_path)
for file_info in files:
    dbutils.fs.mv(
        file_info.path,
        f"{processed_path}/{file_info.name}",
        True
    )

### Silver

In [0]:
%sql
SHOW CATALOGS;


catalog
fmcg
samples
system
workspace


In [0]:
%sql
SHOW TABLES IN fmcg.bronze;


database,tableName,isTemporary
bronze,customers,False
bronze,gross_price,False
bronze,orders,False
bronze,products,False


In [0]:
bronze_table = "fmcg.bronze.orders"


In [0]:
df_orders = spark.sql(f"SELECT * FROM {bronze_table}")
df_orders.show(2)


+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|    order_id|order_placement_date|customer_id|product_id|order_qty|      read_timestamp|           file_name|file_size|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
|FJUL33320501|          2025/07/01|     789320|  25891203|    150.0|2025-12-17 18:26:...|orders_2025_07_01...|    20744|
|FJUL33320501|          2025/07/01|     789320|  25891301|     46.0|2025-12-17 18:26:...|orders_2025_07_01...|    20744|
+------------+--------------------+-----------+----------+---------+--------------------+--------------------+---------+
only showing top 2 rows


#### Transformations


In [0]:
# 1. Keep only rows where order_qty is present
df_orders = df_orders.filter(F.col("order_qty").isNotNull())


# 2. Clean customer_id → keep numeric, else set to 999999
df_orders = df_orders.withColumn(
    "customer_id",
    F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id"))
     .otherwise("999999")
     .cast("string")
)

# 3. Remove weekday name from the date text
#    "Tuesday, July 01, 2025" → "July 01, 2025"
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.regexp_replace(F.col("order_placement_date"), r"^[A-Za-z]+,\s*", "")
)

# 4. Parse order_placement_date using multiple possible formats
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.coalesce(
        F.try_to_date("order_placement_date", "yyyy/MM/dd"),
        F.try_to_date("order_placement_date", "dd-MM-yyyy"),
        F.try_to_date("order_placement_date", "dd/MM/yyyy"),
        F.try_to_date("order_placement_date", "MMMM dd, yyyy"),
    )
)

# 5. Drop duplicates
df_orders = df_orders.dropDuplicates(["order_id", "order_placement_date", "customer_id", "product_id", "order_qty"])

# 5. convert product id to string
df_orders = df_orders.withColumn('product_id', F.col('product_id').cast('string'))

In [0]:
# check what's the maximum and minimum date
df_orders.agg(
    F.min("order_placement_date").alias("min_date"),
    F.max("order_placement_date").alias("max_date")
).show()

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2025-07-01|2025-11-30|
+----------+----------+



#### Join with products

In [0]:
display(df_orders)

order_id,order_placement_date,customer_id,product_id,order_qty,read_timestamp,file_name,file_size
FJUL33320501,2025-07-01,789320,25891203,150.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33320501,2025-07-01,789320,25891301,46.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33320501,2025-07-01,789320,25891201,354.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33320501,2025-07-01,789320,25891501,249.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33401603,2025-07-01,789401,25891302,40.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33401603,2025-07-01,789401,25891502,133.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33401603,2025-07-01,789401,25891503,145.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33401603,2025-07-01,789401,25891203,429.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL33401603,2025-07-01,789401,25891201,461.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744
FJUL32101601,2025-07-01,789101,25891503,183.0,2025-12-17T18:26:20.830Z,orders_2025_07_01.csv,20744


In [0]:
df_products=spark.table("fmcg.silver.products")
display(df_products.limit(5))

product_code,division,category,product,variant,product_id,read_timestamp,file_name,file_size
2e387cef1424d6e7b162b45622d4b1a788d11776e33d05cc8552f4ecd2ea1896,Nutrition Bars,Protein Bars,SportsBar Protein Bar Peanut Crunch (45g),45g,25891201,2025-12-16T22:10:25.551Z,products.csv,1388
fe5a8036be4b9a787b7c0ae013fc752a8cfb6c55a2f7b2fd152a6380925e9c49,Dairy & Recovery,Recovery Dairy,SportsBar Greek Yogurt Pro Vanilla (120g),120g,25891402,2025-12-16T22:10:25.551Z,products.csv,1388
da6bfc596c1360ca07bda4e0ae6bfe3b8456517fc6e8ddc265630ff940f9ab05,Dairy & Recovery,Recovery Dairy,SportsBar Greek Yogurt Pro Vanilla (200g),200g,25891401,2025-12-16T22:10:25.551Z,products.csv,1388
e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,Nutrition Bars,Energy Bars,SportsBar Energy Bar Choco Fudge (60g),60g,25891101,2025-12-16T22:10:25.551Z,products.csv,1388
0cb7b2f42657b625f754e833aa1cf6a967be26f17415f5342302ebb0e90c8a28,Nutrition Bars,Protein Bars,SportsBar Protein Bar Peanut Crunch (55g),55g,25891202,2025-12-16T22:10:25.551Z,products.csv,1388


In [0]:
df_products=spark.table("fmcg.silver.products")
df_joined=df_orders.join(df_products,on="product_id",how="inner").select(df_orders["*"], df_products["product_code"])
df_joined.display()

order_id,order_placement_date,customer_id,product_id,order_qty,read_timestamp,file_name,file_size,product_code
FJUL312422401,2025-07-10,789422,25891401,406.0,2025-12-17T18:26:20.830Z,orders_2025_07_10.csv,20202,da6bfc596c1360ca07bda4e0ae6bfe3b8456517fc6e8ddc265630ff940f9ab05
FJUL316103602,2025-07-14,789103,25891403,235.0,2025-12-17T18:26:20.830Z,orders_2025_07_14.csv,20354,77b6f538a9d0e0cf845db5c2cbecec46fdd30303b501e06f64baf1d4dc0e66f9
FJUL316402601,2025-07-14,789402,25891601,167.0,2025-12-17T18:26:20.830Z,orders_2025_07_14.csv,20354,716fa4e54b7894c910180276e0535d49afb25cdcfac09533fb74ae00689e5742
FJUL320720201,2025-07-19,789720,25891103,358.0,2025-12-17T18:26:20.830Z,orders_2025_07_19.csv,21044,102628255d24304d6bbe0438b1ac992054f262e0814d306d0a34d7356cef3268
FJUL321403603,2025-07-19,789403,25891603,169.0,2025-12-17T18:26:20.830Z,orders_2025_07_19.csv,21044,451f7167b28a25bde73995910e31c07dfa26411f1db47847f19e16747effbdaa
FJUL328403503,2025-07-27,789403,25891101,367.0,2025-12-17T18:26:20.830Z,orders_2025_07_27.csv,20056,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843
FAUG46903403,2025-08-03,789903,25891302,59.0,2025-12-17T18:26:20.830Z,orders_2025_08_03.csv,20414,d9ebd1ca64d23951a6310af93b1c5ac27d831ac842e89aea59a9e8b38621faa5
FAUG46303602,2025-08-03,789303,25891303,85.0,2025-12-17T18:26:20.830Z,orders_2025_08_03.csv,20414,c68834ceaff15846bc1892c2185dc4e4f471d64fe3796b1a8ecc39a5a48c614f
FAUG411902303,2025-08-10,789902,25891103,436.0,2025-12-17T18:26:20.830Z,orders_2025_08_10.csv,19957,102628255d24304d6bbe0438b1ac992054f262e0814d306d0a34d7356cef3268
FAUG417402403,2025-08-14,789402,25891403,382.0,2025-12-17T18:26:20.830Z,orders_2025_08_14.csv,20782,77b6f538a9d0e0cf845db5c2cbecec46fdd30303b501e06f64baf1d4dc0e66f9


In [0]:
if not spark.catalog.tableExists(silver_table):
    (
        df_joined.write
        .format("delta")
        .option("delta.enableChangeDataFeed", "true")
        .option("mergeSchema", "true")
        .mode("overwrite")
        .saveAsTable(silver_table)
    )
else:
    silver_delta = DeltaTable.forName(spark, silver_table)

    (
        silver_delta.alias("silver")
        .merge(
            source=df_joined.alias("bronze"),
            condition="""
                silver.order_placement_date = bronze.order_placement_date
                AND silver.order_id = bronze.order_id
                AND silver.product_code = bronze.product_code
                AND silver.customer_id = bronze.customer_id
            """
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )


### Gold

In [0]:
df_gold = spark.sql(f"SELECT order_id, order_placement_date as date, customer_id as customer_code, product_code, product_id, order_qty as sold_quantity FROM {silver_table};")

df_gold.show(2)

+-------------+----------+-------------+--------------------+----------+-------------+
|     order_id|      date|customer_code|        product_code|product_id|sold_quantity|
+-------------+----------+-------------+--------------------+----------+-------------+
| FDEC82703601|2025-11-30|       789703|e92c739a8d78cd6cb...|  25891102|        447.0|
|FNOV730202501|2025-11-29|       789202|c68834ceaff15846b...|  25891303|         55.0|
+-------------+----------+-------------+--------------------+----------+-------------+
only showing top 2 rows


In [0]:
if not spark.catalog.tableExists(gold_table):
    print("Creating new Gold table")

    (
        df_gold.write
        .format("delta")
        .option("delta.enableChangeDataFeed", "true")
        .option("mergeSchema", "true")
        .mode("overwrite")
        .saveAsTable(gold_table)
    )
else:
    gold_delta = DeltaTable.forName(spark, gold_table)

    (
        gold_delta.alias("source")
        .merge(
            source=df_gold.alias("gold"),
            condition="""
                source.date = gold.date
                AND source.order_id = gold.order_id
                AND source.product_code = gold.product_code
                AND source.customer_code = gold.customer_code
            """
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )


### Merging with Parent company

-- Note: We want data for monthly level but child data is on daily level

### Full Load

In [0]:
df_child = spark.sql(f"SELECT date, product_code, customer_code, sold_quantity FROM {gold_table}")
df_child.show(10)

+----------+--------------------+-------------+-------------+
|      date|        product_code|customer_code|sold_quantity|
+----------+--------------------+-------------+-------------+
|2025-07-10|da6bfc596c1360ca0...|       789422|        406.0|
|2025-07-14|77b6f538a9d0e0cf8...|       789103|        235.0|
|2025-07-14|716fa4e54b7894c91...|       789402|        167.0|
|2025-07-19|102628255d24304d6...|       789720|        358.0|
|2025-07-19|451f7167b28a25bde...|       789403|        169.0|
|2025-07-27|e91ba9d665f90254d...|       789403|        367.0|
|2025-08-03|d9ebd1ca64d23951a...|       789903|         59.0|
|2025-08-03|c68834ceaff15846b...|       789303|         85.0|
|2025-08-10|102628255d24304d6...|       789902|        436.0|
|2025-08-14|77b6f538a9d0e0cf8...|       789402|        382.0|
+----------+--------------------+-------------+-------------+
only showing top 10 rows


In [0]:
df_child.count()


40811

In [0]:
df_monthly = (
    df_child
    # 1. Get month start date (e.g., 2025-11-30 → 2025-11-01)
    .withColumn("month_start", F.trunc("date", "MM"))   # or F.date_trunc("month", "date").cast("date")

    # 2.Group at monthly grain by month_start + product_code + customer_code
    .groupBy("month_start", "product_code", "customer_code")
    .agg(
        F.sum("sold_quantity").alias("sold_quantity")
    )

    # 3. Rename month_start back to `date` to match your target schema
    .withColumnRenamed("month_start", "date")
)

df_monthly.show(5, truncate=False)

+----------+----------------------------------------------------------------+-------------+-------------+
|date      |product_code                                                    |customer_code|sold_quantity|
+----------+----------------------------------------------------------------+-------------+-------------+
|2025-07-01|da6bfc596c1360ca07bda4e0ae6bfe3b8456517fc6e8ddc265630ff940f9ab05|789422       |5011.0       |
|2025-07-01|77b6f538a9d0e0cf845db5c2cbecec46fdd30303b501e06f64baf1d4dc0e66f9|789103       |5203.0       |
|2025-07-01|716fa4e54b7894c910180276e0535d49afb25cdcfac09533fb74ae00689e5742|789402       |1726.0       |
|2025-07-01|102628255d24304d6bbe0438b1ac992054f262e0814d306d0a34d7356cef3268|789720       |3712.0       |
|2025-07-01|451f7167b28a25bde73995910e31c07dfa26411f1db47847f19e16747effbdaa|789403       |1816.0       |
+----------+----------------------------------------------------------------+-------------+-------------+
only showing top 5 rows


In [0]:
df_monthly.count()

3060

In [0]:
gold_parent_delta = DeltaTable.forName(
    spark,
    f"{catalog}.{gold_schema}.fact_orders"
)

(
    gold_parent_delta.alias("parent_gold")
    .merge(
        source=df_monthly.alias("child_gold"),
        condition="""
            parent_gold.date = child_gold.date
            AND parent_gold.product_code = child_gold.product_code
            AND parent_gold.customer_code = child_gold.customer_code
        """
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]