In [0]:
# Import required libraries
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Users/pavansaikandi@gmail.com/unified_pipeline/1_setup/02_config

In [0]:
# Verify project utilities
print(gold_schema, silver_schema, bronze_schema)

In [0]:
# Initialize Notebook Widgets
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "orders", "Data Source")

In [0]:

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

base_path = f's3://vitality-nutrition/{data_source}'
landing_path = f"{base_path}/landing/"
processed_path = f"{base_path}/processed/"

print("Base Path: ", base_path)
print("Landing Path: ", landing_path)
print("Processed Path: ", processed_path)

In [0]:
# Define the tables
bronze_table = f"{catalog}.{bronze_schema}.{data_source}"
silver_table = f"{catalog}.{silver_schema}.{data_source}"
gold_table = f"{catalog}.{gold_schema}.vn_fact_{data_source}"

# Bronze

In [0]:
# Read data from the s3 bucket
df = spark.read.options(header=True, inferSchema=True) \
    .csv(f"{landing_path}/*.csv") \
    .withColumn("read_timestamp", F.current_timestamp()) \
    .select("*", "_metadata.file_name", "_metadata.file_size")

df.count()

In [0]:
# Check sample data
display(df.limit(10))

In [0]:
# Create a table into Bronze Schema
df.write \
    .format("delta") \
    .mode("append") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(bronze_table)

In [0]:
# Moving files from landing to processed directory in s3

files = dbutils.fs.ls(landing_path)

for file_info in files:
    dbutils.fs.mv(
        file_info.path,
        f"{processed_path}/{file_info.name}",
        True
    )

# Silver

In [0]:
df_orders = spark.sql(f"SELECT * FROM {bronze_table}")
display(df_orders)

In [0]:
# Keep only those records where order_qty is present
df_orders = df_orders.filter(F.col("order_qty").isNotNull())
df_orders.count()

In [0]:
# Set customer_id to numeric, set to 99999 if INVALID
df_orders = df_orders.withColumn("customer_id", 
                      F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id"))
                      .otherwise(99999))

display(df_orders)

In [0]:
# Remove Weekday name from the order_placement_date
df_orders = df_orders.withColumn(
    "order_placement_date",
    F.regexp_replace(
        F.col("order_placement_date"),
        r"^[A-Za-z]+,\s*",
        ""
    )
)

# Uniform all the date formats
df_orders = df_orders.withColumn("order_placement_date", 
                                 F.coalesce(
                                        F.try_to_date("order_placement_date", "yyyy/MM/dd"),
                                        F.try_to_date("order_placement_date", "dd-MM-yyyy"),
                                        F.try_to_date("order_placement_date", "dd/MM/yyyy"),
                                        F.try_to_date("order_placement_date", "MMMM dd, yyyy")
                                 ))

display(df_orders)

In [0]:
# Drop duplicates

df_orders = df_orders.dropDuplicates(["order_id", "order_placement_date", "customer_id", "product_id", "order_qty"])


In [0]:
df_orders.count()

In [0]:

# Convert product_id to string
df_orders = df_orders.withColumn('product_id', F.col('product_id').cast('string'))

In [0]:
# Check the min and max order date
df_orders.agg(
    F.min("order_placement_date").alias("min_order_date"),
    F.max("order_placement_date").alias("max_order_date")
).show()

In [0]:
# Join with the products table to get product_code

df_products = spark.table("fmcg.silver.products")
df_joined = df_orders.join(df_products, on="product_id", how="inner").select(df_orders["*"], df_products["product_code"])

display(df_joined)

In [0]:
df_joined.count()

In [0]:
# Write into Silver layer
if not (spark.catalog.tableExists(silver_table)):
    df_joined.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable(silver_table)
else:
    silver_delta = DeltaTable.forName(spark, silver_table)
    silver_delta.alias("silver").merge(df_joined.alias("bronze"), "silver.order_placement_date = bronze.order_placement_date AND silver.order_id = bronze.order_id AND silver.product_code = bronze.product_code AND silver.customer_id = bronze.customer_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# Gold

In [0]:
df_gold = spark.sql(f"SELECT order_id, order_placement_date as date, customer_id as customer_code, product_code, product_id, order_qty as sold_quantity FROM {silver_table};")

df_gold.show(5)

In [0]:
# Create a table in Gold layer
if not (spark.catalog.tableExists(gold_table)):
    print("creating New Table")
    df_gold.write.format("delta").option(
        "delta.enableChangeDataFeed", "true"
    ).option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_table)
else:
    gold_delta = DeltaTable.forName(spark, gold_table)
    gold_delta.alias("source").merge(df_gold.alias("gold"), "source.date = gold.date AND source.order_id = gold.order_id AND source.product_code = gold.product_code AND source.customer_code = gold.customer_code").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
# Merging child data with parent 

# Child company data is on daily basis whereas the parent data is on monthly basis.

df_child = spark.sql(f"SELECT date, product_code, customer_code, sold_quantity FROM {gold_table}")
df_child.show(10)

In [0]:
df_child.count()

In [0]:
df_monthly = (
        # Get the month start date 
        df_child.withColumn("month_start", F.trunc("date", "MM"))
)

display(df_monthly)

In [0]:
# Group at monthly grain by month_start + product_code + customer_code

df_monthly = (
        df_monthly.groupBy("month_start", "product_code", "customer_code")
        .agg(F.sum("sold_quantity").alias("sold_quantity"))
        
        # Rename month_start back to `date` to match your target schema
        .withColumnRenamed("month_start", "date")
)

df_monthly.show(10)

In [0]:
df_monthly.count()

In [0]:
# Write into parent company's gold layer table
gold_parent_delta = DeltaTable.forName(spark, f"{catalog}.{gold_schema}.fact_orders")

gold_parent_delta.alias("parent_gold").merge(df_monthly.alias("child_gold"), "parent_gold.date = child_gold.date AND parent_gold.product_code = child_gold.product_code AND parent_gold.customer_code = child_gold.customer_code").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()