In [2]:
import redis
import time
from pprint import pprint

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)


## Use Case 1: Order Book Event Streaming

In [3]:
# Producer: Simulate order events
r.xadd("order_book_stream", {"order_id": "1001", "symbol": "AAPL", "price": "198.5", "qty": "100", "action": "buy"})
r.xadd("order_book_stream", {"order_id": "1002", "symbol": "GOOG", "price": "2801.2", "qty": "50", "action": "sell"})


In [4]:
# Consumer: Read order events
entries = r.xread({"order_book_stream": "0"}, count=10)
pprint(entries)


## Use Case 2: Trade Confirmation and Notification Streaming

In [None]:
# Producer: Push confirmation messages
r.xadd("trade_confirm_stream", {"trade_id": "T1001", "user": "user1", "status": "confirmed"})
r.xadd("trade_confirm_stream", {"trade_id": "T1002", "user": "user2", "status": "confirmed"})


In [None]:
# Consumer: Read confirmation messages
entries = r.xread({"trade_confirm_stream": "0"}, count=10)
pprint(entries)


## Use Case 3: Market Data Streaming and Replay

In [None]:
# Producer: Simulate market data ticks
r.xadd("market_data_stream", {"symbol": "NIFTY", "price": "22300.1", "volume": "1000"})
r.xadd("market_data_stream", {"symbol": "BANKNIFTY", "price": "48950.6", "volume": "500"})


In [None]:
# Consumer: Replay last 2 ticks
entries = r.xrevrange("market_data_stream", count=2)
pprint(entries)


## Use Case 4: User Activity Stream for Analytics

In [None]:
# Producer: Stream user activity
r.xadd("user_activity_stream", {"user": "u123", "action": "login", "ts": str(time.time())})
r.xadd("user_activity_stream", {"user": "u123", "action": "place_order", "ts": str(time.time())})


In [None]:
# Consumer: Get all user activity
entries = r.xrange("user_activity_stream")
pprint(entries)


## Use Case 5: Separate Redis Stream for real-time market data feed.

In [1]:
#
# Jupyter Notebook: Real-time Market Data Dissemination with Redis Streams
#
# This notebook demonstrates how to use Redis Streams to build a simple
# real-time market data feed.
#
# Pre-requisites:
# 1. A running Redis instance.
# 2. Python redis-py library installed (`pip install redis`).
#

import redis
import time
import random
import threading

# --- Configuration ---
REDIS_HOST = "localhost"
REDIS_PORT = 6379
STREAM_NAME = "market-data:AAPL"
CONSUMER_GROUP = "ui-dashboard-group"
CONSUMER_NAME_1 = "dashboard-1"
CONSUMER_NAME_2 = "dashboard-2"

# --- Connect to Redis ---
# Ensure decode_responses=True to get strings back from Redis
try:
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    r.ping()
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Could not connect to Redis: {e}")
    # Stop execution if Redis is not available
    exit()

#==============================================================================
# CELL 1: Market Data Producer
#
# This cell simulates a market data provider that continuously publishes
# new stock ticks (price and volume) to the Redis Stream.
#==============================================================================

def market_data_producer():
    """
    Simulates a producer sending market data for AAPL stock.
    """
    print("--- Starting Market Data Producer ---")
    last_price = 170.00
    while True:
        try:
            # Simulate a price change
            price_change = random.uniform(-0.5, 0.5)
            last_price += price_change
            volume = random.randint(100, 5000)

            # Create the message payload
            tick_data = {
                'price': f"{last_price:.2f}",
                'volume': volume,
                'timestamp': time.time()
            }

            # Add the message to the stream using XADD
            # The '*' tells Redis to auto-generate a unique ID
            message_id = r.xadd(STREAM_NAME, tick_data)
            print(f"Produced tick for AAPL: Price=${tick_data['price']}, Vol={tick_data['volume']} -> ID: {message_id}")

            # Wait for a short random interval
            time.sleep(random.uniform(0.5, 2.0))

        except Exception as e:
            print(f"Producer error: {e}")
            break

# To run the producer in the background in a notebook, you can use threading
producer_thread = threading.Thread(target=market_data_producer, daemon=True)
producer_thread.start()
print("Producer thread started. It will now publish data in the background.")
print("Run the next cell to start the consumers.")


