# Real-Time Fraud Detection System - Databricks Training Exercise
## Enterprise-Grade Streaming Analytics Pipeline

### 🎯 Objective
Build a complete real-time fraud detection pipeline using only Databricks-native components. This exercise demonstrates:
- Multi-table synthetic data generation
- Continuous streaming ingestion and processing
- Complex stateful fraud detection rules
- Bronze → Silver → Gold Delta Lake architecture
- Enterprise monitoring and fault tolerance

---

## 📋 Table of Contents

1. [Architecture Overview](#architecture-overview)
2. [Setup and Initialization](#setup-and-initialization)
3. [Reference Data Setup](#reference-data-setup)
4. [Continuous Data Generators](#continuous-data-generators)
5. [Bronze Layer: Raw Data Ingestion](#bronze-layer)
6. [Silver Layer: Cleansing and Enrichment](#silver-layer)
7. [Stateful Feature Engineering](#stateful-features)
8. [Gold Layer: Advanced Fraud Rules](#gold-layer)
9. [Monitoring and Operations](#monitoring)
10. [Production Considerations](#production-considerations)

---

## 🏗️ Architecture Overview {#architecture-overview}

### High-Level Data Flow
```
Data Generators → Bronze (Raw) → Silver (Cleansed/Enriched) → Gold (Rules/Alerts)
       ↓              ↓                    ↓                        ↓
   Continuous      Streaming           Stateful              Complex Rules
   Synthetic       Ingestion          Features               & Alerts
     Data         Deduplication     Velocity/Device          Scoring
```

### Tables and Streams Architecture
```
Reference Tables (Static/Slowly Changing):
├── customers_dim (risk tiers, KYC status)
├── merchants_dim (categories, risk segments)
├── device_dim (reputation scores)
├── blacklist_dim (blocked entities)
└── geoip_dim (IP geolocation)

Bronze Layer (Raw Events):
├── tx_events_raw (transactions)
├── login_events_raw (authentication)
└── chargebacks_raw (ground truth)

Silver Layer (Cleansed/Enriched):
├── tx_events_clean (validated transactions)
├── tx_events_enriched (with reference data)
└── ops.customer_velocity (stateful features)

Gold Layer (Business Logic):
├── fraud_suspicions (rule outputs)
├── fraud_alerts (actionable alerts)
└── ops.pipeline_metrics (monitoring)
```

---

## 🚀 Setup and Initialization {#setup-and-initialization}

### Notebook 00: Initialize Objects

```sql
-- Create catalog and schema structure
CREATE CATALOG IF NOT EXISTS fraud_lab;
USE CATALOG fraud_lab;

CREATE SCHEMA IF NOT EXISTS ref;       -- Reference/dimension tables
CREATE SCHEMA IF NOT EXISTS bronze;    -- Raw event streams
CREATE SCHEMA IF NOT EXISTS silver;    -- Cleansed and enriched data
CREATE SCHEMA IF NOT EXISTS gold;      -- Business logic and alerts
CREATE SCHEMA IF NOT EXISTS ops;       -- Operational tables

-- Set checkpoint locations
```

```python
# Define checkpoint and storage paths
base_path = "dbfs:/mnt/fraud_lab"
checkpoint_base = f"{base_path}/checkpoints"

# Configure Spark for streaming
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
spark.conf.set("spark.sql.streaming.numRecentProgressUpdates", "10")
```

---

## 🗃️ Reference Data Setup {#reference-data-setup}

### Create Reference Tables

```sql
-- Customer dimension with risk tiers
CREATE TABLE IF NOT EXISTS ref.customers_dim (
  customer_id STRING PRIMARY KEY,
  risk_tier STRING,                 -- low|medium|high
  kyc_status STRING,                -- verified|pending|rejected
  country STRING,
  signup_timestamp TIMESTAMP,
  credit_limit DOUBLE
) USING DELTA
TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported');

-- Merchant dimension with risk categories
CREATE TABLE IF NOT EXISTS ref.merchants_dim (
  merchant_id STRING PRIMARY KEY,
  merchant_name STRING,
  category STRING,                  -- electronics, travel, gambling, etc.
  country STRING,
  risk_segment STRING,              -- normal|watchlist|high_risk
  monthly_volume DOUBLE
) USING DELTA;

-- Device reputation tracking
CREATE TABLE IF NOT EXISTS ref.device_dim (
  device_id STRING PRIMARY KEY,
  device_type STRING,               -- mobile|desktop|tablet
  os STRING,
  browser STRING,
  reputation_score DOUBLE,          -- 0.0 (bad) to 1.0 (good)
  first_seen TIMESTAMP,
  last_seen TIMESTAMP
) USING DELTA;

-- Blacklist for known bad actors
CREATE TABLE IF NOT EXISTS ref.blacklist_dim (
  entity_type STRING,               -- 'customer'|'device'|'ip'|'merchant'|'card'
  entity_id STRING,
  reason STRING,
  severity STRING,                  -- low|medium|high|critical
  added_at TIMESTAMP,
  expires_at TIMESTAMP
) USING DELTA;

-- Geolocation mapping (simplified)
CREATE TABLE IF NOT EXISTS ref.geoip_dim (
  ip_prefix STRING,                 -- e.g., 52.23.0.0/16
  country STRING,
  region STRING,
  city STRING,
  latitude DOUBLE,
  longitude DOUBLE,
  avg_latency_ms INT
) USING DELTA;
```

### Seed Reference Data

```sql
-- Insert sample customers with different risk profiles
INSERT OVERWRITE ref.customers_dim 
SELECT 
  CONCAT('C', CAST(id + 1000 AS STRING)) as customer_id,
  CASE 
    WHEN id % 10 IN (0,1) THEN 'high'
    WHEN id % 10 IN (2,3,4) THEN 'medium' 
    ELSE 'low' 
  END as risk_tier,
  CASE 
    WHEN id % 20 = 0 THEN 'pending'
    WHEN id % 50 = 0 THEN 'rejected'
    ELSE 'verified'
  END as kyc_status,
  CASE 
    WHEN id % 4 = 0 THEN 'US'
    WHEN id % 4 = 1 THEN 'IN'
    WHEN id % 4 = 2 THEN 'GB'
    ELSE 'CA'
  END as country,
  current_timestamp() - INTERVAL (id * 24) HOURS as signup_timestamp,
  CASE 
    WHEN id % 10 IN (0,1) THEN 50000.0
    WHEN id % 10 IN (2,3,4) THEN 25000.0
    ELSE 10000.0
  END as credit_limit
FROM RANGE(0, 500);

-- Insert merchants with various risk levels
INSERT OVERWRITE ref.merchants_dim
SELECT 
  CONCAT('M', CAST(id + 10 AS STRING)) as merchant_id,
  CONCAT('Merchant_', id) as merchant_name,
  CASE 
    WHEN id % 8 = 0 THEN 'gambling'
    WHEN id % 8 = 1 THEN 'electronics'
    WHEN id % 8 = 2 THEN 'travel'
    WHEN id % 8 = 3 THEN 'grocery'
    WHEN id % 8 = 4 THEN 'luxury'
    WHEN id % 8 = 5 THEN 'pharmacy'
    WHEN id % 8 = 6 THEN 'gas_station'
    ELSE 'restaurant'
  END as category,
  CASE 
    WHEN id % 3 = 0 THEN 'US'
    WHEN id % 3 = 1 THEN 'IN'
    ELSE 'GB'
  END as country,
  CASE 
    WHEN id % 8 = 0 THEN 'high_risk'      -- gambling
    WHEN id % 15 = 0 THEN 'watchlist'
    ELSE 'normal'
  END as risk_segment,
  rand() * 1000000 as monthly_volume
FROM RANGE(0, 100);

-- Insert device profiles
INSERT OVERWRITE ref.device_dim
SELECT 
  CONCAT('D', CAST(id + 10000 AS STRING)) as device_id,
  CASE 
    WHEN id % 3 = 0 THEN 'mobile'
    WHEN id % 3 = 1 THEN 'desktop'
    ELSE 'tablet'
  END as device_type,
  CASE 
    WHEN id % 4 = 0 THEN 'Android'
    WHEN id % 4 = 1 THEN 'iOS'
    WHEN id % 4 = 2 THEN 'Windows'
    ELSE 'MacOS'
  END as os,
  CASE 
    WHEN id % 3 = 0 THEN 'Chrome'
    WHEN id % 3 = 1 THEN 'Safari'
    ELSE 'Firefox'
  END as browser,
  CASE 
    WHEN id % 20 = 0 THEN 0.1  -- Bad reputation
    WHEN id % 15 = 0 THEN 0.3  -- Poor reputation
    ELSE 0.7 + (rand() * 0.3)  -- Good reputation
  END as reputation_score,
  current_timestamp() - INTERVAL (id * 12) HOURS as first_seen,
  current_timestamp() - INTERVAL (id % 48) HOURS as last_seen
FROM RANGE(0, 2000);

-- Insert some blacklisted entities
INSERT OVERWRITE ref.blacklist_dim VALUES
('ip', '192.168.1.100', 'Known botnet', 'critical', current_timestamp(), current_timestamp() + INTERVAL 365 DAYS),
('ip', '10.0.0.50', 'Suspicious activity', 'high', current_timestamp(), current_timestamp() + INTERVAL 90 DAYS),
('customer', 'C1005', 'Previous fraud', 'high', current_timestamp(), current_timestamp() + INTERVAL 180 DAYS),
('device', 'D10050', 'Compromised device', 'medium', current_timestamp(), current_timestamp() + INTERVAL 30 DAYS),
('merchant', 'M15', 'Money laundering', 'critical', current_timestamp(), current_timestamp() + INTERVAL 730 DAYS);

-- Insert geo IP mapping (simplified examples)
INSERT OVERWRITE ref.geoip_dim VALUES
('192.168.0.0/16', 'US', 'California', 'San Francisco', 37.7749, -122.4194, 50),
('10.0.0.0/8', 'IN', 'Maharashtra', 'Mumbai', 19.0760, 72.8777, 120),
('172.16.0.0/12', 'GB', 'England', 'London', 51.5074, -0.1278, 80),
('203.0.113.0/24', 'CA', 'Ontario', 'Toronto', 43.6532, -79.3832, 60);
```

---

## 🔄 Continuous Data Generators {#continuous-data-generators}

### Notebook 01: Data Generators

```python
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import random
import time
import uuid
import json

# Enhanced synthetic data generation with realistic patterns

def random_ip():
    """Generate random IP addresses with some clustering"""
    # 70% from common ranges, 30% random
    if random.random() < 0.7:
        ranges = ['192.168', '10.0', '172.16', '203.0']
        prefix = random.choice(ranges)
        return f"{prefix}.{random.randint(0,255)}.{random.randint(1,254)}"
    else:
        return f"{random.randint(1,255)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}"

def generate_transaction_batch(n=500):
    """Generate realistic transaction patterns"""
    now = datetime.utcnow()
    rows = []
    
    for i in range(n):
        # Base transaction
        customer_id = f"C{random.randint(1000, 1499)}"
        merchant_id = f"M{random.randint(10, 109)}"
        device_id = f"D{random.randint(10000, 11999)}"
        
        # Amount distribution (most small, some large)
        if random.random() < 0.7:
            amount = round(random.uniform(5, 500), 2)  # Normal purchases
        elif random.random() < 0.9:
            amount = round(random.uniform(500, 2000), 2)  # Medium purchases
        else:
            amount = round(random.uniform(2000, 15000), 2)  # Large purchases
        
        # Add some time variation
        time_offset = random.randint(0, 300)  # Up to 5 minutes in the past
        event_time = now - timedelta(seconds=time_offset)
        
        row = {
            "event_id": str(uuid.uuid4()),
            "event_ts": event_time,
            "customer_id": customer_id,
            "merchant_id": merchant_id,
            "amount": amount,
            "currency": random.choice(["USD", "EUR", "INR", "GBP", "CAD"]),
            "country": random.choice(["US", "IN", "GB", "DE", "CA", "FR", "AU", "JP"]),
            "device_id": device_id,
            "channel": random.choice(["web", "mobile", "pos", "atm"]),
            "payment_method": random.choice(["credit_card", "debit_card", "upi", "wallet", "bank_transfer"]),
            "ip": random_ip(),
            "card_last4": f"{random.randint(1000, 9999)}",
            "raw": json.dumps({"session_id": str(uuid.uuid4()), "user_agent": "browser"})
        }
        
        # Inject suspicious patterns (5% of transactions)
        if random.random() < 0.05:
            # High-risk scenarios
            if random.random() < 0.3:
                # International high-value
                row["amount"] = round(random.uniform(8000, 20000), 2)
                row["country"] = random.choice(["RU", "CN", "NG", "PK"])
            elif random.random() < 0.6:
                # Velocity spike (same customer, multiple rapid transactions)
                if i > 0 and random.random() < 0.5:
                    row["customer_id"] = rows[-1]["customer_id"]
                    row["device_id"] = rows[-1]["device_id"]
                    row["event_ts"] = rows[-1]["event_ts"] + timedelta(seconds=random.randint(1, 30))
            else:
                # Device misuse (same device, different customer)
                if i > 0:
                    row["device_id"] = rows[-1]["device_id"]
                    row["ip"] = rows[-1]["ip"]
        
        rows.append(row)
    
    schema = StructType([
        StructField("event_id", StringType()),
        StructField("event_ts", TimestampType()),
        StructField("customer_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("amount", DoubleType()),
        StructField("currency", StringType()),
        StructField("country", StringType()),
        StructField("device_id", StringType()),
        StructField("channel", StringType()),
        StructField("payment_method", StringType()),
        StructField("ip", StringType()),
        StructField("card_last4", StringType()),
        StructField("raw", StringType())
    ])
    
    return spark.createDataFrame(rows, schema=schema)

def generate_login_batch(n=300):
    """Generate login events with failure patterns"""
    now = datetime.utcnow()
    rows = []
    
    for i in range(n):
        customer_id = f"C{random.randint(1000, 1499)}"
        device_id = f"D{random.randint(10000, 11999)}"
        
        # 15% login failures, clustered by customer (brute force simulation)
        if random.random() < 0.15:
            auth_result = "failure"
            # Cluster failures for the same customer
            if i > 0 and random.random() < 0.4:
                customer_id = rows[-1]["customer_id"]
                device_id = rows[-1]["device_id"]
        else:
            auth_result = "success"
        
        time_offset = random.randint(0, 600)  # Up to 10 minutes in the past
        event_time = now - timedelta(seconds=time_offset)
        
        row = {
            "event_id": str(uuid.uuid4()),
            "event_ts": event_time,
            "customer_id": customer_id,
            "device_id": device_id,
            "ip": random_ip(),
            "auth_result": auth_result,
            "channel": random.choice(["web", "mobile", "api"]),
            "raw": json.dumps({"session_duration": random.randint(60, 3600)})
        }
        rows.append(row)
    
    schema = StructType([
        StructField("event_id", StringType()),
        StructField("event_ts", TimestampType()),
        StructField("customer_id", StringType()),
        StructField("device_id", StringType()),
        StructField("ip", StringType()),
        StructField("auth_result", StringType()),
        StructField("channel", StringType()),
        StructField("raw", StringType())
    ])
    
    return spark.createDataFrame(rows, schema=schema)

def generate_chargeback_batch(n=10):
    """Generate occasional chargebacks"""
    now = datetime.utcnow()
    rows = []
    
    for _ in range(n):
        row = {
            "case_id": str(uuid.uuid4()),
            "case_ts": now - timedelta(hours=random.randint(1, 72)),
            "customer_id": f"C{random.randint(1000, 1499)}",
            "merchant_id": f"M{random.randint(10, 109)}",
            "amount": round(random.uniform(50, 5000), 2),
            "reason": random.choice(["fraud", "customer_dispute", "duplicate", "unauthorized"])
        }
        rows.append(row)
    
    schema = StructType([
        StructField("case_id", StringType()),
        StructField("case_ts", TimestampType()),
        StructField("customer_id", StringType()),
        StructField("merchant_id", StringType()),
        StructField("amount", DoubleType()),
        StructField("reason", StringType())
    ])
    
    return spark.createDataFrame(rows, schema=schema)

# Create Bronze tables
spark.sql("""
CREATE TABLE IF NOT EXISTS bronze.tx_events_raw (
  event_id STRING,
  event_ts TIMESTAMP,
  customer_id STRING,
  merchant_id STRING,
  amount DOUBLE,
  currency STRING,
  country STRING,
  device_id STRING,
  channel STRING,
  payment_method STRING,
  ip STRING,
  card_last4 STRING,
  raw STRING
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS bronze.login_events_raw (
  event_id STRING,
  event_ts TIMESTAMP,
  customer_id STRING,
  device_id STRING,
  ip STRING,
  auth_result STRING,
  channel STRING,
  raw STRING
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS bronze.chargebacks_raw (
  case_id STRING,
  case_ts TIMESTAMP,
  customer_id STRING,
  merchant_id STRING,
  amount DOUBLE,
  reason STRING
) USING DELTA
""")

print("Starting continuous data generation...")
print("Run this cell to generate data continuously. Stop the cell to halt generation.")

# Continuous generation loop
iteration = 0
while True:
    try:
        iteration += 1
        print(f"Generating batch {iteration} at {datetime.now()}")
        
        # Generate transaction data
        tx_df = generate_transaction_batch(400)
        tx_df.write.mode("append").saveAsTable("fraud_lab.bronze.tx_events_raw")
        
        # Generate login data
        login_df = generate_login_batch(250)
        login_df.write.mode("append").saveAsTable("fraud_lab.bronze.login_events_raw")
        
        # Occasionally generate chargebacks (10% of iterations)
        if random.random() < 0.1:
            cb_df = generate_chargeback_batch(5)
            cb_df.write.mode("append").saveAsTable("fraud_lab.bronze.chargebacks_raw")
            print("Generated chargebacks")
        
        print(f"Batch {iteration} completed. Generated ~400 transactions, ~250 logins")
        
        # Wait before next batch (simulate real-time flow)
        time.sleep(10)  # 10-second intervals
        
    except KeyboardInterrupt:
        print("Data generation stopped by user")
        break
    except Exception as e:
        print(f"Error in batch {iteration}: {str(e)}")
        time.sleep(5)  # Wait before retrying
```

---

## 🥉 Bronze Layer: Raw Data Ingestion {#bronze-layer}

### Notebook 02: Bronze Streaming Ingestion

```python
from pyspark.sql import functions as F

# Set up checkpoint locations
tx_checkpoint = f"{checkpoint_base}/bronze_tx_clean"
login_checkpoint = f"{checkpoint_base}/bronze_login_clean"

print("Starting Bronze layer streaming ingestion...")

# Stream 1: Transaction Events Cleansing
tx_raw = (spark.readStream
          .table("fraud_lab.bronze.tx_events_raw")
          .withWatermark("event_ts", "2 minutes"))

# Basic cleansing and validation
tx_clean = (tx_raw
    .filter("""
        event_id IS NOT NULL 
        AND customer_id IS NOT NULL 
        AND merchant_id IS NOT NULL 
        AND amount IS NOT NULL 
        AND amount > 0 
        AND amount <= 50000
        AND event_ts IS NOT NULL
    """)
    .dropDuplicates(["event_id"])
    .withColumn("is_valid", F.lit(True))
    .withColumn("processed_ts", F.current_timestamp())
)

# Create Silver clean table
spark.sql("""
CREATE TABLE IF NOT EXISTS silver.tx_events_clean (
  event_id STRING,
  event_ts TIMESTAMP,
  customer_id STRING,
  merchant_id STRING,
  amount DOUBLE,
  currency STRING,
  country STRING,
  device_id STRING,
  channel STRING,
  payment_method STRING,
  ip STRING,
  card_last4 STRING,
  is_valid BOOLEAN,
  processed_ts TIMESTAMP
) USING DELTA
""")

# Start transaction cleaning stream
tx_clean_query = (tx_clean.writeStream
    .format("delta")
    .option("checkpointLocation", tx_checkpoint)
    .outputMode("append")
    .table("fraud_lab.silver.tx_events_clean")
    .queryName("bronze_to_silver_tx_clean"))

print("Transaction cleaning stream started")

# Stream 2: Login Events Cleansing
login_raw = (spark.readStream
             .table("fraud_lab.bronze.login_events_raw")
             .withWatermark("event_ts", "2 minutes"))

login_clean = (login_raw
    .filter("""
        event_id IS NOT NULL 
        AND customer_id IS NOT NULL 
        AND device_id IS NOT NULL
        AND auth_result IS NOT NULL
        AND event_ts IS NOT NULL
    """)
    .dropDuplicates(["event_id"])
    .withColumn("is_valid", F.lit(True))
    .withColumn("processed_ts", F.current_timestamp())
)

# Create login clean table
spark.sql("""
CREATE TABLE IF NOT EXISTS silver.login_events_clean (
  event_id STRING,
  event_ts TIMESTAMP,
  customer_id STRING,
  device_id STRING,
  ip STRING,
  auth_result STRING,
  channel STRING,
  is_valid BOOLEAN,
  processed_ts TIMESTAMP
) USING DELTA
""")

# Start login cleaning stream
login_clean_query = (login_clean.writeStream
    .format("delta")
    .option("checkpointLocation", login_checkpoint)
    .outputMode("append")
    .table("fraud_lab.silver.login_events_clean")
    .queryName("bronze_to_silver_login_clean"))

print("Login cleaning stream started")

# Monitor streams
import time
time.sleep(5)

print("Active streaming queries:")
for query in spark.streams.active:
    print(f"- {query.name}: {query.status}")
```

---

## 🥈 Silver Layer: Cleansing and Enrichment {#silver-layer}

### Notebook 03: Silver Enrichment

```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Checkpoint locations
enrichment_checkpoint = f"{checkpoint_base}/silver_tx_enriched"

print("Starting Silver layer enrichment...")

# Read clean transaction stream
tx_clean = (spark.readStream
            .table("fraud_lab.silver.tx_events_clean")
            .withWatermark("event_ts", "3 minutes"))

# Load reference data for enrichment
customers = spark.read.table("fraud_lab.ref.customers_dim").alias("c")
merchants = spark.read.table("fraud_lab.ref.merchants_dim").alias("m") 
devices = spark.read.table("fraud_lab.ref.device_dim").alias("d")
blacklist = spark.read.table("fraud_lab.ref.blacklist_dim").alias("b")
geoip = spark.read.table("fraud_lab.ref.geoip_dim").alias("g")

# Enrich transactions with reference data
enriched = (tx_clean.alias("t")
    # Join with customer data
    .join(customers, F.col("t.customer_id") == F.col("c.customer_id"), "left")
    # Join with merchant data  
    .join(merchants, F.col("t.merchant_id") == F.col("m.merchant_id"), "left")
    # Join with device data
    .join(devices, F.col("t.device_id") == F.col("d.device_id"), "left")
    # Check blacklist for customer
    .join(
        blacklist.filter("entity_type = 'customer'").alias("bc"),
        F.col("t.customer_id") == F.col("bc.entity_id"),
        "left"
    )
    # Check blacklist for device
    .join(
        blacklist.filter("entity_type = 'device'").alias("bd"), 
        F.col("t.device_id") == F.col("bd.entity_id"),
        "left"
    )
    # Check blacklist for IP
    .join(
        blacklist.filter("entity_type = 'ip'").alias("bi"),
        F.col("t.ip") == F.col("bi.entity_id"),
        "left"
    )
    # Simple geo-IP lookup (in production, use proper CIDR matching)
    .join(geoip, F.substring(F.col("t.ip"), 1, 7) == F.substring(F.col("g.ip_prefix"), 1, 7), "left")
    
    # Select and rename columns
    .select(
        F.col("t.event_id"),
        F.col("t.event_ts"),
        F.col("t.customer_id"),
        F.col("t.merchant_id"),
        F.col("t.device_id"),
        F.col("t.amount"),
        F.col("t.currency"),
        F.col("t.country").alias("tx_country"),
        F.col("t.channel"),
        F.col("t.payment_method"),
        F.col("t.ip"),
        F.col("t.card_last4"),
        
        # Customer enrichment
        F.col("c.risk_tier").alias("customer_risk_tier"),
        F.col("c.kyc_status").alias("customer_kyc"),
        F.col("c.country").alias("customer_country"),
        F.col("c.credit_limit").alias("customer_credit_limit"),
        
        # Merchant enrichment
        F.col("m.merchant_name"),
        F.col("m.category").alias("merchant_category"),
        F.col("m.risk_segment").alias("merchant_risk_segment"),
        F.col("m.monthly_volume").alias("merchant_monthly_volume"),
        
        # Device enrichment
        F.col("d.device_type"),
        F.col("d.os").alias("device_os"),
        F.col("d.reputation_score").alias("device_reputation"),
        
        # Blacklist flags
        F.when(F.col("bc.entity_id").isNotNull(), True).otherwise(False).alias("customer_blacklisted"),
        F.when(F.col("bd.entity_id").isNotNull(), True).otherwise(False).alias("device_blacklisted"),
        F.when(F.col("bi.entity_id").isNotNull(), True).otherwise(False).alias("ip_blacklisted"),
        
        # Geo enrichment
        F.col("g.country").alias("geo_country"),
        F.col("g.city").alias("geo_city"),
        F.col("g.latitude"),
        F.col("g.longitude"),
        
        # Derived features
        F.hour("t.event_ts").alias("hour_of_day"),
        F.dayofweek("t.event_ts").alias("day_of_week"),
        F.when(F.col("t.country") != F.col("c.country"), True).otherwise(False).alias("international_tx"),
        F.when(F.hour("t.event_ts").between(22, 6), True).otherwise(False).alias("night_tx"),
        F.when(F.col("t.amount") > F.col("c.credit_limit") * 0.8, True).otherwise(False).alias("high_amount_vs_limit"),
        
        F.current_timestamp().alias("enriched_ts")
    )
)

# Create enriched table
spark.sql("""
CREATE TABLE IF NOT EXISTS silver.tx_events_enriched (
  event_id STRING,
  event_ts TIMESTAMP,
  customer_id STRING,
  merchant_id STRING,
  device_id STRING,
  amount DOUBLE,
  currency STRING,
  tx_country STRING,
  channel STRING,
  payment_method STRING,
  ip STRING,
  card_last4 STRING,
  customer_risk_tier STRING,
  customer_kyc STRING,
  customer_country STRING,
  customer_credit_limit DOUBLE,
  merchant_name STRING,
  merchant_category STRING,
  merchant_risk_segment STRING,
  merchant_monthly_volume DOUBLE,
  device_type STRING,
  device_os STRING,
  device_reputation DOUBLE,
  customer_blacklisted BOOLEAN,
  device_blacklisted BOOLEAN,
  ip_blacklisted BOOLEAN,
  geo_country STRING,
  geo_city STRING,
  latitude DOUBLE,
  longitude DOUBLE,
  hour_of_day INT,
  day_of_week INT,
  international_tx BOOLEAN,
  night_tx BOOLEAN,
  high_amount_vs_limit BOOLEAN,
  enriched_ts TIMESTAMP
) USING DELTA
""")

# Start enrichment stream
enrichment_query = (enriched.writeStream
    .format("delta")
    .option("checkpointLocation", enrichment_checkpoint)
    .outputMode("append")
    .table("fraud_lab.silver.tx_events_enriched")
    .queryName("silver_enrichment"))

print("Enrichment stream started")

# Monitor the stream
import time
time.sleep(5)

print("Active enrichment stream:")
for query in spark.streams.active:
    if query.name == "silver_enrichment":
        print(f"- {query.name}: {query.status}")
```

---

## ⚙️ Stateful Feature Engineering {#stateful-features}

### Notebook 04: Velocity and Behavioral Features

```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

print("Starting stateful feature engineering...")

# Checkpoint locations
velocity_checkpoint = f"{checkpoint_base}/ops_velocity"
device_checkpoint = f"{checkpoint_base}/ops_device"
geo_checkpoint = f"{checkpoint_base}/ops_geo"

# Read enriched transaction stream
enriched = (spark.readStream
            .table("fraud_lab.silver.tx_events_enriched")
            .withWatermark("event_ts", "5 minutes"))

# Create operational tables for stateful features
spark.sql("""
CREATE TABLE IF NOT EXISTS ops.customer_velocity (
  customer_id STRING,
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  tx_count INT,
  total_amount DOUBLE,
  unique_merchants INT,
  unique_countries INT,
  unique_devices INT,
  avg_amount DOUBLE,
  max_amount DOUBLE,
  last_tx_ts TIMESTAMP
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS ops.device_behavior (
  device_id STRING,
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  unique_customers INT,
  unique_countries INT,
  total_transactions INT,
  customer_list ARRAY<STRING>,
  country_list ARRAY<STRING>,
  last_seen_ts TIMESTAMP
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS ops.geo_velocity (
  customer_id STRING,
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  locations ARRAY<STRUCT<country: STRING, city: STRING, lat: DOUBLE, lon: DOUBLE, ts: TIMESTAMP>>,
  max_distance_km DOUBLE,
  max_speed_kmh DOUBLE,
  country_changes INT
) USING DELTA
""")

# Feature 1: Customer Transaction Velocity (5-minute sliding windows)
customer_velocity = (enriched
    .groupBy(
        F.window("event_ts", "5 minutes", "1 minute"),
        "customer_id"
    )
    .agg(
        F.count("*").alias("tx_count"),
        F.sum("amount").alias("total_amount"),
        F.countDistinct("merchant_id").alias("unique_merchants"),
        F.countDistinct("tx_country").alias("unique_countries"),
        F.countDistinct("device_id").alias("unique_devices"),
        F.avg("amount").alias("avg_amount"),
        F.max("amount").alias("max_amount"),
        F.max("event_ts").alias("last_tx_ts")
    )
    .select(
        "customer_id",
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "tx_count",
        F.round("total_amount", 2).alias("total_amount"),
        "unique_merchants",
        "unique_countries", 
        "unique_devices",
        F.round("avg_amount", 2).alias("avg_amount"),
        "max_amount",
        "last_tx_ts"
    )
)

# Start customer velocity stream
velocity_query = (customer_velocity.writeStream
    .format("delta")
    .option("checkpointLocation", velocity_checkpoint)
    .outputMode("complete")  # Complete mode for windowed aggregations
    .table("fraud_lab.ops.customer_velocity")
    .queryName("customer_velocity"))

print("Customer velocity stream started")

# Feature 2: Device Sharing Behavior (10-minute sliding windows)
device_behavior = (enriched
    .groupBy(
        F.window("event_ts", "10 minutes", "2 minutes"),
        "device_id"
    )
    .agg(
        F.countDistinct("customer_id").alias("unique_customers"),
        F.countDistinct("tx_country").alias("unique_countries"),
        F.count("*").alias("total_transactions"),
        F.collect_set("customer_id").alias("customer_list"),
        F.collect_set("tx_country").alias("country_list"),
        F.max("event_ts").alias("last_seen_ts")
    )
    .select(
        "device_id",
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "unique_customers",
        "unique_countries",
        "total_transactions",
        "customer_list",
        "country_list",
        "last_seen_ts"
    )
)

# Start device behavior stream
device_query = (device_behavior.writeStream
    .format("delta")
    .option("checkpointLocation", device_checkpoint)
    .outputMode("complete")
    .table("fraud_lab.ops.device_behavior")
    .queryName("device_behavior"))

print("Device behavior stream started")

# Feature 3: Geo-velocity Analysis (15-minute windows)
# Simplified version - in production, calculate actual distances and speeds
geo_features = (enriched
    .filter("latitude IS NOT NULL AND longitude IS NOT NULL")
    .groupBy(
        F.window("event_ts", "15 minutes", "3 minutes"),
        "customer_id"
    )
    .agg(
        F.collect_list(
            F.struct(
                "tx_country", "geo_city", "latitude", "longitude", "event_ts"
            )
        ).alias("locations"),
        F.countDistinct("tx_country").alias("country_changes")
    )
    .select(
        "customer_id",
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "locations",
        F.lit(0.0).alias("max_distance_km"),  # Placeholder - implement haversine distance
        F.lit(0.0).alias("max_speed_kmh"),   # Placeholder - calculate based on time/distance
        "country_changes"
    )
)

# Start geo-velocity stream
geo_query = (geo_features.writeStream
    .format("delta")
    .option("checkpointLocation", geo_checkpoint)
    .outputMode("complete")
    .table("fraud_lab.ops.geo_velocity")
    .queryName("geo_velocity"))

print("Geo-velocity stream started")

# Monitor all stateful streams
import time
time.sleep(10)

print("\nActive stateful feature streams:")
for query in spark.streams.active:
    if query.name in ["customer_velocity", "device_behavior", "geo_velocity"]:
        print(f"- {query.name}: {query.status}")
        if query.lastProgress:
            print(f"  Batch: {query.lastProgress.get('batchId', 'N/A')}, "
                  f"Input rows: {query.lastProgress.get('inputRowsPerSecond', 'N/A')}")
```

---

## 🥇 Gold Layer: Advanced Fraud Rules {#gold-layer}

### Notebook 05: Fraud Detection Rules Engine

```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

print("Starting advanced fraud detection rules engine...")

# Checkpoint locations
suspicions_checkpoint = f"{checkpoint_base}/gold_fraud_suspicions"
alerts_checkpoint = f"{checkpoint_base}/gold_fraud_alerts"

# Read enriched transactions
tx_enriched = (spark.readStream
               .table("fraud_lab.silver.tx_events_enriched")
               .withWatermark("event_ts", "10 minutes"))

# Read stateful features (as batch joins - get latest state)
def get_latest_velocity():
    return (spark.read.table("fraud_lab.ops.customer_velocity")
            .withColumn("rank", 
                       F.row_number().over(
                           Window.partitionBy("customer_id")
                           .orderBy(F.col("window_end").desc())
                       ))
            .filter("rank = 1")
            .drop("rank"))

def get_latest_device_behavior():
    return (spark.read.table("fraud_lab.ops.device_behavior")
            .withColumn("rank",
                       F.row_number().over(
                           Window.partitionBy("device_id")
                           .orderBy(F.col("window_end").desc())
                       ))
            .filter("rank = 1")
            .drop("rank"))

def get_latest_geo_velocity():
    return (spark.read.table("fraud_lab.ops.geo_velocity")
            .withColumn("rank",
                       F.row_number().over(
                           Window.partitionBy("customer_id")
                           .orderBy(F.col("window_end").desc())
                       ))
            .filter("rank = 1")
            .drop("rank"))

# Create comprehensive fraud detection using foreachBatch
def fraud_detection_rules(batch_df, batch_id):
    if batch_df.count() == 0:
        return
    
    print(f"Processing batch {batch_id} with {batch_df.count()} transactions")
    
    # Join with latest stateful features
    velocity_df = get_latest_velocity()
    device_df = get_latest_device_behavior()
    geo_df = get_latest_geo_velocity()
    
    # Read login failures for recent context
    recent_login_failures = (spark.read.table("fraud_lab.silver.login_events_clean")
                            .filter("auth_result = 'failure'")
                            .filter(f"event_ts >= current_timestamp() - INTERVAL 30 MINUTES")
                            .groupBy("customer_id")
                            .agg(
                                F.count("*").alias("recent_login_failures"),
                                F.max("event_ts").alias("last_failure_ts")
                            ))
    
    # Join all features
    enriched_batch = (batch_df.alias("tx")
                      .join(velocity_df.alias("vel"), "customer_id", "left")
                      .join(device_df.alias("dev"), "device_id", "left") 
                      .join(geo_df.alias("geo"), "customer_id", "left")
                      .join(recent_login_failures.alias("login"), "customer_id", "left"))
    
    # Define fraud rules
    rules_df = enriched_batch.withColumn(
        "rule_results",
        F.array(
            # R1: High-risk customer + international + high amount
            F.struct(
                F.lit("R1_high_risk_international").alias("rule_name"),
                (
                    (F.col("customer_risk_tier") == "high") &
                    F.col("international_tx") &
                    (F.col("amount") > 5000)
                ).alias("triggered"),
                F.lit(0.35).alias("weight")
            ),
            
            # R2: Blacklisted entities
            F.struct(
                F.lit("R2_blacklisted_entity").alias("rule_name"),
                (
                    F.col("customer_blacklisted") |
                    F.col("device_blacklisted") |
                    F.col("ip_blacklisted")
                ).alias("triggered"),
                F.lit(0.45).alias("weight")
            ),
            
            # R3: High-risk merchant + large amount
            F.struct(
                F.lit("R3_high_risk_merchant").alias("rule_name"),
                (
                    F.col("merchant_risk_segment").isin(["high_risk", "watchlist"]) &
                    (F.col("amount") > 3000)
                ).alias("triggered"),
                F.lit(0.25).alias("weight")
            ),
            
            # R4: Velocity spike
            F.struct(
                F.lit("R4_velocity_spike").alias("rule_name"),
                (
                    (F.coalesce(F.col("tx_count"), F.lit(0)) > 15) |
                    (F.coalesce(F.col("total_amount"), F.lit(0)) > 25000) |
                    (F.coalesce(F.col("unique_merchants"), F.lit(0)) > 10)
                ).alias("triggered"),
                F.lit(0.30).alias("weight")
            ),
            
            # R5: Device misuse (multiple customers)
            F.struct(
                F.lit("R5_device_misuse").alias("rule_name"),
                (F.coalesce(F.col("unique_customers"), F.lit(0)) > 7).alias("triggered"),
                F.lit(0.20).alias("weight")
            ),
            
            # R6: Poor device reputation
            F.struct(
                F.lit("R6_poor_device_reputation").alias("rule_name"),
                (F.coalesce(F.col("device_reputation"), F.lit(1.0)) < 0.3).alias("triggered"),
                F.lit(0.15).alias("weight")
            ),
            
            # R7: Night transaction + high amount
            F.struct(
                F.lit("R7_night_high_amount").alias("rule_name"),
                (
                    F.col("night_tx") &
                    (F.col("amount") > 2000)
                ).alias("triggered"),
                F.lit(0.15).alias("weight")
            ),
            
            # R8: Multiple countries in short time
            F.struct(
                F.lit("R8_geo_velocity").alias("rule_name"),
                (F.coalesce(F.col("country_changes"), F.lit(0)) >= 3).alias("triggered"),
                F.lit(0.25).alias("weight")
            ),
            
            # R9: Amount vs credit limit ratio
            F.struct(
                F.lit("R9_high_credit_utilization").alias("rule_name"),
                F.col("high_amount_vs_limit").alias("triggered"),
                F.lit(0.10).alias("weight")
            ),
            
            # R10: Recent login failures followed by transaction
            F.struct(
                F.lit("R10_failed_login_then_tx").alias("rule_name"),
                (
                    (F.coalesce(F.col("recent_login_failures"), F.lit(0)) > 3) &
                    (F.col("amount") > 1000) &
                    (
                        F.col("event_ts").cast("long") - 
                        F.coalesce(F.col("last_failure_ts"), F.lit("1970-01-01")).cast("long")
                    ) < 1800  # 30 minutes
                ).alias("triggered"),
                F.lit(0.20).alias("weight")
            )
        )
    )
    
    # Calculate final risk score and extract triggered rules
    final_results = rules_df.withColumn(
        "triggered_rules",
        F.filter(F.col("rule_results"), lambda x: x.triggered)
    ).withColumn(
        "risk_score",
        F.round(
            F.aggregate(
                F.col("triggered_rules"),
                F.lit(0.0),
                lambda acc, x: acc + x.weight
            ),
            3
        )
    ).withColumn(
        "rule_names",
        F.transform(F.col("triggered_rules"), lambda x: x.rule_name)
    ).filter(
        "size(triggered_rules) > 0"  # Only keep transactions with rule violations
    )
    
    # Create fraud suspicions
    fraud_suspicions = final_results.select(
        "event_id",
        "event_ts", 
        "customer_id",
        "merchant_id",
        "device_id",
        "ip",
        "amount",
        "currency",
        "tx_country",
        F.col("rule_names").alias("reasons"),
        "risk_score",
        F.col("rule_names").alias("rule_hits"),
        F.current_timestamp().alias("created_ts")
    )
    
    # Write to fraud suspicions table
    if fraud_suspicions.count() > 0:
        fraud_suspicions.write.mode("append").saveAsTable("fraud_lab.gold.fraud_suspicions")
        print(f"  - Wrote {fraud_suspicions.count()} fraud suspicions")
        
        # Create alerts for high-risk cases
        alerts = fraud_suspicions.filter("risk_score >= 0.3").select(
            F.expr("uuid()").alias("alert_id"),
            "event_id",
            F.current_timestamp().alias("alert_ts"),
            F.when(F.col("risk_score") >= 0.6, "critical")
             .when(F.col("risk_score") >= 0.4, "high") 
             .when(F.col("risk_score") >= 0.3, "medium")
             .otherwise("low").alias("severity"),
            F.concat_ws(" | ",
                       F.lit("Fraud detected - Customer:"), F.col("customer_id"),
                       F.lit("Amount:"), F.col("amount"),
                       F.lit("Risk Score:"), F.col("risk_score")
                      ).alias("summary"),
            "reasons"
        )
        
        if alerts.count() > 0:
            alerts.write.mode("append").saveAsTable("fraud_lab.gold.fraud_alerts")
            print(f"  - Generated {alerts.count()} fraud alerts")

# Create Gold tables
spark.sql("""
CREATE TABLE IF NOT EXISTS gold.fraud_suspicions (
  event_id STRING,
  event_ts TIMESTAMP,
  customer_id STRING,
  merchant_id STRING,
  device_id STRING,
  ip STRING,
  amount DOUBLE,
  currency STRING,
  tx_country STRING,
  reasons ARRAY<STRING>,
  risk_score DOUBLE,
  rule_hits ARRAY<STRING>,
  created_ts TIMESTAMP
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS gold.fraud_alerts (
  alert_id STRING,
  event_id STRING,
  alert_ts TIMESTAMP,
  severity STRING,
  summary STRING,
  reasons ARRAY<STRING>
) USING DELTA
""")

# Start fraud detection stream using foreachBatch
fraud_query = (tx_enriched.writeStream
    .foreachBatch(fraud_detection_rules)
    .option("checkpointLocation", suspicions_checkpoint)
    .queryName("fraud_detection_engine")
    .start())

print("Fraud detection engine started")

# Monitor the fraud detection stream
import time
time.sleep(10)

print("\nFraud detection stream status:")
for query in spark.streams.active:
    if query.name == "fraud_detection_engine":
        print(f"- {query.name}: {query.status}")
        if query.lastProgress:
            print(f"  Batch: {query.lastProgress.get('batchId', 'N/A')}")
```

---

## 📊 Monitoring and Operations {#monitoring}

### Notebook 06: Real-time Monitoring Dashboard

```sql
-- Create operational monitoring tables
CREATE TABLE IF NOT EXISTS ops.pipeline_metrics (
  metric_ts TIMESTAMP,
  pipeline_name STRING,
  metric_name STRING,
  metric_value DOUBLE,
  metric_unit STRING
) USING DELTA;

CREATE TABLE IF NOT EXISTS ops.data_quality_metrics (
  check_ts TIMESTAMP,
  table_name STRING,
  total_records BIGINT,
  null_records BIGINT,
  duplicate_records BIGINT,
  quality_score DOUBLE
) USING DELTA;
```

```python
import time
from datetime import datetime, timedelta

def collect_pipeline_metrics():
    """Collect real-time pipeline metrics"""
    
    # Get current counts from each layer
    bronze_tx_count = spark.sql("SELECT COUNT(*) as cnt FROM bronze.tx_events_raw").collect()[0].cnt
    silver_clean_count = spark.sql("SELECT COUNT(*) as cnt FROM silver.tx_events_clean").collect()[0].cnt
    silver_enriched_count = spark.sql("SELECT COUNT(*) as cnt FROM silver.tx_events_enriched").collect()[0].cnt
    gold_suspicions_count = spark.sql("SELECT COUNT(*) as cnt FROM gold.fraud_suspicions").collect()[0].cnt
    gold_alerts_count = spark.sql("SELECT COUNT(*) as cnt FROM gold.fraud_alerts").collect()[0].cnt
    
    # Calculate processing rates
    processing_rate = silver_clean_count / max(bronze_tx_count, 1) * 100
    enrichment_rate = silver_enriched_count / max(silver_clean_count, 1) * 100
    fraud_detection_rate = gold_suspicions_count / max(silver_enriched_count, 1) * 100
    
    # Get recent fraud statistics
    recent_fraud_stats = spark.sql("""
        SELECT 
            COUNT(*) as recent_suspicions,
            AVG(risk_score) as avg_risk_score,
            COUNT(DISTINCT customer_id) as unique_customers_flagged
        FROM gold.fraud_suspicions 
        WHERE created_ts >= current_timestamp() - INTERVAL 15 MINUTES
    """).collect()[0]
    
    metrics = [
        ("bronze_tx_count", bronze_tx_count, "count"),
        ("silver_clean_count", silver_clean_count, "count"),
        ("silver_enriched_count", silver_enriched_count, "count"),
        ("gold_suspicions_count", gold_suspicions_count, "count"),
        ("gold_alerts_count", gold_alerts_count, "count"),
        ("processing_rate", processing_rate, "percentage"),
        ("enrichment_rate", enrichment_rate, "percentage"),
        ("fraud_detection_rate", fraud_detection_rate, "percentage"),
        ("recent_suspicions_15min", recent_fraud_stats.recent_suspicions, "count"),
        ("avg_risk_score_15min", recent_fraud_stats.avg_risk_score or 0, "score"),
        ("unique_customers_flagged_15min", recent_fraud_stats.unique_customers_flagged, "count")
    ]
    
    # Insert metrics
    metrics_data = [(datetime.now(), "fraud_detection_pipeline", name, value, unit) 
                   for name, value, unit in metrics]
    
    metrics_df = spark.createDataFrame(
        metrics_data, 
        ["metric_ts", "pipeline_name", "metric_name", "metric_value", "metric_unit"]
    )
    metrics_df.write.mode("append").saveAsTable("fraud_lab.ops.pipeline_metrics")
    
    return dict(metrics)

# Real-time monitoring queries
print("=== FRAUD DETECTION SYSTEM MONITORING DASHBOARD ===\n")

# 1. Pipeline Health Overview
print("1. PIPELINE HEALTH:")
metrics = collect_pipeline_metrics()
print(f"   Bronze Transactions: {metrics['bronze_tx_count']:,}")
print(f"   Silver Clean: {metrics['silver_clean_count']:,} ({metrics['processing_rate']:.1f}% of bronze)")
print(f"   Silver Enriched: {metrics['silver_enriched_count']:,} ({metrics['enrichment_rate']:.1f}% of clean)")
print(f"   Gold Suspicions: {metrics['gold_suspicions_count']:,} ({metrics['fraud_detection_rate']:.1f}% flagged)")
print(f"   Gold Alerts: {metrics['gold_alerts_count']:,}")

# 2. Recent Fraud Activity
print("\n2. RECENT FRAUD ACTIVITY (Last 15 minutes):")
print(f"   Suspicions: {metrics['recent_suspicions_15min']}")
print(f"   Average Risk Score: {metrics['avg_risk_score_15min']:.3f}")
print(f"   Unique Customers Flagged: {metrics['unique_customers_flagged_15min']}")

# 3. Streaming Query Health
print("\n3. STREAMING QUERY STATUS:")
for query in spark.streams.active:
    status = query.status
    name = query.name or "unnamed"
    print(f"   {name}: {status}")
    if query.lastProgress:
        progress = query.lastProgress
        print(f"     Batch: {progress.get('batchId', 'N/A')}")
        print(f"     Input Rate: {progress.get('inputRowsPerSecond', 'N/A')} rows/sec")
        print(f"     Processing Time: {progress.get('durationMs', {}).get('triggerExecution', 'N/A')} ms")

# 4. Top Fraud Rules
print("\n4. TOP FRAUD RULES (Last hour):")
top_rules = spark.sql("""
    SELECT 
        explode(reasons) as rule_name,
        COUNT(*) as trigger_count,
        AVG(risk_score) as avg_risk_score
    FROM gold.fraud_suspicions 
    WHERE created_ts >= current_timestamp() - INTERVAL 1 HOUR
    GROUP BY rule_name
    ORDER BY trigger_count DESC
    LIMIT 5
""")
top_rules.show(truncate=False)

# 5. High-Risk Customers
print("\n5. HIGH-RISK CUSTOMERS (Last hour):")
high_risk_customers = spark.sql("""
    SELECT 
        customer_id,
        COUNT(*) as suspicion_count,
        MAX(risk_score) as max_risk_score,
        SUM(amount) as total_amount,
        array_distinct(flatten(collect_list(reasons))) as all_reasons
    FROM gold.fraud_suspicions 
    WHERE created_ts >= current_timestamp() - INTERVAL 1 HOUR
    GROUP BY customer_id
    HAVING max_risk_score > 0.4
    ORDER BY max_risk_score DESC, suspicion_count DESC
    LIMIT 10
""")
high_risk_customers.show(truncate=False)

# 6. Alert Summary by Severity
print("\n6. ALERTS BY SEVERITY (Last hour):")
alert_summary = spark.sql("""
    SELECT 
        severity,
        COUNT(*) as alert_count,
        COUNT(DISTINCT event_id) as unique_events
    FROM gold.fraud_alerts 
    WHERE alert_ts >= current_timestamp() - INTERVAL 1 HOUR
    GROUP BY severity
    ORDER BY 
        CASE severity 
            WHEN 'critical' THEN 1 
            WHEN 'high' THEN 2 
            WHEN 'medium' THEN 3 
            ELSE 4 
        END
""")
alert_summary.show()

# 7. Data Quality Checks
print("\n7. DATA QUALITY METRICS:")
quality_check = spark.sql("""
    SELECT 
        'silver.tx_events_clean' as table_name,
        COUNT(*) as total_records,
        COUNT(*) - COUNT(customer_id) as null_customers,
        COUNT(*) - COUNT(merchant_id) as null_merchants,
        COUNT(*) - COUNT(amount) as null_amounts
    FROM silver.tx_events_clean
    WHERE processed_ts >= current_timestamp() - INTERVAL 1 HOUR
""")
quality_check.show()

# 8. Velocity Analytics
print("\n8. VELOCITY PATTERNS (Current 5-minute windows):")
velocity_patterns = spark.sql("""
    SELECT 
        tx_count,
        COUNT(*) as customers_with_count,
        AVG(total_amount) as avg_total_amount,
        MAX(total_amount) as max_total_amount
    FROM ops.customer_velocity 
    WHERE window_end >= current_timestamp() - INTERVAL 10 MINUTES
    GROUP BY tx_count
    HAVING tx_count > 1
    ORDER BY tx_count DESC
""")
velocity_patterns.show()

print("\n=== END OF MONITORING DASHBOARD ===")
```

### Continuous Monitoring Loop

```python
# Run continuous monitoring (optional)
def continuous_monitoring(interval_seconds=60):
    """Run monitoring dashboard every N seconds"""
    
    print("Starting continuous monitoring... (Press Ctrl+C to stop)")
    
    try:
        while True:
            print(f"\n{'='*50}")
            print(f"MONITORING UPDATE: {datetime.now()}")
            print(f"{'='*50}")
            
            # Collect and display key metrics
            metrics = collect_pipeline_metrics()
            
            # Show streaming query health
            active_queries = len(spark.streams.active)
            print(f"Active Streams: {active_queries}")
            
            # Show recent fraud activity
            recent_alerts = spark.sql("""
                SELECT severity, COUNT(*) as count 
                FROM gold.fraud_alerts 
                WHERE alert_ts >= current_timestamp() - INTERVAL 5 MINUTES
                GROUP BY severity
            """).collect()
            
            if recent_alerts:
                print("Recent Alerts (5 min):", {row.severity: row.count for row in recent_alerts})
            else:
                print("Recent Alerts (5 min): None")
            
            # Show top suspicious customers
            top_suspicious = spark.sql("""
                SELECT customer_id, COUNT(*) as count, MAX(risk_score) as max_score
                FROM gold.fraud_suspicions 
                WHERE created_ts >= current_timestamp() - INTERVAL 5 MINUTES
                GROUP BY customer_id
                ORDER BY max_score DESC
                LIMIT 3
            """).collect()
            
            if top_suspicious:
                print("Top Suspicious Customers:")
                for row in top_suspicious:
                    print(f"  {row.customer_id}: {row.count} events, max risk {row.max_score:.3f}")
            
            time.sleep(interval_seconds)
            
    except KeyboardInterrupt:
        print("\nMonitoring stopped by user")

# Uncomment to run continuous monitoring
# continuous_monitoring(30)  # Every 30 seconds
```

---

## 🔧 Production Considerations {#production-considerations}

### Performance Optimization

```python
# Optimize streaming queries for production
optimization_configs = {
    # Adaptive Query Execution
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    
    # Streaming optimizations
    "spark.sql.streaming.metricsEnabled": "true",
    "spark.sql.streaming.ui.enabled": "true",
    "spark.sql.streaming.checkpointLocation.deleteOnExit": "false",
    
    # Delta optimizations
    "spark.databricks.delta.autoCompact.enabled": "true",
    "spark.databricks.delta.optimizeWrite.enabled": "true",
    "spark.databricks.delta.autoOptimize.optimizeWrite": "true",
    
    # Memory and performance
    "spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
    "spark.sql.streaming.minBatchesToRetain": "10",
    
    # Checkpointing
    "spark.sql.streaming.checkpointFileManagerClass": "org.apache.spark.sql.execution.streaming.CheckpointFileManager"
}

for key, value in optimization_configs.items():
    spark.conf.set(key, value)
```

### Error Handling and Recovery

```python
def robust_stream_starter(stream_func, checkpoint_path, query_name, max_retries=3):
    """Start a streaming query with error handling and retries"""
    
    for attempt in range(max_retries):
        try:
            query = stream_func()
            print(f"Successfully started {query_name} on attempt {attempt + 1}")
            return query
            
        except Exception as e:
            print(f"Attempt {attempt + 1} failed for {query_name}: {str(e)}")
            
            if attempt < max_retries - 1:
                print(f"Retrying in 10 seconds...")
                time.sleep(10)
            else:
                print(f"Failed to start {query_name} after {max_retries} attempts")
                raise e

# Example usage:
# fraud_query = robust_stream_starter(
#     lambda: tx_enriched.writeStream.foreachBatch(fraud_detection_rules)
#             .option("checkpointLocation", suspicions_checkpoint)
#             .start(),
#     suspicions_checkpoint,
#     "fraud_detection_engine"
# )
```

### Data Archival and Cleanup

```sql
-- Automated cleanup procedures
CREATE OR REPLACE FUNCTION cleanup_old_data(retention_days INT)
RETURNS STRING
LANGUAGE SQL
AS
$$
  -- Archive old bronze data
  DELETE FROM bronze.tx_events_raw 
  WHERE event_ts < current_timestamp() - INTERVAL retention_days DAYS;
  
  -- Clean up old operational metrics
  DELETE FROM ops.pipeline_metrics 
  WHERE metric_ts < current_timestamp() - INTERVAL (retention_days * 2) DAYS;
  
  RETURN 'Cleanup completed for data older than ' || retention_days || ' days';
$$;

-- Schedule cleanup (run weekly)
-- SELECT cleanup_old_data(30);
```

### Health Checks and Alerting

```python
def health_check_pipeline():
    """Comprehensive pipeline health check"""
    
    issues = []
    
    # Check if all streams are active
    expected_streams = [
        "bronze_to_silver_tx_clean",
        "silver_enrichment", 
        "customer_velocity",
        "device_behavior",
        "fraud_detection_engine"
    ]
    
    active_stream_names = [q.name for q in spark.streams.active if q.name]
    
    for expected in expected_streams:
        if expected not in active_stream_names:
            issues.append(f"Stream '{expected}' is not active")
    
    # Check data freshness
    latest_bronze = spark.sql("""
        SELECT MAX(event_ts) as latest_ts 
        FROM bronze.tx_events_raw
    """).collect()[0].latest_ts
    
    if latest_bronze:
        freshness_minutes = (datetime.now() - latest_bronze).total_seconds() / 60
        if freshness_minutes > 10:  # Alert if data is more than 10 minutes old
            issues.append(f"Data freshness issue: latest data is {freshness_minutes:.1f} minutes old")
    
    # Check error rates
    total_bronze = spark.sql("SELECT COUNT(*) as cnt FROM bronze.tx_events_raw").collect()[0].cnt
    total_silver = spark.sql("SELECT COUNT(*) as cnt FROM silver.tx_events_clean").collect()[0].cnt
    
    if total_bronze > 0:
        success_rate = total_silver / total_bronze * 100
        if success_rate < 95:  # Alert if success rate below 95%
            issues.append(f"Low processing success rate: {success_rate:.1f}%")
    
    # Return health status
    if issues:
        return {"status": "UNHEALTHY", "issues": issues}
    else:
        return {"status": "HEALTHY", "issues": []}

# Run health check
health_status = health_check_pipeline()
print(f"Pipeline Health: {health_status['status']}")
if health_status['issues']:
    print("Issues found:")
    for issue in health_status['issues']:
        print(f"  - {issue}")
```

---

## 🎯 Exercise Summary

### What Students Will Learn

1. **Enterprise Streaming Architecture**
   - Multi-layer data architecture (Bronze → Silver → Gold)
   - Real-time data ingestion and processing
   - Stateful stream processing with watermarks

2. **Advanced Fraud Detection**
   - Complex rule-based detection logic
   - Velocity analysis and behavioral patterns
   - Multi-dimensional risk scoring

3. **Production Operations**
   - Checkpointing and fault tolerance
   - Monitoring and alerting systems
   - Performance optimization techniques

4. **Databricks Platform Mastery**
   - Delta Lake features and optimizations
   - Structured Streaming capabilities
   - Unity Catalog integration (if enabled)

### Deployment Instructions

1. **Sequential Notebook Execution:**
   - Run notebooks 00-02 first to set up infrastructure
   - Start notebook 01 (generators) and leave running
   - Execute notebooks 03-05 to start streaming pipelines
   - Use notebook 06 for ongoing monitoring

2. **Resource Requirements:**
   - Minimum: 2-node cluster (8 cores, 32GB RAM)
   - Recommended: 4-node cluster (16 cores, 64GB RAM)
   - Storage: 100GB+ for checkpoint and data files

3. **Runtime Considerations:**
   - Allow 2-3 hours for full exercise completion
   - Streams should run for at least 30 minutes to see patterns
   - Monitor resource usage and scale as needed

This comprehensive exercise provides hands-on experience with enterprise-grade streaming analytics using only Databricks-native components, preparing students for real-world fraud detection implementations.