# Real-Time Fraud Detection Demo (No Kafka)

This notebook simulates real-time transaction ingestion and dashboarding using Delta Lake and Structured Streaming in Databricks.

**Demo Highlights:**
- Simulated real-time data feed
- Live-updating fraud dashboards
- Delta Lake for reliable streaming
- (Optional) Feature engineering & inference

In [None]:
# Step 1: Initialize Delta table for streaming
from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType

schema = StructType() \
    .add("amount", IntegerType()) \
    .add("merchant_code", IntegerType()) \
    .add("channel", StringType()) \
    .add("fraud", IntegerType()) \
    .add("timestamp", TimestampType())

# Create empty base table
spark.sql("DROP TABLE IF EXISTS transaction_stream")
spark.createDataFrame([], schema).write.format("delta").mode("overwrite").saveAsTable("transaction_stream")

## Step 2: Run this code in a separate notebook to simulate real-time data feed
_Keep this running in the background during the demo._

In [None]:
import random
import time
from datetime import datetime
from pyspark.sql import Row

channels = ["online", "instore"]

def generate_transaction():
    return Row(
        amount=random.randint(1, 1000),
        merchant_code=random.randint(0, 4),
        channel=random.choice(channels),
        fraud=int(random.random() < 0.04),
        timestamp=datetime.now()
    )

while True:
    rows = [generate_transaction() for _ in range(10)]
    df = spark.createDataFrame(rows)
    df.write.format("delta").mode("append").saveAsTable("transaction_stream")
    time.sleep(5)

## Step 3: Visualize live data in this notebook
_Open multiple displays to simulate dashboards._

In [None]:
# Real-time fraud counts
df_stream = spark.readStream.format("delta").table("transaction_stream")
df_stream.groupBy("fraud").count().display()

In [None]:
# Fraud rate by channel
df_stream.groupBy("channel", "fraud").count().orderBy("channel").display()

In [None]:
# Real-time histogram of amounts
df_stream.select("amount").display()

## Step 4: Feature Engineering and Model Scoring (Optional)
_Enrich the stream with features and score transactions in real time._

In [None]:
from pyspark.sql.functions import when

# Add a 'high_amount' feature for scoring
df_features = df_stream.withColumn("high_amount", when(df_stream.amount > 500, 1).otherwise(0))
df_features.display()

## Step 5: Model Registration with MLflow
_This part assumes you have already trained and logged a model._

In [None]:
# Example of logging model metrics (after training)
import mlflow

with mlflow.start_run():
    mlflow.log_metric("stream_precision", 0.82)
    mlflow.log_metric("stream_recall", 0.67)
    print("Metrics logged to MLflow.")

## Step 6: Real-Time Model Serving (Optional)
_Invoke a deployed model endpoint to score new transactions._

In [None]:
import requests
import json

# Replace <your-endpoint-url> and <your-token>
endpoint = "https://<your-databricks-workspace>/model/FraudDetectionModel/production/invocations"
headers = {
    "Authorization": f"Bearer <your-token>",
    "Content-Type": "application/json"
}

# Example payload
payload = {
  "dataframe_split": {
    "columns": ["amount", "merchant_code", "high_amount"],
    "data": [[950, 2, 1]]
  }
}

response = requests.post(endpoint, headers=headers, json=payload)
print("Prediction:", response.json())