In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Users/swatilalwani342@gmail.com/Consolidated_pipeline/1_setup/utilities

In [0]:
print(bronze_schema, silver_schema, gold_schema)

bronze silver gold


In [0]:
# TODO: Verify that the S3 path below is correct and that the CSV files exist at this location.
# If not, update 'base_path' to the correct S3 path for your customer data.
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "customer", "Data Source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

base_path = f's3://sportsbar-asap/{data_source}/*.csv'  # <-- Check this path
print(base_path)

s3://sportsbar-asap/customers/*.csv


In [0]:
df = (
    spark.read.format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .load(base_path)
        .withColumn("read_timestamp", F.current_timestamp())
        .select("*", "_metadata.file_name", "_metadata.file_size")
)

In [0]:
# print check data type
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = false)
 |-- file_name: string (nullable = false)
 |-- file_size: long (nullable = false)



In [0]:
display(df.limit(10))

customer_id,customer_name,State,read_timestamp,file_name,file_size
789201,FitFuel Market,California,2026-01-15T21:37:52.823Z,customers.csv,1363
789202,FitFuel Market,Dallas,2026-01-15T21:37:52.823Z,customers.csv,1363
789203,FitFuel Market,Seattle,2026-01-15T21:37:52.823Z,customers.csv,1363
789301,Athlete's Choice Store,California,2026-01-15T21:37:52.823Z,customers.csv,1363
789303,Athlete's Choice Store,Seattle,2026-01-15T21:37:52.823Z,customers.csv,1363
789101,Endurance Foods,Bengalore,2026-01-15T21:37:52.823Z,customers.csv,1363
789102,Endurance Foods,Dallas,2026-01-15T21:37:52.823Z,customers.csv,1363
789103,Endurance Foods,Seattle,2026-01-15T21:37:52.823Z,customers.csv,1363
789121,HydroBoost Nutrition,Dallas,2026-01-15T21:37:52.823Z,customers.csv,1363
789122,HydroBoost Nutrition,Seattle,2026-01-15T21:37:52.823Z,customers.csv,1363


In [0]:
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")

### Silver Processing

In [0]:
df_bronze = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source};")
df_bronze.show(10)

+-----------+--------------------+----------+--------------------+-------------+---------+
|customer_id|       customer_name|     State|      read_timestamp|    file_name|file_size|
+-----------+--------------------+----------+--------------------+-------------+---------+
|     789201|      FitFuel Market|California|2026-01-15 21:38:...|customers.csv|     1363|
|     789202|      FitFuel Market|    Dallas|2026-01-15 21:38:...|customers.csv|     1363|
|     789203|      FitFuel Market|   Seattle|2026-01-15 21:38:...|customers.csv|     1363|
|     789301|Athlete's Choice ...|California|2026-01-15 21:38:...|customers.csv|     1363|
|     789303|Athlete's Choice ...|   Seattle|2026-01-15 21:38:...|customers.csv|     1363|
|     789101|     Endurance Foods| Bengalore|2026-01-15 21:38:...|customers.csv|     1363|
|     789102|     Endurance Foods|    Dallas|2026-01-15 21:38:...|customers.csv|     1363|
|     789103|     Endurance Foods|   Seattle|2026-01-15 21:38:...|customers.csv|     1363|

In [0]:
df_bronze.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = true)
 |-- file_name: string (nullable = true)
 |-- file_size: long (nullable = true)



## Transformations

1: Drop Duplicates

In [0]:
df_duplicates = df_bronze.groupBy("customer_id").count().filter(F.col("count") > 1)
display(df_duplicates)

customer_id,count
789321,2
789503,2
789522,2
789603,2


In [0]:
print('Rows before duplicates dropped: ', df_bronze.count())
df_silver = df_bronze.dropDuplicates(['customer_id'])
print('Rows after duplicates dropped: ', df_silver.count())

Rows before duplicates dropped:  39
Rows after duplicates dropped:  35


2: Trim Spaces

In [0]:
# check those values
display(
    df_silver.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
)

customer_id,customer_name,State,read_timestamp,file_name,file_size
789121,HydroBoost Nutrition,Dallas,2026-01-15T21:38:44.250Z,customers.csv,1363
789401,SprintX nutrition,California,2026-01-15T21:38:44.250Z,customers.csv,1363
789420,ZenAthlete foods,,2026-01-15T21:38:44.250Z,customers.csv,1363
789421,ZenAthlete Foods,Dallas,2026-01-15T21:38:44.250Z,customers.csv,1363
789521,PrimeFuel Nutrition,,2026-01-15T21:38:44.250Z,customers.csv,1363
789702,StaminaX Store,Dallas,2026-01-15T21:38:44.250Z,customers.csv,1363


