# Observer Pattern for LLM Monitoring

## Interactive demonstration of Observer Pattern in AI/LLM systems

This notebook demonstrates how the Observer Pattern enables real-time monitoring, cost tracking, and performance optimization in LLM applications.

### Key Learning Objectives:
- Understand Observer Pattern implementation
- Build production-ready LLM monitoring system
- Implement real-time cost and performance tracking
- Create extensible monitoring architecture

## 1. Core Observer Pattern Implementation

In [1]:
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import time
import json
from datetime import datetime
from enum import Enum
from dataclasses import dataclass
import random

# Event system for LLM operations
class EventType(Enum):
    """Types of events in LLM operations"""
    CALL_START = "call_start"
    CALL_SUCCESS = "call_success"
    CALL_ERROR = "call_error"
    COST_ALERT = "cost_alert"
    PERFORMANCE_ALERT = "performance_alert"

@dataclass
class LLMEvent:
    """Event data structure for LLM operations"""
    event_type: EventType
    timestamp: datetime
    call_id: str
    data: Dict[str, Any]
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

# Observer interface
class Observer(ABC):
    """Abstract observer interface"""
    
    @abstractmethod
    def update(self, event: LLMEvent) -> None:
        """Handle event notification"""
        pass
    
    def get_name(self) -> str:
        """Get observer name for identification"""
        return self.__class__.__name__

print("✅ Core Observer Pattern classes defined")

✅ Core Observer Pattern classes defined


## 2. LLM Subject (Observable)

In [2]:
class LLMClient:
    """LLM client with observer pattern support"""
    
    def __init__(self):
        self._observers: List[Observer] = []
        self.call_counter = 0
        
        # Model pricing (per 1K tokens)
        self.model_pricing = {
            "gpt-4": 0.03,
            "gpt-3.5-turbo": 0.002,
            "claude-3": 0.025,
            "gemini-pro": 0.001
        }
    
    def attach(self, observer: Observer) -> None:
        """Attach an observer"""
        if observer not in self._observers:
            self._observers.append(observer)
            print(f"📎 Attached observer: {observer.get_name()}")
    
    def detach(self, observer: Observer) -> None:
        """Detach an observer"""
        if observer in self._observers:
            self._observers.remove(observer)
            print(f"📎 Detached observer: {observer.get_name()}")
    
    def notify(self, event: LLMEvent) -> None:
        """Notify all observers of an event"""
        for observer in self._observers:
            try:
                observer.update(event)
            except Exception as e:
                print(f"⚠️ Observer {observer.get_name()} failed: {e}")
    
    def call_llm(self, prompt: str, model: str = "gpt-3.5-turbo", 
                 max_tokens: int = 100) -> str:
        """Simulate LLM API call with comprehensive monitoring"""
        
        self.call_counter += 1
        call_id = f"call_{self.call_counter:04d}"
        
        # Notify call start
        start_event = LLMEvent(
            event_type=EventType.CALL_START,
            timestamp=datetime.now(),
            call_id=call_id,
            data={
                "prompt": prompt[:50] + "..." if len(prompt) > 50 else prompt,
                "model": model,
                "max_tokens": max_tokens
            }
        )
        self.notify(start_event)
        
        start_time = time.time()
        
        try:
            # Simulate API call with realistic delays and occasional errors
            response_time = self._simulate_api_call(model, max_tokens)
            
            # Simulate random errors (5% chance)
            if random.random() < 0.05:
                raise Exception(f"API Error: Rate limit exceeded for {model}")
            
            # Generate mock response
            response = f"Mock {model} response to: '{prompt[:30]}...'"
            actual_tokens = min(max_tokens, len(response.split()) * 1.3)  # Rough token estimate
            cost = self._calculate_cost(model, actual_tokens)
            
            # Notify successful completion
            success_event = LLMEvent(
                event_type=EventType.CALL_SUCCESS,
                timestamp=datetime.now(),
                call_id=call_id,
                data={
                    "response": response,
                    "response_time": response_time,
                    "cost": cost,
                    "tokens_used": actual_tokens,
                    "model": model
                }
            )
            self.notify(success_event)
            
            return response
            
        except Exception as e:
            response_time = time.time() - start_time
            
            # Notify error
            error_event = LLMEvent(
                event_type=EventType.CALL_ERROR,
                timestamp=datetime.now(),
                call_id=call_id,
                data={
                    "error": str(e),
                    "response_time": response_time,
                    "model": model
                }
            )
            self.notify(error_event)
            
            raise e
    
    def _simulate_api_call(self, model: str, tokens: int) -> float:
        """Simulate realistic API response times"""
        base_delay = {
            "gpt-4": 0.8,
            "gpt-3.5-turbo": 0.3,
            "claude-3": 0.5,
            "gemini-pro": 0.4
        }.get(model, 0.5)
        
        # Add token-based delay and some randomness
        delay = base_delay + (tokens / 1000) + random.uniform(0, 0.3)
        time.sleep(delay)
        return delay
    
    def _calculate_cost(self, model: str, tokens: float) -> float:
        """Calculate API call cost"""
        price_per_1k = self.model_pricing.get(model, 0.01)
        return (tokens / 1000) * price_per_1k