#==============================================================================
# CELL 2: Market Data Consumers (with Consumer Group)
#
# This cell sets up a consumer group and two consumers that will read
# from the stream in a load-balanced fashion.
#==============================================================================

def create_consumer_group():
    """
    Creates a consumer group for the stream.
    If the group already exists, it does nothing.
    """
    try:
        # Create the consumer group.
        # 'mkstream=True' creates the stream if it doesn't exist.
        # '$' means the group will only read new messages arriving after its creation.
        r.xgroup_create(STREAM_NAME, CONSUMER_GROUP, id='$', mkstream=True)
        print(f"Consumer group '{CONSUMER_GROUP}' created.")
    except redis.exceptions.ResponseError as e:
        # This error is raised if the group already exists, which is fine.
        print(f"Consumer group '{CONSUMER_GROUP}' already exists.")

def market_data_consumer(consumer_name: str):
    """
    A consumer that reads data from the stream as part of a group.
    """
    print(f"--- Starting Consumer: {consumer_name} ---")
    while True:
        try:
            # XREADGROUP blocks until a message is available.
            # '>' is a special ID meaning "give me messages that have not been delivered to any other consumer yet".
            # COUNT 1 means we process one message at a time.
            # BLOCK 0 means wait forever.
            response = r.xreadgroup(
                CONSUMER_GROUP,
                consumer_name,
                {STREAM_NAME: '>'},
                count=1,
                block=0
            )

            if response:
                # The response is a list containing a list for each stream.
                # e.g., [['market-data:AAPL', [('1678886400000-0', {'price': '170.10', ...})]]]
                stream, messages = response[0]
                message_id, data = messages[0]

                print(f"Consumer '{consumer_name}' received: ID={message_id}, Data={data}")

                # Acknowledge the message so it's not redelivered.
                r.xack(STREAM_NAME, CONSUMER_GROUP, message_id)

        except Exception as e:
            print(f"Consumer '{consumer_name}' error: {e}")
            break


# --- Main execution for consumers ---
create_consumer_group()

# Start two consumers in separate threads to see the load balancing
consumer_1_thread = threading.Thread(target=market_data_consumer, args=(CONSUMER_NAME_1,), daemon=True)
consumer_2_thread = threading.Thread(target=market_data_consumer, args=(CONSUMER_NAME_2,), daemon=True)

consumer_1_thread.start()
consumer_2_thread.start()

print("Both consumer threads started. They will now process messages from the stream.")
print("Observe how the work is distributed between them.")

# You can stop the notebook cell execution to stop the threads.


## Use Case 6: Redis Stream as an immutable, append-only log for all events in an order's lifecycle..

In [2]:
#
# Jupyter Notebook: Order Management System (OMS) with Redis Streams
#
# This notebook demonstrates using a Redis Stream as an immutable,
# append-only log for all events in an order's lifecycle.
#
# Pre-requisites:
# 1. A running Redis instance.
# 2. Python redis-py library installed (`pip install redis`).
#

import redis
import time
import uuid

# --- Configuration ---
REDIS_HOST = "localhost"
REDIS_PORT = 6379

# --- Connect to Redis ---
try:
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    r.ping()
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Could not connect to Redis: {e}")
    exit()

#==============================================================================
# CELL 1: Order Lifecycle Management Functions
#
# These functions simulate creating an order and subsequent modifications.
# Each order gets its own dedicated stream.
#==============================================================================

def create_order(ticker: str, quantity: int, price: float, order_type: str) -> str:
    """
    Creates a new order and logs the 'CREATE' event.
    Returns the unique order ID.
    """
    order_id = str(uuid.uuid4())
    stream_name = f"order:{order_id}"
    event_data = {
        'event_type': 'CREATE',
        'ticker': ticker,
        'quantity': quantity,
        'price': price,
        'order_type': order_type,
        'timestamp': time.time()
    }
    message_id = r.xadd(stream_name, event_data)
    print(f"Order CREATED. ID: {order_id}, Stream: {stream_name}, Event ID: {message_id}")
    return order_id

