In [0]:
from pyspark.sql.functions import col, round



In [0]:
# Accessing sales table from bronze schema under retail_poc catalog
sales_df = spark.table('retail_poc.bronze.sales')


In [0]:
sales_df.describe()

In [0]:
for column in sales_df.columns:
    # Filter rows where the column value starts with '#'
    df_filtered = sales_df.filter(col(column).startswith("#"))
    count = df_filtered.count()
    if count > 0:
        print(f"Column '{column}' has {count} values starting with '#'")
        df_filtered.select(column).distinct().show(truncate=False)


In [0]:
# Drop exact duplicate rows
sales_df_dedup = sales_df.dropDuplicates()

print(f"Rows after dropping duplicates: {sales_df_dedup.count()}")


In [0]:
clean_df = sales_df_dedup.filter(
    (col("Category") != "#N/A") &
    (col("Sub_Category") != "#N/A") &
    (col("Gender") != "#N/A") &
    (col("GST_") != "#DIV/0!") &
    (col("GST_") != "#N/A")
)

print(f"Rows after cleaning invalid values: {clean_df.count()}")


In [0]:
clean_df.describe()

In [0]:
silver_df = clean_df.select(
    col("Channel"),
    col("EAN"),
    col("Date").cast("date"),
    col("Category"),
    col("Sub_Category"),
    col("Gender"),
    col("Store_code"),
    col("Month"),
    col("Year").cast("int"),
    col("Sale_QTY").cast("int"),
    col("Sale_iV").cast("float"),
    col("Net_Margin").cast("float"),
    col("COGS").cast("float"),
    col("MRP").cast("float"),
    col("MRP_Value").cast("float"),
    col("Net_Sales").cast("float"),
    col("Brand_code"),
    col("brand_as_per_master"),
    col("season"),
    col("GST_").cast("float"),
    col("_rescued_data")
)


In [0]:
#calculated Margin% = Net_Margin / Net_Sales * 100 , results were saved in new column named 'Margin %' in sales_df
silver_df = silver_df.withColumn(
    "Margin_%",
    round((col("Net_Margin") / col("Net_Sales")) * 100, 2)
)


In [0]:
#accessing store_master table from bronze schema under retail_poc catalog 
#joining sales_df with store_master_df based on store_code (LEFT JOIN)


store_master_df = spark.table("retail_poc.bronze.store_master")

joined_df = silver_df.join(
    store_master_df.select("store_name", "region"),
    silver_df.Store_code == col('store_code'),
    how="left"
)

display(joined_df)

In [0]:
#display(joined_df)


In [0]:
joined_df.write.format("delta") \
    .mode("append") \
    .saveAsTable("retail_poc.silver.combined_sales_summary")