# Observability Features Demo

This notebook demonstrates the production-ready observability features:
- **Rate Limiting** - Control request throughput with token bucket algorithm
- **OpenTelemetry Tracing** - Distributed tracing for all operations
- **AlertManager** - Critical error alerting for production monitoring

## Prerequisites
- ScyllaDB running on localhost:9042

## Setup and Imports

In [1]:
import asyncio
import time
from cassandra.cluster import Cluster
from vertector_scylladbstore import (
    AsyncScyllaDBStore,
    RateLimitExceeded,
    AlertSeverity,
)

print("âœ“ Imports successful")

âœ“ Imports successful


## 1. Rate Limiting Demo

Test the token bucket rate limiter with configurable requests per second and burst capacity.

In [2]:
async def test_rate_limiting():
    """Demonstrate rate limiting functionality."""
    
    # Create cluster and session
    cluster = Cluster(["127.0.0.1"])
    session = cluster.connect()
    
    print("Test 1: Rate Limiting Configuration")
    print("=" * 60)
    
    # Create store with aggressive rate limiting for testing
    store = AsyncScyllaDBStore(
        session=session,
        keyspace="observability_demo",
        enable_rate_limiting=True,
        rate_limit_config={
            "requests_per_second": 5,  # Very low for demo
            "burst_size": 3
        },
        enable_circuit_breaker=False
    )
    await store.setup()
    
    print(f"âœ“ Store created with rate limiting:")
    print(f"  - Requests per second: 5")
    print(f"  - Burst capacity: 3")
    print()
    
    # Test burst capacity
    print("Test 2: Burst Capacity (should succeed)")
    print("=" * 60)
    
    burst_success = 0
    for i in range(3):
        try:
            await store.aput(
                namespace=("test", "burst"),
                key=f"item_{i}",
                value={"burst_test": i}
            )
            burst_success += 1
            print(f"  âœ“ Burst request {i+1}: Success")
        except RateLimitExceeded as e:
            print(f"  âœ— Burst request {i+1}: Rate limited - {e}")
    
    print(f"\nBurst test: {burst_success}/3 requests succeeded")
    print()
    
    # Test rate limit exceeded
    print("Test 3: Rate Limit Exceeded (should fail)")
    print("=" * 60)
    
    try:
        # This should fail - burst exhausted
        await store.aput(
            namespace=("test", "overflow"),
            key="item_overflow",
            value={"overflow": True}
        )
        print("  âœ— Request succeeded (unexpected!)")
    except RateLimitExceeded as e:
        print(f"  âœ“ Rate limit exceeded as expected")
        print(f"  Message: {e}")
    
    print()
    
    # Test rate limiter recovery
    print("Test 4: Rate Limiter Recovery")
    print("=" * 60)
    print("  Waiting 1 second for token refill...")
    await asyncio.sleep(1.0)
    
    try:
        await store.aput(
            namespace=("test", "recovery"),
            key="item_recovery",
            value={"recovered": True}
        )
        print("  âœ“ Request succeeded after recovery")
    except RateLimitExceeded as e:
        print(f"  âœ— Still rate limited: {e}")
    
    print()
    
    # Cleanup
    cluster.shutdown()
    print("âœ… Rate limiting tests complete!")
    print()

await test_rate_limiting()

No embedding dimensions configured - semantic search will fail. Configure index_config with 'dims' and 'embed'.


Test 1: Rate Limiting Configuration
âœ“ Store created with rate limiting:
  - Requests per second: 5
  - Burst capacity: 3

Test 2: Burst Capacity (should succeed)
  âœ“ Burst request 1: Success
  âœ“ Burst request 2: Success
  âœ“ Burst request 3: Success

Burst test: 3/3 requests succeeded

Test 3: Rate Limit Exceeded (should fail)
  âœ“ Rate limit exceeded as expected
  Message: PUT operation rate limit exceeded

Test 4: Rate Limiter Recovery
  Waiting 1 second for token refill...
  âœ“ Request succeeded after recovery

âœ… Rate limiting tests complete!



## 2. OpenTelemetry Tracing Demo

Test distributed tracing with span creation and attributes.

