In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
s3_path_orders_landing = "s3://sportsbar-dp-dbx/orders/landing/*.csv"
landing = "s3://sportsbar-dp-dbx/orders/landing/"
s3_path_orders_processed = "s3://sportsbar-dp-dbx/orders/processed/"

In [0]:
def store_into_bronze(list_s3):
    for s3_path in list_s3:
        # reading the file from s3
        df = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("includeMetadata", "true")
        .csv(s3_path[0])
        )
        print("1")
        #adding the extra columns
        
        print(df.schema["order_qty"].dataType.simpleString())
        df = (
            df.withColumn("order_qty", F.col("order_qty").cast("double"))
                .withColumn("read_timestamp", F.current_timestamp())
                .withColumn("_metadata_filename", F.col("_metadata.file_path"))
                .withColumn("_metadata_file_size", F.col("_metadata.file_size"))
        )
        print("2")
        print(df.schema["order_qty"].dataType.simpleString())
        
        #writting it to bronze layer
        (df.write
        .format("delta")
        .option("delta.enableChangeDataFeed", "true")
        .mode("append") 
        .saveAsTable(f"fmcg.bronze.{s3_path[1]}")
        )
        print("3")
        (df.write
        .format("delta")
        .option("delta.enableChangeDataFeed", "true")
        .mode("overwrite") 
        .saveAsTable(f"fmcg.bronze.stg_{s3_path[1]}")
        )
        print("4")
    return (print("stored tables into bronze layer successfully both append and in stg as well!!"))

In [0]:
list_s3 = [[s3_path_orders_landing,"orders"]]
store_into_bronze(list_s3)

In [0]:
files = dbutils.fs.ls(landing)

for file_info in files:
    dbutils.fs.mv(
        file_info.path,
        f"{s3_path_orders_processed}/{file_info.name}",
        True
    ) 

In [0]:
df1 = spark.sql("select * from fmcg.bronze.stg_orders")
display(df1)

In [0]:
#remove duplicates
df1 = spark.sql(
    """
    select distinct order_id,order_placement_date,customer_id,product_id,order_qty,read_timestamp,_metadata_filename,_metadata_file_size
    from fmcg.bronze.stg_orders;
    """
)
display(df1)
df1.createOrReplaceTempView("remove_duplicates")

In [0]:
df2 = df1.filter(F.col("order_qty").isNotNull())
display(df2)

In [0]:
df3 = df2.withColumn(
    "customer_id",F.when(
                        F.col("customer_id").rlike("^-?\\d*(\\.\\d+)?$"),F.col("customer_id")
                    ).otherwise("99999999")
    )
display(df3)

In [0]:
df4 = df3.withColumn(
    "order_placement_date",
    F.regexp_replace(F.col("order_placement_date"), "^[^,]+,\\s*", "")
)
display(df4)

In [0]:
df5 = df4.withColumn(
    "order_placement_date", 
    F.coalesce(
        F.date_format(F.try_to_date(F.col("order_placement_date"), "yyyy/MM/dd"), "yyyy-MM-dd"),
        F.date_format(F.try_to_date(F.col("order_placement_date"), "MM/dd/yyyy"), "yyyy-MM-dd"),
        F.date_format(F.try_to_date(F.col("order_placement_date"), "dd/yyyy/MM"), "yyyy-MM-dd"),
		F.date_format(F.try_to_date(F.col("order_placement_date"), "yyyy-MM-dd"), "yyyy-MM-dd"),
        F.date_format(F.try_to_date(F.col("order_placement_date"), "MM-dd-yyyy"), "yyyy-MM-dd"),
        F.date_format(F.try_to_date(F.col("order_placement_date"), "dd-yyyy-MM"), "yyyy-MM-dd"),
        F.date_format(F.try_to_date(F.col("order_placement_date"), "MMMM dd, yyyy"), "yyyy-MM-dd"),
        F.date_format(F.try_to_date(F.col("order_placement_date"), "MMM dd, yyyy"), "yyyy-MM-dd")	
    )
)
display(df5)