print("✅ LLM Client with Observer support created")

✅ LLM Client with Observer support created


## 3. Concrete Observers for LLM Monitoring

In [3]:
class CostMonitor(Observer):
    """Monitor and track API costs with budget controls"""
    
    def __init__(self, daily_budget: float = 50.0, alert_threshold: float = 0.8):
        self.daily_budget = daily_budget
        self.alert_threshold = alert_threshold
        self.daily_cost = 0.0
        self.total_cost = 0.0
        self.cost_by_model = {}
        self.call_costs = []
    
    def update(self, event: LLMEvent) -> None:
        if event.event_type == EventType.CALL_SUCCESS:
            cost = event.data.get('cost', 0)
            model = event.data.get('model')
            
            # Update cost tracking
            self.daily_cost += cost
            self.total_cost += cost
            self.call_costs.append(cost)
            
            # Track by model
            if model not in self.cost_by_model:
                self.cost_by_model[model] = 0
            self.cost_by_model[model] += cost
            
            print(f"💰 Cost: +${cost:.4f} | Daily: ${self.daily_cost:.3f}/{self.daily_budget}")
            
            # Budget alert
            if self.daily_cost > self.daily_budget * self.alert_threshold:
                utilization = (self.daily_cost / self.daily_budget) * 100
                print(f"🚨 BUDGET ALERT: {utilization:.1f}% of daily budget used!")
    
    def get_cost_summary(self) -> Dict[str, Any]:
        """Get comprehensive cost analysis"""
        avg_cost = sum(self.call_costs) / len(self.call_costs) if self.call_costs else 0
        
        return {
            "daily_cost": self.daily_cost,
            "daily_budget": self.daily_budget,
            "budget_utilization_percent": (self.daily_cost / self.daily_budget) * 100,
            "total_cost": self.total_cost,
            "average_cost_per_call": avg_cost,
            "cost_by_model": self.cost_by_model,
            "total_calls": len(self.call_costs)
        }