In [None]:
async def test_tracing():
    """Demonstrate OpenTelemetry tracing functionality."""
    
    # Create cluster and session
    cluster = Cluster(["127.0.0.1"])
    session = cluster.connect()
    
    print("Test 1: Tracing Configuration")
    print("=" * 60)
    
    # Create store with tracing enabled
    store = AsyncScyllaDBStore(
        session=session,
        keyspace="observability_demo",
        enable_tracing=True,
        enable_circuit_breaker=False
    )
    await store.setup()
    
    print(f"âœ“ Store created with OpenTelemetry tracing enabled")
    print(f"  Service name: scylladb_store_observability_demo")
    print()
    
    # Test traced operations
    print("Test 2: Traced CRUD Operations")
    print("=" * 60)
    
    # PUT operation (will create span)
    print("Executing traced PUT operation...")
    await store.aput(
        namespace=("traced", "users"),
        key="user_001",
        value={
            "name": "Alice",
            "email": "alice@example.com",
            "traced": True
        }
    )
    print("  âœ“ PUT operation traced")
    
    # GET operation (will create span)
    print("Executing traced GET operation...")
    result = await store.aget(("traced", "users"), "user_001")
    print(f"  âœ“ GET operation traced")
    print(f"  Retrieved: {result.value['name']}")
    
    # SEARCH operation (will create span)
    print("Executing traced SEARCH operation...")
    results = await store.asearch(("traced", "users"), limit=10)
    print(f"  âœ“ SEARCH operation traced")
    print(f"  Found: {len(results)} items")
    
    # DELETE operation (will create span)
    print("Executing traced DELETE operation...")
    await store.adelete(("traced", "users"), "user_001")
    print(f"  âœ“ DELETE operation traced")
    
    print()
    print("ðŸ“Š Trace Information:")
    print("  All operations above generated OpenTelemetry spans with:")
    print("  - Unique trace IDs and span IDs")
    print("  - Operation names (aput, aget, asearch, adelete)")
    print("  - Attributes (namespace, key, query parameters)")
    print("  - Timestamps and duration")
    print()
    print("  Note: Spans are exported to console by default.")
    print("  In production, configure OTLP exporter for Jaeger/Zipkin.")
    
    print()
    
    # Cleanup
    cluster.shutdown()
    print("âœ… Tracing tests complete!")
    print()

await test_tracing()

No embedding dimensions configured - semantic search will fail. Configure index_config with 'dims' and 'embed'.


Test 1: Tracing Configuration
âœ“ Store created with OpenTelemetry tracing enabled
  Service name: scylladb_store_observability_demo

Test 2: Traced CRUD Operations
Executing traced PUT operation...
  âœ“ PUT operation traced
Executing traced GET operation...
  âœ“ GET operation traced
  Retrieved: Alice
Executing traced SEARCH operation...
  âœ“ SEARCH operation traced
  Found: 1 items
Executing traced DELETE operation...
  âœ“ DELETE operation traced

ðŸ“Š Trace Information:
  All operations above generated OpenTelemetry spans with:
  - Unique trace IDs and span IDs
  - Operation names (aput, aget, asearch, adelete)
  - Attributes (namespace, key, query parameters)
  - Timestamps and duration

  Note: Spans are exported to console by default.
  In production, configure OTLP exporter for Jaeger/Zipkin.

âœ… Tracing tests complete!



