# Module 11: Advanced Features - Redis Stack

## 🎯 Interactive Lab: RedisJSON, RediSearch & Streams

**Duration:** 60 minutes  
**Level:** Advanced  

Explore Redis Stack capabilities:
- 📄 **RedisJSON**: Native JSON document storage
- 🔍 **RediSearch**: Full-text and vector search
- 📊 **Streams**: Event streaming and messaging
- 🎲 **Probabilistic**: Bloom filters, HyperLogLog

---


## Part 1: Setup


In [None]:
# Install Redis Stack support
!pip install -q redis[json] pandas matplotlib

import redis
from redis.commands.json.path import Path
import json as pyjson
import time

print('✅ Packages installed!')


### Connect to Redis


In [None]:
# Connect to local Redis (with Stack support)
r = redis.Redis(
    host='localhost',
    port=6379,
    decode_responses=True
)

# Verify connection
r.ping()
print('✅ Connected to Redis')

# Check if JSON module is available
try:
    modules = r.module_list()
    has_json = any(m[b'name'] == b'ReJSON' for m in modules)
    print(f'   RedisJSON: {"✅" if has_json else "❌"}')
except:
    print('   Note: Module list check not available')


---

## Part 2: RedisJSON - Native JSON Storage

### Why RedisJSON?

**Traditional Redis:**
```python
# Requires serialization
r.set('user:1', json.dumps({"name": "Alice"}))
data = json.loads(r.get('user:1'))
```

**RedisJSON:**
```python
# Native JSON operations
r.json().set('user:1', '$', {"name": "Alice"})
name = r.json().get('user:1', '$.name')
```


In [None]:
# Create complex JSON document
product = {
    "id": "P001",
    "name": "Redis in Action Book",
    "price": 39.99,
    "category": "books",
    "tags": ["database", "redis", "programming"],
    "inventory": {
        "warehouse_a": 50,
        "warehouse_b": 30
    },
    "reviews": [
        {"user": "alice", "rating": 5, "comment": "Excellent!"},
        {"user": "bob", "rating": 4, "comment": "Very helpful"}
    ]
}

# Store with JSON.SET
try:
    r.json().set('product:P001', '$', product)
    print('✅ JSON document stored')
    
    # Get entire document
    doc = r.json().get('product:P001')
    print(f'   Product: {doc["name"]}')
    print(f'   Price: ${doc["price"]}')
    print(f'   Tags: {doc["tags"]}')
except redis.ResponseError as e:
    print(f'⚠️  RedisJSON not available: {e}')
    print('   Using standard Redis with JSON serialization...')
    r.set('product:P001', pyjson.dumps(product))
    doc = pyjson.loads(r.get('product:P001'))
    print(f'✅ Stored with standard Redis')


In [None]:
# Update specific fields
try:
    # Update price
    r.json().set('product:P001', '$.price', 34.99)
    
    # Increment warehouse inventory
    r.json().numincrby('product:P001', '$.inventory.warehouse_a', 10)
    
    # Append new review
    r.json().arrappend('product:P001', '$.reviews', {
        "user": "charlie",
        "rating": 5,
        "comment": "Must read!"
    })
    
    # Get updated values
    price = r.json().get('product:P001', '$.price')[0]
    inventory_a = r.json().get('product:P001', '$.inventory.warehouse_a')[0]
    review_count = r.json().arrlen('product:P001', '$.reviews')[0]
    
    print('✅ Updates applied:')
    print(f'   New price: ${price}')
    print(f'   Warehouse A: {inventory_a} units')
    print(f'   Total reviews: {review_count}')
    
except redis.ResponseError:
    print('⚠️  Using standard Redis (no RedisJSON operations)')


---

## Part 3: Redis Streams - Event Streaming

### What are Redis Streams?

An append-only log data structure for:
- 📨 Message queuing
- 📊 Event sourcing
- 🔄 Real-time processing
- 📝 Activity feeds

**Key Features:**
- Consumer groups (like Kafka)
- Message acknowledgment
- Automatic ID generation
- Range queries


In [None]:
# Create event stream
stream_key = 'events:orders'

# Add events to stream
events = [
    {'order_id': 'O001', 'customer': 'alice', 'amount': 99.99, 'status': 'created'},
    {'order_id': 'O002', 'customer': 'bob', 'amount': 149.50, 'status': 'created'},
    {'order_id': 'O001', 'customer': 'alice', 'amount': 99.99, 'status': 'paid'},
    {'order_id': 'O003', 'customer': 'charlie', 'amount': 75.00, 'status': 'created'},
    {'order_id': 'O002', 'customer': 'bob', 'amount': 149.50, 'status': 'paid'},
]

# Add events with XADD
event_ids = []
for event in events:
    event_id = r.xadd(stream_key, event)
    event_ids.append(event_id)

