# 10 - Streaming and Real-time: Building Responsive AI Applications

## Overview
In this notebook, we'll learn how to build responsive AI applications with streaming capabilities. You'll implement real-time token streaming, async operations, and interactive applications that provide immediate feedback to users.

## Learning Objectives
By the end of this notebook, you will be able to:
- Implement token-by-token streaming from LLMs
- Build async chains and agents
- Create real-time chat applications
- Handle streaming with callbacks
- Implement progress indicators for long operations
- Build WebSocket-based real-time systems

## Prerequisites
- Completion of notebooks 01-09
- Understanding of async programming basics
- Knowledge of chains and agents

## Back-and-Forth Teaching Pattern
This notebook follows our pattern:
1. **Instructor Activity**: Demonstrates a concept with complete examples
2. **Learner Activity**: You apply the concept with guidance and hidden solutions

## Setup

Let's install and import the necessary libraries:

In [None]:
# Install required packages
!pip install langchain langchain-openai asyncio nest-asyncio websockets tqdm

In [None]:
import os
import asyncio
import nest_asyncio
from typing import Any, Dict, List, Optional, AsyncIterator
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain
from langchain.schema import HumanMessage, AIMessage, BaseMessage
from langchain.schema.output import LLMResult
import time
from datetime import datetime
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Allow nested event loops (for Jupyter)
nest_asyncio.apply()

# Set your OpenAI API key
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

---

## Instructor Activity 1: Basic Streaming and Callbacks

Let's start with basic streaming implementations:

In [None]:
# Basic token streaming

# Create LLM with streaming enabled
streaming_llm = ChatOpenAI(
    temperature=0.7,
    streaming=True,  # Enable streaming
    callbacks=[StreamingStdOutCallbackHandler()]  # Print tokens as they arrive
)

# Test streaming
print("Streaming Response:")
print("=" * 50)

response = streaming_llm.predict("Tell me a short story about a robot learning to paint.")

print("\n\n" + "=" * 50)
print("\nFinal response stored in variable:")
print(f"Length: {len(response)} characters")

In [None]:
# Custom streaming callback with more control

class CustomStreamingCallback(BaseCallbackHandler):
    """Custom callback to handle streaming tokens."""
    
    def __init__(self):
        self.tokens = []
        self.token_count = 0
        self.start_time = None
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        """Called when LLM starts."""
        self.start_time = time.time()
        self.tokens = []
        self.token_count = 0
        print("\n🚀 Starting generation...\n")
    
    def on_llm_new_token(self, token: str, **kwargs):
        """Called when a new token is generated."""
        self.tokens.append(token)
        self.token_count += 1
        
        # Print token with formatting
        print(token, end="", flush=True)
        
        # Show progress every 10 tokens
        if self.token_count % 10 == 0:
            elapsed = time.time() - self.start_time
            tokens_per_sec = self.token_count / elapsed if elapsed > 0 else 0
            # Print stats on same line
            print(f" [{self.token_count} tokens, {tokens_per_sec:.1f} tok/s]", end="", flush=True)
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        """Called when LLM finishes."""
        elapsed = time.time() - self.start_time
        print(f"\n\n✅ Generation complete!")
        print(f"   Total tokens: {self.token_count}")
        print(f"   Time: {elapsed:.2f}s")
        print(f"   Speed: {self.token_count/elapsed:.1f} tokens/second")
    
    def on_llm_error(self, error: Exception, **kwargs):
        """Called on error."""
        print(f"\n❌ Error: {error}")

# Use custom callback
custom_callback = CustomStreamingCallback()

custom_llm = ChatOpenAI(
    temperature=0.7,
    streaming=True,
    callbacks=[custom_callback]
)

# Generate with custom streaming
response = custom_llm.predict("Explain quantum computing in simple terms.")

In [None]:
# Streaming with chains

class ChainStreamingCallback(BaseCallbackHandler):
    """Callback for streaming chain outputs."""
    
    def __init__(self):
        self.current_step = ""
        self.steps_completed = 0
    
    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
        """Called when chain starts."""
        print("\n🔗 Chain started...")
        print(f"   Input: {list(inputs.keys())}")
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
        """Called when chain ends."""
        print("\n✅ Chain completed!")
    
    def on_llm_new_token(self, token: str, **kwargs):
        """Stream tokens from LLM in chain."""
        print(token, end="", flush=True)
    
    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
        """Called when tool starts."""
        tool_name = serialized.get("name", "Unknown")
        print(f"\n🔧 Using tool: {tool_name}")
    
    def on_tool_end(self, output: str, **kwargs):
        """Called when tool ends."""
        print(f"\n   Tool result: {output[:100]}...")

# Create chain with streaming
chain_callback = ChainStreamingCallback()

prompt = ChatPromptTemplate.from_template(
    "Write a detailed explanation about {topic}. Include examples and applications."
)

streaming_chain = LLMChain(
    llm=ChatOpenAI(temperature=0.7, streaming=True),
    prompt=prompt,
    callbacks=[chain_callback]
)

# Run chain with streaming
result = streaming_chain.run(topic="neural networks")

In [None]:
# Token usage and cost tracking callback

class TokenUsageCallback(BaseCallbackHandler):
    """Track token usage and estimate costs."""
    
    def __init__(self):
        self.total_tokens = 0
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.total_cost = 0.0
        
        # Pricing per 1K tokens (example rates)
        self.pricing = {
            "gpt-3.5-turbo": {"prompt": 0.0015, "completion": 0.002},
            "gpt-4": {"prompt": 0.03, "completion": 0.06}
        }
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        """Estimate prompt tokens."""
        # Rough estimation: 1 token ≈ 4 characters
        for prompt in prompts:
            self.prompt_tokens += len(prompt) // 4
    
    def on_llm_new_token(self, token: str, **kwargs):
        """Count completion tokens."""
        self.completion_tokens += 1
        self.total_tokens += 1
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        """Calculate final costs."""
        if response.llm_output and "token_usage" in response.llm_output:
            usage = response.llm_output["token_usage"]
            self.prompt_tokens = usage.get("prompt_tokens", self.prompt_tokens)
            self.completion_tokens = usage.get("completion_tokens", self.completion_tokens)
            self.total_tokens = usage.get("total_tokens", self.total_tokens)
        
        # Calculate cost (assuming gpt-3.5-turbo)
        model_pricing = self.pricing["gpt-3.5-turbo"]
        prompt_cost = (self.prompt_tokens / 1000) * model_pricing["prompt"]
        completion_cost = (self.completion_tokens / 1000) * model_pricing["completion"]
        self.total_cost = prompt_cost + completion_cost
    
    def get_usage_report(self) -> str:
        """Get formatted usage report."""
        return f"""Token Usage Report:
        - Prompt tokens: {self.prompt_tokens}
        - Completion tokens: {self.completion_tokens}
        - Total tokens: {self.total_tokens}
        - Estimated cost: ${self.total_cost:.4f}"""