{
    "name": "aput",
    "context": {
        "trace_id": "0x42a9ee73aa15b5aaa9a6f2399bf66ead",
        "span_id": "0x1a0eaa29eb913da6",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2025-10-06T02:16:00.796984Z",
    "end_time": "2025-10-06T02:16:00.826857Z",
    "status": {
        "status_code": "OK"
    },
    "attributes": {
        "namespace": "('traced', 'users')",
        "key": "user_001"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "scylladb_store_observability_demo"
        },
        "schema_url": ""
    }
}
{
    "name": "aget",
    "context": {
        "trace_id": "0x1a67d860376675331666085006008f21",
        "span_id": "0xd3fda72e4f24c66d",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2025-10-06T02:16:00.827008Z",
    "end_time": "2025-10-06T02:16:00.828995Z",
    "status": {
 

## 3. AlertManager Demo

Test critical error alerting for production monitoring.

In [4]:
async def test_alerting():
    """Demonstrate AlertManager functionality."""
    
    # Create cluster and session
    cluster = Cluster(["127.0.0.1"])
    session = cluster.connect()
    
    print("Test 1: AlertManager Configuration")
    print("=" * 60)
    
    # Create store with alerting enabled
    store = AsyncScyllaDBStore(
        session=session,
        keyspace="observability_demo",
        enable_alerting=True,
        enable_circuit_breaker=True,
        circuit_breaker_config={
            "failure_threshold": 3,  # Low threshold for demo
            "timeout_seconds": 5.0
        }
    )
    await store.setup()
    
    print(f"âœ“ Store created with AlertManager enabled")
    print(f"  Circuit breaker: Enabled (threshold: 3 failures)")
    print()
    
    # Test normal operations (no alerts)
    print("Test 2: Normal Operations (no alerts)")
    print("=" * 60)
    
    await store.aput(
        namespace=("alerts", "test"),
        key="normal_op",
        value={"status": "healthy"}
    )
    print("  âœ“ Normal PUT operation completed (no alerts)")
    
    result = await store.aget(("alerts", "test"), "normal_op")
    print(f"  âœ“ Normal GET operation completed (no alerts)")
    
    await store.adelete(("alerts", "test"), "normal_op")
    print(f"  âœ“ Normal DELETE operation completed (no alerts)")
    
    print()
    
    # Check alert manager state
    print("Test 3: AlertManager State")
    print("=" * 60)
    
    if store.alert_manager:
        recent_alerts = store.alert_manager.get_recent_alerts(limit=5)
        print(f"  Recent alerts: {len(recent_alerts)}")
        
        if recent_alerts:
            print("\n  Alert History:")
            for i, alert in enumerate(recent_alerts, 1):
                print(f"    {i}. [{alert['severity']}] {alert['message']}")
                print(f"       Time: {alert['timestamp']}")
        else:
            print("  âœ“ No alerts triggered (system healthy)")
    else:
        print("  Alert manager not initialized")
    
    print()
    
    # Show when alerts would be triggered
    print("Test 4: Alert Triggers (Informational)")
    print("=" * 60)
    print("  Alerts are triggered in these scenarios:")
    print()
    print("  1. Circuit Breaker OPEN:")
    print("     - Severity: CRITICAL")
    print("     - Trigger: 3+ consecutive failures")
    print("     - Message: 'Circuit breaker OPEN: failure threshold exceeded'")
    print()
    print("  2. Cluster Unreachable:")
    print("     - Severity: CRITICAL")
    print("     - Trigger: NoHostAvailable exception")
    print("     - Message: 'ScyllaDB cluster unreachable: No hosts available'")
    print()
    print("  3. Data Inconsistency:")
    print("     - Severity: CRITICAL")
    print("     - Trigger: Qdrant vector sync fails after retries")
    print("     - Message: 'Data inconsistency: Qdrant sync failed after ScyllaDB write'")
    print()
    print("  4. Batch Data Inconsistency:")
    print("     - Severity: CRITICAL")
    print("     - Trigger: Batch Qdrant sync fails")
    print("     - Message: 'Batch data inconsistency: N Qdrant syncs failed'")
    
    print()
    
    # Cleanup
    cluster.shutdown()
    print("âœ… AlertManager tests complete!")
    print()

await test_alerting()

No embedding dimensions configured - semantic search will fail. Configure index_config with 'dims' and 'embed'.


Test 1: AlertManager Configuration
âœ“ Store created with AlertManager enabled
  Circuit breaker: Enabled (threshold: 3 failures)

Test 2: Normal Operations (no alerts)
  âœ“ Normal PUT operation completed (no alerts)
  âœ“ Normal GET operation completed (no alerts)
  âœ“ Normal DELETE operation completed (no alerts)

Test 3: AlertManager State
  Recent alerts: 0
  âœ“ No alerts triggered (system healthy)

Test 4: Alert Triggers (Informational)
  Alerts are triggered in these scenarios:

  1. Circuit Breaker OPEN:
     - Severity: CRITICAL
     - Trigger: 3+ consecutive failures
     - Message: 'Circuit breaker OPEN: failure threshold exceeded'

  2. Cluster Unreachable:
     - Severity: CRITICAL
     - Trigger: NoHostAvailable exception
     - Message: 'ScyllaDB cluster unreachable: No hosts available'

  3. Data Inconsistency:
     - Severity: CRITICAL
     - Trigger: Qdrant vector sync fails after retries
     - Message: 'Data inconsistency: Qdrant sync failed after ScyllaDB write'


## 4. Combined Features Demo

Test all three observability features working together.

In [5]:
async def test_combined_features():
    """Demonstrate all observability features working together."""
    
    # Create cluster and session
    cluster = Cluster(["127.0.0.1"])
    session = cluster.connect()
    
    print("Combined Observability Features Test")
    print("=" * 60)
    
    # Create store with ALL features enabled
    store = AsyncScyllaDBStore(
        session=session,
        keyspace="observability_demo",
        enable_rate_limiting=True,
        rate_limit_config={
            "requests_per_second": 100,
            "burst_size": 20
        },
        enable_tracing=True,
        enable_alerting=True,
        enable_circuit_breaker=True,
        circuit_breaker_config={
            "failure_threshold": 5,
            "timeout_seconds": 60.0
        }
    )
    await store.setup()
    
    print("âœ“ Production-ready store created with:")
    print("  - Rate Limiting: 100 req/s, burst 20")
    print("  - OpenTelemetry Tracing: Enabled")
    print("  - AlertManager: Enabled")
    print("  - Circuit Breaker: 5 failures, 60s timeout")
    print()
    
    # Perform operations
    print("Executing production-ready operations...")
    print()
    
    # Each operation has:
    # 1. Rate limit check
    # 2. Tracing span
    # 3. Alert monitoring
    # 4. Circuit breaker protection
    
    operations = [
        ("PUT", lambda: store.aput(("prod", "user_001"), "data", {"name": "Alice"})),
        ("GET", lambda: store.aget(("prod", "user_001"), "data")),
        ("SEARCH", lambda: store.asearch(("prod",), limit=10)),
        ("DELETE", lambda: store.adelete(("prod", "user_001"), "data")),
    ]
    
    for op_name, op_func in operations:
        start = time.time()
        try:
            await op_func()
            elapsed = (time.time() - start) * 1000
            print(f"  âœ“ {op_name:8} - Success ({elapsed:.2f}ms)")
            print(f"           Rate limited: âœ“")
            print(f"           Traced: âœ“")
            print(f"           Monitored: âœ“")
        except RateLimitExceeded:
            print(f"  âœ— {op_name:8} - Rate limited")
        except Exception as e:
            print(f"  âœ— {op_name:8} - Error: {e}")
    
    print()
    
    # Get metrics
    print("Performance Metrics:")
    print("=" * 60)
    stats = store.metrics.get_stats()
    print(f"  Total operations: {stats['total_queries']}")
    print(f"  Total errors: {stats['total_errors']}")
    print(f"  Error rate: {stats['error_rate']:.2%}")
    print(f"  Avg latency: {stats['avg_latency_ms']:.2f}ms")
    
    print()
    
    # Cleanup
    cluster.shutdown()
    print("âœ… All observability features working together!")
    print()

await test_combined_features()

No embedding dimensions configured - semantic search will fail. Configure index_config with 'dims' and 'embed'.


Combined Observability Features Test
âœ“ Production-ready store created with:
  - Rate Limiting: 100 req/s, burst 20
  - OpenTelemetry Tracing: Enabled
  - AlertManager: Enabled
  - Circuit Breaker: 5 failures, 60s timeout

Executing production-ready operations...

  âœ“ PUT      - Success (31.55ms)
           Rate limited: âœ“
           Traced: âœ“
           Monitored: âœ“
  âœ“ GET      - Success (1.98ms)
           Rate limited: âœ“
           Traced: âœ“
           Monitored: âœ“
  âœ“ SEARCH   - Success (5.15ms)
           Rate limited: âœ“
           Traced: âœ“
           Monitored: âœ“
  âœ“ DELETE   - Success (1.45ms)
           Rate limited: âœ“
           Traced: âœ“
           Monitored: âœ“

Performance Metrics:
  Total operations: 3
  Total errors: 0
  Error rate: 0.00%
  Avg latency: 11.33ms

âœ… All observability features working together!



## 5. Production Configuration Example

Example configuration for production deployment.

In [6]:
# Production configuration example
production_config = """
# Production-Ready AsyncScyllaDBStore Configuration

from cassandra.cluster import Cluster
from vertector_scylladbstore import AsyncScyllaDBStore

# Create cluster
cluster = Cluster(
    contact_points=["scylla1.prod.com", "scylla2.prod.com", "scylla3.prod.com"],
    port=9042
)
session = cluster.connect()

# Create production store
store = AsyncScyllaDBStore(
    session=session,
    keyspace="production",
    
    # Rate Limiting
    enable_rate_limiting=True,
    rate_limit_config={
        "requests_per_second": 1000,  # Adjust based on cluster capacity
        "burst_size": 100             # Allow traffic spikes
    },
    
    # OpenTelemetry Tracing
    enable_tracing=True,              # Export to Jaeger/Zipkin
    
    # AlertManager
    enable_alerting=True,             # Critical error alerts
    
    # Circuit Breaker
    enable_circuit_breaker=True,
    circuit_breaker_config={
        "failure_threshold": 5,       # Open after 5 failures
        "success_threshold": 2,       # Close after 2 successes
        "timeout_seconds": 60.0       # Wait 60s before retry
    }
)

await store.setup()
"""

print("Production Configuration Example:")
print("=" * 60)
print(production_config)
print()
print("Key Benefits:")
print("  âœ“ Rate limiting prevents cluster overload")
print("  âœ“ Tracing enables debugging and performance analysis")
print("  âœ“ Alerts notify on-call engineers of critical issues")
print("  âœ“ Circuit breaker prevents cascade failures")
print()
print("Monitoring Integration:")
print("  - Export traces to Jaeger: Configure OTLP_ENDPOINT")
print("  - Send alerts to PagerDuty: Implement custom AlertManager handler")
print("  - View metrics in Prometheus: Expose metrics endpoint")

Production Configuration Example:

# Production-Ready AsyncScyllaDBStore Configuration

from cassandra.cluster import Cluster
from vertector_scylladbstore import AsyncScyllaDBStore

# Create cluster
cluster = Cluster(
    contact_points=["scylla1.prod.com", "scylla2.prod.com", "scylla3.prod.com"],
    port=9042
)
session = cluster.connect()

# Create production store
store = AsyncScyllaDBStore(
    session=session,
    keyspace="production",

    # Rate Limiting
    enable_rate_limiting=True,
    rate_limit_config={
        "requests_per_second": 1000,  # Adjust based on cluster capacity
        "burst_size": 100             # Allow traffic spikes
    },

    # OpenTelemetry Tracing
    enable_tracing=True,              # Export to Jaeger/Zipkin

    # AlertManager
    enable_alerting=True,             # Critical error alerts

    # Circuit Breaker
    enable_circuit_breaker=True,
    circuit_breaker_config={
        "failure_threshold": 5,       # Open after 5 failures
        "succes

## Summary

This notebook demonstrated all three production observability features:

### âœ… Rate Limiting
- Token bucket algorithm with configurable rate and burst
- Protects cluster from overload
- Raises `RateLimitExceeded` when limit hit

### âœ… OpenTelemetry Tracing
- Distributed tracing for all CRUD operations
- Captures trace IDs, span IDs, and operation metadata
- Integrates with Jaeger, Zipkin, and other APM tools

### âœ… AlertManager
- Critical error alerting for production monitoring
- Triggers on circuit breaker open, cluster down, data inconsistency
- Integrates with PagerDuty, Slack, email, etc.

### Production Ready
All features work together seamlessly to provide enterprise-grade observability for your ScyllaDB store!