In [0]:
from pyspark.sql import functions as f
from delta.tables import DeltaTable


In [0]:
%run /Workspace/Databricks_FMCG/setup/utilities

In [0]:
dbutils.widgets.text("catalog","fmcg","Catalog")
dbutils.widgets.text("data_source","products","Data Source")

catalog=dbutils.widgets.get("catalog")
data_source=dbutils.widgets.get("data_source")
base_path=f's3://fmcg-child-sports-data/{data_source}/*.csv'

In [0]:
print(base_path)

##Bronze Table

In [0]:
df=(spark.read.format("csv")
.option("header",True)
.option("inferSchema",True)
.load(base_path)
.withColumn("read_timestamp",f.current_timestamp())
.select("*","_metadata.file_name","_metadata.file_size")
)



In [0]:
df.printSchema()

In [0]:
df.write\
.format("delta")\
.option("enabledChangeFeed",True)\
.mode("overwrite")\
.saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")



##Silver

In [0]:
df_bronze=spark.sql(f"select * from {catalog}.{bronze_schema}.{data_source};")
df_bronze.show(10)

In [0]:
#Drop Duplicates
print('Rows before duplicates dropped: ', df_bronze.count())
df_silver = df_bronze.dropDuplicates(['product_id'])
print('Rows after duplicates dropped: ', df_silver.count())

In [0]:
#capitalise the first letter of each word

df_silver.select('category').distinct().show()

In [0]:
df_silver=df_silver.withColumn("category",f.initcap(f.col("category")))

In [0]:
df_silver.show(10)

In [0]:
#Fix "protein" spelling mistake in both product_name and category columns
df_silver = (
    df_silver
    .withColumn(
        "product_name",
        f.regexp_replace(f.col("product_name"), "(?i)Protien", "Protein")
    )
    .withColumn(
        "category",
        f.regexp_replace(f.col("category"), "(?i)Protien", "Protein")
    )
)
display(df_silver)

## Standardizing Customer Attributes to Match Parent Company Data Model

In [0]:
# 1: Add division column
df_silver = (
    df_silver
    .withColumn(
        "division",
        f.when(f.col("category") == "Energy Bars",        "Nutrition Bars")
         .when(f.col("category") == "Protein Bars",       "Nutrition Bars")
         .when(f.col("category") == "Granola & Cereals",  "Breakfast Foods")
         .when(f.col("category") == "Recovery Dairy",     "Dairy & Recovery")
         .when(f.col("category") == "Healthy Snacks",     "Healthy Snacks")
         .when(f.col("category") == "Electrolyte Mix",    "Hydration & Electrolytes")
         .otherwise("Other")
    )
)

# 2: Variant column
df_silver = df_silver.withColumn(
    "variant",
    f.regexp_extract(f.col("product_name"), r"\((.*?)\)", 1)
)


### 3: Create new column: product_code  

# Invalid product_ids are replaced with a fallback value to avoid losing fact records and ensure downstream joins remain consistent

df_silver = (
    df_silver
    # 1. Generate deterministic product_code from product_name
    .withColumn(
        "product_code",
        f.sha2(f.col("product_name").cast("string"), 256)
    )
    # 2. Clean product_id: keep only numeric IDs, else set to 999999
    .withColumn(
        "product_id",
        f.when(
            f.col("product_id").cast("string").rlike("^[0-9]+$"),
            f.col("product_id").cast("string")
        ).otherwise(f.lit(999999).cast("string"))
    )
    # 3. Rename product_name â†’ product
    .withColumnRenamed("product_name", "product")
)

In [0]:
display(df_silver)

In [0]:
df_silver = df_silver.select("product_code", "division", "category", "product", "variant", "product_id", "read_timestamp", "file_name", "file_size")

In [0]:
display(df_silver)

In [0]:
df_silver.write\
    .format("delta")\
    .option("delta.enableChangeDataFeed","true")\
    .option("mergeSchema","true")\
    .mode("overwrite")\
    .saveAsTable(f"{catalog}.{silver_schema}.{data_source}") 

##Gold

In [0]:
df_silver=spark.sql(f"select * from {catalog}.{silver_schema}.{data_source};" )
df_gold=df_silver.select("product_code", "product_id", "division", "category", "product", "variant")
df_gold.show(5)

In [0]:
df_gold.write\
    .format("delta")\
    .option("delta.enableChangeDataFeed","true")\
    .mode("overwrite")\
    .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

## Merging Data source with parent

In [0]:
delta_table=DeltaTable.forName(spark, "fmcg.gold.dim_products")
df_child_products=spark.sql(f"select product_code,division, category, product, variant from  fmcg.gold.sb_dim_products")

df_child_products.show(10)

In [0]:
delta_table.alias("target").merge(
    source=df_child_products.alias("source"),
    condition="target.product_code = source.product_code"
).whenMatchedUpdate(
    set={
        "division": "source.division",
        "category": "source.category",
        "product": "source.product",
        "variant": "source.variant"
    }
).whenNotMatchedInsert(
    values={
        "product_code": "source.product_code",
        "division": "source.division",
        "category": "source.category",
        "product": "source.product",
        "variant": "source.variant"
    }
).execute()