# Silver Layer - Transformação e Limpeza da Tabela Sales

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, to_date, when, isnan, count, lit, round, coalesce, isnull, last, avg
from pyspark.sql.window import Window # Import Window for partition and order by operations
from pyspark.sql import functions as F


StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 3, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# 2. Leitura dos dados da camada RAW
# -------------------------------------------
spark = SparkSession.builder.getOrCreate() 
df_sales_raw = spark.table("Bronze_Data.sales.raw_sales_data")

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 4, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# 3. Conversão de types and normalization
# -------------------------------------------
df_sales_clean = df_sales_raw.withColumn("date", to_date(col("date")))

# After further inspection product P0254 has no records of sales, revenue, stock, price, so we will remove them from the dataset. 
# Drop rows with product_id = 'p0254' since this product has no concrete data in any part of the dataset 
df_sales_clean = df_sales_clean.filter(col("product_id") != "p0254")


for col_name in ["store_id", "product_id", "promo_type_1", "promo_bin_1", "promo_type_2", "promo_bin_2", "promo_discount_type_2"]:
    df_sales_clean = df_sales_clean.withColumn(col_name, trim(lower(col(col_name).cast("string"))))

df_sales_clean = df_sales_clean.withColumn("sales", col("sales").cast("float")) \
                               .withColumn("revenue", col("revenue").cast("float")) \
                               .withColumn("price", col("price").cast("float")) \
                               .withColumn("promo_discount_2", col("promo_discount_2").cast("float")) \
                               .withColumn("stock", col("stock").cast("int"))

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 5, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# Initial Imputation: Impute 0 for sales, revenue, stock if all are missing
# If sales, revenue, and stock are all NULL, set them to 0.
# This assumes that if they are all missing, it means no activity/stock.
df_sales_clean = df_sales_clean.withColumn("sales", 
    when(isnull(col("sales")) & isnull(col("revenue")) & isnull(col("stock")), lit(0))
    .otherwise(col("sales"))
)
df_sales_clean = df_sales_clean.withColumn("revenue", 
    when(isnull(col("sales")) & isnull(col("revenue")) & isnull(col("stock")), lit(0))
    .otherwise(col("revenue"))
)
df_sales_clean = df_sales_clean.withColumn("stock", 
    when(isnull(col("sales")) & isnull(col("revenue")) & isnull(col("stock")), lit(0))
    .otherwise(col("stock"))
)


StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 6, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# 4. Imputação de price
# Fill price with inferred value if possible:
# 1. When price is null AND sales > 0, price = revenue / sales.
# 2. For remaining missing prices, search for the latest non-null, non-zero price
#    for the same product_id in the same store_id.
# 3. For products still missing price, impute with the average price of that product across all stores.
# 4. For any prices still null after all attempts, impute price = 0.
# 5. Flag rows where price is still missing (null or 0 after all imputation attempts).
# -------------------------------------------

# Step 1: Impute price if sales > 0 and price is null (revenue / sales)
df_sales_clean = df_sales_clean.withColumn("price",
    when(col("price").isNull() & (col("sales") > 0), round(col("revenue") / col("sales"), 2))
    .otherwise(col("price"))
)

# Step 2: For remaining missing prices, search for the latest non-null, non-zero price
#         for the same product_id in the same store_id.
# Define a window specification: partition by store_id and product_id, order by date (descending to get latest)
window_spec = Window.partitionBy("store_id", "product_id").orderBy(col("date").desc())

# Create a temporary column that is price if non-zero and non-null, else null.
df_sales_clean = df_sales_clean.withColumn(
    "price_for_lookup",
    when((col("price").isNotNull()) & (col("price") != 0), col("price"))
    .otherwise(lit(None).cast("float")) # Cast to float to match the price column's type
)

# Get the latest non-null (and non-zero) price within the window
df_sales_clean = df_sales_clean.withColumn(
    "latest_store_product_price",
    last(col("price_for_lookup"), ignorenulls=True).over(window_spec)
)

# Apply this latest valid price to records where the 'price' is still null after Step 1
df_sales_clean = df_sales_clean.withColumn("price",
    when(col("price").isNull(), col("latest_store_product_price"))
    .otherwise(col("price"))
)

# Drop the helper columns as they are no longer needed
df_sales_clean = df_sales_clean.drop("price_for_lookup", "latest_store_product_price")


# Step 2.5: For those who still have price missing, impute with global product average price
# First, calculate the average price for each product_id across the entire dataset
# (considering only non-null and non-zero prices).
df_product_global_avg_price = df_sales_clean.filter(col("price").isNotNull() & (col("price") != 0)) \
                                     .groupBy("product_id") \
                                     .agg(avg("price").alias("global_avg_product_price"))

# Join the calculated average prices back to the main DataFrame
df_sales_clean = df_sales_clean.join(
    df_product_global_avg_price,
    on="product_id",
    how="left" # Use a left join to keep all rows from df_sales_clean
)

# Apply the imputation:
# If 'price' is still null AND a 'global_avg_product_price' was found for that product_id, use it.
df_sales_clean = df_sales_clean.withColumn("price",
    when(col("price").isNull() & col("global_avg_product_price").isNotNull(), col("global_avg_product_price"))
    .otherwise(col("price"))
)

# Drop the temporary global average price column
df_sales_clean = df_sales_clean.drop("global_avg_product_price")


# NEW Step 3.5: For any prices still null after all attempts, impute price = 0.
df_sales_clean = df_sales_clean.withColumn("price", coalesce(col("price"), lit(0.0)).cast("float"))