In [0]:
## remove those trim values

df_silver = df_silver.withColumn(
    "customer_name",
    F.trim(F.col("customer_name"))
)

In [0]:
# # Sanity Check

# # check those values
display(
     df_silver.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
 )


customer_id,customer_name,State,read_timestamp,file_name,file_size


3: Data Quality Fix: Correcting City Typos

In [0]:
df_silver.select('State').distinct().show()

+-----------+
|      State|
+-----------+
| California|
|     Dallas|
|    Seattle|
|  Bengalore|
|    Dallasd|
|       NULL|
|  NewDelhee|
|   NewDelhi|
|Californiau|
|   NewDheli|
+-----------+



In [0]:
# # typo dictionary
# State_typos = {
#     'California': ['Californiau', 'Bengalore'],
#     'Dallas': ['Dallasd'],
#     'Seattle':': ['NewDelhi', 'NewDheli', 'NewDelhee']
# }

# typos â†’ correct names
State_mapping = {
    'Californiau': 'California',
    'Bengalore': 'California',

    'Dallasd': 'Dallas',


    'NewDelhi': 'Seattle',
    'NewDheli': 'Seattle',
    'NewDelhee': 'Seattle'
}


allowed = ["California", "Dallas", "Seattle"]

df_silver = (
    df_silver
    .replace(State_mapping, subset=["State"])
    .withColumn(
        "State",
        F.when(F.col("State").isNull(), None)
         .when(F.col("State").isin(allowed), F.col("State"))
         .otherwise(None)
    )
)

In [0]:
# Sanity check
df_silver.select('State').distinct().show()

+----------+
|     State|
+----------+
|California|
|    Dallas|
|   Seattle|
|      NULL|
+----------+



4: Fix Title-Casing Issue

In [0]:
df_silver.select('customer_name').distinct().show()

+--------------------+
|       customer_name|
+--------------------+
|      FitFuel Market|
|Athlete's Choice ...|
|     Endurance Foods|
|HydroBoost Nutrition|
|MacroBite Superfoods|
|MacroBite superfoods|
|      PowerSnack Hub|
|      PowerSnack hub|
|   SprintX nutrition|
|   SprintX Nutrition|
|    ZenAthlete foods|
|    ZenAthlete Foods|
|Peak performance ...|
|Peak Performance ...|
| PrimeFuel Nutrition|
|       Recovery Lane|
|      StaminaX Store|
|EliteAthlete Nutr...|
|      GamePlan Foods|
|   Champion's choice|
+--------------------+
only showing top 20 rows


In [0]:
# Title case fix
df_silver = df_silver.withColumn(
    "customer_name",
    F.when(F.col("customer_name").isNull(), None)
     .otherwise(F.initcap("customer_name"))
)

In [0]:
# sanity check

df_silver.select('customer_name').distinct().show()

+--------------------+
|       customer_name|
+--------------------+
|      Fitfuel Market|
|Athlete's Choice ...|
|     Endurance Foods|
|Hydroboost Nutrition|
|Macrobite Superfoods|
|      Powersnack Hub|
|   Sprintx Nutrition|
|    Zenathlete Foods|
|Peak Performance ...|
| Primefuel Nutrition|
|       Recovery Lane|
|      Staminax Store|
|Eliteathlete Nutr...|
|      Gameplan Foods|
|   Champion's Choice|
+--------------------+



5: Handling missing States

In [0]:
df_silver.filter(F.col("State").isNull()).show(truncate=False)

+-----------+-------------------+-----+--------------------------+-------------+---------+
|customer_id|customer_name      |State|read_timestamp            |file_name    |file_size|
+-----------+-------------------+-----+--------------------------+-------------+---------+
|789403     |Sprintx Nutrition  |NULL |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789420     |Zenathlete Foods   |NULL |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789521     |Primefuel Nutrition|NULL |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789603     |Recovery Lane      |NULL |2026-01-15 21:38:44.250064|customers.csv|1363     |
+-----------+-------------------+-----+--------------------------+-------------+---------+



In [0]:
null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
df_silver.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

+-----------+-------------------+----------+--------------------------+-------------+---------+
|customer_id|customer_name      |State     |read_timestamp            |file_name    |file_size|
+-----------+-------------------+----------+--------------------------+-------------+---------+
|789401     |Sprintx Nutrition  |California|2026-01-15 21:38:44.250064|customers.csv|1363     |
|789402     |Sprintx Nutrition  |Dallas    |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789403     |Sprintx Nutrition  |NULL      |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789420     |Zenathlete Foods   |NULL      |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789421     |Zenathlete Foods   |Dallas    |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789422     |Zenathlete Foods   |Seattle   |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789520     |Primefuel Nutrition|California|2026-01-15 21:38:44.250064|customers.csv|1363     |
|789521     |Primefuel Nutrition|NULL   