# Test token tracking
usage_callback = TokenUsageCallback()

tracking_llm = ChatOpenAI(
    temperature=0.7,
    streaming=True,
    callbacks=[usage_callback, StreamingStdOutCallbackHandler()]
)

response = tracking_llm.predict("Write a haiku about programming.")

print("\n\n" + "=" * 50)
print(usage_callback.get_usage_report())

---

## Learner Activity 1: Build a Real-time Progress Tracker

Create a system that provides real-time feedback for long-running operations.

**Task**: Build a progress tracking system that:
1. Shows progress for document processing
2. Provides ETA estimates
3. Displays partial results as they become available
4. Handles multiple concurrent operations
5. Includes a progress bar visualization

Requirements:
- Implement custom callbacks for progress tracking
- Show live updates during processing
- Calculate and display ETAs
- Handle errors gracefully with progress state

In [None]:
# Build your real-time progress tracker

class ProgressTracker(BaseCallbackHandler):
    def __init__(self, total_steps: int):
        # TODO: Initialize progress tracking
        # - Track current step
        # - Calculate ETA
        # - Store partial results
        pass
    
    def update_progress(self, step_name: str, progress: float):
        """Update progress for current operation."""
        # TODO: Update progress and calculate ETA
        pass
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        # TODO: Track LLM operation start
        pass
    
    def on_llm_new_token(self, token: str, **kwargs):
        # TODO: Update progress as tokens arrive
        pass
    
    def get_status(self) -> Dict:
        """Get current status and ETA."""
        # TODO: Return current progress status
        pass

# TODO: Create document processor with progress tracking
class DocumentProcessor:
    def __init__(self):
        # TODO: Initialize with progress tracker
        pass
    
    def process_documents(self, documents: List[str]):
        """Process documents with real-time progress."""
        # TODO: Process each document
        # - Show progress bar
        # - Update ETA
        # - Display partial results
        pass

# TODO: Test your progress tracker
# Process multiple documents
# Show live progress updates

# Your test code here

In [None]:
# Solution (hidden by default)

"""
from tqdm import tqdm
import time

class ProgressTracker(BaseCallbackHandler):
    def __init__(self, total_steps: int = 100):
        self.total_steps = total_steps
        self.current_step = 0
        self.start_time = None
        self.step_times = []
        self.partial_results = []
        self.current_operation = None
        self.progress_bar = None
        self.token_count = 0
    
    def start_operation(self, operation_name: str, total_items: int = None):
        '''Start tracking a new operation.'''
        self.current_operation = operation_name
        self.start_time = time.time()
        self.current_step = 0
        self.token_count = 0
        
        if total_items:
            self.total_steps = total_items
        
        # Create progress bar
        self.progress_bar = tqdm(
            total=self.total_steps,
            desc=operation_name,
            unit="steps",
            bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]"
        )
    
    def update_progress(self, step_name: str, progress: float = 1.0):
        '''Update progress for current operation.'''
        self.current_step += progress
        
        # Update progress bar
        if self.progress_bar:
            self.progress_bar.update(progress)
            self.progress_bar.set_postfix_str(step_name[:30])
        
        # Track step timing
        current_time = time.time()
        if self.start_time:
            elapsed = current_time - self.start_time
            self.step_times.append(elapsed)
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        '''Track LLM operation start.'''
        if not self.current_operation:
            self.start_operation("LLM Generation")
        
        # Estimate tokens from prompt
        prompt_length = sum(len(p) for p in prompts)
        estimated_tokens = prompt_length // 4
        
        self.update_progress(f"Processing prompt (~{estimated_tokens} tokens)", 0.1)
    
    def on_llm_new_token(self, token: str, **kwargs):
        '''Update progress as tokens arrive.'''
        self.token_count += 1
        
        # Update every 10 tokens to avoid too frequent updates
        if self.token_count % 10 == 0:
            self.update_progress(f"Generating... ({self.token_count} tokens)", 0.1)
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        '''Track LLM completion.'''
        self.update_progress("Generation complete", 1.0)
        
        # Store partial result
        if response.generations:
            text = response.generations[0][0].text
            self.partial_results.append({
                "operation": self.current_operation,
                "result": text[:100] + "..." if len(text) > 100 else text,
                "tokens": self.token_count
            })
    
    def get_eta(self) -> float:
        '''Calculate estimated time to completion.'''
        if not self.step_times or self.current_step == 0:
            return 0
        
        avg_step_time = sum(self.step_times) / len(self.step_times)
        remaining_steps = self.total_steps - self.current_step
        eta = avg_step_time * remaining_steps
        
        return eta
    
    def get_status(self) -> Dict:
        '''Get current status and ETA.'''
        progress_pct = (self.current_step / self.total_steps * 100) if self.total_steps > 0 else 0
        
        return {
            "operation": self.current_operation,
            "progress": f"{progress_pct:.1f}%",
            "steps_completed": self.current_step,
            "total_steps": self.total_steps,
            "eta_seconds": self.get_eta(),
            "partial_results": len(self.partial_results),
            "tokens_generated": self.token_count
        }
    
    def close(self):
        '''Close progress bar.'''
        if self.progress_bar:
            self.progress_bar.close()

# Document processor with progress tracking
class DocumentProcessor:
    def __init__(self):
        self.llm = ChatOpenAI(temperature=0.7, streaming=True)
        self.progress_tracker = None
        self.results = []
    
    def process_documents(self, documents: List[str], operation: str = "summarize"):
        '''Process documents with real-time progress.'''
        # Create progress tracker
        self.progress_tracker = ProgressTracker(total_steps=len(documents))
        
        # Configure LLM with progress callback
        self.llm.callbacks = [self.progress_tracker]
        
        # Start operation
        self.progress_tracker.start_operation(
            f"Processing {len(documents)} documents",
            total_items=len(documents)
        )
        
        print(f"\n📄 Starting document processing: {operation}")
        print("=" * 50)
        
        # Process each document
        for i, doc in enumerate(documents):
            try:
                # Show current document
                self.progress_tracker.update_progress(
                    f"Document {i+1}/{len(documents)}: {doc[:50]}...",
                    0
                )
                
                # Process based on operation
                if operation == "summarize":
                    prompt = f"Summarize this text in 2 sentences: {doc}"
                elif operation == "analyze":
                    prompt = f"Analyze the key points in: {doc}"
                elif operation == "translate":
                    prompt = f"Translate to Spanish: {doc}"
                else:
                    prompt = f"Process this text: {doc}"
                
                # Generate with streaming (progress tracked via callback)
                result = self.llm.predict(prompt)
                
                # Store result
                self.results.append({
                    "document": doc[:100],
                    "result": result,
                    "status": "completed"
                })
                
                # Update main progress
                self.progress_tracker.update_progress(
                    f"Completed document {i+1}",
                    1
                )
                
                # Show partial result
                print(f"\n✅ Document {i+1} processed")
                print(f"   Result preview: {result[:100]}...")
                
            except Exception as e:
                print(f"\n❌ Error processing document {i+1}: {e}")
                self.results.append({
                    "document": doc[:100],
                    "result": None,
                    "status": "error",
                    "error": str(e)
                })
                self.progress_tracker.update_progress("Error", 1)
        
        # Close progress bar
        self.progress_tracker.close()
        
        # Final status
        print("\n" + "=" * 50)
        status = self.progress_tracker.get_status()
        print("\n📊 Final Status:")
        for key, value in status.items():
            print(f"   {key}: {value}")
        
        # Summary
        successful = sum(1 for r in self.results if r["status"] == "completed")
        print(f"\n✅ Successfully processed: {successful}/{len(documents)} documents")
        
        return self.results

# Test the progress tracker
print("Real-time Progress Tracking Demo")
print("=" * 50)

# Create sample documents
test_documents = [
    "Artificial intelligence is transforming how we work and live.",
    "Machine learning algorithms can identify patterns in vast amounts of data.",
    "Natural language processing enables computers to understand human language.",
    "Deep learning has revolutionized computer vision applications.",
    "Robotics combines AI with mechanical systems for automation."
]

# Process documents with progress tracking
processor = DocumentProcessor()
results = processor.process_documents(test_documents, operation="summarize")

print("\n" + "=" * 50)
print("\n📋 Processing Complete! Results stored.")
"""

