# Real-Time Cryptocurrency Streaming Analytics - LIVE DEMO VERSION

## Serverless-Compatible Streaming for Live Demonstrations

This notebook is optimized for **live demos** on Databricks Serverless clusters.
Key features:

- Uses `availableNow` trigger (Serverless-compatible)
- Processes all available data in one run, then completes
- Provides real-time console output during processing
- Can be re-run to process newly arrived data
- Works on both Serverless and Classic clusters

### Demo Instructions:
1. Start `crypto_data_producer` notebook first (set `RUN_DURATION_MINUTES = 10` or more)
2. Run this notebook - it will process all available data
3. Re-run the streaming cell to process new data as it arrives
4. Open a Databricks SQL Dashboard to visualize results

## Setup & Configuration

In [0]:
# Configuration - must match producer settings
CATALOG = "takamol_demo"
SCHEMA = "crypto_streaming"
VOLUME_NAME = "crypto_landing"

# Paths
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME_NAME}"
LANDING_PATH = f"{VOLUME_PATH}/trades"

# Session ID for unique checkpoints - use fixed ID for demo continuity
import uuid
from datetime import datetime
SESSION_ID = "live_demo_" + datetime.now().strftime("%Y%m%d")
CHECKPOINT_BASE = f"{VOLUME_PATH}/checkpoints/{SESSION_ID}"

# Streaming configuration (using default micro-batch for Serverless compatibility)

print("=" * 70)
print("LIVE DEMO: CRYPTOCURRENCY STREAMING ANALYTICS")
print("=" * 70)
print(f"\nConfiguration:")
print(f"  Catalog: {CATALOG}")
print(f"  Schema: {SCHEMA}")
print(f"  Landing Path: {LANDING_PATH}")
print(f"  Checkpoint Base: {CHECKPOINT_BASE}")

LIVE DEMO: CRYPTOCURRENCY STREAMING ANALYTICS

Configuration:
  Catalog: takamol_demo
  Schema: crypto_streaming
  Landing Path: /Volumes/takamol_demo/crypto_streaming/crypto_landing/trades
  Checkpoint Base: /Volumes/takamol_demo/crypto_streaming/crypto_landing/checkpoints/live_demo_20260112


## Verify Data Producer is Running

In [0]:
# Check if data is available and monitor file count
try:
    files = dbutils.fs.ls(LANDING_PATH)
    file_count = len(files)
    print(f"\n{'='*60}")
    print(f"DATA AVAILABILITY CHECK")
    print(f"{'='*60}")
    print(f"  Files in landing zone: {file_count}")

    if file_count > 0:
        files_sorted = sorted(files, key=lambda x: x.name, reverse=True)
        print(f"\n  Most recent files:")
        for f in files_sorted[:3]:
            print(f"    {f.name} ({f.size / 1024:.1f} KB)")
        print(f"\n  Status: READY TO STREAM")
    else:
        print(f"\n  Status: WAITING FOR DATA")
        print(f"  Please start the crypto_data_producer notebook first!")

except Exception as e:
    print(f"\n  Error: {e}")
    print(f"\n  Please run the crypto_data_producer notebook first to generate data.")


DATA AVAILABILITY CHECK
  Files in landing zone: 83

  Most recent files:
    trades_20260112_062422_0e2df7f6.json (0.3 KB)
    trades_20260112_062352_11a1137f.json (0.3 KB)
    trades_20260112_062345_0e9781bf.json (0.3 KB)

  Status: READY TO STREAM


## Create/Reset Delta Tables

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Save Python's built-in abs before it gets shadowed by PySpark's abs
import builtins
py_abs = builtins.abs

# Ensure schema exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Table names
trades_raw_table = f"{CATALOG}.{SCHEMA}.trades_raw_live"
trades_analytics_table = f"{CATALOG}.{SCHEMA}.trades_analytics_live"
price_alerts_table = f"{CATALOG}.{SCHEMA}.price_alerts_live"

# Reset tables for fresh demo (comment out if you want to preserve data)
print("Creating fresh tables for live demo...")

# Clean up old checkpoints to avoid Delta table ID conflicts
try:
    dbutils.fs.rm(CHECKPOINT_BASE, recurse=True)
    print(f"Cleared old checkpoints from {CHECKPOINT_BASE}")
except:
    pass  # Checkpoints may not exist yet

