# Silver Layer: Clean and Standardize Claims
Azure Synapse Analytics - Medallion Architecture

**Pattern**: Schema-driven type casting + validation + SCD Type 2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, upper, current_timestamp, lit, to_date
import logging
import yaml

In [None]:
# Configuration - ADLS Gen2 paths
STORAGE_ACCOUNT = "<storage-account-name>"
TABLES_ROOT = f"abfss://tables@{STORAGE_ACCOUNT}.dfs.core.windows.net"
FILES_ROOT = f"abfss://files@{STORAGE_ACCOUNT}.dfs.core.windows.net"

BRONZE_PATH = f"{TABLES_ROOT}/bronze/bronze_claims"
SILVER_PATH = f"{TABLES_ROOT}/silver/silver_claims"
SCHEMA_PATH = f"{FILES_ROOT}/config/schemas/silver/silver_claims.yaml"

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
def apply_schema_transformations(df, schema_path):
    """Apply type casting and transformations based on silver schema."""
    try:
        schema_content = spark.read.text(schema_path, wholetext=True).collect()[0][0]
        schema = yaml.safe_load(schema_content)
    except:
        logger.warning(f"Schema not found, skipping transformations")
        return df
    
    for col_def in schema['business_columns']:
        col_name = col_def['name']
        col_type = col_def['type']
        transformation = col_def.get('transformation', None)
        
        # Apply transformation first
        if transformation == 'upper_trim':
            df = df.withColumn(col_name, upper(trim(col(col_name))))
        elif transformation == 'trim':
            df = df.withColumn(col_name, trim(col(col_name)))
        
        # Apply type casting
        if col_type == 'double':
            df = df.withColumn(col_name, col(col_name).cast("double"))
        elif col_type == 'integer':
            df = df.withColumn(col_name, col(col_name).cast("int"))
        elif col_type == 'date':
            df = df.withColumn(col_name, to_date(col(col_name)))
        
        # Apply nullable filter
        if not col_def['nullable']:
            df = df.filter(col(col_name).isNotNull())
        
        # Apply validation rules
        if 'validation' in col_def:
            for rule in col_def['validation']:
                if rule['rule'] == 'greater_than':
                    df = df.filter(col(col_name) > rule['value'])
                elif rule['rule'] == 'less_than':
                    df = df.filter(col(col_name) < rule['value'])
    
    return df

In [None]:
def main():
    spark = SparkSession.builder.getOrCreate()
    
    try:
        logger.info(f"Reading from {BRONZE_PATH}")
        df_bronze = spark.read.format("delta").load(BRONZE_PATH)
        
        record_count = df_bronze.count()
        logger.info(f"Read {record_count} records from Bronze")
        
        # Deduplication
        df_cleaned = df_bronze.dropDuplicates(["claim_id"])
        
        # Apply schema-driven transformations
        logger.info(f"Applying silver schema transformations from {SCHEMA_PATH}")
        df_cleaned = apply_schema_transformations(df_cleaned, SCHEMA_PATH)
        
        # Add processing timestamp
        df_cleaned = df_cleaned.withColumn("processed_timestamp", current_timestamp())
        
        # Add SCD Type 2 columns
        df_cleaned = df_cleaned \
            .withColumn("effective_from", col("ingestion_timestamp")) \
            .withColumn("effective_to", lit(None).cast("timestamp")) \
            .withColumn("is_current", lit(True))
        
        cleaned_count = df_cleaned.count()
        dropped_count = record_count - cleaned_count
        pass_rate = (cleaned_count / record_count * 100) if record_count > 0 else 0
        
        logger.info(f"Data Quality Metrics:")
        logger.info(f"  - Total records: {record_count}")
        logger.info(f"  - Cleaned records: {cleaned_count}")
        logger.info(f"  - Dropped records: {dropped_count}")
        logger.info(f"  - Pass rate: {pass_rate:.2f}%")
        
        # Write to Silver
        logger.info(f"Writing to {SILVER_PATH}")
        df_cleaned.write \
            .format("delta") \
            .mode("overwrite") \
            .option("description", "Silver layer: Cleaned claims with SCD Type 2 tracking") \
            .save(SILVER_PATH)
        
        logger.info("✓ Silver claims cleaning completed")
        
    except Exception as e:
        logger.error(f"✗ Failed to clean claims: {str(e)}")
        raise

In [None]:
main()