# Step 4: Flag rows where price is still missing (null or 0 after all imputation attempts).
# This flag will now only be True if price is 0 after all steps,
# assuming 0 is treated as a "missing" case for analysis.
df_sales_clean = df_sales_clean.withColumn(
    "needs_manual_price_review",
    when(col("price") == 0, lit(True)) # Only flag if price is 0 (as nulls are now filled)
    .otherwise(lit(False))
)

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 7, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# Specific Imputation for Revenue based on Sales & Price
# -------------------------------------------
df_sales_clean = df_sales_clean.withColumn("revenue",
    when(col("sales") == 0, lit(0.0)) # If sales is 0, revenue is 0
    .when((col("sales") > 0) & col("price").isNotNull(), col("sales") * col("price")) # If sales > 0 and price is known
    .otherwise(col("revenue")) # Otherwise, keep current revenue (might be imputed 0, or original)
)

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 8, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# Specific Imputation for Stock based on Sales and last non-null stock
# -------------------------------------------
# Define a window for stock lookup: partition by product_id and store_id, order by date (descending)
window_stock_spec = Window.partitionBy("product_id", "store_id").orderBy(col("date").desc())

# Create a helper column to find the last known non-null stock.
# This ensures that 'last' function correctly finds a value even if 'stock' itself is null for the current row.
df_sales_clean = df_sales_clean.withColumn("stock_for_lookup",
    when(col("stock").isNotNull(), col("stock"))
    .otherwise(lit(None).cast("int")) # Cast to int to match the stock column's type
)

# Calculate the last known stock for the product-store combination
df_sales_clean = df_sales_clean.withColumn(
    "last_known_stock",
    last(col("stock_for_lookup"), ignorenulls=True).over(window_stock_spec)
)

# Apply the imputation logic for stock:
df_sales_clean = df_sales_clean.withColumn("stock",
    when((col("sales") == 0) & col("stock").isNull(), col("last_known_stock")) # If sales is 0 and stock is null, use last_known_stock
    .when((col("sales") != 0) & col("stock").isNull() & col("last_known_stock").isNotNull(), (col("last_known_stock") - col("sales")).cast("int")) # If sales is not 0, stock is null, and last_known_stock exists, calculate (last_known_stock - sales)
    .otherwise(col("stock")) # If none of the above conditions, keep the current stock value
)

# Final fallback: If stock is *still* null after all previous logic (e.g., no last_known_stock), set it to 0.
df_sales_clean = df_sales_clean.withColumn("stock",
    when(col("stock").isNull(), lit(0).cast("int"))
    .otherwise(col("stock"))
)


df_sales_clean = df_sales_clean.drop("stock_for_lookup", "last_known_stock") # Drop helper columns

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 9, Finished, Available, Finished)

In [None]:
# -------------------------------------------
# 5. Remoção de duplicados
# -------------------------------------------
df_sales_clean = df_sales_clean.dropDuplicates(["store_id", "product_id", "date"])

# -------------------------------------------
# Fill missing promo bins with "none"
df_sales_clean = df_sales_clean.withColumn("promo_bin_1", coalesce(col("promo_bin_1"), lit("none")))
df_sales_clean = df_sales_clean.withColumn("promo_bin_2", coalesce(col("promo_bin_2"), lit("none")))

# Fill discount values with 0 or "none" depending on type
df_sales_clean = df_sales_clean.withColumn("promo_discount_2", coalesce(col("promo_discount_2"), lit(0.0)).cast("float"))
df_sales_clean = df_sales_clean.withColumn("promo_discount_type_2", coalesce(col("promo_discount_type_2"), lit("none")))


# -------------------------------------------
# 6. Criação de novas colunas derivadas
# -------------------------------------------
df_sales_clean = df_sales_clean.withColumn(
    "is_promo_type_1_active_this_week",
    when(col("promo_type_1") != "pr14", lit(True)).otherwise(lit(False))
)

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 10, Finished, Available, Finished)

In [None]:
df_sales_clean = df_sales_clean.drop("_c0", "needs_manual_price_review")
df_sales_clean.printSchema()

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 11, Finished, Available, Finished)

root
 |-- product_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- sales: float (nullable = true)
 |-- revenue: double (nullable = true)
 |-- stock: integer (nullable = true)
 |-- price: float (nullable = false)
 |-- promo_type_1: string (nullable = true)
 |-- promo_bin_1: string (nullable = false)
 |-- promo_type_2: string (nullable = true)
 |-- promo_bin_2: string (nullable = false)
 |-- promo_discount_2: float (nullable = false)
 |-- promo_discount_type_2: string (nullable = false)
 |-- is_promo_type_1_active_this_week: boolean (nullable = false)



In [None]:
# # -------------------------------------------
# # Count missing values
# # -------------------------------------------
# print("\nMissing values count after cleaning:")
# missing_values = []
# for column in df_sales_clean.columns:
#     missing_count = df_sales_clean.filter(col(column).isNull()).count()
#     if missing_count > 0:
#         missing_values.append((column, missing_count))

# # Sort missing values in descending order
# missing_values.sort(key=lambda x: x[1], reverse=True)

# for column, count in missing_values:
#     print(f"'{column}': {count}")

# if not missing_values:
#     print("No missing values found.")

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 12, Finished, Available, Finished)


Missing values count after cleaning:


No missing values found.


In [None]:
# -------------------------------------------
# 8. Escrita dos dados limpos na camada SILVER (Delta)
# -------------------------------------------
df_sales_clean.write.format("delta").mode("overwrite").saveAsTable("Silver_Data.sales.silver_sales_data")

print("\n✅ Silver Layer - Sales concluída com sucesso.")

StatementMeta(, 6d53df56-5dfd-45c1-a8b9-48acc0bbb7bc, 13, Finished, Available, Finished)


✅ Silver Layer - Sales concluída com sucesso.