print(f'✅ Added {len(events)} events to stream')
print(f'   Stream: {stream_key}')
print(f'   First event ID: {event_ids[0]}')
print(f'   Last event ID: {event_ids[-1]}')


In [None]:
# Read from stream
print('📖 Reading events from stream...')
print()

# Read all events
events = r.xrange(stream_key, '-', '+')

for event_id, event_data in events:
    print(f'Event ID: {event_id}')
    print(f'  Order: {event_data["order_id"]}')
    print(f'  Customer: {event_data["customer"]}')
    print(f'  Amount: ${event_data["amount"]}')
    print(f'  Status: {event_data["status"]}')
    print()

print(f'✅ Total events: {len(events)}')


In [None]:
# Stream statistics
info = r.xinfo_stream(stream_key)

print('📊 Stream Statistics:')
print(f'   Length: {info["length"]} events')
print(f'   Consumer groups: {info["groups"]}')
print(f'   First entry: {info["first-entry"][0]}')
print(f'   Last entry: {info["last-entry"][0]}')


---

## Part 4: Probabilistic Data Structures

### HyperLogLog - Cardinality Estimation

Count unique items with **minimal memory** (12KB for billions of items).

**Use Cases:**
- Unique visitors count
- Distinct products viewed
- Unique IP addresses


In [None]:
# Simulate unique visitors
import random

# Add 10,000 page views (with duplicates)
user_ids = [f'user_{random.randint(1, 1000)}' for _ in range(10000)]

for user_id in user_ids:
    r.pfadd('page:home:visitors', user_id)

# Count unique visitors
unique_count = r.pfcount('page:home:visitors')

print('📊 HyperLogLog Results:')
print(f'   Total page views: {len(user_ids):,}')
print(f'   Unique visitors: {unique_count:,}')
print(f'   Actual unique: {len(set(user_ids)):,}')
print(f'   Error rate: {abs(unique_count - len(set(user_ids))) / len(set(user_ids)) * 100:.2f}%')
print(f'   Memory used: ~12 KB')


---

## Part 5: Performance Benchmarks

Let's compare traditional Redis vs Redis Stack features:


In [None]:
import statistics

def benchmark(name, operations):
    """Run benchmark and return stats"""
    times = []
    for _ in range(100):
        start = time.perf_counter()
        operations()
        times.append((time.perf_counter() - start) * 1000)
    
    return {
        'name': name,
        'avg': statistics.mean(times),
        'median': statistics.median(times),
        'p95': sorted(times)[94]
    }

# Benchmark 1: Traditional JSON
def traditional_json():
    data = {'name': 'Alice', 'age': 30}
    r.set('bench:trad', pyjson.dumps(data))
    result = pyjson.loads(r.get('bench:trad'))

# Benchmark 2: Stream append
def stream_append():
    r.xadd('bench:stream', {'msg': 'test'})

# Benchmark 3: HyperLogLog
def hyperloglog():
    r.pfadd('bench:hll', f'user_{random.randint(1, 1000)}')

results = [
    benchmark('Traditional JSON', traditional_json),
    benchmark('Stream Append', stream_append),
    benchmark('HyperLogLog Add', hyperloglog)
]

print('⚡ Performance Benchmarks (100 iterations):')
print()
for r in results:
    print(f'{r["name"]:20} | Avg: {r["avg"]:.2f}ms | Median: {r["median"]:.2f}ms | P95: {r["p95"]:.2f}ms')


---

## Part 6: Cleanup


In [None]:
# Delete all test keys
keys_to_delete = [
    'product:P001',
    'events:orders',
    'page:home:visitors',
    'bench:*'
]

deleted = 0
for pattern in keys_to_delete:
    keys = r.keys(pattern)
    if keys:
        deleted += r.delete(*keys)

print(f'✅ Cleanup complete: {deleted} keys deleted')


---

## 🎯 Key Takeaways

### ✅ What You Learned

1. **RedisJSON**
   - Native JSON storage without serialization
   - Path-based queries and updates
   - Array manipulation operations

2. **Redis Streams**
   - Append-only event log
   - Message queuing with consumer groups
   - Real-time event processing

3. **Probabilistic Structures**
   - HyperLogLog for cardinality estimation
   - Memory-efficient unique counting
   - < 1% error rate with 12KB memory

### 🚀 Use Cases

- **RedisJSON**: API responses, user profiles, product catalogs
- **Streams**: Event sourcing, activity feeds, real-time analytics
- **HyperLogLog**: Unique visitor counts, A/B testing metrics

### 📚 Next Steps

- Explore RediSearch for full-text search
- Try RedisTimeSeries for time-series data
- Implement Bloom filters for membership testing
- Build a real-time analytics pipeline

---

## 🎉 Congratulations!

You've mastered Redis Stack features and are ready for production!