def modify_order(order_id: str, new_quantity: int):
    """
    Modifies an existing order and logs the 'MODIFY' event.
    """
    stream_name = f"order:{order_id}"
    if not r.exists(stream_name):
        print(f"Error: Order {order_id} not found.")
        return

    event_data = {
        'event_type': 'MODIFY',
        'new_quantity': new_quantity,
        'timestamp': time.time()
    }
    message_id = r.xadd(stream_name, event_data)
    print(f"Order MODIFIED. ID: {order_id}, New Quantity: {new_quantity}, Event ID: {message_id}")

def cancel_order(order_id: str):
    """
    Cancels an existing order and logs the 'CANCEL' event.
    """
    stream_name = f"order:{order_id}"
    if not r.exists(stream_name):
        print(f"Error: Order {order_id} not found.")
        return

    event_data = {
        'event_type': 'CANCEL',
        'reason': 'User requested cancellation',
        'timestamp': time.time()
    }
    message_id = r.xadd(stream_name, event_data)
    print(f"Order CANCELLED. ID: {order_id}, Event ID: {message_id}")

#==============================================================================
# CELL 2: Simulate an Order's Lifecycle
#
# We will create, modify, and then cancel an order.
#==============================================================================

print("--- Simulating Order Lifecycle ---")
# 1. Create a new order for GOOG
my_order_id = create_order(ticker='GOOG', quantity=50, price=140.50, order_type='LIMIT')
time.sleep(1)

# 2. Modify the order
modify_order(my_order_id, new_quantity=40)
time.sleep(1)

# 3. Cancel the order
cancel_order(my_order_id)
print("\n--- Simulation Complete ---")


#==============================================================================
# CELL 3: Audit Trail Consumer
#
# This function reads the entire history of an order from its stream,
# effectively acting as an audit log viewer.
#==============================================================================

def get_order_history(order_id: str):
    """
    Retrieves and prints all events for a given order ID.
    """
    stream_name = f"order:{order_id}"
    print(f"\n--- Retrieving Full Audit Trail for Order: {order_id} ---")

    if not r.exists(stream_name):
        print(f"No history found for order {order_id}.")
        return

    # XRANGE reads a range of messages. '-' (min) and '+' (max) means read all.
    history = r.xrange(stream_name, min='-', max='+')

    if not history:
        print("Stream exists but is empty.")
        return

    print(f"Found {len(history)} event(s):")
    for message_id, data in history:
        print(f"  -> Event ID: {message_id}")
        for key, value in data.items():
            print(f"     {key}: {value}")
        print("-" * 20)

# Let's audit the order we just created
get_order_history(my_order_id)

# Try auditing a non-existent order
get_order_history("non-existent-order-123")


## Use Case 7: Simulates a real-time risk management system.

In [3]:
#
# Jupyter Notebook: Real-time Risk Management with Redis Streams
#
# This notebook simulates a real-time risk management system. Trades are
# added to a stream, and a risk engine consumes them to update account
# exposure, triggering alerts if a threshold is breached.
#
# Pre-requisites:
# 1. A running Redis instance.
# 2. Python redis-py library installed (`pip install redis`).
#

import redis
import time
import random
import threading

# --- Configuration ---
REDIS_HOST = "localhost"
REDIS_PORT = 6379
STREAM_NAME = "risk-events"
CONSUMER_GROUP = "risk-engine-group"
CONSUMER_NAME = "risk-calculator-1"

# --- Connect to Redis ---
try:
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    r.ping()
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Could not connect to Redis: {e}")
    exit()

#==============================================================================
# CELL 1: Event Producer (Simulates Trades)
#
# This function produces events representing trades and adds them to the
# 'risk-events' stream.
#==============================================================================

def event_producer():
    """Simulates a producer sending trade events."""
    print("--- Starting Risk Event Producer ---")
    while True:
        try:
            # Simulate a trade event
            event_data = {
                'account': f"account-{random.randint(101, 103)}", # 3 different accounts
                'ticker': random.choice(['NVDA', 'AMD', 'INTC']),
                'quantity': random.randint(10, 100),
                'price': f"{random.uniform(100, 900):.2f}",
                'side': random.choice(['BUY', 'SELL'])
            }
            message_id = r.xadd(STREAM_NAME, event_data)
            print(f"Produced Event: {event_data['side']} {event_data['quantity']} {event_data['ticker']} for {event_data['account']}")
            time.sleep(1.5)
        except Exception as e:
            print(f"Producer error: {e}")
            break

