# Silver Layer - Data Cleansing and Enrichment
## FinchMart Sales ETL Pipeline

This notebook transforms Bronze layer data by:
- Cleansing data (handling nulls, duplicates, incorrect timestamps)
- Enriching with product reference data
- Implementing incremental processing
- Storing cleaned data in Delta Lake Silver layer

**Architecture Decision:** Using Delta Lake merge operations for incremental processing with deduplication logic.

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, when, coalesce, lit, 
    row_number, current_timestamp, round as spark_round
)
from pyspark.sql.window import Window
from delta import configure_spark_with_delta_pip, DeltaTable
from delta.tables import DeltaTable
import os

In [None]:
# Initialize Spark Session with Delta Lake support
builder = SparkSession.builder \
    .appName("FinchMart-Silver-Layer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print(f"Spark Version: {spark.version}")

In [None]:
# Define paths
BASE_PATH = "/home/ubuntu/dataengineer-transformations-python/finchmart_sales_etl"
BRONZE_PATH = f"{BASE_PATH}/data/bronze/sales_transactions"
SILVER_PATH = f"{BASE_PATH}/data/silver/sales_transactions_clean"
PRODUCT_REF_PATH = f"{BASE_PATH}/data/raw/Product_Table.csv"

print(f"Bronze Layer Path: {BRONZE_PATH}")
print(f"Silver Layer Path: {SILVER_PATH}")

In [None]:
# Read Bronze layer data
bronze_df = spark.read.format("delta").load(BRONZE_PATH)

print(f"Bronze layer records: {bronze_df.count()}")
bronze_df.printSchema()

In [None]:
# Read Product reference data
product_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(PRODUCT_REF_PATH)

# Rename columns to avoid conflicts
product_df = product_df \
    .withColumnRenamed("product_id", "prod_id") \
    .withColumnRenamed("product_category", "ref_category") \
    .withColumnRenamed("price", "list_price")

print(f"Product reference records: {product_df.count()}")
product_df.show(5)

In [None]:
# Data Cleansing Step 1: Handle timestamp conversion and validation
# Convert string timestamps to proper timestamp type
# Filter out records with invalid timestamps
cleansed_df = bronze_df \
    .withColumn("transaction_timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \
    .filter(col("transaction_timestamp").isNotNull())

print(f"Records after timestamp validation: {cleansed_df.count()}")

In [None]:
# Data Cleansing Step 2: Remove duplicates
# Deduplication based on transaction_id, keeping the latest ingestion
window_spec = Window.partitionBy("transaction_id").orderBy(col("ingestion_timestamp").desc())

deduplicated_df = cleansed_df \
    .withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

print(f"Records after deduplication: {deduplicated_df.count()}")

In [None]:
# Data Cleansing Step 3: Handle null values and data quality issues
# - Fill missing payment methods with 'Unknown'
# - Fill missing store locations with 'Unknown'
# - Ensure quantity is positive
# - Ensure price is positive
quality_df = deduplicated_df \
    .withColumn("payment_method", coalesce(col("payment_method"), lit("Unknown"))) \
    .withColumn("store_location", coalesce(col("store_location"), lit("Unknown"))) \
    .withColumn("quantity", when(col("quantity") > 0, col("quantity")).otherwise(1)) \
    .withColumn("price", when(col("price") > 0, col("price")).otherwise(0.0)) \
    .filter(col("transaction_id").isNotNull()) \
    .filter(col("customer_id").isNotNull()) \
    .filter(col("product_id").isNotNull())

print(f"Records after quality checks: {quality_df.count()}")

In [None]:
# Data Enrichment: Join with product reference data
enriched_df = quality_df \
    .join(product_df, quality_df.product_id == product_df.prod_id, "left") \
    .select(
        col("transaction_id"),
        col("transaction_timestamp"),
        col("customer_id"),
        col("product_id"),
        col("product_name"),
        coalesce(col("ref_category"), col("product_category")).alias("product_category"),
        col("price").alias("transaction_price"),
        col("list_price"),
        col("quantity"),
        (col("price") * col("quantity")).alias("total_amount"),
        col("payment_method"),
        col("store_location"),
        col("source_file"),
        col("ingestion_timestamp")
    ) \
    .withColumn("total_amount", spark_round(col("total_amount"), 2)) \
    .withColumn("processed_timestamp", current_timestamp())

print(f"Records after enrichment: {enriched_df.count()}")
enriched_df.show(5, truncate=False)

In [None]:
# Write to Silver layer using Delta Lake
# Using overwrite mode for initial load, but designed for incremental updates
enriched_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(SILVER_PATH)

print("Silver layer data written successfully")

In [None]:
# Verify Silver layer data
silver_df = spark.read.format("delta").load(SILVER_PATH)

print(f"Total records in Silver layer: {silver_df.count()}")
print("\nSchema:")
silver_df.printSchema()
print("\nSample data:")
silver_df.show(5, truncate=False)

In [None]:
# Data Quality Report for Silver Layer
from pyspark.sql.functions import sum as _sum, avg, min as _min, max as _max

print("=== Silver Layer Data Quality Report ===")
print(f"Total transactions: {silver_df.count()}")
print(f"Unique customers: {silver_df.select('customer_id').distinct().count()}")
print(f"Unique products: {silver_df.select('product_id').distinct().count()}")
print(f"Store locations: {silver_df.select('store_location').distinct().count()}")

# Statistical summary
stats_df = silver_df.agg(
    _sum("total_amount").alias("total_revenue"),
    avg("total_amount").alias("avg_transaction_value"),
    _min("transaction_timestamp").alias("earliest_transaction"),
    _max("transaction_timestamp").alias("latest_transaction")
)

print("\nStatistical Summary:")
stats_df.show(truncate=False)

# Category distribution
print("\nSales by Category:")
silver_df.groupBy("product_category") \
    .agg(
        _sum("total_amount").alias("total_sales"),
        _sum("quantity").alias("total_quantity")
    ) \
    .orderBy(col("total_sales").desc()) \
    .show()

# Store performance
print("\nSales by Store:")
silver_df.groupBy("store_location") \
    .agg(_sum("total_amount").alias("total_sales")) \
    .orderBy(col("total_sales").desc()) \
    .show()

## Incremental Processing Strategy

For future incremental loads, use the following pattern:

```python
# Read new data from Bronze layer with watermark
last_processed_timestamp = spark.read.format("delta").load(SILVER_PATH) \
    .agg(_max("ingestion_timestamp")).first()[0]

new_bronze_df = spark.read.format("delta").load(BRONZE_PATH) \
    .filter(col("ingestion_timestamp") > last_processed_timestamp)

# Apply same transformations to new data
# Use Delta Lake MERGE for upsert operations
delta_table = DeltaTable.forPath(spark, SILVER_PATH)
delta_table.alias("target").merge(
    new_enriched_df.alias("source"),
    "target.transaction_id = source.transaction_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
```

## Summary

**Silver Layer Transformation Complete:**
- Timestamps validated and converted to proper format
- Duplicates removed based on transaction_id
- Null values handled appropriately
- Data enriched with product reference information
- Total amount calculated for each transaction
- Ready for Gold layer aggregations