# Bronze Layer: Raw Data Ingestion from Event Hubs

## Purpose
Stream raw transaction data from Azure Event Hubs and write to Delta Lake Bronze table.

## Data Flow
```
Event Hubs ‚Üí Parse JSON ‚Üí Bronze Delta Table (Raw)
```

## Outputs
- **Table:** `fraud_lakehouse_workspace.default.bronze_transactions`
- **Format:** Delta Lake
- **Schema:** All 30+ original columns + Event Hubs metadata

## 1. Import Libraries & Setup

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Event Hubs connection string WITH EntityPath
CONNECTION_STRING = "YOUR_KEY_HERE" # Replace with your actual Event Hubs connection string

# Encrypt the connection string
ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(CONNECTION_STRING)
}

print("Imports and configuration loaded")

## 2. Connect to Event Hubs Stream

In [0]:
print("üì° Connecting to Event Hubs stream...")

# Read streaming data from Event Hubs
try:
    df_stream = spark \
      .readStream \
      .format("eventhubs") \
      .options(**ehConf) \
      .load()
    print("Connected to Event Hubs stream")
    print("\n Raw Event Hubs Schema:")
    df_stream.printSchema()
except Exception as e:
    print(f"Connection failed: {e}")
    raise

## 3. Define Fraud Transaction Schema

The Event Hubs `body` column contains JSON with 30+ features:
- **Time:** Seconds elapsed since first transaction
- **V1-V28:** PCA-transformed features (anonymized)
- **Amount:** Transaction amount in Euros
- **Class:** 0 = Legitimate, 1 = Fraud

In [0]:
# Define schema for parsing JSON body
fraud_schema = StructType([
    StructField("Time", DoubleType(), True),
    StructField("V1", DoubleType(), True),
    StructField("V2", DoubleType(), True),
    StructField("V3", DoubleType(), True),
    StructField("V4", DoubleType(), True),
    StructField("V5", DoubleType(), True),
    StructField("V6", DoubleType(), True),
    StructField("V7", DoubleType(), True),
    StructField("V8", DoubleType(), True),
    StructField("V9", DoubleType(), True),
    StructField("V10", DoubleType(), True),
    StructField("V11", DoubleType(), True),
    StructField("V12", DoubleType(), True),
    StructField("V13", DoubleType(), True),
    StructField("V14", DoubleType(), True),
    StructField("V15", DoubleType(), True),
    StructField("V16", DoubleType(), True),
    StructField("V17", DoubleType(), True),
    StructField("V18", DoubleType(), True),
    StructField("V19", DoubleType(), True),
    StructField("V20", DoubleType(), True),
    StructField("V21", DoubleType(), True),
    StructField("V22", DoubleType(), True),
    StructField("V23", DoubleType(), True),
    StructField("V24", DoubleType(), True),
    StructField("V25", DoubleType(), True),
    StructField("V26", DoubleType(), True),
    StructField("V27", DoubleType(), True),
    StructField("V28", DoubleType(), True),
    StructField("Amount", DoubleType(), True),
    StructField("Class", IntegerType(), True),
    StructField("IngestionTime", DoubleType(), True),
    StructField("OriginalClass", IntegerType(), True)
])

print("Schema defined with 30+ columns")

## 4. Parse JSON and Extract Fields

In [0]:
# Parse JSON from Event Hubs body
df_bronze = df_stream \
    .withColumn("body_string", col("body").cast("string")) \
    .withColumn("transaction", from_json(col("body_string"), fraud_schema)) \
    .select(
        # Transaction data (from JSON)
        col("transaction.*"),
        # Event Hubs metadata
        col("enqueuedTime").alias("eventhub_enqueued_time"),
        col("offset").alias("eventhub_offset"),
        col("sequenceNumber").alias("eventhub_sequence"),
        col("partitionKey").alias("eventhub_partition_key"),
        # Bronze ingestion timestamp
        current_timestamp().alias("bronze_ingestion_time")
    )

print("JSON parsed successfully")
print("\n Bronze DataFrame Schema:")
df_bronze.printSchema()

## 5. Write to Bronze Delta Table

**Features:**
- **Format:** Delta Lake (ACID transactions)
- **Mode:** Append (streaming)
- **Checkpoint:** Fault-tolerant recovery
- **Schema Evolution:** Enabled

In [0]:
# Define paths
checkpoint_path = "YOUR_CHECKPOINT_PATH_HERE" # Replace with your actual checkpoint path, e.g., "dbfs:/fraud_lakehouse_workspace/checkpoints/bronze_transactions"
bronze_table_name = "fraud_lakehouse_workspace.default.bronze_transactions"

print("Writing to Bronze Delta Lake...")
print(f"Table: {bronze_table_name}")
print(f"Checkpoint: {checkpoint_path}")

# Clear old checkpoint if exists (optional - for fresh starts)
try:
    dbutils.fs.rm(checkpoint_path, recurse=True)
    print("Old checkpoint cleared")
except:
    pass

# Start streaming write to Delta table
bronze_query = df_bronze \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .option("mergeSchema", "true") \
    .toTable(bronze_table_name)

print("\nBronze streaming ACTIVE!")
print(f"Stream ID: {bronze_query.id}")

## 6. Monitor Stream Status

In [0]:
import time

# Wait for initialization
time.sleep(5)

# Check stream status
print("Stream Status:")
print(f"  Active: {bronze_query.isActive}")
print(f"  Status: {bronze_query.status}")

# Check progress
if bronze_query.lastProgress:
    print(f"\nLast Progress:")
    print(f"  Input Rows: {bronze_query.lastProgress.get('numInputRows', 0)}")
    print(f"  Processed: {bronze_query.lastProgress.get('processedRowsPerSecond', 0)}/sec")
else:
    print("\nWaiting for first batch...")

## 7. Verify Data in Bronze Table

In [0]:
# Read from Bronze Delta table
try:
    bronze_df = spark.read.format("delta").table("fraud_lakehouse_workspace.default.bronze_transactions")

    count = bronze_df.count()
    print(f"Bronze Table Records: {count}")

    if count > 0:
        print("\nSUCCESS! Data is flowing!")
        print("\nSample transactions:")
        bronze_df.select("Time", "Amount", "Class", "bronze_ingestion_time").show(10)

        # Check fraud vs legit count
        fraud_count = bronze_df.filter(col("Class") == 1).count()
        legit_count = bronze_df.filter(col("Class") == 0).count()
        print(f"\nFraud: {fraud_count} | Legit: {legit_count}")

        # Show latest enqueued time
        latest_event = bronze_df.select("eventhub_enqueued_time").orderBy(col("eventhub_enqueued_time").desc()).first()[0]
        print(f"  Latest Event Enqueued: {latest_event}")
    else:
        print("\n‚è≥ No data yet - run simulator and wait ~30 seconds")
except Exception as e:
    print(f"Verification failed: {e}")
    raise

## Summary

**Bronze Layer Status:**
- ‚úÖ Streaming from Event Hubs
- ‚úÖ Writing to Delta Lake
- ‚úÖ Checkpoint enabled (fault-tolerant)
- ‚úÖ Schema evolution enabled

**Next Step:** Run `04_silver_layer` to clean and transform data