In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime, timedelta
import time

CATALOG = "fraud_detection"
SCHEMA  = "raw"

ALL_TRANSACTIONS_TABLE = f"{CATALOG}.{SCHEMA}.silver_all_transactions"
FRAUD_ALERTS_TABLE     = f"{CATALOG}.{SCHEMA}.gold_fraud_alerts"
CHECKPOINT_BASE        = f"/Volumes/{CATALOG}/{SCHEMA}/raw_files/checkpoints"

MAX_SPEED_KMH    = 900.0
MIN_DISTANCE_KM  = 5000.0
MIN_GAP_MINUTES  = 5.0
MAX_GAP_MINUTES  = 40.0

EVENT_HUB_NAMESPACE = "eh-fraud-detection-kevin"
EVENT_HUB_NAME      = "credit-card-transactions"
EVENT_HUB_KEY_NAME  = "RootManageSharedAccessKey"
EVENT_HUB_KEY       = ""
EVENT_HUB_CONNECTION_STRING = (
    f"Endpoint=sb://{EVENT_HUB_NAMESPACE}.servicebus.windows.net/;"
    f"SharedAccessKeyName={EVENT_HUB_KEY_NAME};SharedAccessKey={EVENT_HUB_KEY};"
    f"EntityPath={EVENT_HUB_NAME}"
)

# State table for tracking card history
CARD_STATE_TABLE = f"{CATALOG}.{SCHEMA}.card_state"

# NEW: Metrics tables for dashboards
METRICS_REALTIME_TABLE = f"{CATALOG}.{SCHEMA}.gold_fraud_metrics_realtime"
PERFORMANCE_TABLE = f"{CATALOG}.{SCHEMA}.gold_system_performance"

# Transaction schema
TRANSACTION_SCHEMA = StructType([
    StructField("card_id", IntegerType(), False),
    StructField("transaction_id", StringType(), False),
    StructField("location", StringType(), False),
    StructField("latitude", DoubleType(), False),
    StructField("longitude", DoubleType(), False),
    StructField("timestamp", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("merchant", StringType(), False),
    StructField("merchant_category", StringType(), False),
    StructField("currency", StringType(), False)
])

print("‚úÖ Config loaded")


‚úÖ Config loaded


In [0]:
# Stop all active streams
for stream in spark.streams.active:
    stream.stop()
    print(f"Stopped: {stream.name}")

print(f"‚úÖ Active streams: {len(spark.streams.active)}")


‚úÖ Active streams: 0


In [0]:
# Card state tracking table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CARD_STATE_TABLE} (
    card_id INT PRIMARY KEY,
    last_transaction_id STRING,
    last_location STRING,
    last_latitude DOUBLE,
    last_longitude DOUBLE,
    last_timestamp TIMESTAMP
) USING DELTA
""")

# Real-time metrics table (for dashboards)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {METRICS_REALTIME_TABLE} (
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    total_transactions BIGINT,
    fraud_alerts BIGINT,
    fraud_rate_pct DOUBLE,
    total_fraud_amount DOUBLE,
    avg_fraud_speed_kmh DOUBLE,
    critical_alerts BIGINT,
    high_alerts BIGINT,
    unique_cards_flagged BIGINT,
    updated_at TIMESTAMP
) USING DELTA
""")