spark.sql(f"DROP TABLE IF EXISTS {trades_raw_table}")
spark.sql(f"""
    CREATE TABLE {trades_raw_table} (
        event_type STRING,
        event_time BIGINT,
        symbol STRING,
        trade_id BIGINT,
        price DOUBLE,
        quantity DOUBLE,
        buyer_order_id BIGINT,
        seller_order_id BIGINT,
        trade_time BIGINT,
        is_buyer_maker BOOLEAN,
        trade_value_usdt DOUBLE,
        ingestion_time BIGINT,
        producer_id STRING,
        processing_time TIMESTAMP,
        trade_timestamp TIMESTAMP,
        batch_id BIGINT
    )
    COMMENT 'Live demo: Raw cryptocurrency trades'
""")
print(f"  Created: {trades_raw_table}")

spark.sql(f"DROP TABLE IF EXISTS {trades_analytics_table}")
spark.sql(f"""
    CREATE TABLE {trades_analytics_table} (
        window_start TIMESTAMP,
        window_end TIMESTAMP,
        symbol STRING,
        trade_count BIGINT,
        total_volume DOUBLE,
        total_value_usdt DOUBLE,
        vwap DOUBLE,
        avg_price DOUBLE,
        min_price DOUBLE,
        max_price DOUBLE,
        price_range_pct DOUBLE,
        buy_volume DOUBLE,
        sell_volume DOUBLE,
        buy_sell_ratio DOUBLE,
        batch_id BIGINT
    )
    COMMENT 'Live demo: Trade analytics with VWAP'
""")
print(f"  Created: {trades_analytics_table}")

spark.sql(f"DROP TABLE IF EXISTS {price_alerts_table}")
spark.sql(f"""
    CREATE TABLE {price_alerts_table} (
        alert_time TIMESTAMP,
        symbol STRING,
        alert_type STRING,
        current_price DOUBLE,
        previous_price DOUBLE,
        price_change_pct DOUBLE,
        volume_in_window DOUBLE,
        trade_count BIGINT,
        severity STRING,
        message STRING,
        batch_id BIGINT
    )
    COMMENT 'Live demo: Price movement alerts'
""")
print(f"  Created: {price_alerts_table}")

Creating fresh tables for live demo...
Cleared old checkpoints from /Volumes/takamol_demo/crypto_streaming/crypto_landing/checkpoints/live_demo_20260112
  Created: takamol_demo.crypto_streaming.trades_raw_live
  Created: takamol_demo.crypto_streaming.trades_analytics_live
  Created: takamol_demo.crypto_streaming.price_alerts_live


## Define Stream Processing Functions

