# Bronze to Silver Processing

This notebook ingests raw data from Azure Blob Storage (`bronze/`) using Auto Loader, applies cleaning, standardization, and validation, and writes to `silver/`. Errors are logged to `silver/errors/`.

## Inputs
- `bronze/<table>/` (e.g., `donations/`, `crm_data/`)

## Outputs
- `silver/<table>/` (cleaned Parquet files)
- `silver/errors/` (error rows)

## Dependencies
- `pyspark`, `dotenv`

## Environment
- Uses `.env` for Blob Storage credentials
- Configures Auto Loader with Event Grid for file notification

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, DateType
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()
AZURE_CONN_STR = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
CONTAINER_NAME = os.getenv("CONTAINER_NAME")
AZURE_STORAGE_ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")

if not all([AZURE_CONN_STR, CONTAINER_NAME, AZURE_STORAGE_ACCOUNT_NAME]):
    raise ValueError("Missing environment variables. Check .env file.")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BronzeToSilver") \
    .config("spark.hadoop.fs.azure.account.auth.type", "SAS") \
    .config("spark.hadoop.fs.azure.sas.token.provider.type", "org.apache.hadoop.fs.azure.SimpleSasTokenProvider") \
    .config("spark.hadoop.fs.azure.sas.fixed.token", AZURE_CONN_STR) \
    .config("spark.databricks.delta.eventGrid.enabled", "true") \
    .getOrCreate()

# Define blob storage paths
blob_base_path = f"wasbs://{CONTAINER_NAME}@{AZURE_STORAGE_ACCOUNT_NAME}.blob.core.windows.net"
bronze_path = f"{blob_base_path}/bronze"
silver_path = f"{blob_base_path}/silver"
error_path = f"{silver_path}/errors"

# Define schemas
schemas = {
    "donations": StructType([
        StructField("donation_id", StringType(), True),
        StructField("donor_id", StringType(), True),
        StructField("donor_name", StringType(), True),
        StructField("project_id", StringType(), True),
        StructField("campaign_id", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("donation_date", TimestampType(), True)
    ]),
    "projects": StructType([
        StructField("project_id", StringType(), True),
        StructField("project_name", StringType(), True),
        StructField("region", StringType(), True)
    ]),
    "campaigns": StructType([
        StructField("campaign_id", StringType(), True),
        StructField("title", StringType(), True),
        StructField("project_id", StringType(), True),
        StructField("start_date", DateType(), True),
        StructField("end_date", DateType(), True),
        StructField("target_amount", DoubleType(), True)
    ]),
    "volunteers": StructType([
        StructField("volunteer_id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]),
    "volunteer_shifts": StructType([
        StructField("shift_id", StringType(), True),
        StructField("volunteer_id", StringType(), True),
        StructField("project_id", StringType(), True),
        StructField("start_time", TimestampType(), True),
        StructField("end_time", TimestampType(), True)
    ]),
    "beneficiaries": StructType([
        StructField("beneficiary_id", StringType(), True),
        StructField("project_id", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("aid_type", StringType(), True)
    ]),
    "transactions": StructType([
        StructField("transaction_id", StringType(), True),
        StructField("donation_id", StringType(), True),
        StructField("payment_provider", StringType(), True),
        StructField("status", StringType(), True),
        StructField("transaction_date", TimestampType(), True)
    ]),
    "crm_data": StructType([
        StructField("donor_id", StringType(), True),
        StructField("donor_name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("last_contact_date", TimestampType(), True)
    ])
}

# Process each table with Auto Loader
for table, schema in schemas.items():
    format_type = "json" if table == "crm_data" else "csv"
    df = spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", format_type) \
        .option("cloudFiles.schemaLocation", f"{blob_base_path}/checkpoints/{table}") \
        .option("cloudFiles.inferColumnTypes", "true") \
        .schema(schema) \
        .load(f"{bronze_path}/{table}/")

    # Basic cleaning and validation
    if table == "donations":
        valid_df = df.filter(col("amount") > 0).withColumn("ingest_timestamp", current_timestamp())
        error_df = df.filter(col("amount") <= 0).withColumn("error", lit("Invalid amount")) \
            .withColumn("ingest_timestamp", current_timestamp())
    elif table == "volunteer_shifts":
        valid_df = df.withColumn("shift_duration_hours", (col("end_time").cast("long") - col("start_time").cast("long")) / 3600) \
            .filter(col("shift_duration_hours") > 0).withColumn("ingest_timestamp", current_timestamp())
        error_df = df.filter(col("shift_duration_hours") <= 0).withColumn("error", lit("Invalid shift duration")) \
            .withColumn("ingest_timestamp", current_timestamp())
    else:
        valid_df = df.withColumn("ingest_timestamp", current_timestamp())
        error_df = df.filter(col(df.columns[0]).isNull()).withColumn("error", lit("Missing primary key")) \
            .withColumn("ingest_timestamp", current_timestamp())

    # Write valid data to silver
    valid_df.writeStream \
        .outputMode("append") \
        .option("checkpointLocation", f"{blob_base_path}/checkpoints/{table}") \
        .start(f"{silver_path}/{table}")

    # Write errors to silver/errors
    error_df.writeStream \
        .outputMode("append") \
        .option("checkpointLocation", f"{blob_base_path}/checkpoints/errors_{table}") \
        .start(f"{error_path}/{table}")

print("Bronze to Silver streaming started.")
# Keep streaming active (handled by Databricks Workflow)
spark.streams.awaitAnyTermination()