# Run the producer in a background thread
producer_thread = threading.Thread(target=event_producer, daemon=True)
producer_thread.start()
print("Producer thread started.")

#==============================================================================
# CELL 2: Risk Engine (Consumer)
#
# This consumer reads trade events, updates an in-memory representation
# of account exposure, and prints alerts.
#==============================================================================

def setup_risk_group():
    """Creates the consumer group for the risk engine."""
    try:
        r.xgroup_create(STREAM_NAME, CONSUMER_GROUP, id='$', mkstream=True)
        print(f"Consumer group '{CONSUMER_GROUP}' created.")
    except redis.exceptions.ResponseError:
        print(f"Consumer group '{CONSUMER_GROUP}' already exists.")

def risk_engine_consumer():
    """
    The main logic for the risk engine consumer.
    """
    print(f"\n--- Starting Risk Engine Consumer: {CONSUMER_NAME} ---")
    
    # In-memory store for account exposures. In a real app, this might
    # also be stored and updated in Redis Hashes.
    account_exposures = {}
    RISK_LIMIT = 100000  # $100,000 exposure limit

    while True:
        try:
            response = r.xreadgroup(
                CONSUMER_GROUP,
                CONSUMER_NAME,
                {STREAM_NAME: '>'},
                count=1,
                block=0
            )

            if response:
                stream, messages = response[0]
                message_id, data = messages[0]
                
                print(f"\nRisk Engine processing message {message_id}...")
                
                # Calculate the value of the trade
                trade_value = int(data['quantity']) * float(data['price'])
                account = data['account']
                
                # Initialize account if not seen before
                if account not in account_exposures:
                    account_exposures[account] = 0.0
                
                # Update exposure
                if data['side'] == 'BUY':
                    account_exposures[account] += trade_value
                else: # SELL
                    account_exposures[account] -= trade_value
                
                print(f"  -> Updated exposure for {account}: ${account_exposures[account]:,.2f}")
                
                # Check against risk limit
                if abs(account_exposures[account]) > RISK_LIMIT:
                    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
                    print(f"  ALERT: RISK LIMIT BREACHED for {account}!")
                    print(f"  Exposure: ${account_exposures[account]:,.2f} | Limit: ${RISK_LIMIT:,.2f}")
                    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
                
                # Acknowledge the message
                r.xack(STREAM_NAME, CONSUMER_GROUP, message_id)

        except Exception as e:
            print(f"Risk Engine error: {e}")
            break

# --- Run the risk engine ---
setup_risk_group()

risk_engine_thread = threading.Thread(target=risk_engine_consumer, daemon=True)
risk_engine_thread.start()

print("Risk Engine consumer started. It will now process trade events.")


## Use Case 8: Simulates trade events to detect potentially manipulative patterns.

In [1]:
#
# Jupyter Notebook: Compliance and Surveillance with Redis Streams
#
# This notebook simulates a compliance system that surveils a stream of
# trade events to detect potentially manipulative patterns, like wash trading.
#
# Pre-requisites:
# 1. A running Redis instance.
# 2. Python redis-py library installed (`pip install redis`).
#

import redis
import time
import random
import threading
from collections import deque

# --- Configuration ---
REDIS_HOST = "localhost"
REDIS_PORT = 6379
SURVEILLANCE_STREAM = "trade-surveillance"
ALERTS_STREAM = "compliance-alerts"
CONSUMER_GROUP = "surveillance-engine-group"
CONSUMER_NAME = "pattern-detector-1"

# --- Connect to Redis ---
try:
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    r.ping()
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Could not connect to Redis: {e}")
    exit()

#==============================================================================
# CELL 1: Trade Event Producer
#
# This producer generates trade events, including some that will form a
# pattern the compliance engine should detect.
#==============================================================================