In [0]:
df6 = df5.withColumn(
    "product_id",
    F.col("product_id").cast("string")
)

In [0]:
df_product_code_fetch = spark.sql("select * from fmcg.silver.s_products")

In [0]:
df_joined = df6.join(
    df_product_code_fetch,
    df6.product_id == df_product_code_fetch.product_id,
    "inner"
).select ("order_id","order_placement_date","customer_id",df6.product_id,df_product_code_fetch.product_code,"order_qty",df6.read_timestamp,df6._metadata_filename,df6._metadata_file_size)
display(df_joined)
df_joined.createOrReplaceTempView("joined_table")

In [0]:
if not (spark.catalog.tableExists("fmcg.silver.s_orders")):
    df_joined.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable("fmcg.silver.s_orders")
else:
    silver_delta = DeltaTable.forName(spark, "fmcg.silver.s_orders")
    silver_delta.alias("silver").merge(df_joined.alias("bronze"), "silver.order_placement_date = bronze.order_placement_date AND silver.order_id = bronze.order_id AND silver.product_code = bronze.product_code AND silver.customer_id = bronze.customer_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
df_joined.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"fmcg.silver.stg_s_orders")

In [0]:
df7 = spark.sql("SELECT order_id, order_placement_date as date, customer_id as customer_code, product_code, product_id, order_qty as sold_quantity FROM fmcg.silver.stg_s_orders;")

df7.createOrReplaceTempView("ready_to_gold")

In [0]:
if not (spark.catalog.tableExists("fmcg.gold.sb_orders")):
    print("creating New Table")
    df7.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable("fmcg.gold.sb_orders")
else:
    gold_delta = DeltaTable.forName(spark, "fmcg.gold.sb_orders")
    gold_delta.alias("source").merge(df7.alias("gold"), "source.date = gold.date AND source.order_id = gold.order_id AND source.product_code = gold.product_code AND source.customer_code = gold.customer_code").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
# df_child = your incremental daily rows

df_child =  spark.sql("SELECT order_placement_date as date FROM fmcg.silver.stg_s_orders")

incremental_month_df = df_child.select(
    F.trunc("date", "MM").alias("start_month")
).distinct()

incremental_month_df.show()

incremental_month_df.createOrReplaceTempView("incremental_months")

In [0]:
monthly_table = spark.sql(f"""
    SELECT date, product_code, customer_code, sold_quantity
    FROM fmcg.gold.sb_orders sbo
    INNER JOIN incremental_months m
        ON trunc(sbo.date, 'MM') = m.start_month
""")

print("Total Rows: ", monthly_table.count())
monthly_table.show(10)

In [0]:
monthly_table.select('date').distinct().orderBy('date').show()

In [0]:
df_monthly_recalc = (
    monthly_table
    .withColumn("month_start", F.trunc("date", "MM"))
    .groupBy("month_start", "product_code", "customer_code")
    .agg(F.sum("sold_quantity").alias("sold_quantity"))
    .withColumnRenamed("month_start", "date")   # month_start â†’ date = first of month
)

df_monthly_recalc.show(10, truncate=False)

In [0]:
df_monthly_recalc.count()

In [0]:
gold_parent_delta = DeltaTable.forName(spark, f"fmcg.gold.fact_orders")
gold_parent_delta.alias("parent_gold").merge(df_monthly_recalc.alias("child_gold"), "parent_gold.date = child_gold.date AND parent_gold.product_code = child_gold.product_code AND parent_gold.customer_code = child_gold.customer_code").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
%sql
DROP TABLE fmcg.bronze.stg_orders;

In [0]:
%sql
DROP TABLE fmcg.silver.stg_s_orders;

In [0]:
%sql
select count(*) from fmcg.gold.fact_orders;