In [0]:
# Business Confirmation Note: Country corrections confirmed by business team
customer_state_fix = {
    # Sprintx Nutrition
    789403: "Seattle",

    # Zenathlete Foods
    789420: "California",

    # Primefuel Nutrition
    789521: "Dallas",

    # Recovery Lane
    789603: "Dallas"
}

df_fix = spark.createDataFrame(
    [(k, v) for k, v in customer_state_fix.items()],
    ["customer_id", "fixed_state"]
)

display(df_fix)

customer_id,fixed_state
789403,Seattle
789420,California
789521,Dallas
789603,Dallas


In [0]:
df_silver = (
    df_silver
    .join(df_fix, "customer_id", "left")
    .withColumn(
        "State",
        F.coalesce("State", "fixed_state")   # Replace null with fixed city
    )
    .drop("fixed_state")
)

In [0]:
# Sanity Checks

null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
df_silver.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

+-----------+-------------------+----------+--------------------------+-------------+---------+
|customer_id|customer_name      |State     |read_timestamp            |file_name    |file_size|
+-----------+-------------------+----------+--------------------------+-------------+---------+
|789420     |Zenathlete Foods   |California|2026-01-15 21:38:44.250064|customers.csv|1363     |
|789601     |Recovery Lane      |California|2026-01-15 21:38:44.250064|customers.csv|1363     |
|789402     |Sprintx Nutrition  |Dallas    |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789603     |Recovery Lane      |Dallas    |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789401     |Sprintx Nutrition  |California|2026-01-15 21:38:44.250064|customers.csv|1363     |
|789421     |Zenathlete Foods   |Dallas    |2026-01-15 21:38:44.250064|customers.csv|1363     |
|789520     |Primefuel Nutrition|California|2026-01-15 21:38:44.250064|customers.csv|1363     |
|789522     |Primefuel Nutrition|Seattle

6: Convert customer_id to string

In [0]:
df_silver = df_silver.withColumn("customer_id", F.col("customer_id").cast("string"))
print(df_silver.printSchema())

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = true)
 |-- file_name: string (nullable = true)
 |-- file_size: long (nullable = true)

None


### Standardizing Customer Attributes to Match Parent Company Data Model

In [0]:
df_silver = (
    df_silver
    # Build final customer column: "CustomerName-State" or "CustomerName-Unknown"
    .withColumn(
        "customer",
        F.concat_ws("-", "customer_name", F.coalesce(F.col("State"), F.lit("Unknown")))
    )
    
    # Static attributes aligned with parent data model
    .withColumn("market", F.lit("USA"))
    .withColumn("platform", F.lit("Sports Bar"))
    .withColumn("channel", F.lit("Acquisition"))
)

In [0]:
display(df_silver.limit(5))

customer_id,customer_name,State,read_timestamp,file_name,file_size,customer,market,platform,channel
789503,Peak Performance Store,Seattle,2026-01-15T21:38:44.250Z,customers.csv,1363,Peak Performance Store-Seattle,USA,Sports Bar,Acquisition
789420,Zenathlete Foods,California,2026-01-15T21:38:44.250Z,customers.csv,1363,Zenathlete Foods-California,USA,Sports Bar,Acquisition
789703,Staminax Store,Seattle,2026-01-15T21:38:44.250Z,customers.csv,1363,Staminax Store-Seattle,USA,Sports Bar,Acquisition
789621,Eliteathlete Nutrition,Dallas,2026-01-15T21:38:44.250Z,customers.csv,1363,Eliteathlete Nutrition-Dallas,USA,Sports Bar,Acquisition
789101,Endurance Foods,California,2026-01-15T21:38:44.250Z,customers.csv,1363,Endurance Foods-California,USA,Sports Bar,Acquisition


In [0]:
df_silver.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .option("mergeSchema", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

### Gold

In [0]:
df_silver = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source};")


# take req cols only
# "customer_id, customer_name, city, read_timestamp, file_name, file_size, customer, market, platform, channel"
df_gold = df_silver.select("customer_id", "customer_name", "State", "customer", "market", "platform", "channel")

In [0]:
df_gold.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

## Merging Data source with parent

In [0]:
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_customers")
df_child_customers = spark.table("fmcg.gold.sb_dim_customers").select(
    F.col("customer_id").alias("customer_code"),
    "customer",
    "market",
    "platform",
    "channel"
)

In [0]:
delta_table.alias("target").merge(
    source=df_child_customers.alias("source"),
    condition="target.customer_code = source.customer_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]