def trade_event_producer():
    """Produces trade events, including a wash trade pattern."""
    print("--- Starting Trade Event Producer ---")
    
    # Normal trades
    for i in range(5):
        trade = {
            'ticker': random.choice(['IBM', 'CSCO', 'ORCL']),
            'price': random.uniform(50, 200),
            'quantity': random.randint(100, 500),
            'buyer': f'account-{random.randint(300, 399)}',
            'seller': f'account-{random.randint(400, 499)}'
        }
        r.xadd(SURVEILLANCE_STREAM, trade)
        print(f"Produced normal trade: Buyer {trade['buyer']}, Seller {trade['seller']}")
        time.sleep(1)

    # Suspicious wash trade pattern
    print("\n--- Producing suspicious wash trade pattern... ---")
    for i in range(3):
        wash_trade = {
            'ticker': 'WASHCO',
            'price': 10.00,
            'quantity': 1000,
            'buyer': 'account-777',
            'seller': 'account-888'
        }
        r.xadd(SURVEILLANCE_STREAM, wash_trade)
        print(f"Produced wash trade leg 1: Buyer {wash_trade['buyer']}, Seller {wash_trade['seller']}")
        time.sleep(0.2)
        
        wash_trade_return = {
            'ticker': 'WASHCO',
            'price': 10.01,
            'quantity': 1000,
            'buyer': 'account-888',
            'seller': 'account-777'
        }
        r.xadd(SURVEILLANCE_STREAM, wash_trade_return)
        print(f"Produced wash trade leg 2: Buyer {wash_trade_return['buyer']}, Seller {wash_trade_return['seller']}")
        time.sleep(0.2)

    print("\n--- Producer Finished ---")

# Run the producer
trade_event_producer()


#==============================================================================
# CELL 2: Compliance Engine (Pattern Detection Consumer)
#
# This consumer reads the surveillance stream and looks for a simple
# wash trade pattern: two accounts trading the same stock back and forth.
#==============================================================================

def setup_surveillance_group():
    """Creates the consumer group for the surveillance engine."""
    try:
        r.xgroup_create(SURVEILLANCE_STREAM, CONSUMER_GROUP, id='0-0', mkstream=True)
        print(f"Consumer group '{CONSUMER_GROUP}' created.")
    except redis.exceptions.ResponseError:
        print(f"Consumer group '{CONSUMER_GROUP}' already exists.")

def compliance_engine():
    """
    The main logic for the compliance surveillance engine.
    """
    print(f"\n--- Starting Compliance Engine: {CONSUMER_NAME} ---")
    
    # Keep a short history of recent trades to detect patterns
    # In a real system, this state would need to be managed more robustly.
    trade_history = deque(maxlen=10)

    while True:
        try:
            # Read a batch of messages
            response = r.xreadgroup(
                CONSUMER_GROUP,
                CONSUMER_NAME,
                {SURVEILLANCE_STREAM: '>'},
                count=1,
                block=2000 # Block for 2 seconds, then timeout to check for stop
            )

            if not response:
                print("No new messages. Engine shutting down.")
                break

            stream, messages = response[0]
            message_id, data = messages[0]
            
            print(f"\nEngine processing trade: {data['ticker']} from {data['seller']} to {data['buyer']}")
            
            # Simple Wash Trade Detection Logic:
            # Look for a trade that is the exact reverse of a recent trade.
            for prev_trade in trade_history:
                is_reverse_trade = (
                    prev_trade['ticker'] == data['ticker'] and
                    prev_trade['buyer'] == data['seller'] and
                    prev_trade['seller'] == data['buyer']
                )
                if is_reverse_trade:
                    alert = {
                        'type': 'WASH_TRADE_DETECTED',
                        'ticker': data['ticker'],
                        'account_1': data['buyer'],
                        'account_2': data['seller'],
                        'details': f"Suspicious buy/sell-back pattern detected between {data['buyer']} and {data['seller']}"
                    }
                    # Publish the alert to the alerts stream
                    r.xadd(ALERTS_STREAM, alert)
                    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
                    print(f"  ALERT: {alert['type']} on {alert['ticker']}")
                    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

            trade_history.append(data)
            r.xack(SURVEILLANCE_STREAM, CONSUMER_GROUP, message_id)

        except Exception as e:
            print(f"Compliance Engine error: {e}")
            break

# --- Run the compliance engine ---
setup_surveillance_group()
compliance_engine()

# --- Verify alerts ---
print("\n--- Checking for generated alerts ---")
alerts = r.xrange(ALERTS_STREAM)
if alerts:
    print(f"Found {len(alerts)} alert(s) in '{ALERTS_STREAM}':")
    for msg_id, data in alerts:
        print(f"  -> {data}")
else:
    print(f"No alerts found in '{ALERTS_STREAM}'.")