In [0]:
# Define schema for trade data
trade_schema = StructType([
    StructField("event_type", StringType(), True),
    StructField("event_time", LongType(), True),
    StructField("symbol", StringType(), True),
    StructField("trade_id", LongType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", DoubleType(), True),
    StructField("buyer_order_id", LongType(), True),
    StructField("seller_order_id", LongType(), True),
    StructField("trade_time", LongType(), True),
    StructField("is_buyer_maker", BooleanType(), True),
    StructField("trade_value_usdt", DoubleType(), True),
    StructField("ingestion_time", LongType(), True),
    StructField("producer_id", StringType(), True)
])

# Track statistics globally
batch_stats = {"total_trades": 0, "total_batches": 0, "alerts_generated": 0}

def process_raw_trades_batch(batch_df, batch_id):
    """Process and persist raw trade data with live output."""
    global batch_stats

    num_trades = batch_df.count()
    if num_trades > 0:
        batch_stats["total_trades"] += num_trades
        batch_stats["total_batches"] += 1

        # Enrich with processing metadata
        enriched_df = batch_df \
            .withColumn("processing_time", current_timestamp()) \
            .withColumn("trade_timestamp",
                (col("trade_time") / 1000).cast("timestamp")) \
            .withColumn("batch_id", lit(batch_id).cast("bigint"))

        # Write to Delta table
        enriched_df.write.mode("append").saveAsTable(trades_raw_table)

        # Get symbols and prices for display
        stats = enriched_df.groupBy("symbol").agg(
            count("*").alias("trades"),
            round(avg("price"), 2).alias("avg_price"),
            round(sum("trade_value_usdt"), 2).alias("volume")
        ).collect()

        # Format output for live display
        timestamp = datetime.now().strftime('%H:%M:%S')
        print(f"\n[{timestamp}] Batch {batch_id}: {num_trades} trades ingested")
        for s in stats:
            print(f"  {s.symbol:8} | {s.trades:4} trades | ${s.avg_price:>10,.2f} | Vol: ${s.volume:>12,.2f}")
        print(f"  Running Total: {batch_stats['total_trades']:,} trades in {batch_stats['total_batches']} batches")


def process_analytics_batch(batch_df, batch_id):
    """Calculate analytics for each batch."""
    if batch_df.count() > 0:
        analytics_df = batch_df \
            .groupBy("symbol") \
            .agg(
                count("*").alias("trade_count"),
                sum("quantity").alias("total_volume"),
                sum("trade_value_usdt").alias("total_value_usdt"),
                (sum(col("price") * col("quantity")) / sum("quantity")).alias("vwap"),
                avg("price").alias("avg_price"),
                min("price").alias("min_price"),
                max("price").alias("max_price"),
                sum(when(~col("is_buyer_maker"), col("quantity")).otherwise(0)).alias("buy_volume"),
                sum(when(col("is_buyer_maker"), col("quantity")).otherwise(0)).alias("sell_volume")
            ) \
            .withColumn("price_range_pct",
                round((col("max_price") - col("min_price")) / col("avg_price") * 100, 4)) \
            .withColumn("buy_sell_ratio",
                round(col("buy_volume") / (col("sell_volume") + 0.0001), 4)) \
            .withColumn("window_start", current_timestamp()) \
            .withColumn("window_end", current_timestamp()) \
            .withColumn("batch_id", lit(batch_id).cast("bigint")) \
            .select(
                "window_start", "window_end", "symbol", "trade_count",
                round("total_volume", 6).alias("total_volume"),
                round("total_value_usdt", 2).alias("total_value_usdt"),
                round("vwap", 4).alias("vwap"),
                round("avg_price", 4).alias("avg_price"),
                round("min_price", 4).alias("min_price"),
                round("max_price", 4).alias("max_price"),
                "price_range_pct",
                round("buy_volume", 6).alias("buy_volume"),
                round("sell_volume", 6).alias("sell_volume"),
                "buy_sell_ratio",
                "batch_id"
            )

        analytics_df.write.mode("append").saveAsTable(trades_analytics_table)


# Alert thresholds - sensitive for demo
PRICE_CHANGE_THRESHOLD_PCT = 0.1
VOLUME_SPIKE_THRESHOLD = 2

def process_alerts_batch(batch_df, batch_id):
    """Detect price movements and volume spikes with live output."""
    global batch_stats

    if batch_df.count() == 0:
        return

    metrics_df = batch_df \
        .groupBy("symbol") \
        .agg(
            count("*").alias("trade_count"),
            first("price").alias("first_price"),
            last("price").alias("last_price"),
            min("price").alias("min_price"),
            max("price").alias("max_price"),
            sum("quantity").alias("volume")
        ) \
        .withColumn("price_change_pct",
            round((col("last_price") - col("first_price")) / col("first_price") * 100, 4))

    alerts = []

    for row in metrics_df.collect():
        alert = None

        if py_abs(row.price_change_pct) > PRICE_CHANGE_THRESHOLD_PCT:
            direction = "UP" if row.price_change_pct > 0 else "DOWN"
            severity = "HIGH" if py_abs(row.price_change_pct) > 0.5 else "MEDIUM"
            alert = {
                "alert_time": datetime.now(),
                "symbol": row.symbol,
                "alert_type": f"PRICE_{direction}",
                "current_price": row.last_price,
                "previous_price": row.first_price,
                "price_change_pct": row.price_change_pct,
                "volume_in_window": row.volume,
                "trade_count": row.trade_count,
                "severity": severity,
                "message": f"{row.symbol} price moved {direction} {py_abs(row.price_change_pct):.2f}% "
                          f"(${row.first_price:,.2f} -> ${row.last_price:,.2f})",
                "batch_id": int(batch_id)
            }
        elif row.trade_count > VOLUME_SPIKE_THRESHOLD:
            alert = {
                "alert_time": datetime.now(),
                "symbol": row.symbol,
                "alert_type": "VOLUME_SPIKE",
                "current_price": row.last_price,
                "previous_price": row.first_price,
                "price_change_pct": row.price_change_pct,
                "volume_in_window": row.volume,
                "trade_count": row.trade_count,
                "severity": "MEDIUM",
                "message": f"{row.symbol} volume spike: {row.trade_count} trades",
                "batch_id": int(batch_id)
            }

        if alert:
            alerts.append(alert)
            batch_stats["alerts_generated"] += 1
            severity_icon = "!!" if alert["severity"] == "HIGH" else "!"
            print(f"\n  [{severity_icon} ALERT] {alert['message']}")

    if alerts:
        alerts_df = spark.createDataFrame(alerts)
        alerts_df.write.mode("append").saveAsTable(price_alerts_table)

---
# START STREAMING PROCESSING

The cell below starts all three streaming pipelines using `availableNow` trigger.
This will process all currently available data and then complete.
**Re-run this cell to process new data as it arrives from the producer.**

In [0]:
# Stop any existing streams first
for stream in spark.streams.active:
    print(f"Stopping existing stream: {stream.name}")
    stream.stop()

print("\n" + "=" * 70)
print("STARTING STREAMING PIPELINES (SERVERLESS MODE)")
print("=" * 70)
print(f"\nReading from: {LANDING_PATH}")
print(f"Processing mode: availableNow (processes all available data, then completes)")
print(f"Tip: Re-run this cell to process newly arrived data")
print("-" * 70)

# Create Auto Loader stream
auto_loader_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{CHECKPOINT_BASE}/live_schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .schema(trade_schema)
    .load(LANDING_PATH)
)