class PerformanceMonitor(Observer):
    """Monitor response times and performance metrics"""
    
    def __init__(self, slow_threshold: float = 2.0):
        self.slow_threshold = slow_threshold
        self.response_times = []
        self.model_performance = {}
        self.slow_calls = 0
    
    def update(self, event: LLMEvent) -> None:
        if event.event_type in [EventType.CALL_SUCCESS, EventType.CALL_ERROR]:
            response_time = event.data.get('response_time', 0)
            model = event.data.get('model')
            
            self.response_times.append(response_time)
            
            # Track by model
            if model not in self.model_performance:
                self.model_performance[model] = []
            self.model_performance[model].append(response_time)
            
            # Performance feedback
            if response_time > self.slow_threshold:
                self.slow_calls += 1
                print(f"🐌 SLOW: {model} took {response_time:.2f}s (>{self.slow_threshold}s)")
            else:
                print(f"⚡ Fast: {model} responded in {response_time:.2f}s")
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get performance analysis"""
        if not self.response_times:
            return {}
        
        avg_time = sum(self.response_times) / len(self.response_times)
        
        # Calculate model averages
        model_averages = {}
        for model, times in self.model_performance.items():
            model_averages[model] = sum(times) / len(times)
        
        return {
            "average_response_time": avg_time,
            "total_calls": len(self.response_times),
            "slow_calls": self.slow_calls,
            "slow_call_percentage": (self.slow_calls / len(self.response_times)) * 100,
            "fastest_call": min(self.response_times),
            "slowest_call": max(self.response_times),
            "model_averages": model_averages
        }

class ErrorTracker(Observer):
    """Track and categorize errors"""
    
    def __init__(self):
        self.total_errors = 0
        self.error_types = {}
        self.recent_errors = []
        self.error_rate_window = []
    
    def update(self, event: LLMEvent) -> None:
        if event.event_type == EventType.CALL_START:
            # Track calls for error rate calculation
            self.error_rate_window.append(False)
            
        elif event.event_type == EventType.CALL_ERROR:
            self.total_errors += 1
            self.error_rate_window[-1] = True  # Mark last call as error
            
            error_msg = event.data.get('error', 'Unknown error')
            error_type = error_msg.split(':')[0] if ':' in error_msg else 'Unknown'
            
            # Categorize errors
            if error_type not in self.error_types:
                self.error_types[error_type] = 0
            self.error_types[error_type] += 1
            
            # Store recent errors (keep last 10)
            self.recent_errors.append({
                'call_id': event.call_id,
                'error': error_msg,
                'model': event.data.get('model'),
                'timestamp': event.timestamp
            })
            
            if len(self.recent_errors) > 10:
                self.recent_errors.pop(0)
            
            print(f"❌ ERROR: {event.call_id} - {error_msg}")
    
    def get_error_summary(self) -> Dict[str, Any]:
        """Get error analysis"""
        total_calls = len(self.error_rate_window)
        error_rate = (self.total_errors / total_calls * 100) if total_calls > 0 else 0
        
        return {
            "total_errors": self.total_errors,
            "error_rate_percent": error_rate,
            "error_types": self.error_types,
            "recent_errors": self.recent_errors[-3:],  # Last 3 errors
            "total_calls": total_calls
        }

print("✅ Observer implementations created")

✅ Observer implementations created


## 4. Usage Analytics Observer

In [4]:
class UsageAnalytics(Observer):
    """Track usage patterns and trends"""
    
    def __init__(self):
        self.total_calls = 0
        self.successful_calls = 0
        self.model_usage = {}
        self.hourly_usage = {}
        self.prompt_categories = {}
    
    def update(self, event: LLMEvent) -> None:
        if event.event_type == EventType.CALL_START:
            self.total_calls += 1
            model = event.data.get('model')
            
            # Model usage tracking
            if model not in self.model_usage:
                self.model_usage[model] = 0
            self.model_usage[model] += 1
            
            # Hourly usage pattern
            hour = event.timestamp.hour
            if hour not in self.hourly_usage:
                self.hourly_usage[hour] = 0
            self.hourly_usage[hour] += 1
            
            # Basic prompt categorization
            prompt = event.data.get('prompt', '').lower()
            category = self._categorize_prompt(prompt)
            if category not in self.prompt_categories:
                self.prompt_categories[category] = 0
            self.prompt_categories[category] += 1
            
        elif event.event_type == EventType.CALL_SUCCESS:
            self.successful_calls += 1
    
    def _categorize_prompt(self, prompt: str) -> str:
        """Simple prompt categorization"""
        if any(word in prompt for word in ['explain', 'what is', 'define']):
            return 'explanation'
        elif any(word in prompt for word in ['write', 'generate', 'create']):
            return 'generation'
        elif any(word in prompt for word in ['analyze', 'review', 'examine']):
            return 'analysis'
        elif any(word in prompt for word in ['translate', 'convert']):
            return 'translation'
        else:
            return 'other'
    
    def get_usage_summary(self) -> Dict[str, Any]:
        """Get usage analytics"""
        success_rate = (self.successful_calls / self.total_calls * 100) if self.total_calls > 0 else 0
        
        # Find most popular model and hour
        popular_model = max(self.model_usage, key=self.model_usage.get) if self.model_usage else None
        peak_hour = max(self.hourly_usage, key=self.hourly_usage.get) if self.hourly_usage else None
        
        return {
            "total_calls": self.total_calls,
            "successful_calls": self.successful_calls,
            "success_rate_percent": success_rate,
            "model_usage": self.model_usage,
            "most_popular_model": popular_model,
            "peak_hour": peak_hour,
            "prompt_categories": self.prompt_categories
        }

print("✅ Usage Analytics observer created")

✅ Usage Analytics observer created


## 5. Demonstration: Building a Monitored LLM System

In [5]:
def demonstrate_observer_pattern():
    """Comprehensive demonstration of Observer Pattern in LLM systems"""
    
    print("🚀 Setting up LLM Monitoring System")
    print("=" * 50)
    
    # Create LLM client
    llm = LLMClient()
    
    # Create observers
    cost_monitor = CostMonitor(daily_budget=5.0, alert_threshold=0.7)
    perf_monitor = PerformanceMonitor(slow_threshold=1.5)
    error_tracker = ErrorTracker()
    usage_analytics = UsageAnalytics()
    
    # Attach observers
    llm.attach(cost_monitor)
    llm.attach(perf_monitor)
    llm.attach(error_tracker)
    llm.attach(usage_analytics)
    
    print("\n📞 Starting LLM calls with real-time monitoring...\n")
    
    # Test scenarios
    test_scenarios = [
        ("Explain machine learning in simple terms", "gpt-3.5-turbo", 80),
        ("Write a Python function to sort a list", "gpt-4", 120),
        ("Analyze the sentiment of this text: 'I love AI!'", "claude-3", 60),
        ("Generate a creative story about space exploration", "gemini-pro", 200),
        ("Translate 'Hello world' to Spanish", "gpt-3.5-turbo", 30),
        ("What are the benefits of renewable energy?", "gpt-4", 150),
        ("Create a marketing email for a new product", "claude-3", 180),
        ("Explain quantum computing concepts", "gemini-pro", 100)
    ]
    
    successful_calls = 0
    
    for i, (prompt, model, tokens) in enumerate(test_scenarios, 1):
        print(f"\n[Call {i}] Prompt: {prompt[:40]}... | Model: {model}")
        print("-" * 60)
        
        try:
            response = llm.call_llm(prompt, model, tokens)
            print(f"✅ Response: {response[:60]}...")
            successful_calls += 1
        except Exception as e:
            print(f"❌ Failed: {e}")
        
        # Small delay between calls
        time.sleep(0.2)
    
    return llm, cost_monitor, perf_monitor, error_tracker, usage_analytics

# Run demonstration
llm_client, cost_mon, perf_mon, error_track, usage_stats = demonstrate_observer_pattern()

🚀 Setting up LLM Monitoring System
📎 Attached observer: CostMonitor
📎 Attached observer: PerformanceMonitor
📎 Attached observer: ErrorTracker
📎 Attached observer: UsageAnalytics

📞 Starting LLM calls with real-time monitoring...


[Call 1] Prompt: Explain machine learning in simple terms... | Model: gpt-3.5-turbo
------------------------------------------------------------
💰 Cost: +$0.0000 | Daily: $0.000/5.0
⚡ Fast: gpt-3.5-turbo responded in 0.52s
✅ Response: Mock gpt-3.5-turbo response to: 'Explain machine learning in...

[Call 2] Prompt: Write a Python function to sort a list... | Model: gpt-4
------------------------------------------------------------
💰 Cost: +$0.0004 | Daily: $0.000/5.0
⚡ Fast: gpt-4 responded in 1.03s
✅ Response: Mock gpt-4 response to: 'Write a Python function to sor...'...

[Call 3] Prompt: Analyze the sentiment of this text: 'I l... | Model: claude-3
------------------------------------------------------------
💰 Cost: +$0.0003 | Daily: $0.001/5.0
⚡ Fast: cla

## 6. Comprehensive Monitoring Dashboard

In [6]:
def display_monitoring_dashboard():
    """Display comprehensive monitoring dashboard"""
    
    print("\n" + "=" * 70)
    print("📊 LLM MONITORING DASHBOARD")
    print("=" * 70)
    
    # Cost Analysis
    print("\n💰 COST ANALYSIS")
    print("-" * 30)
    cost_data = cost_mon.get_cost_summary()
    print(f"Daily Cost: ${cost_data['daily_cost']:.3f} / ${cost_data['daily_budget']:.2f}")
    print(f"Budget Utilization: {cost_data['budget_utilization_percent']:.1f}%")
    print(f"Average Cost/Call: ${cost_data['average_cost_per_call']:.4f}")
    print(f"Total Calls: {cost_data['total_calls']}")
    print("\nCost by Model:")
    for model, cost in cost_data['cost_by_model'].items():
        print(f"  {model}: ${cost:.4f}")
    
    # Performance Analysis
    print("\n⚡ PERFORMANCE ANALYSIS")
    print("-" * 30)
    perf_data = perf_mon.get_performance_summary()
    if perf_data:
        print(f"Average Response Time: {perf_data['average_response_time']:.2f}s")
        print(f"Slow Calls: {perf_data['slow_calls']}/{perf_data['total_calls']} ({perf_data['slow_call_percentage']:.1f}%)")
        print(f"Fastest Call: {perf_data['fastest_call']:.2f}s")
        print(f"Slowest Call: {perf_data['slowest_call']:.2f}s")
        print("\nModel Performance:")
        for model, avg_time in perf_data['model_averages'].items():
            print(f"  {model}: {avg_time:.2f}s avg")
    
    # Error Analysis
    print("\n❌ ERROR ANALYSIS")
    print("-" * 30)
    error_data = error_track.get_error_summary()
    print(f"Total Errors: {error_data['total_errors']}")
    print(f"Error Rate: {error_data['error_rate_percent']:.1f}%")
    if error_data['error_types']:
        print("Error Types:")
        for error_type, count in error_data['error_types'].items():
            print(f"  {error_type}: {count}")
    
    # Usage Analytics
    print("\n📈 USAGE ANALYTICS")
    print("-" * 30)
    usage_data = usage_stats.get_usage_summary()
    print(f"Total Calls: {usage_data['total_calls']}")
    print(f"Success Rate: {usage_data['success_rate_percent']:.1f}%")
    print(f"Most Popular Model: {usage_data['most_popular_model']}")
    print(f"Peak Hour: {usage_data['peak_hour']}:00")
    
    print("\nModel Usage Distribution:")
    for model, count in usage_data['model_usage'].items():
        percentage = (count / usage_data['total_calls']) * 100
        print(f"  {model}: {count} calls ({percentage:.1f}%)")
    
    print("\nPrompt Categories:")
    for category, count in usage_data['prompt_categories'].items():
        percentage = (count / usage_data['total_calls']) * 100
        print(f"  {category}: {count} calls ({percentage:.1f}%)")
    
    print("\n" + "=" * 70)

# Display the dashboard
display_monitoring_dashboard()


📊 LLM MONITORING DASHBOARD

💰 COST ANALYSIS
------------------------------
Daily Cost: $0.001 / $5.00
Budget Utilization: 0.0%
Average Cost/Call: $0.0002
Total Calls: 8

Cost by Model:
  gpt-3.5-turbo: $0.0000
  gpt-4: $0.0008
  claude-3: $0.0006
  gemini-pro: $0.0000

⚡ PERFORMANCE ANALYSIS
------------------------------
Average Response Time: 0.78s
Slow Calls: 0/8 (0.0%)
Fastest Call: 0.52s
Slowest Call: 1.15s

Model Performance:
  gpt-3.5-turbo: 0.54s avg
  gpt-4: 1.09s avg
  claude-3: 0.84s avg
  gemini-pro: 0.65s avg

❌ ERROR ANALYSIS
------------------------------
Total Errors: 0
Error Rate: 0.0%

📈 USAGE ANALYTICS
------------------------------
Total Calls: 8
Success Rate: 100.0%
Most Popular Model: gpt-3.5-turbo
Peak Hour: 22:00

Model Usage Distribution:
  gpt-3.5-turbo: 2 calls (25.0%)
  gpt-4: 2 calls (25.0%)
  claude-3: 2 calls (25.0%)
  gemini-pro: 2 calls (25.0%)

Prompt Categories:
  explanation: 2 calls (25.0%)
  generation: 3 calls (37.5%)
  analysis: 1 calls (12.5%)


## 7. Advanced Observer Features

In [7]:
class AlertManager(Observer):
    """Advanced alerting system with multiple notification channels"""
    
    def __init__(self, cost_threshold: float = 10.0, error_rate_threshold: float = 20.0):
        self.cost_threshold = cost_threshold
        self.error_rate_threshold = error_rate_threshold
        self.alerts_sent = []
        self.recent_errors = []
        self.total_calls = 0
        self.daily_cost = 0.0
    
    def update(self, event: LLMEvent) -> None:
        if event.event_type == EventType.CALL_START:
            self.total_calls += 1
            
        elif event.event_type == EventType.CALL_SUCCESS:
            cost = event.data.get('cost', 0)
            self.daily_cost += cost
            
            # Cost threshold alert
            if self.daily_cost > self.cost_threshold:
                self._send_alert(f"🚨 COST ALERT: Daily spend ${self.daily_cost:.2f} exceeded threshold ${self.cost_threshold}")
                
        elif event.event_type == EventType.CALL_ERROR:
            self.recent_errors.append(event.timestamp)
            
            # Keep only last 10 errors for rate calculation
            if len(self.recent_errors) > 10:
                self.recent_errors.pop(0)
            
            # Error rate alert (if we have enough data)
            if self.total_calls >= 5:
                error_rate = (len(self.recent_errors) / min(self.total_calls, 10)) * 100
                if error_rate > self.error_rate_threshold:
                    self._send_alert(f"🚨 ERROR RATE ALERT: {error_rate:.1f}% error rate detected")
    
    def _send_alert(self, message: str) -> None:
        """Send alert (in real system: email, Slack, SMS, etc.)"""
        alert = {
            'timestamp': datetime.now(),
            'message': message
        }
        self.alerts_sent.append(alert)
        print(f"🔔 ALERT: {message}")
    
    def get_alerts_summary(self) -> Dict[str, Any]:
        return {
            'total_alerts': len(self.alerts_sent),
            'recent_alerts': self.alerts_sent[-3:] if self.alerts_sent else []
        }

class ConfigurableObserver(Observer):
    """Observer with configurable filtering and processing"""
    
    def __init__(self, name: str, event_filter=None, processor=None):
        self.name = name
        self.event_filter = event_filter or (lambda event: True)
        self.processor = processor or (lambda event: print(f"[{self.name}] {event.event_type.value}"))
        self.processed_events = 0
    
    def update(self, event: LLMEvent) -> None:
        if self.event_filter(event):
            self.processor(event)
            self.processed_events += 1
    
    def get_name(self) -> str:
        return f"ConfigurableObserver({self.name})"

# Demonstrate advanced observers
print("🔧 Testing Advanced Observer Features")
print("=" * 40)

# Create alert manager
alert_manager = AlertManager(cost_threshold=0.1, error_rate_threshold=15.0)
llm_client.attach(alert_manager)

# Create configurable observers
# Observer that only tracks GPT-4 calls
gpt4_only = ConfigurableObserver(
    "GPT-4 Tracker",
    event_filter=lambda e: e.data.get('model') == 'gpt-4',
    processor=lambda e: print(f"🎯 GPT-4 Event: {e.event_type.value}")
)
llm_client.attach(gpt4_only)

# Observer that only tracks expensive calls
expensive_calls = ConfigurableObserver(
    "Expensive Call Tracker", 
    event_filter=lambda e: e.event_type == EventType.CALL_SUCCESS and e.data.get('cost', 0) > 0.005,
    processor=lambda e: print(f"💸 Expensive call: ${e.data.get('cost', 0):.4f}")
)
llm_client.attach(expensive_calls)

# Test with a few more calls
print("\n🧪 Testing with additional calls...")
test_calls = [
    ("Complex analysis task", "gpt-4", 300),
    ("Simple question", "gpt-3.5-turbo", 20),
    ("Another complex task", "gpt-4", 250)
]

for prompt, model, tokens in test_calls:
    try:
        llm_client.call_llm(prompt, model, tokens)
    except Exception as e:
        pass  # Continue even if there are errors
    time.sleep(0.1)

# Show alert summary
alert_summary = alert_manager.get_alerts_summary()
print(f"\n📊 Alert Summary: {alert_summary['total_alerts']} alerts sent")
print(f"🎯 GPT-4 Events Processed: {gpt4_only.processed_events}")
print(f"💸 Expensive Calls Tracked: {expensive_calls.processed_events}")

print("\n✅ Advanced Observer features demonstrated!")

🔧 Testing Advanced Observer Features
📎 Attached observer: AlertManager
📎 Attached observer: ConfigurableObserver(GPT-4 Tracker)
📎 Attached observer: ConfigurableObserver(Expensive Call Tracker)

🧪 Testing with additional calls...
🎯 GPT-4 Event: call_start
💰 Cost: +$0.0003 | Daily: $0.002/5.0
⚡ Fast: gpt-4 responded in 1.30s
🎯 GPT-4 Event: call_success
💰 Cost: +$0.0000 | Daily: $0.002/5.0
⚡ Fast: gpt-3.5-turbo responded in 0.44s
🎯 GPT-4 Event: call_start
💰 Cost: +$0.0003 | Daily: $0.002/5.0
⚡ Fast: gpt-4 responded in 1.07s
🎯 GPT-4 Event: call_success

📊 Alert Summary: 0 alerts sent
🎯 GPT-4 Events Processed: 4
💸 Expensive Calls Tracked: 0

✅ Advanced Observer features demonstrated!


## 8. Observer Pattern Benefits & Best Practices

### Key Benefits Demonstrated:

1. **Separation of Concerns**: Monitoring logic is completely separate from LLM logic
2. **Extensibility**: Easy to add new types of monitoring without changing existing code
3. **Real-time Feedback**: Immediate insights into cost, performance, and errors
4. **Configurability**: Different observers for different needs and environments
5. **Production Ready**: Handles errors gracefully, doesn't affect core functionality

### Best Practices Applied:

- **Error Isolation**: Observer failures don't crash the system
- **Weak Coupling**: Observers don't depend on each other
- **Event-Driven**: Clean event structure for extensibility
- **Performance Conscious**: Minimal overhead in observer notifications
- **Configurable**: Easy to enable/disable different monitoring aspects

### Real-World Applications:

This pattern is essential for:
- **Production LLM APIs**: Cost control and performance monitoring
- **Multi-Agent Systems**: Coordination and state synchronization
- **Training Pipelines**: Progress tracking and metric collection
- **AI Product Analytics**: Usage patterns and optimization insights

## 9. Integration with Real LLM APIs

Here's how you would integrate this with actual LLM services:

In [8]:
# Example integration with OpenAI API
class ProductionLLMClient(LLMClient):
    """Production LLM client with Observer pattern integration"""
    
    def __init__(self, api_key: str = None):
        super().__init__()
        # self.openai_client = OpenAI(api_key=api_key)  # Uncomment for real usage
        print("🔌 Production LLM Client initialized (using mock for demo)")
    
    def call_llm(self, prompt: str, model: str = "gpt-3.5-turbo", 
                 max_tokens: int = 100) -> str:
        """Production LLM call with full observability"""
        
        call_id = f"prod_call_{self.call_counter:04d}"
        self.call_counter += 1
        
        # Notify call start
        start_event = LLMEvent(
            event_type=EventType.CALL_START,
            timestamp=datetime.now(),
            call_id=call_id,
            data={
                "prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
                "model": model,
                "max_tokens": max_tokens
            }
        )
        self.notify(start_event)
        
        start_time = time.time()
        
        try:
            # In production, this would be:
            # response = self.openai_client.chat.completions.create(
            #     model=model,
            #     messages=[{"role": "user", "content": prompt}],
            #     max_tokens=max_tokens
            # )
            
            # For demo, simulate the call
            time.sleep(0.5)  # Simulate API delay
            response_text = f"Production {model} response to: {prompt[:50]}..."
            response_time = time.time() - start_time
            
            # Calculate actual costs based on real API pricing
            actual_tokens = max_tokens  # In production: response.usage.total_tokens
            cost = self._calculate_cost(model, actual_tokens)
            
            # Notify success
            success_event = LLMEvent(
                event_type=EventType.CALL_SUCCESS,
                timestamp=datetime.now(),
                call_id=call_id,
                data={
                    "response": response_text,
                    "response_time": response_time,
                    "cost": cost,
                    "tokens_used": actual_tokens,
                    "model": model
                }
            )
            self.notify(success_event)
            
            return response_text
            
        except Exception as e:
            response_time = time.time() - start_time
            
            error_event = LLMEvent(
                event_type=EventType.CALL_ERROR,
                timestamp=datetime.now(),
                call_id=call_id,
                data={
                    "error": str(e),
                    "response_time": response_time,
                    "model": model
                }
            )
            self.notify(error_event)
            raise e

# Example usage
print("🏭 Production Integration Example")
prod_client = ProductionLLMClient()

# Attach the same observers
prod_client.attach(CostMonitor(daily_budget=100.0))
prod_client.attach(PerformanceMonitor())

# Test production client
try:
    response = prod_client.call_llm(
        "Explain the Observer Pattern in the context of distributed systems",
        "gpt-4",
        150
    )
    print(f"✅ Production response: {response[:60]}...")
except Exception as e:
    print(f"❌ Production error: {e}")

print("\n🎯 Observer Pattern demonstration complete!")
print("The same monitoring system works seamlessly with production APIs.")

🏭 Production Integration Example
🔌 Production LLM Client initialized (using mock for demo)
📎 Attached observer: CostMonitor
📎 Attached observer: PerformanceMonitor
💰 Cost: +$0.0045 | Daily: $0.004/100.0
⚡ Fast: gpt-4 responded in 0.50s
✅ Production response: Production gpt-4 response to: Explain the Observer Pattern i...

🎯 Observer Pattern demonstration complete!
The same monitoring system works seamlessly with production APIs.
