# Silver Layer – Data Cleaning and Validation

This notebook performs the transformation of raw ingested data from the Bronze layer into a cleaned and validated form in the Silver layer.  
The following cleaning operations are performed:

- Removal of cancelled transactions (i.e., Invoice numbers starting with 'C')
- Elimination of records with missing or null customer identifiers
- Computation of a new field: `SalesAmount` as `Quantity × UnitPrice`
- Deduplication of transactions where applicable

The cleaned data is written to a Delta table named `silver_sales`, which serves as the refined dataset for analytical aggregation in the Gold layer.


## Step 1: Load Raw Data from Bronze Table

The raw dataset is read from the previously created Delta table `bronze_sales`, which resides in the Bronze layer.  
This table contains the unprocessed input along with an ingestion timestamp and standardized column names.  
The data will be further filtered and transformed as part of the Silver layer processing.


In [0]:
# Load the raw data from the Bronze Delta table
df_bronze = spark.read.format("delta").table("bronze_sales")

# Preview the loaded DataFrame
df_bronze.display()


## Step 2: Filter Out Invalid and Cancelled Records

To ensure data quality, records that represent cancellations or lack essential information are excluded.  
Specifically, transactions with an `InvoiceNo` starting with the letter 'C' (indicating cancellations) are removed.  
Additionally, records with null values in the `Customer_ID` field are filtered out, as these entries are considered incomplete.


In [0]:
from pyspark.sql.functions import col

# Filter out cancelled invoices (start with 'C') and null customer IDs
df_silver = df_bronze.filter(
    (~col("Invoice").startswith("C")) &
    (col("Customer_ID").isNotNull())
)

df_silver.display()


##  Step 3: Compute SalesAmount Field

To enhance the dataset for downstream analytical use, a new column named `SalesAmount` is calculated.  
It is derived by multiplying the quantity of each item sold with its unit price.  
This computed field will be used for revenue-based aggregations in the Gold layer.

### Restore Numeric Types for Calculation

Since all columns were initially cast to strings for ingestion into the Bronze layer,  
the `Quantity` and `Price` fields are explicitly converted back to their appropriate numeric types.  
Regular expressions are applied to ensure that only rows containing valid numeric values are retained.  
This step enables accurate arithmetic operations required for computing `SalesAmount`.


In [0]:
from pyspark.sql.functions import col

# Step 3a: Filter valid numeric rows only (supporting negative quantities)
df_cleaned = df_bronze.filter(
    col("Quantity").rlike("^-?[0-9]+$") &     # e.g., -20, 100
    col("Price").rlike("^[0-9]+(\\.[0-9]+)?$") # e.g., 2.99, 10.0
)

# Step 3b: Cast string columns to proper numeric types
df_cleaned = df_cleaned.withColumn("Quantity", col("Quantity").cast("int"))
df_cleaned = df_cleaned.withColumn("Price", col("Price").cast("double"))

# Step 3c: Compute SalesAmount
df_silver = df_cleaned.withColumn("SalesAmount", col("Quantity") * col("Price"))

# Step 3d: Preview results
df_silver.display()


## Step 4: Deduplicate Transactions (If Applicable)

To ensure data integrity, duplicate transaction records—if any—are removed based on relevant fields.  
The deduplication logic uses a combination of identifying columns such as `Invoice`, `StockCode`, `Customer_ID`, and `InvoiceDate` to retain unique entries.  
This prevents double-counting during aggregation in the Gold layer.


In [0]:
# Deduplicate based on common transaction identifiers
df_silver = df_silver.dropDuplicates(["Invoice", "StockCode", "Customer_ID", "InvoiceDate"])


## Step 5: Save Cleaned Data to Silver Delta Table

The validated and enriched dataset is now saved as a Delta Table named `silver_sales`.  
This table represents the refined version of the raw input and serves as the source for business-level aggregations in the Gold layer.  
By using Delta format, version control and efficient data access are preserved.


In [0]:
# Save cleaned and enriched data to Silver Delta table
df_silver.write.format("delta").mode("overwrite").saveAsTable("silver_sales")


In [0]:
# df_bronze.select("Quantity").distinct().show(100, truncate=False)