# Start Stream 1: Raw Trades Ingestion
print("\nStarting Stream 1: Raw Trades Ingestion...")
raw_query = (
    auto_loader_stream
    .writeStream
    .foreachBatch(process_raw_trades_batch)
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/live_raw")
    .trigger(availableNow=True)
    .queryName("live_raw_trades")
    .start()
)

# Wait a moment for first stream to initialize
import time
time.sleep(3)

# Start Stream 2: Analytics (reads from Delta)
print("Starting Stream 2: Analytics Aggregations...")
analytics_stream = spark.readStream.format("delta").table(trades_raw_table)
analytics_query = (
    analytics_stream
    .writeStream
    .foreachBatch(process_analytics_batch)
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/live_analytics")
    .trigger(availableNow=True)
    .queryName("live_analytics")
    .start()
)

# Start Stream 3: Alerts (reads from Delta)
print("Starting Stream 3: Price Alert Monitoring...")
alerts_stream = spark.readStream.format("delta").table(trades_raw_table)
alerts_query = (
    alerts_stream
    .writeStream
    .foreachBatch(process_alerts_batch)
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/live_alerts")
    .trigger(availableNow=True)
    .queryName("live_alerts")
    .start()
)

print("\n" + "=" * 70)
print("ALL STREAMS STARTED - PROCESSING AVAILABLE DATA")
print("=" * 70)
print(f"\nActive streaming queries:")
for stream in spark.streams.active:
    print(f"  - {stream.name}: {stream.status['message']}")

print("\n" + "-" * 70)
print("PROCESSING OUTPUT (batches will appear below):")
print("-" * 70)

# Wait for all streams to complete processing available data
try:
    raw_query.awaitTermination()
    analytics_query.awaitTermination()
    alerts_query.awaitTermination()
except Exception as e:
    print(f"Stream terminated: {e}")

print("\n" + "=" * 70)
print("ALL STREAMS COMPLETED")
print("=" * 70)
print("Re-run this cell to process new data that has arrived.")


STARTING STREAMING PIPELINES (SERVERLESS MODE)

Reading from: /Volumes/takamol_demo/crypto_streaming/crypto_landing/trades
Processing mode: availableNow (processes all available data, then completes)
Tip: Re-run this cell to process newly arrived data
----------------------------------------------------------------------

Starting Stream 1: Raw Trades Ingestion...
Starting Stream 2: Analytics Aggregations...
Starting Stream 3: Price Alert Monitoring...

ALL STREAMS STARTED - PROCESSING AVAILABLE DATA

Active streaming queries:
  - live_alerts: Initializing sources

----------------------------------------------------------------------
PROCESSING OUTPUT (batches will appear below):
----------------------------------------------------------------------

ALL STREAMS COMPLETED
Re-run this cell to process new data that has arrived.


## Query Live Results

Run these cells while streaming is active to see current data.

In [0]:
# Current statistics
print("=" * 60)
print("LIVE DEMO STATISTICS")
print("=" * 60)

raw_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {trades_raw_table}").collect()[0]['cnt']
analytics_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {trades_analytics_table}").collect()[0]['cnt']
alerts_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {price_alerts_table}").collect()[0]['cnt']

print(f"\nData in Delta Tables:")
print(f"  Raw Trades: {raw_count:,}")
print(f"  Analytics Records: {analytics_count:,}")
print(f"  Price Alerts: {alerts_count:,}")

LIVE DEMO STATISTICS

Data in Delta Tables:
  Raw Trades: 97
  Analytics Records: 5
  Price Alerts: 5


In [0]:
# Recent trades
display(spark.sql(f"""
    SELECT
        trade_timestamp,
        symbol,
        price,
        quantity,
        trade_value_usdt,
        CASE WHEN is_buyer_maker THEN 'SELL' ELSE 'BUY' END as side
    FROM {trades_raw_table}
    ORDER BY trade_timestamp DESC
    LIMIT 20
"""))

