In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run  /Workspace/Users/dsouza.vi@northeastern.edu/fmcg-delta-medallion-pipeline/consolidated_pipeline/1_setup/utilities


In [0]:
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "customers", "Data Source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

In [0]:
print(catalog, data_source)

In [0]:
base_path = f's3://sportsbar-oltp-ingestion-layer/{data_source}/*.csv'
print(base_path)

Bronze Layer

In [0]:
df = spark.read.csv(path=base_path, header=True, inferSchema=True).withColumn("read_ts", F.current_timestamp()).select("*", "_metadata.file_name", "_metadata.file_size")

In [0]:
df.printSchema()

In [0]:
df.write\
    .format("delta") \
        .option("delta.enableChangeDataFeed", "true") \
            .mode("overwrite") \
                .saveAsTable(f'{catalog}.{bronze_schema}.{data_source}')

## Silver

In [0]:
bronze_df = spark.read.table(f'{catalog}.{bronze_schema}.{data_source}')

In [0]:
bronze_df.show(10)

In [0]:
bronze_df.printSchema()

### Transformations

1. Drop duplicates

In [0]:
dup_df = bronze_df.groupBy("customer_id").count().filter(F.col("count") > 1)


dup_df.show()

In [0]:
print('Rows before duplicates dropped: ', bronze_df.count())
silver_df = bronze_df.dropDuplicates(['customer_id'])
print('Rows after duplicates dropped: ', silver_df.count())

2. Trim spaces in Customer Name

In [0]:
display(silver_df.select ("customer_name").distinct())

In [0]:
display(
    silver_df.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
)

In [0]:
silver_df = silver_df.withColumn(
    "customer_name",
    F.trim(F.col("customer_name"))
)

In [0]:
display(silver_df)

3. Data Quality Issues - correcting typos

In [0]:
silver_df.select('city').distinct().show()

In [0]:
city_mapping = {
    'Bengaluruu': 'Bengaluru',
    'Bengalore': 'Bengaluru',

    'Hyderabadd': 'Hyderabad',
    'Hyderbad': 'Hyderabad',

    'NewDelhi': 'New Delhi',
    'NewDheli': 'New Delhi',
    'NewDelhee': 'New Delhi'
}

allowed = ['Bengaluru', 'Hyderabad', 'New Delhi']

silver_df = (
    silver_df
    .replace(city_mapping, subset=['city'])
    .withColumn("city",F.when(F.col("city").isin(allowed), F.col("city"))))
    
silver_df.select('city').distinct().show()

4. Title-Casing issue - making customer name case consistent with existing standards

In [0]:
silver_df = (
    silver_df.
    withColumn("customer_name", F.initcap(F.col("customer_name"))))

In [0]:
# sanity check
silver_df.select ("customer_name").distinct().show()

5. Handling missing cities - NULL handling

In [0]:
silver_df.filter(F.col("city").isNull()).show(truncate=False)

In [0]:
null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
silver_df.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

In [0]:

# Business Confirmation Note: City corrections confirmed by business team
customer_city_fix = {
    # Sprintx Nutrition
    789403: "New Delhi",

    # Zenathlete Foods
    789420: "Bengaluru",

    # Primefuel Nutrition
    789521: "Hyderabad",

    # Recovery Lane
    789603: "Hyderabad"
}

df_fix = spark.createDataFrame(
    [(k, v) for k, v in customer_city_fix.items()],
    ["customer_id", "fixed_city"]
)

display(df_fix)

In [0]:
silver_df = (
    silver_df
    .join(df_fix, "customer_id", "left")
    .withColumn(
        "city",
        F.coalesce("city", "fixed_city")   # Replace null with fixed city
    )
    .drop("fixed_city")
)

In [0]:
# Sanity Checks

null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
silver_df.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

6. Convert customer_id to string

In [0]:
silver_df = silver_df.withColumn("customer_id", F.col("customer_id").cast("string"))
print(silver_df.printSchema())

### Standardising Customer Attributes to match the Parent Company Data Model

In [0]:
silver_df = (
    silver_df
    # Build final customer column: "CustomerName-City" or "CustomerName-Unknown"
    .withColumn(
        "customer",
        F.concat_ws("-", "customer_name", F.coalesce(F.col("city"), F.lit("Unknown")))
    )
    
    # Static attributes aligned with parent data model
    .withColumn("market", F.lit("India"))
    .withColumn("platform", F.lit("Sports Bar"))
    .withColumn("channel", F.lit("Acquisition"))
)

In [0]:
display(silver_df.limit(5))

In [0]:
silver_df.write\
    .format("delta")\
    .option("enableChangeDataFeed", "true")\
    .option("mergeSchema", "true") \
    .mode("overwrite")\
    .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")
    

## Gold

In [0]:

silver_df = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source};")
gold_df = silver_df.select("customer_id", "customer_name", "city", "customer", "market", "platform", "channel")

In [0]:
gold_df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

## Merging Data Source with the Parent

In [0]:
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_customers")
df_child_customers = spark.table("fmcg.gold.sb_dim_customers").select(
    F.col("customer_id").alias("customer_code"),
    "customer",
    "market",
    "platform",
    "channel"
)

In [0]:
delta_table.alias("target").merge(
    source=df_child_customers.alias("source"),
    condition="target.customer_code = source.customer_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()