# Performance tracking table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {PERFORMANCE_TABLE} (
    batch_id BIGINT,
    batch_timestamp TIMESTAMP,
    transactions_processed BIGINT,
    fraud_detected BIGINT,
    processing_time_sec DOUBLE,
    avg_latency_ms DOUBLE
) USING DELTA
""")

print(f"‚úÖ All tables created:")
print(f"   - Silver: {ALL_TRANSACTIONS_TABLE}")
print(f"   - Gold Alerts: {FRAUD_ALERTS_TABLE}")
print(f"   - State: {CARD_STATE_TABLE}")
print(f"   - Metrics: {METRICS_REALTIME_TABLE}")
print(f"   - Performance: {PERFORMANCE_TABLE}")


‚úÖ All tables created:
   - Silver: fraud_detection.raw.silver_all_transactions
   - Gold Alerts: fraud_detection.raw.gold_fraud_alerts
   - State: fraud_detection.raw.card_state
   - Metrics: fraud_detection.raw.gold_fraud_metrics_realtime
   - Performance: fraud_detection.raw.gold_system_performance


In [0]:
# Event Hubs configuration
eh_conf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING),
    "eventhubs.consumerGroup": "$Default",
    "maxEventsPerTrigger": 1000,
    "startingOffsets": "earliest"
}

# Read and parse stream
parsed_stream = (
    spark.readStream
    .format("eventhubs")
    .options(**eh_conf)
    .load()
    .selectExpr("CAST(body AS STRING) AS json_str")
    .select(from_json(col("json_str"), TRANSACTION_SCHEMA).alias("data"))
    .select("data.*")
    .withColumn("event_timestamp", 
        coalesce(
            to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"),
            to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss"),
            to_timestamp(col("timestamp"))
        )
    )
    .withColumn("ingest_ts", current_timestamp())
)

print("‚úÖ Stream created from Event Hubs")


‚úÖ Stream created from Event Hubs


In [0]:
def process_batch(batch_df, batch_id):
    """
    Process each micro-batch with fraud detection + metrics tracking
    """
    
    if batch_df.rdd.isEmpty():
        return
    
    batch_start = time.time()
    batch_count = batch_df.count()
    
    print(f"\n{'='*70}")
    print(f"üì¶ Batch {batch_id} | {batch_count} transactions")
    print(f"{'='*70}")
    
    # === SILVER: Write raw transactions ===
    batch_df.write.format("delta").mode("append").saveAsTable(ALL_TRANSACTIONS_TABLE)
    
    # === Get card state ===
    state_df = spark.table(CARD_STATE_TABLE)
    with_prev = batch_df.join(state_df, "card_id", "left")
    
    # === Calculate fraud features ===
    fraud_candidates = with_prev.select(
        col("card_id"),
        col("transaction_id"),
        col("last_transaction_id").alias("prev_transaction_id"),
        col("location"),
        col("last_location").alias("prev_location"),
        col("latitude"),
        col("longitude"),
        col("last_latitude").alias("prev_latitude"),
        col("last_longitude").alias("prev_longitude"),
        col("event_timestamp"),
        col("last_timestamp").alias("prev_timestamp"),
        col("amount"),
        col("merchant"),
        
        # Distance calculation (Haversine formula)
        when(col("last_latitude").isNotNull(),
            expr("""
                6371 * acos(
                    least(1.0, greatest(-1.0,
                        cos(radians(last_latitude)) * cos(radians(latitude)) *
                        cos(radians(longitude) - radians(last_longitude)) +
                        sin(radians(last_latitude)) * sin(radians(latitude))
                    ))
                )
            """)
        ).otherwise(0.0).alias("distance_km"),
        
        # Time difference in minutes
        when(col("last_timestamp").isNotNull(),
            (col("event_timestamp").cast("long") - col("last_timestamp").cast("long")) / 60.0
        ).otherwise(None).alias("time_diff_min")
    )
    
    # === GOLD: Detect fraud ===
    fraud_alerts = (
        fraud_candidates
        .filter(col("distance_km") >= MIN_DISTANCE_KM)
        .filter(col("time_diff_min").between(MIN_GAP_MINUTES, MAX_GAP_MINUTES))
        .withColumn("speed_kmh", col("distance_km") / (col("time_diff_min") / 60.0))
        .filter(col("speed_kmh") > MAX_SPEED_KMH)
        .withColumn("alert_id", expr("uuid()"))
        .withColumn("alert_type", lit("IMPOSSIBLE_TRAVEL"))
        .withColumn("severity",
            when(col("speed_kmh") > 10000, "CRITICAL")
            .when(col("speed_kmh") > 5000, "HIGH")
            .otherwise("MEDIUM")
        )
        .withColumn("alert_timestamp", current_timestamp())
        .withColumn("detection_latency_ms",
            (unix_timestamp(current_timestamp()) - unix_timestamp(col("event_timestamp"))) * 1000
        )
        .select(
            "alert_id", "card_id", "alert_type",
            "transaction_id", "prev_transaction_id",
            "location", "prev_location",
            round(col("distance_km"), 2).alias("distance_km"),
            round(col("time_diff_min"), 2).alias("time_diff_minutes"),
            round(col("speed_kmh"), 2).alias("speed_kmh"),
            "amount", "merchant", "alert_timestamp", "severity",
            col("detection_latency_ms")
        )
    )
    
    # Write fraud alerts
    fraud_count = fraud_alerts.count()
    if fraud_count > 0:
        fraud_alerts.write.format("delta").mode("append").saveAsTable(FRAUD_ALERTS_TABLE)
        print(f"üö® FRAUD DETECTED: {fraud_count} alerts!")
        fraud_alerts.show(truncate=False)
    else:
        print(f"‚úÖ No fraud detected")
    
    # === Update card state (DELETE + INSERT) ===
    cards = [row.card_id for row in batch_df.select("card_id").distinct().collect()]
    if cards:
        spark.sql(f"DELETE FROM {CARD_STATE_TABLE} WHERE card_id IN ({','.join(map(str, cards))})")
    
    # Get latest transaction per card
    w = Window.partitionBy("card_id").orderBy(col("event_timestamp").desc())
    latest = batch_df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
    
    new_state = latest.select(
        col("card_id"),
        col("transaction_id").alias("last_transaction_id"),
        col("location").alias("last_location"),
        col("latitude").alias("last_latitude"),
        col("longitude").alias("last_longitude"),
        col("event_timestamp").alias("last_timestamp")
    )
    new_state.write.format("delta").mode("append").saveAsTable(CARD_STATE_TABLE)
    
    # === NEW: Performance metrics ===
    batch_end = time.time()
    processing_time = batch_end - batch_start
    
    perf_data = spark.createDataFrame([{
        "batch_id": batch_id,
        "batch_timestamp": datetime.now(),
        "transactions_processed": batch_count,
        "fraud_detected": fraud_count,
        "processing_time_sec": round(processing_time, 2),
        "avg_latency_ms": fraud_alerts.select(avg("detection_latency_ms")).first()[0] if fraud_count > 0 else 0.0
    }])
    perf_data.write.format("delta").mode("append").saveAsTable(PERFORMANCE_TABLE)
    
    # === NEW: Real-time metrics (last 1 minute window) ===
    current_time = datetime.now()
    window_start = current_time - timedelta(minutes=1)
    
    recent_txns = spark.table(ALL_TRANSACTIONS_TABLE).filter(col("ingest_ts") >= window_start)
    recent_fraud = spark.table(FRAUD_ALERTS_TABLE).filter(col("alert_timestamp") >= window_start)
    
    total_txns = recent_txns.count()
    total_fraud = recent_fraud.count()
    
    metrics_data = spark.createDataFrame([{
        "window_start": window_start,
        "window_end": current_time,
        "total_transactions": total_txns,
        "fraud_alerts": total_fraud,
        "fraud_rate_pct": round((total_fraud / total_txns * 100) if total_txns > 0 else 0.0, 2),
        "total_fraud_amount": float(recent_fraud.select(sum("amount")).first()[0] or 0.0),
        "avg_fraud_speed_kmh": float(recent_fraud.select(avg("speed_kmh")).first()[0] or 0.0),
        "critical_alerts": recent_fraud.filter(col("severity") == "CRITICAL").count(),
        "high_alerts": recent_fraud.filter(col("severity") == "HIGH").count(),
        "unique_cards_flagged": recent_fraud.select(countDistinct("card_id")).first()[0] or 0,
        "updated_at": current_time
    }])
    metrics_data.write.format("delta").mode("append").saveAsTable(METRICS_REALTIME_TABLE)
    
    fraud_rate = (fraud_count / batch_count * 100) if batch_count > 0 else 0
    print(f"‚úÖ Batch complete in {processing_time:.2f}s | Fraud rate: {fraud_rate:.1f}%")
    print(f"   State updated: {new_state.count()} cards")

print("‚úÖ Enhanced processing function defined")


‚úÖ Enhanced processing function defined


In [0]:
stream = (
    parsed_stream.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/main_stream")
    .trigger(processingTime="10 seconds")
    .start()
)

print("=" * 70)
print("üöÄ PRODUCTION FRAUD DETECTION PIPELINE STARTED")
print(f"   Active streams: {len(spark.streams.active)}")
print("=" * 70)
print()
print("üìä Tables:")
print(f"   ü•à Silver: {ALL_TRANSACTIONS_TABLE}")
print(f"   ü•á Gold Alerts: {FRAUD_ALERTS_TABLE}")
print(f"   üìç State: {CARD_STATE_TABLE}")
print(f"   üìà Metrics: {METRICS_REALTIME_TABLE}")
print(f"   ‚ö° Performance: {PERFORMANCE_TABLE}")
print()
print("üöÄ START YOUR GENERATOR NOW!")
print("   python transaction_generator.py")


üöÄ PRODUCTION FRAUD DETECTION PIPELINE STARTED
   Active streams: 1

üìä Tables:
   ü•à Silver: fraud_detection.raw.silver_all_transactions
   ü•á Gold Alerts: fraud_detection.raw.gold_fraud_alerts
   üìç State: fraud_detection.raw.card_state
   üìà Metrics: fraud_detection.raw.gold_fraud_metrics_realtime
   ‚ö° Performance: fraud_detection.raw.gold_system_performance

üöÄ START YOUR GENERATOR NOW!
   python transaction_generator.py


In [0]:
print("üìä PIPELINE STATUS")
print("=" * 70)

# Check stream health
for stream in spark.streams.active:
    print(f"Stream: {stream.id}")
    print(f"  Status: {'‚úÖ Running' if stream.isActive else '‚ùå Stopped'}")
    if stream.lastProgress:
        print(f"  Last Batch ID: {stream.lastProgress.get('batchId', 'N/A')}")
        print(f"  Input Rows: {stream.lastProgress.get('numInputRows', 0)}")

print("\nüìà DATA VOLUMES")
print("=" * 70)
print(f"Silver transactions: {spark.table(ALL_TRANSACTIONS_TABLE).count()}")
print(f"Gold fraud alerts: {spark.table(FRAUD_ALERTS_TABLE).count()}")
print(f"Card states: {spark.table(CARD_STATE_TABLE).count()}")
print(f"Metrics records: {spark.table(METRICS_REALTIME_TABLE).count()}")
print(f"Performance records: {spark.table(PERFORMANCE_TABLE).count()}")


üìä PIPELINE STATUS
Stream: 73bfe3b5-1157-46a9-8493-96d6b4b6196f
  Status: ‚úÖ Running

üìà DATA VOLUMES
Silver transactions: 1090

üì¶ Batch 1 | 20 transactions
Gold fraud alerts: 29
Card states: 51
Metrics records: 0
Performance records: 0


In [0]:
print("üö® LATEST FRAUD ALERTS")
print("=" * 70)

display(
    spark.table(FRAUD_ALERTS_TABLE)
    .orderBy(col("alert_timestamp").desc())
    .limit(20)
)


üö® LATEST FRAUD ALERTS


alert_id,card_id,alert_type,transaction_id,previous_transaction_id,current_location,previous_location,distance_km,time_diff_minutes,speed_kmh,amount,merchant,alert_timestamp,severity
0dad3742-cb38-4025-a1c4-da2573b813b2,1048,IMPOSSIBLE_TRAVEL,TXN-20260211014206-4167,TXN-20260211012706-5407,Tokyo,New York,10851.73,15.0,43406.93,275.58,Marriott,2026-02-11T01:42:06.18365Z,CRITICAL
6103d5f6-24fa-42d9-92f0-00073cba6702,1002,IMPOSSIBLE_TRAVEL,TXN-20260211013737-9207,TXN-20260211012237-1330,London,Dubai,5473.43,15.0,21893.72,325.63,Starbucks,2026-02-11T01:37:37.058502Z,CRITICAL
1cf69f98-0d5e-4be9-b1eb-5d0392d270c9,1025,IMPOSSIBLE_TRAVEL,TXN-20260211012749-8266,TXN-20260211010041-5338,Mumbai,Los Angeles,13991.18,27.13,30938.73,5.89,Starbucks,2026-02-11T01:27:49.942428Z,CRITICAL
1eea409f-2ef5-4777-b358-2f55c5e467e7,1048,IMPOSSIBLE_TRAVEL,TXN-20260211012706-5407,TXN-20260211010106-2749,New York,Dubai,11008.07,26.0,25403.25,116.14,Uber,2026-02-11T01:27:06.18365Z,CRITICAL
beb1425f-f9d5-4470-88a6-1109eb875b13,1020,IMPOSSIBLE_TRAVEL,TXN-20260211012628-8003,TXN-20260211005858-7378,Paris,Tokyo,9711.72,27.5,21189.22,442.96,Microsoft Store,2026-02-11T01:26:28.96636Z,CRITICAL
4520f6c3-3eab-43da-af55-ea9ca603bd4e,1028,IMPOSSIBLE_TRAVEL,TXN-20260211012441-6865,TXN-20260211005933-3481,London,Dubai,5473.43,25.13,13066.54,106.5,IKEA,2026-02-11T01:24:41.102076Z,CRITICAL
33825759-814e-4a16-95cb-0b64cb2e8ea0,1002,IMPOSSIBLE_TRAVEL,TXN-20260211012237-1330,TXN-20260211005937-5726,Dubai,Sydney,12049.52,23.0,31433.54,250.58,Marriott,2026-02-11T01:22:37.058502Z,CRITICAL
be11ce55-b34c-4bfa-8fbc-5ce2691d9a0c,1016,IMPOSSIBLE_TRAVEL,TXN-20260211011925-1506,TXN-20260211010025-8249,New York,S√£o Paulo,7685.63,19.0,24270.4,304.98,Target,2026-02-11T01:19:25.666161Z,CRITICAL
008c85d7-76f4-4c29-9c36-521b665bb591,1006,IMPOSSIBLE_TRAVEL,TXN-20260211011812-2108,TXN-20260211005858-5254,Sydney,Singapore,6306.25,19.23,19672.88,322.96,Best Buy,2026-02-11T01:18:12.787782Z,CRITICAL
bf078f92-c058-4383-91cf-346d047513bc,1049,IMPOSSIBLE_TRAVEL,TXN-20260211011742-6845,TXN-20260211010124-9325,Sydney,London,16993.93,16.3,62554.36,216.02,Starbucks,2026-02-11T01:17:42.630607Z,CRITICAL


In [0]:
print("üìä REAL-TIME METRICS (Last Hour)")
print("=" * 70)

display(
    spark.table(METRICS_REALTIME_TABLE)
    .filter(col("window_start") >= current_timestamp() - expr("INTERVAL 1 HOUR"))
    .orderBy(col("window_start").desc())
    .limit(60)
)


üìä REAL-TIME METRICS (Last Hour)


window_start,window_end,total_transactions,fraud_alerts,fraud_rate_pct,total_fraud_amount,avg_fraud_speed_kmh,critical_alerts,high_alerts,unique_cards_flagged,updated_at