print("Build a real-time progress tracking system for document processing!")
print("The solution includes progress bars, ETA calculation, and partial results.")

---

## Instructor Activity 2: Async Operations and Concurrent Processing

Let's explore async operations for better performance:

In [None]:
# Async LLM operations

async def async_generate(llm: ChatOpenAI, prompt: str) -> str:
    """Generate text asynchronously."""
    # Use apredict for async generation
    result = await llm.apredict(prompt)
    return result

async def process_multiple_prompts(prompts: List[str]):
    """Process multiple prompts concurrently."""
    llm = ChatOpenAI(temperature=0.7)
    
    # Create tasks for concurrent execution
    tasks = [async_generate(llm, prompt) for prompt in prompts]
    
    # Run all tasks concurrently
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    elapsed = time.time() - start_time
    
    return results, elapsed

# Test async processing
test_prompts = [
    "Write a haiku about coding.",
    "Explain recursion in one sentence.",
    "What is the meaning of life?",
    "Describe the color blue to a blind person.",
    "Write a joke about databases."
]

print("Async Concurrent Processing:")
print("=" * 50)

# Run async function
results, time_async = asyncio.run(process_multiple_prompts(test_prompts))

print(f"\n⚡ Processed {len(test_prompts)} prompts in {time_async:.2f} seconds")
print(f"Average time per prompt: {time_async/len(test_prompts):.2f} seconds\n")

for i, (prompt, result) in enumerate(zip(test_prompts, results)):
    print(f"\n{i+1}. Prompt: {prompt}")
    print(f"   Result: {result}")

In [None]:
# Async streaming with callbacks

class AsyncStreamingCallback(AsyncCallbackHandler):
    """Async callback for streaming."""
    
    def __init__(self):
        self.tokens = []
        self.is_streaming = False
    
    async def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        """Called when LLM starts."""
        self.is_streaming = True
        self.tokens = []
        print("\n🚀 Starting async generation...")
    
    async def on_llm_new_token(self, token: str, **kwargs):
        """Handle new token asynchronously."""
        self.tokens.append(token)
        print(token, end="", flush=True)
        
        # Simulate async processing
        await asyncio.sleep(0.01)  # Small delay to show async behavior
    
    async def on_llm_end(self, response: LLMResult, **kwargs):
        """Called when LLM ends."""
        self.is_streaming = False
        print("\n✅ Async generation complete!")

# Async streaming generator
async def stream_response(prompt: str) -> AsyncIterator[str]:
    """Stream response tokens asynchronously."""
    callback = AsyncStreamingCallback()
    
    llm = ChatOpenAI(
        temperature=0.7,
        streaming=True,
        callbacks=[callback]
    )
    
    # Start generation
    task = asyncio.create_task(llm.apredict(prompt))
    
    # Yield tokens as they arrive
    while callback.is_streaming or not task.done():
        if callback.tokens:
            token = callback.tokens.pop(0)
            yield token
        else:
            await asyncio.sleep(0.01)
    
    # Ensure task completes
    await task

