In [0]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

#Silver

In [0]:
df_bronze = spark.sql(f"SELECT * FROM main_fmcg.bronze.customers;")
df_bronze.show(10)

##Deduplication step:
Identify and count duplicate customer_ids, display duplicates, and remove them to keep only distinct customer_ids.

In [0]:
df_duplicates = df_bronze.groupBy("customer_id").count().filter(F.col("count") > 1)
display(df_duplicates)

print('Rows before duplicates dropped: ', df_bronze.count())
df_bronze = df_bronze.dropDuplicates(['customer_id'])
print('Rows after duplicates dropped: ', df_bronze.count())




##Checks for any remaining duplicate


In [0]:
from pyspark.sql import functions as F

# Count total duplicate customer_id values
duplicate_count = (
    df_bronze
        .groupBy("customer_id")
        .agg(F.count("*").alias("cnt"))
        .filter(F.col("cnt") > 1)
        .count()
)

print(f"Number of duplicate customer_ids: {duplicate_count}")


# Show duplicate customer_ids with their counts
duplicates = (
    df_bronze
        .groupBy("customer_id")
        .agg(F.count("*").alias("cnt"))
        .filter(F.col("cnt") > 1)
        .orderBy(F.col("cnt").desc())
)

display(duplicates)


# Total number of rows
total_rows = df_bronze.count()
print(f"Number of rows in df_bronze: {total_rows}")

##standardization(first capital)

In [0]:
from pyspark.sql.functions import initcap

# Capitalize first letter of each word
df_bronze = df_bronze.withColumn("customer_name", initcap(col("customer_name"))) \
       .withColumn("city", initcap(col("city")))

df_bronze.show(10)
# Get total number of rows
print(f"Number of rows in df: {df_bronze.count()}")

##test trim

In [0]:
from pyspark.sql.functions import col, trim, length

# Find rows where trimming would change the value
df_with_whitespace = df_bronze.filter(
    (col("customer_name") != trim(col("customer_name"))) |
    (col("city") != trim(col("city")))
)

print(f"Rows with whitespace: {df_with_whitespace.count()}")
df_with_whitespace.show(20, truncate=False)

##trimming

In [0]:
from pyspark.sql.functions import trim, col

# Trim whitespace from customer_name and city
df_bronze = df_bronze.withColumn("customer_name", trim(col("customer_name"))) \
       .withColumn("city", trim(col("city")))

df_bronze.show(10)

##standrdiztion(replace typo values to single distinct value)

In [0]:
# typos â†’ correct names
city_mapping = {
    'Bengaluruu': 'Bengaluru',
    'Bengalore': 'Bengaluru',

    'Hyderabadd': 'Hyderabad',
    'Hyderbad': 'Hyderabad',

    'Newdelhi': 'New Delhi',
    'Newdheli': 'New Delhi',
    'Newdelhee': 'New Delhi'
}


allowed = ["Bengaluru", "Hyderabad", "New Delhi"]

df_bronze = (
    df_bronze
    .replace(city_mapping, subset=["city"])
    .withColumn(
        "city",
        F.when(F.col("city").isNull(), None)
         .when(F.col("city").isin(allowed), F.col("city"))
         .otherwise(None)
    )
)

##**test** for standrdiztion(replace typo values to single distinct value)

In [0]:
# Distinct combinations of customer_name and city
distinct_customer_name = df_bronze.select("customer_name").distinct()
distinct_customer_name.show()
distinct_cities = df_bronze.select("city").distinct()
distinct_cities.show()


##dealing with some nulls in (city) based on business roles

In [0]:
df_bronze.filter(F.col("city").isNull()).show(truncate=False)

In [0]:
from pyspark.sql.functions import col, when
from functools import reduce

# Define mapping
customer_city_mapping = {
    "789403": "New Delhi",      # Sprintx Nutrition
    "789420": "Bengaluru",      # Zenathlete Foods
    "789521": "Hyderabad",      # Primefuel Nutrition
    "789603": "Hyderabad"       # Recovery Lane
}

# Create mapping expression using reduce
city_expr = reduce(
    lambda acc, item: when(
        col("city").isNull() & (col("customer_id") == item[0]), 
        item[1]
    ).otherwise(acc),
    customer_city_mapping.items(),
    col("city")
)

# Apply mapping
df_bronze = df_bronze.withColumn("city", city_expr)

In [0]:
df_bronze.filter(F.col("city").isNull()).show(truncate=False)

##adding some new columns

In [0]:
df_bronze = (
    df_bronze
    # Build final customer column: "CustomerName-City" or "CustomerName-Unknown"
    .withColumn(
        "customer",
        F.concat_ws("-", "customer_name", F.coalesce(F.col("city"), F.lit("Unknown")))
    )
    
    # Static attributes aligned with parent data model
    .withColumn("market", F.lit("India"))
    .withColumn("platform", F.lit("Sports Bar"))
    .withColumn("channel", F.lit("Acquisition"))
)
df_bronze.show(10)

In [0]:
# Get first row (returns list with 1 Row object)
first_row = df_bronze.head(1)[0]
print(first_row)


##save to silver

In [0]:
df_bronze.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .option("mergeSchema", "true") \
 .mode("overwrite") \
 .saveAsTable("main_fmcg.silver.customers")
print("Data saved successfully to fmcg.bronze.customers")