trade_timestamp,symbol,price,quantity,trade_value_usdt,side
2026-01-12T06:24:20.993Z,ETHUSDT,3157.21,3.0,9471.63,SELL
2026-01-12T06:23:50.447Z,BTCUSDT,92192.29,5e-05,4.6096145,BUY
2026-01-12T06:23:43.076Z,ETHUSDT,3158.89,0.0316,99.820924,BUY
2026-01-12T06:06:48.579Z,SOLUSDT,142.98,0.04,5.7192,BUY
2026-01-12T06:06:20.419Z,SOLUSDT,142.98,0.07,10.0086,BUY
2026-01-12T06:06:11.270Z,SOLUSDT,142.95,0.229,32.73555,BUY
2026-01-12T05:54:26.878Z,BTCUSDT,91933.94,0.00022,20.2254668,SELL
2026-01-12T05:53:50.676Z,ETHUSDT,3156.87,1.5093,4764.663891,SELL
2026-01-12T05:53:50.676Z,ETHUSDT,3156.87,0.0999,315.371313,SELL
2026-01-12T05:47:58.690Z,XRPUSDT,2.0828,225.0,468.63000000000005,SELL


In [0]:
# Recent alerts
display(spark.sql(f"""
    SELECT
        alert_time,
        symbol,
        alert_type,
        severity,
        message,
        price_change_pct
    FROM {price_alerts_table}
    ORDER BY alert_time DESC
    LIMIT 20
"""))

alert_time,symbol,alert_type,severity,message,price_change_pct
2026-01-12T06:24:58.692Z,SOLUSDT,PRICE_UP,HIGH,SOLUSDT price moved UP 5.01% ($136.16 -> $142.98),5.0088
2026-01-12T06:24:58.692Z,BNBUSDT,VOLUME_SPIKE,MEDIUM,BNBUSDT volume spike: 10 trades,0.0673
2026-01-12T06:24:58.692Z,ETHUSDT,PRICE_UP,HIGH,"ETHUSDT price moved UP 1.89% ($3,098.66 -> $3,157.21)",1.8895
2026-01-12T06:24:58.692Z,XRPUSDT,PRICE_DOWN,MEDIUM,XRPUSDT price moved DOWN 0.41% ($2.09 -> $2.08),-0.4064
2026-01-12T06:24:58.692Z,BTCUSDT,PRICE_UP,HIGH,"BTCUSDT price moved UP 1.71% ($90,637.99 -> $92,192.29)",1.7148


In [0]:
# Market summary
display(spark.sql(f"""
    SELECT
        symbol,
        COUNT(*) as total_trades,
        ROUND(SUM(trade_value_usdt), 2) as total_volume_usdt,
        ROUND(MIN(price), 2) as low,
        ROUND(MAX(price), 2) as high,
        ROUND(AVG(price), 2) as avg_price
    FROM {trades_raw_table}
    GROUP BY symbol
    ORDER BY total_volume_usdt DESC
"""))

symbol,total_trades,total_volume_usdt,low,high,avg_price
SOLUSDT,23,17564.02,136.16,142.98,140.05
ETHUSDT,13,14779.49,3097.7,3168.0,3137.63
BTCUSDT,33,5805.42,90637.99,92254.12,91261.26
XRPUSDT,18,1079.34,2.08,2.09,2.09
BNBUSDT,10,190.26,904.84,908.21,906.29


## Cleanup

In [0]:
# Stop all streams
print("Stopping all streaming queries...")
for stream in spark.streams.active:
    print(f"  Stopping: {stream.name}")
    stream.stop()

print("\nAll streams stopped.")

# Final statistics
raw_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {trades_raw_table}").collect()[0]['cnt']
analytics_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {trades_analytics_table}").collect()[0]['cnt']
alerts_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {price_alerts_table}").collect()[0]['cnt']

print("\n" + "=" * 60)
print("FINAL DEMO STATISTICS")
print("=" * 60)
print(f"  Total Trades Processed: {raw_count:,}")
print(f"  Analytics Records: {analytics_count:,}")
print(f"  Alerts Generated: {alerts_count:,}")
print("=" * 60)

Stopping all streaming queries...

All streams stopped.

FINAL DEMO STATISTICS
  Total Trades Processed: 97
  Analytics Records: 5
  Alerts Generated: 5


---

**Takamol Demo - Live Streaming Analytics**

*For business presentations and live demonstrations*