# Test async streaming
async def test_streaming():
    prompt = "Tell me a story about a brave little robot."
    
    print("Async Streaming Demo:")
    print("=" * 50)
    print(f"Prompt: {prompt}\n")
    print("Response: ", end="")
    
    async for token in stream_response(prompt):
        # Process each token as it arrives
        print(token, end="", flush=True)
    
    print("\n\nStreaming complete!")

# Run async streaming test
asyncio.run(test_streaming())

In [None]:
# Batch processing with async and progress

class AsyncBatchProcessor:
    """Process batches of items asynchronously with progress tracking."""
    
    def __init__(self, batch_size: int = 5):
        self.batch_size = batch_size
        self.llm = ChatOpenAI(temperature=0.7)
        self.results = []
        self.errors = []
    
    async def process_item(self, item: Dict, index: int) -> Dict:
        """Process a single item."""
        try:
            # Simulate different processing based on item type
            if item["type"] == "summarize":
                prompt = f"Summarize: {item['content']}"
            elif item["type"] == "translate":
                prompt = f"Translate to {item.get('language', 'Spanish')}: {item['content']}"
            else:
                prompt = f"Process: {item['content']}"
            
            # Async generation
            result = await self.llm.apredict(prompt)
            
            return {
                "index": index,
                "input": item,
                "output": result,
                "status": "success"
            }
            
        except Exception as e:
            return {
                "index": index,
                "input": item,
                "error": str(e),
                "status": "error"
            }
    
    async def process_batch(self, items: List[Dict]) -> List[Dict]:
        """Process a batch of items concurrently."""
        tasks = [
            self.process_item(item, i) 
            for i, item in enumerate(items)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Handle exceptions
        processed_results = []
        for r in results:
            if isinstance(r, Exception):
                processed_results.append({
                    "status": "error",
                    "error": str(r)
                })
            else:
                processed_results.append(r)
        
        return processed_results
    
    async def process_all(self, items: List[Dict]):
        """Process all items in batches with progress."""
        total_items = len(items)
        total_batches = (total_items + self.batch_size - 1) // self.batch_size
        
        print(f"\n📦 Processing {total_items} items in {total_batches} batches")
        print(f"   Batch size: {self.batch_size}")
        print("=" * 50)
        
        # Progress bar
        pbar = tqdm(total=total_items, desc="Processing items")
        
        all_results = []
        start_time = time.time()
        
        for batch_num in range(total_batches):
            # Get batch
            start_idx = batch_num * self.batch_size
            end_idx = min(start_idx + self.batch_size, total_items)
            batch = items[start_idx:end_idx]
            
            # Process batch
            print(f"\n🔄 Processing batch {batch_num + 1}/{total_batches}...")
            batch_results = await self.process_batch(batch)
            
            # Update progress
            pbar.update(len(batch))
            
            # Store results
            all_results.extend(batch_results)
            
            # Show batch summary
            successful = sum(1 for r in batch_results if r["status"] == "success")
            print(f"   Batch complete: {successful}/{len(batch)} successful")
        
        pbar.close()
        elapsed = time.time() - start_time
        
        # Summary
        successful_total = sum(1 for r in all_results if r["status"] == "success")
        
        print("\n" + "=" * 50)
        print("\n📊 Batch Processing Complete:")
        print(f"   Total items: {total_items}")
        print(f"   Successful: {successful_total}")
        print(f"   Failed: {total_items - successful_total}")
        print(f"   Time: {elapsed:.2f} seconds")
        print(f"   Throughput: {total_items/elapsed:.2f} items/second")
        
        return all_results

# Test batch processing
async def test_batch_processing():
    # Create test items
    test_items = [
        {"type": "summarize", "content": "AI is transforming industries."},
        {"type": "translate", "content": "Hello world", "language": "French"},
        {"type": "summarize", "content": "Machine learning uses data patterns."},
        {"type": "translate", "content": "Good morning", "language": "Japanese"},
        {"type": "process", "content": "Natural language processing."},
        {"type": "summarize", "content": "Deep learning mimics brain neurons."},
        {"type": "translate", "content": "Thank you", "language": "German"},
        {"type": "process", "content": "Computer vision applications."},
    ]
    
    processor = AsyncBatchProcessor(batch_size=3)
    results = await processor.process_all(test_items)
    
    # Show sample results
    print("\n📝 Sample Results:")
    for r in results[:3]:
        if r["status"] == "success":
            print(f"\n   Input: {r['input']['content'][:50]}")
            print(f"   Output: {r['output'][:100]}...")

# Run batch processing test
asyncio.run(test_batch_processing())

---

## Learner Activity 2: Build an Async Chat Server

Create a real-time chat server with streaming responses.

**Task**: Build a chat server that:
1. Handles multiple concurrent users
2. Streams responses in real-time
3. Manages conversation history per user
4. Implements typing indicators
5. Provides response time analytics

Requirements:
- Support multiple simultaneous conversations
- Stream tokens as they're generated
- Track metrics per user
- Handle connection/disconnection gracefully

In [None]:
# Build your async chat server

class ChatSession:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.history = []
        self.is_typing = False
        # TODO: Initialize session components
        pass
    
    async def send_message(self, message: str) -> AsyncIterator[str]:
        """Send message and stream response."""
        # TODO: Implement streaming response
        pass

class AsyncChatServer:
    def __init__(self):
        self.sessions = {}  # user_id -> ChatSession
        # TODO: Initialize server components
        pass
    
    async def connect_user(self, user_id: str) -> ChatSession:
        """Connect a new user or retrieve existing session."""
        # TODO: Create or retrieve session
        pass
    
    async def handle_message(self, user_id: str, message: str):
        """Handle incoming message from user."""
        # TODO: Process message and stream response
        pass
    
    async def broadcast_typing_indicator(self, user_id: str, is_typing: bool):
        """Broadcast typing status."""
        # TODO: Implement typing indicator
        pass
    
    def get_analytics(self) -> Dict:
        """Get server analytics."""
        # TODO: Return usage statistics
        pass

# TODO: Implement WebSocket handler (simplified)
async def simulate_chat_interaction():
    """Simulate chat server interactions."""
    # TODO: Create server
    # TODO: Connect multiple users
    # TODO: Send messages concurrently
    # TODO: Show streaming responses
    pass

# TODO: Test your chat server
# Your test code here

In [None]:
# Solution (hidden by default)

"""
import uuid
from collections import defaultdict

class ChatSession:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.session_id = str(uuid.uuid4())
        self.history = []
        self.is_typing = False
        self.metrics = {
            "messages_sent": 0,
            "messages_received": 0,
            "total_tokens": 0,
            "avg_response_time": 0,
            "response_times": []
        }
        self.llm = ChatOpenAI(temperature=0.7, streaming=True)
        self.created_at = datetime.now()
    
    async def send_message(self, message: str) -> AsyncIterator[str]:
        '''Send message and stream response.'''
        # Update metrics
        self.metrics["messages_sent"] += 1
        start_time = time.time()
        
        # Add to history
        self.history.append({"role": "user", "content": message})
        
        # Build prompt with history
        messages = []
        for h in self.history[-5:]:  # Keep last 5 messages for context
            if h["role"] == "user":
                messages.append(HumanMessage(content=h["content"]))
            else:
                messages.append(AIMessage(content=h["content"]))
        
        # Set typing indicator
        self.is_typing = True
        
        try:
            # Stream response
            full_response = ""
            token_count = 0
            
            async for chunk in self.llm.astream(messages):
                if hasattr(chunk, 'content'):
                    token = chunk.content
                    full_response += token
                    token_count += 1
                    yield token
            
            # Update history
            self.history.append({"role": "assistant", "content": full_response})
            
            # Update metrics
            response_time = time.time() - start_time
            self.metrics["messages_received"] += 1
            self.metrics["total_tokens"] += token_count
            self.metrics["response_times"].append(response_time)
            self.metrics["avg_response_time"] = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
            
        finally:
            self.is_typing = False
    
    def get_summary(self) -> Dict:
        '''Get session summary.'''
        return {
            "user_id": self.user_id,
            "session_id": self.session_id,
            "message_count": len(self.history),
            "metrics": self.metrics,
            "duration": (datetime.now() - self.created_at).total_seconds()
        }

class AsyncChatServer:
    def __init__(self):
        self.sessions = {}  # user_id -> ChatSession
        self.active_users = set()
        self.server_metrics = {
            "total_connections": 0,
            "total_messages": 0,
            "total_tokens": 0,
            "peak_concurrent_users": 0
        }
        self.typing_indicators = {}  # user_id -> is_typing
        self.start_time = datetime.now()
    
    async def connect_user(self, user_id: str) -> ChatSession:
        '''Connect a new user or retrieve existing session.'''
        if user_id not in self.sessions:
            self.sessions[user_id] = ChatSession(user_id)
            self.server_metrics["total_connections"] += 1
            print(f"👤 User {user_id} connected")
        else:
            print(f"👤 User {user_id} reconnected")
        
        self.active_users.add(user_id)
        
        # Update peak concurrent users
        current_users = len(self.active_users)
        if current_users > self.server_metrics["peak_concurrent_users"]:
            self.server_metrics["peak_concurrent_users"] = current_users
        
        return self.sessions[user_id]
    
    async def disconnect_user(self, user_id: str):
        '''Disconnect a user.'''
        if user_id in self.active_users:
            self.active_users.remove(user_id)
            print(f"👤 User {user_id} disconnected")
    
    async def handle_message(self, user_id: str, message: str) -> str:
        '''Handle incoming message from user.'''
        if user_id not in self.sessions:
            await self.connect_user(user_id)
        
        session = self.sessions[user_id]
        
        # Update server metrics
        self.server_metrics["total_messages"] += 1
        
        # Broadcast typing indicator
        await self.broadcast_typing_indicator(user_id, True)
        
        # Stream response
        full_response = ""
        print(f"\n💬 User {user_id}: {message}")
        print(f"🤖 Assistant: ", end="")
        
        async for token in session.send_message(message):
            full_response += token
            print(token, end="", flush=True)
            self.server_metrics["total_tokens"] += 1
        
        print()  # New line after response
        
        # Stop typing indicator
        await self.broadcast_typing_indicator(user_id, False)
        
        return full_response
    
    async def broadcast_typing_indicator(self, user_id: str, is_typing: bool):
        '''Broadcast typing status.'''
        self.typing_indicators[user_id] = is_typing
        
        if is_typing:
            # In real implementation, broadcast to other users
            # print(f"   [{user_id} is typing...]")
            pass
    
    async def handle_concurrent_users(self, user_messages: List[tuple]):
        '''Handle messages from multiple users concurrently.'''
        tasks = []
        
        for user_id, message in user_messages:
            task = self.handle_message(user_id, message)
            tasks.append(task)
        
        # Process all messages concurrently
        responses = await asyncio.gather(*tasks)
        return responses
    
    def get_analytics(self) -> Dict:
        '''Get server analytics.'''
        uptime = (datetime.now() - self.start_time).total_seconds()
        
        # Calculate per-user statistics
        user_stats = []
        for user_id, session in self.sessions.items():
            stats = session.get_summary()
            user_stats.append(stats)
        
        return {
            "server_uptime_seconds": uptime,
            "active_users": len(self.active_users),
            "total_users": len(self.sessions),
            "server_metrics": self.server_metrics,
            "user_statistics": user_stats,
            "currently_typing": [uid for uid, typing in self.typing_indicators.items() if typing]
        }

# Simulate chat server interactions
async def simulate_chat_interaction():
    '''Simulate multiple users chatting.'''
    print("\n🚀 Async Chat Server Demo")
    print("=" * 50)
    
    # Create server
    server = AsyncChatServer()
    
    # Connect users
    users = ["Alice", "Bob", "Charlie"]
    for user in users:
        await server.connect_user(user)
    
    print(f"\n📊 Connected {len(users)} users")
    print("=" * 50)
    
    # Simulate conversations
    print("\n💬 Starting conversations...\n")
    
    # Round 1: Individual messages
    await server.handle_message("Alice", "What's the weather like today?")
    await server.handle_message("Bob", "Tell me a joke about programming.")
    
    # Round 2: Concurrent messages
    print("\n🔄 Processing concurrent messages...")
    concurrent_messages = [
        ("Alice", "How does machine learning work?"),
        ("Bob", "What's recursion?"),
        ("Charlie", "Explain quantum computing.")
    ]
    
    start_time = time.time()
    responses = await server.handle_concurrent_users(concurrent_messages)
    concurrent_time = time.time() - start_time
    
    print(f"\n⚡ Processed {len(concurrent_messages)} messages concurrently in {concurrent_time:.2f}s")
    
    # Disconnect a user
    await server.disconnect_user("Charlie")
    
    # Get analytics
    print("\n" + "=" * 50)
    print("\n📊 Server Analytics:")
    
    analytics = server.get_analytics()
    
    print(f"\n🖥️  Server Status:")
    print(f"   Uptime: {analytics['server_uptime_seconds']:.1f} seconds")
    print(f"   Active users: {analytics['active_users']}")
    print(f"   Total users: {analytics['total_users']}")
    
    print(f"\n📈 Server Metrics:")
    for key, value in analytics['server_metrics'].items():
        print(f"   {key}: {value}")
    
    print(f"\n👥 User Statistics:")
    for user_stat in analytics['user_statistics']:
        print(f"\n   User: {user_stat['user_id']}")
        print(f"   Messages: {user_stat['message_count']}")
        print(f"   Avg response time: {user_stat['metrics']['avg_response_time']:.2f}s")
        print(f"   Total tokens: {user_stat['metrics']['total_tokens']}")

# Run the simulation
asyncio.run(simulate_chat_interaction())
"""

print("Build an async chat server with streaming and multi-user support!")
print("The solution includes concurrent handling, metrics, and typing indicators.")

---

## Instructor Activity 3: WebSocket Real-time Communication

Let's build real-time WebSocket-based communication:

In [None]:
# Simulated WebSocket communication (actual WebSocket requires server setup)

class WebSocketMessage:
    """Simulated WebSocket message."""
    def __init__(self, type: str, data: Any):
        self.type = type
        self.data = data
        self.timestamp = datetime.now()

class RealtimeStreamHandler:
    """Handle real-time streaming over WebSocket."""
    
    def __init__(self):
        self.connections = {}  # connection_id -> connection_info
        self.message_queue = asyncio.Queue()
        self.llm = ChatOpenAI(temperature=0.7, streaming=True)
    
    async def handle_connection(self, connection_id: str):
        """Handle new WebSocket connection."""
        self.connections[connection_id] = {
            "id": connection_id,
            "connected_at": datetime.now(),
            "messages_sent": 0,
            "messages_received": 0
        }
        
        # Send welcome message
        await self.send_message(
            connection_id,
            WebSocketMessage("welcome", {"message": "Connected to real-time chat"})
        )
    
    async def send_message(self, connection_id: str, message: WebSocketMessage):
        """Send message to specific connection."""
        if connection_id in self.connections:
            self.connections[connection_id]["messages_sent"] += 1
            # Simulate sending over WebSocket
            print(f"→ [{connection_id}] {message.type}: {message.data}")
    
    async def broadcast_message(self, message: WebSocketMessage, exclude: str = None):
        """Broadcast message to all connections."""
        for conn_id in self.connections:
            if conn_id != exclude:
                await self.send_message(conn_id, message)
    
    async def handle_chat_message(self, connection_id: str, message_text: str):
        """Handle incoming chat message with streaming response."""
        self.connections[connection_id]["messages_received"] += 1
        
        # Send acknowledgment
        await self.send_message(
            connection_id,
            WebSocketMessage("ack", {"received": message_text})
        )
        
        # Start streaming response
        await self.send_message(
            connection_id,
            WebSocketMessage("stream_start", {})
        )
        
        # Stream tokens
        token_buffer = ""
        token_count = 0
        
        async for chunk in self.llm.astream(message_text):
            if hasattr(chunk, 'content'):
                token = chunk.content
                token_buffer += token
                token_count += 1
                
                # Send token
                await self.send_message(
                    connection_id,
                    WebSocketMessage("stream_token", {"token": token})
                )
                
                # Simulate network delay
                await asyncio.sleep(0.01)
        
        # End streaming
        await self.send_message(
            connection_id,
            WebSocketMessage("stream_end", {
                "total_tokens": token_count,
                "complete_response": token_buffer
            })
        )

# Test real-time streaming
async def test_realtime_streaming():
    print("Real-time WebSocket Streaming Demo")
    print("=" * 50)
    
    handler = RealtimeStreamHandler()
    
    # Simulate client connection
    client_id = "client_001"
    await handler.handle_connection(client_id)
    
    # Send chat message
    print(f"\n← [{client_id}] Sending: 'Explain WebSockets in one paragraph'\n")
    await handler.handle_chat_message(
        client_id,
        "Explain WebSockets in one paragraph"
    )
    
    # Show connection stats
    print("\n" + "=" * 50)
    print("\nConnection Statistics:")
    for conn_id, info in handler.connections.items():
        print(f"  {conn_id}:")
        print(f"    Messages sent: {info['messages_sent']}")
        print(f"    Messages received: {info['messages_received']}")

asyncio.run(test_realtime_streaming())

---

## Learner Activity 3: Build a Real-time Dashboard

Create a real-time dashboard that monitors AI operations.

**Task**: Build a dashboard that:
1. Shows live token generation stats
2. Displays concurrent user activity
3. Tracks error rates and response times
4. Provides cost estimates in real-time
5. Includes alerts for anomalies

Requirements:
- Real-time metric updates
- Historical data visualization
- Alert system for thresholds
- Export functionality for reports

In [None]:
# Build your real-time dashboard

class MetricsCollector:
    def __init__(self):
        # TODO: Initialize metrics storage
        pass
    
    def record_metric(self, metric_type: str, value: float):
        """Record a metric value."""
        # TODO: Store metric with timestamp
        pass
    
    def get_current_stats(self) -> Dict:
        """Get current statistics."""
        # TODO: Calculate current stats
        pass

class RealtimeDashboard:
    def __init__(self):
        self.metrics = MetricsCollector()
        # TODO: Initialize dashboard components
        pass
    
    async def update_dashboard(self):
        """Update dashboard with latest metrics."""
        # TODO: Fetch and display metrics
        pass
    
    async def monitor_operations(self):
        """Monitor AI operations in real-time."""
        # TODO: Track operations and update metrics
        pass
    
    def check_alerts(self) -> List[str]:
        """Check for alert conditions."""
        # TODO: Check thresholds and return alerts
        pass
    
    def export_report(self, format: str = "json") -> str:
        """Export dashboard report."""
        # TODO: Generate and export report
        pass

# TODO: Test your dashboard
# Simulate various operations
# Show real-time updates
# Trigger alerts

# Your test code here

In [None]:
# Solution (hidden by default)

"""
from collections import deque, defaultdict
import statistics

class MetricsCollector:
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.metrics = defaultdict(lambda: deque(maxlen=window_size))
        self.totals = defaultdict(float)
        self.counts = defaultdict(int)
        self.alerts = []
    
    def record_metric(self, metric_type: str, value: float, timestamp: datetime = None):
        '''Record a metric value.'''
        timestamp = timestamp or datetime.now()
        
        # Store in sliding window
        self.metrics[metric_type].append({
            "value": value,
            "timestamp": timestamp
        })
        
        # Update totals
        self.totals[metric_type] += value
        self.counts[metric_type] += 1
    
    def get_current_stats(self) -> Dict:
        '''Get current statistics.'''
        stats = {}
        
        for metric_type, values in self.metrics.items():
            if values:
                recent_values = [v["value"] for v in values]
                stats[metric_type] = {
                    "current": recent_values[-1] if recent_values else 0,
                    "mean": statistics.mean(recent_values),
                    "min": min(recent_values),
                    "max": max(recent_values),
                    "total": self.totals[metric_type],
                    "count": self.counts[metric_type]
                }
                
                # Add percentiles for numeric metrics
                if len(recent_values) > 1:
                    stats[metric_type]["p50"] = statistics.median(recent_values)
                    if len(recent_values) > 10:
                        sorted_values = sorted(recent_values)
                        p95_idx = int(len(sorted_values) * 0.95)
                        stats[metric_type]["p95"] = sorted_values[p95_idx]
        
        return stats
    
    def get_time_series(self, metric_type: str, last_n: int = 50) -> List[Dict]:
        '''Get time series data for a metric.'''
        if metric_type not in self.metrics:
            return []
        
        values = list(self.metrics[metric_type])
        return values[-last_n:]

class RealtimeDashboard:
    def __init__(self):
        self.metrics = MetricsCollector()
        self.llm_monitor = None
        self.start_time = datetime.now()
        self.alert_thresholds = {
            "response_time": 5.0,  # seconds
            "error_rate": 0.1,     # 10%
            "cost_per_hour": 10.0, # dollars
            "tokens_per_second": 1000  # max throughput
        }
        self.active_sessions = 0
        self.total_requests = 0
        self.total_errors = 0
    
    def create_monitoring_callback(self):
        '''Create callback for monitoring LLM operations.'''
        parent = self
        
        class MonitoringCallback(BaseCallbackHandler):
            def __init__(self):
                self.start_time = None
                self.token_count = 0
            
            def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
                self.start_time = time.time()
                self.token_count = 0
                parent.active_sessions += 1
                parent.total_requests += 1
            
            def on_llm_new_token(self, token: str, **kwargs):
                self.token_count += 1
                parent.metrics.record_metric("tokens_generated", 1)
            
            def on_llm_end(self, response: LLMResult, **kwargs):
                if self.start_time:
                    response_time = time.time() - self.start_time
                    parent.metrics.record_metric("response_time", response_time)
                    
                    # Calculate tokens per second
                    if response_time > 0:
                        tps = self.token_count / response_time
                        parent.metrics.record_metric("tokens_per_second", tps)
                    
                    # Estimate cost (example: $0.002 per 1K tokens)
                    cost = (self.token_count / 1000) * 0.002
                    parent.metrics.record_metric("cost", cost)
                
                parent.active_sessions -= 1
            
            def on_llm_error(self, error: Exception, **kwargs):
                parent.total_errors += 1
                parent.metrics.record_metric("errors", 1)
                parent.active_sessions -= 1
        
        return MonitoringCallback()
    
    async def update_dashboard(self):
        '''Update dashboard with latest metrics.'''
        stats = self.metrics.get_current_stats()
        
        # Clear screen (in terminal)
        print("\033[2J\033[H")  # Clear screen and move cursor to top
        
        # Dashboard header
        print("=" * 60)
        print("           🎯 REAL-TIME AI OPERATIONS DASHBOARD")
        print("=" * 60)
        
        # Current time and uptime
        uptime = (datetime.now() - self.start_time).total_seconds()
        print(f"\n⏰ Current Time: {datetime.now().strftime('%H:%M:%S')}")
        print(f"⏱️  Uptime: {uptime:.0f} seconds")
        
        # Active sessions
        print(f"\n👥 Active Sessions: {self.active_sessions}")
        print(f"📊 Total Requests: {self.total_requests}")
        print(f"❌ Total Errors: {self.total_errors}")
        
        # Performance metrics
        print("\n📈 Performance Metrics:")
        print("-" * 40)
        
        if "response_time" in stats:
            rt = stats["response_time"]
            print(f"Response Time:")
            print(f"  Current: {rt['current']:.2f}s")
            print(f"  Average: {rt['mean']:.2f}s")
            print(f"  P95: {rt.get('p95', rt['max']):.2f}s")
        
        if "tokens_per_second" in stats:
            tps = stats["tokens_per_second"]
            print(f"\nThroughput:")
            print(f"  Current: {tps['current']:.1f} tok/s")
            print(f"  Average: {tps['mean']:.1f} tok/s")
            print(f"  Peak: {tps['max']:.1f} tok/s")
        
        # Cost tracking
        print("\n💰 Cost Analysis:")
        print("-" * 40)
        
        if "cost" in stats:
            cost = stats["cost"]
            total_cost = cost['total']
            cost_per_hour = (total_cost / uptime) * 3600 if uptime > 0 else 0
            
            print(f"Total Cost: ${total_cost:.4f}")
            print(f"Rate: ${cost_per_hour:.2f}/hour")
            print(f"Avg per request: ${cost['mean']:.4f}")
        
        # Error metrics
        if self.total_requests > 0:
            error_rate = self.total_errors / self.total_requests
            print(f"\n⚠️  Error Rate: {error_rate:.1%}")
        
        # Alerts
        alerts = self.check_alerts()
        if alerts:
            print("\n🚨 ACTIVE ALERTS:")
            print("-" * 40)
            for alert in alerts:
                print(f"  ⚠️  {alert}")
        else:
            print("\n✅ All systems operating normally")
        
        print("\n" + "=" * 60)
    
    async def monitor_operations(self, duration: int = 30):
        '''Monitor AI operations in real-time.'''
        print(f"Starting {duration}-second monitoring session...\n")
        
        # Create monitoring LLM
        callback = self.create_monitoring_callback()
        llm = ChatOpenAI(temperature=0.7, streaming=True, callbacks=[callback])
        
        # Simulate various operations
        operations = [
            "Generate a haiku.",
            "Explain quantum computing.",
            "Write a short story.",
            "Solve a math problem: 15 * 23",
            "Translate 'Hello' to 5 languages."
        ]
        
        start = time.time()
        operation_count = 0
        
        while time.time() - start < duration:
            # Pick random operation
            import random
            operation = random.choice(operations)
            
            # Run operation async
            try:
                await llm.apredict(operation)
                operation_count += 1
            except Exception as e:
                self.total_errors += 1
            
            # Update dashboard
            await self.update_dashboard()
            
            # Wait before next operation
            await asyncio.sleep(random.uniform(0.5, 2.0))
        
        print(f"\n\nMonitoring complete. Processed {operation_count} operations.")
    
    def check_alerts(self) -> List[str]:
        '''Check for alert conditions.'''
        alerts = []
        stats = self.metrics.get_current_stats()
        
        # Check response time
        if "response_time" in stats:
            if stats["response_time"]["current"] > self.alert_thresholds["response_time"]:
                alerts.append(
                    f"High response time: {stats['response_time']['current']:.2f}s > {self.alert_thresholds['response_time']}s"
                )
        
        # Check error rate
        if self.total_requests > 0:
            error_rate = self.total_errors / self.total_requests
            if error_rate > self.alert_thresholds["error_rate"]:
                alerts.append(
                    f"High error rate: {error_rate:.1%} > {self.alert_thresholds['error_rate']:.1%}"
                )
        
        # Check cost
        if "cost" in stats:
            uptime = (datetime.now() - self.start_time).total_seconds()
            if uptime > 0:
                cost_per_hour = (stats["cost"]["total"] / uptime) * 3600
                if cost_per_hour > self.alert_thresholds["cost_per_hour"]:
                    alerts.append(
                        f"High cost rate: ${cost_per_hour:.2f}/hour > ${self.alert_thresholds['cost_per_hour']}/hour"
                    )
        
        return alerts
    
    def export_report(self, format: str = "json") -> str:
        '''Export dashboard report.'''
        report_data = {
            "timestamp": datetime.now().isoformat(),
            "uptime_seconds": (datetime.now() - self.start_time).total_seconds(),
            "total_requests": self.total_requests,
            "total_errors": self.total_errors,
            "active_sessions": self.active_sessions,
            "metrics": self.metrics.get_current_stats(),
            "alerts": self.check_alerts()
        }
        
        if format == "json":
            return json.dumps(report_data, indent=2, default=str)
        
        elif format == "text":
            report = "AI Operations Report\n"
            report += "=" * 40 + "\n"
            report += f"Generated: {report_data['timestamp']}\n"
            report += f"Uptime: {report_data['uptime_seconds']:.0f} seconds\n"
            report += f"Total Requests: {report_data['total_requests']}\n"
            report += f"Total Errors: {report_data['total_errors']}\n"
            report += f"Active Sessions: {report_data['active_sessions']}\n"
            
            if report_data['alerts']:
                report += "\nAlerts:\n"
                for alert in report_data['alerts']:
                    report += f"  - {alert}\n"
            
            return report
        
        return "Unsupported format"

# Test the dashboard
async def test_dashboard():
    dashboard = RealtimeDashboard()
    
    # Run monitoring for a short duration
    await dashboard.monitor_operations(duration=15)
    
    # Export report
    report = dashboard.export_report("text")
    print("\n" + "=" * 60)
    print("\nFinal Report:")
    print(report)

# Run dashboard test
asyncio.run(test_dashboard())
"""

print("Build a real-time AI operations dashboard with monitoring and alerts!")
print("The solution includes metrics collection, live updates, and reporting.")

---

## Summary and Next Steps

Congratulations! You've mastered streaming and real-time operations in LangChain. You can now:

✅ Implement token-by-token streaming from LLMs
✅ Build custom callbacks for monitoring
✅ Create async chains for concurrent processing
✅ Handle real-time WebSocket communication
✅ Build progress tracking systems
✅ Create real-time monitoring dashboards

### Key Takeaways:
- **Streaming Improves UX**: Users see progress as it happens
- **Async Enables Scale**: Process multiple requests concurrently
- **Callbacks Provide Control**: Monitor and react to events
- **Real-time Matters**: Immediate feedback enhances applications
- **Monitoring is Essential**: Track performance and costs

### Next Steps:
- **Notebook 11**: Learn about Advanced Patterns
- **Practice**: Build real-time applications
- **Experiment**: Try different streaming strategies
- **Scale**: Implement WebSocket servers

### Additional Challenges:
1. Build a real-time collaborative text editor with AI assistance
2. Create a live translation system with streaming
3. Implement a real-time code completion service
4. Build a streaming data analysis pipeline
5. Create a multi-modal streaming system (text + audio)