# Chapter 15: Building Stateful Agents - Solutions
**From: Zero to AI Agent**

**Try the exercises in the main notebook first before viewing solutions!**

---
## Section 15.1 Solutions

### Exercise 15.1.1: Conversation Counter

In [None]:
# File: exercise_1_15_1_solution.py

"""
Agent that counts user visits and greets accordingly.
"""

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class VisitState(TypedDict):
    visit_count: int
    greeting: str

def greet_user(state: VisitState) -> dict:
    """Generate appropriate greeting based on visit count."""
    count = state["visit_count"] + 1
    
    if count == 1:
        greeting = "Welcome! This is your first visit."
    elif count == 2:
        greeting = "Welcome back! Good to see you again."
    else:
        greeting = f"Hello again! This is visit #{count}."
    
    return {"visit_count": count, "greeting": greeting}

# Build graph
graph = StateGraph(VisitState)
graph.add_node("greet", greet_user)
graph.add_edge(START, "greet")
graph.add_edge("greet", END)

app = graph.compile(checkpointer=MemorySaver())

# Simulate multiple visits
config = {"configurable": {"thread_id": "user-123"}}

print("=== Conversation Counter ===\n")
for i in range(4):
    result = app.invoke({"visit_count": 0, "greeting": ""}, config)
    print(f"Visit {i+1}: {result['greeting']}")


### Exercise 15.1.2: Multi-User Tracker

In [None]:
# File: exercise_2_15_1_solution.py

"""
Multi-user agent with separate state per user.
"""

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class UserState(TypedDict):
    user_name: str
    visit_count: int

def track_visit(state: UserState) -> dict:
    return {"visit_count": state["visit_count"] + 1}

# Build graph
graph = StateGraph(UserState)
graph.add_node("track", track_visit)
graph.add_edge(START, "track")
graph.add_edge("track", END)

app = graph.compile(checkpointer=MemorySaver())

def visit(user_name: str) -> int:
    """Simulate a user visit, return their visit count."""
    config = {"configurable": {"thread_id": f"user-{user_name}"}}
    result = app.invoke({"user_name": user_name, "visit_count": 0}, config)
    return result["visit_count"]

# Simulate visits from different users
print("=== Multi-User Tracker ===\n")

# Alice visits 3 times
for i in range(3):
    count = visit("alice")
    print(f"Alice visit: count = {count}")

print()

# Bob visits 2 times
for i in range(2):
    count = visit("bob")
    print(f"Bob visit: count = {count}")

print()

# Alice visits again - should continue from 3
count = visit("alice")
print(f"Alice visit: count = {count} (continued from before!)")


### Exercise 15.1.3: State History Explorer

In [None]:
# File: exercise_3_15_1_solution.py

"""
Explore state history through a multi-node workflow.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class WorkflowState(TypedDict):
    steps_completed: Annotated[list[str], add]
    current_value: int

def step_one(state: WorkflowState) -> dict:
    return {
        "steps_completed": ["step_one"],
        "current_value": 10
    }

def step_two(state: WorkflowState) -> dict:
    return {
        "steps_completed": ["step_two"],
        "current_value": state["current_value"] * 2
    }

def step_three(state: WorkflowState) -> dict:
    return {
        "steps_completed": ["step_three"],
        "current_value": state["current_value"] + 5
    }

# Build graph
graph = StateGraph(WorkflowState)
graph.add_node("one", step_one)
graph.add_node("two", step_two)
graph.add_node("three", step_three)
graph.add_edge(START, "one")
graph.add_edge("one", "two")
graph.add_edge("two", "three")
graph.add_edge("three", END)

app = graph.compile(checkpointer=MemorySaver())

# Run the workflow
config = {"configurable": {"thread_id": "workflow-1"}}
result = app.invoke({"steps_completed": [], "current_value": 0}, config)

# Explore the history
print("=== State History Explorer ===\n")
print("Final result:", result)
print("\n--- History (newest first) ---\n")

for i, snapshot in enumerate(app.get_state_history(config)):
    print(f"Snapshot {i}:")
    print(f"  Steps: {snapshot.values.get('steps_completed', [])}")
    print(f"  Value: {snapshot.values.get('current_value', 'N/A')}")
    print(f"  Next: {snapshot.next}")
    print()


---
## Section 15.2 Solutions

### Exercise 15.2.1: Validated User Profile

In [None]:
# File: exercise_1_15_2_solution.py

"""
Validated user profile with Pydantic.
"""

from pydantic import BaseModel, Field, field_validator
from typing import Optional
from enum import Enum
import re

class MembershipLevel(str, Enum):
    FREE = "free"
    BASIC = "basic"
    PREMIUM = "premium"

class UserProfile(BaseModel):
    username: str = Field(min_length=3, max_length=20)
    email: str
    age: Optional[int] = Field(default=None, ge=13, le=120)
    membership: MembershipLevel = MembershipLevel.FREE
    
    @field_validator('username')
    @classmethod
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9]+$', v):
            raise ValueError('Username must be alphanumeric')
        return v.lower()
    
    @field_validator('email')
    @classmethod
    def email_valid(cls, v):
        if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w{2,}$', v):
            raise ValueError('Invalid email format')
        return v.lower()

# Test valid profiles
print("=== Valid Profiles ===")
valid = UserProfile(username="Alice123", email="alice@test.com", age=25)
print(f"‚úì {valid.username}, {valid.email}, {valid.membership.value}")

minimal = UserProfile(username="bob", email="bob@test.com")
print(f"‚úì {minimal.username}, age={minimal.age}")

# Test invalid profiles
print("\n=== Invalid Profiles ===")
test_cases = [
    ({"username": "ab", "email": "a@b.com"}, "too short"),
    ({"username": "test_user", "email": "a@b.com"}, "underscore"),
    ({"username": "test", "email": "notanemail"}, "bad email"),
    ({"username": "test", "email": "a@b.com", "age": 10}, "too young"),
]

for data, reason in test_cases:
    try:
        UserProfile(**data)
        print(f"‚úó Should have failed: {reason}")
    except Exception as e:
        print(f"‚úì Caught ({reason})")


### Exercise 15.2.2: Chat Message Validation

In [None]:
# File: exercise_2_15_2_solution.py

"""
LangGraph node with Pydantic message validation.
"""

from typing import TypedDict, Annotated
from operator import add
from pydantic import BaseModel, Field, field_validator
from langgraph.graph import StateGraph, START, END

# Pydantic model for validation
class Message(BaseModel):
    role: str
    content: str
    
    @field_validator('role')
    @classmethod
    def valid_role(cls, v):
        if v not in ('user', 'assistant'):
            raise ValueError('Role must be "user" or "assistant"')
        return v
    
    @field_validator('content')
    @classmethod
    def content_not_empty(cls, v):
        if not v or not v.strip():
            raise ValueError('Content cannot be empty')
        return v.strip()

# TypedDict for LangGraph
class ChatState(TypedDict):
    messages: Annotated[list[dict], add]
    last_error: str

def validate_and_add_message(state: ChatState) -> dict:
    """Validate incoming message and add to state."""
    # Simulate incoming raw message
    raw_message = {"role": "user", "content": "  Hello there!  "}
    
    try:
        validated = Message(**raw_message)
        return {
            "messages": [validated.model_dump()],
            "last_error": ""
        }
    except Exception as e:
        return {
            "last_error": str(e)
        }

# Build and test
graph = StateGraph(ChatState)
graph.add_node("validate", validate_and_add_message)
graph.add_edge(START, "validate")
graph.add_edge("validate", END)
app = graph.compile()

result = app.invoke({"messages": [], "last_error": ""})
print("=== Message Validation ===")
print(f"Messages: {result['messages']}")
print(f"Notice: content was trimmed!")


### Exercise 15.2.3: Order State Schema

In [None]:
# File: exercise_3_15_2_solution.py

"""
Order processing state with comprehensive validation.
"""

from typing import TypedDict
from pydantic import BaseModel, Field, model_validator
from enum import Enum

class OrderStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

class OrderItem(BaseModel):
    name: str = Field(min_length=1)
    quantity: int = Field(gt=0)
    price: float = Field(gt=0)
    
    @property
    def subtotal(self) -> float:
        return self.quantity * self.price

class Order(BaseModel):
    order_id: str
    items: list[OrderItem] = Field(min_length=1)
    total: float = Field(gt=0)
    status: OrderStatus = OrderStatus.PENDING
    
    @model_validator(mode='after')
    def validate_total(self):
        calculated = sum(item.subtotal for item in self.items)
        if abs(self.total - calculated) > 0.01:
            raise ValueError(f'Total {self.total} != calculated {calculated}')
        return self

# TypedDict for LangGraph
class OrderState(TypedDict):
    order: dict
    status_history: list[str]

# Test
print("=== Order Validation ===\n")

# Valid order
valid = Order(
    order_id="ORD-001",
    items=[
        {"name": "Widget", "quantity": 2, "price": 10.00},
        {"name": "Gadget", "quantity": 1, "price": 25.00}
    ],
    total=45.00  # 2*10 + 1*25 = 45 ‚úì
)
print(f"‚úì Valid order: {valid.order_id}, ${valid.total}")

# Invalid: wrong total
try:
    Order(
        order_id="ORD-002",
        items=[{"name": "Item", "quantity": 2, "price": 10.00}],
        total=100.00  # Should be 20!
    )
except Exception as e:
    print(f"‚úì Caught bad total: {e}")

# Invalid: zero quantity
try:
    Order(
        order_id="ORD-003",
        items=[{"name": "Item", "quantity": 0, "price": 10.00}],
        total=0
    )
except Exception as e:
    print(f"‚úì Caught zero quantity")


---
## Section 15.3 Solutions

### Exercise 15.3.1: Deduplicating Reducer

In [None]:
# File: exercise_1_15_3_solution.py

"""
Custom reducer that accumulates but removes duplicates.
"""

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END

def dedupe_messages(existing: list, new: list) -> list:
    """Accumulate messages, removing duplicates by content."""
    result = existing.copy()
    seen = {msg["content"] for msg in existing}
    
    for msg in new:
        if msg["content"] not in seen:
            result.append(msg)
            seen.add(msg["content"])
    
    return result

class ChatState(TypedDict):
    messages: Annotated[list[dict], dedupe_messages]

def greeting(state):
    return {"messages": [
        {"role": "system", "content": "Welcome!"},
        {"role": "assistant", "content": "How can I help?"}
    ]}

def help_prompt(state):
    return {"messages": [
        {"role": "assistant", "content": "How can I help?"},  # Duplicate!
        {"role": "assistant", "content": "I'm here to assist."}
    ]}

# Build graph
graph = StateGraph(ChatState)
graph.add_node("greet", greeting)
graph.add_node("help", help_prompt)
graph.add_edge(START, "greet")
graph.add_edge("greet", "help")
graph.add_edge("help", END)

app = graph.compile()
result = app.invoke({"messages": []})

print("=== Dedupe Reducer ===")
print(f"Total messages: {len(result['messages'])}")
for msg in result['messages']:
    print(f"  - {msg['content']}")
print("\n'How can I help?' appears only once!")


### Exercise 15.3.2: Priority Queue Reducer

In [None]:
# File: exercise_2_15_3_solution.py

"""
Reducer that maintains a priority-sorted task queue.
"""

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END

def priority_queue(existing: list, new: list) -> list:
    """Merge tasks, keeping highest priority first."""
    combined = existing + new
    return sorted(combined, key=lambda x: x["priority"], reverse=True)

class TaskState(TypedDict):
    tasks: Annotated[list[dict], priority_queue]

def add_normal_tasks(state):
    return {"tasks": [
        {"task": "Write docs", "priority": 5},
        {"task": "Code review", "priority": 6}
    ]}

def add_urgent_task(state):
    return {"tasks": [
        {"task": "Fix critical bug", "priority": 10}
    ]}

def add_low_priority(state):
    return {"tasks": [
        {"task": "Update readme", "priority": 2}
    ]}

# Build graph
graph = StateGraph(TaskState)
graph.add_node("normal", add_normal_tasks)
graph.add_node("urgent", add_urgent_task)
graph.add_node("low", add_low_priority)
graph.add_edge(START, "normal")
graph.add_edge("normal", "urgent")
graph.add_edge("urgent", "low")
graph.add_edge("low", END)

app = graph.compile()
result = app.invoke({"tasks": []})

print("=== Priority Queue ===")
for task in result['tasks']:
    print(f"  [{task['priority']:2d}] {task['task']}")


### Exercise 15.3.3: Change Tracker

In [None]:
# File: exercise_3_15_3_solution.py

"""
State that tracks all changes in a changelog.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from datetime import datetime

class TrackedState(TypedDict):
    # Regular fields
    counter: int
    status: str
    # Changelog accumulates
    changelog: Annotated[list[dict], add]

def make_change_entry(node_name: str, changes: dict) -> dict:
    """Create a changelog entry."""
    return {
        "node": node_name,
        "time": datetime.now().strftime("%H:%M:%S"),
        "changes": changes
    }

def initialize(state):
    changes = {"counter": "0 ‚Üí 1", "status": "‚Üí initialized"}
    return {
        "counter": 1,
        "status": "initialized",
        "changelog": [make_change_entry("initialize", changes)]
    }

def process(state):
    new_counter = state["counter"] + 10
    changes = {"counter": f"{state['counter']} ‚Üí {new_counter}"}
    return {
        "counter": new_counter,
        "changelog": [make_change_entry("process", changes)]
    }

def finalize(state):
    changes = {"status": f"{state['status']} ‚Üí complete"}
    return {
        "status": "complete",
        "changelog": [make_change_entry("finalize", changes)]
    }

# Build graph
graph = StateGraph(TrackedState)
graph.add_node("init", initialize)
graph.add_node("proc", process)
graph.add_node("final", finalize)
graph.add_edge(START, "init")
graph.add_edge("init", "proc")
graph.add_edge("proc", "final")
graph.add_edge("final", END)

app = graph.compile()
result = app.invoke({"counter": 0, "status": "", "changelog": []})

print("=== Change Tracker ===")
print(f"\nFinal state: counter={result['counter']}, status={result['status']}")
print("\nChangelog:")
for entry in result['changelog']:
    print(f"  [{entry['time']}] {entry['node']}: {entry['changes']}")


---
## Section 15.4 Solutions

### Exercise 15.4.1: Multi-User Chat System

In [None]:
# File: exercise_1_15_4_solution.py

"""
Multi-user chat system with SQLite persistence.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from datetime import datetime
import sqlite3

class ChatState(TypedDict):
    messages: Annotated[list[dict], add]
    user_id: str
    last_activity: str

def update_activity(state: ChatState) -> dict:
    return {"last_activity": datetime.now().isoformat()}

# Build minimal graph
graph = StateGraph(ChatState)
graph.add_node("update", update_activity)
graph.add_edge(START, "update")
graph.add_edge("update", END)

DB_PATH = "multiuser_chat.db"

def send_message(user_id: str, content: str, role: str = "user"):
    """Send a message in a user's chat."""
    with SqliteSaver.from_conn_string(DB_PATH) as saver:
        app = graph.compile(checkpointer=saver)
        thread_id = f"chat:{user_id}"
        config = {"configurable": {"thread_id": thread_id}}
        
        # Load or create state
        try:
            current = app.get_state(config).values or {}
        except:
            current = {}
        
        state = {
            "messages": current.get("messages", []) + [{"role": role, "content": content}],
            "user_id": user_id,
            "last_activity": ""
        }
        
        return app.invoke(state, config)

def list_user_conversations(user_id: str):
    """List all conversations for a user."""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Find threads for this user
    cursor.execute(
        "SELECT DISTINCT thread_id FROM checkpoints WHERE thread_id LIKE ?",
        (f"chat:{user_id}%",)
    )
    
    conversations = []
    with SqliteSaver.from_conn_string(DB_PATH) as saver:
        app = graph.compile(checkpointer=saver)
        
        for (thread_id,) in cursor.fetchall():
            config = {"configurable": {"thread_id": thread_id}}
            state = app.get_state(config)
            
            if state.values:
                conversations.append({
                    "thread_id": thread_id,
                    "message_count": len(state.values.get("messages", [])),
                    "last_activity": state.values.get("last_activity", "Unknown")
                })
    
    conn.close()
    return conversations

# Demo
print("=== Multi-User Chat ===\n")

# Alice sends messages
send_message("alice", "Hello!")
send_message("alice", "How are you?")
send_message("alice", "Great, thanks!", "assistant")

# Bob sends messages
send_message("bob", "Hi there")

# List Alice's conversations
print("Alice's conversations:")
for conv in list_user_conversations("alice"):
    print(f"  {conv['thread_id']}: {conv['message_count']} messages")

print("\nBob's conversations:")
for conv in list_user_conversations("bob"):
    print(f"  {conv['thread_id']}: {conv['message_count']} messages")


### Exercise 15.4.2: Checkpoint Cleanup Utility

In [None]:
# File: exercise_2_15_4_solution.py

"""
Checkpoint cleanup and maintenance utility.
"""

import sqlite3
import os
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class DemoState(TypedDict):
    counter: int

def increment(state: DemoState) -> dict:
    return {"counter": state["counter"] + 1}

# Build graph for creating test data
graph = StateGraph(DemoState)
graph.add_node("inc", increment)
graph.add_edge(START, "inc")
graph.add_edge("inc", END)

DB_PATH = "cleanup_demo.db"

def get_stats(db_path: str) -> dict:
    """Get checkpoint statistics."""
    if not os.path.exists(db_path):
        return {"total": 0, "threads": 0, "size_bytes": 0}
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("SELECT COUNT(*) FROM checkpoints")
    total = cursor.fetchone()[0]
    
    cursor.execute("SELECT COUNT(DISTINCT thread_id) FROM checkpoints")
    threads = cursor.fetchone()[0]
    
    conn.close()
    
    return {
        "total": total,
        "threads": threads,
        "size_bytes": os.path.getsize(db_path)
    }

def cleanup(db_path: str, keep_per_thread: int = 3) -> int:
    """Remove old checkpoints, keeping N most recent per thread."""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("SELECT DISTINCT thread_id FROM checkpoints")
    threads = [row[0] for row in cursor.fetchall()]
    
    deleted = 0
    for thread_id in threads:
        cursor.execute("""
            SELECT checkpoint_id FROM checkpoints 
            WHERE thread_id = ? ORDER BY checkpoint_id DESC
        """, (thread_id,))
        
        checkpoints = [row[0] for row in cursor.fetchall()]
        
        for cp_id in checkpoints[keep_per_thread:]:
            cursor.execute("DELETE FROM checkpoints WHERE checkpoint_id = ?", (cp_id,))
            deleted += 1
    
    conn.commit()
    conn.execute("VACUUM")  # Reclaim space
    conn.close()
    
    return deleted

# Create test data
print("=== Checkpoint Cleanup Utility ===\n")
print("Creating test data...")

with SqliteSaver.from_conn_string(DB_PATH) as saver:
    app = graph.compile(checkpointer=saver)
    
    for thread_num in range(3):
        config = {"configurable": {"thread_id": f"thread-{thread_num}"}}
        state = {"counter": 0}
        for _ in range(10):
            state = app.invoke(state, config)

# Show before stats
print("\n--- Before Cleanup ---")
before = get_stats(DB_PATH)
print(f"Checkpoints: {before['total']}")
print(f"Threads: {before['threads']}")
print(f"Size: {before['size_bytes']:,} bytes")

# Run cleanup
deleted = cleanup(DB_PATH, keep_per_thread=2)
print(f"\n--- Cleanup ---")
print(f"Deleted: {deleted} checkpoints")

# Show after stats
print("\n--- After Cleanup ---")
after = get_stats(DB_PATH)
print(f"Checkpoints: {after['total']}")
print(f"Size: {after['size_bytes']:,} bytes")
print(f"Space saved: {before['size_bytes'] - after['size_bytes']:,} bytes")


### Exercise 15.4.3: Conversation Export Tool

In [None]:
# File: exercise_3_15_4_solution.py

"""
Conversation export/import and forking utility.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from datetime import datetime
import json

class ConvoState(TypedDict):
    messages: Annotated[list[dict], add]
    metadata: dict

def process(state: ConvoState) -> dict:
    return {"metadata": {**state.get("metadata", {}), "updated": datetime.now().isoformat()}}

graph = StateGraph(ConvoState)
graph.add_node("process", process)
graph.add_edge(START, "process")
graph.add_edge("process", END)

DB_PATH = "export_demo.db"

def export_conversation(thread_id: str, output_file: str):
    """Export conversation to JSON file."""
    with SqliteSaver.from_conn_string(DB_PATH) as saver:
        app = graph.compile(checkpointer=saver)
        config = {"configurable": {"thread_id": thread_id}}
        
        state = app.get_state(config)
        if not state.values:
            raise ValueError(f"Thread not found: {thread_id}")
        
        export_data = {
            "thread_id": thread_id,
            "exported_at": datetime.now().isoformat(),
            "state": state.values
        }
        
        with open(output_file, 'w') as f:
            json.dump(export_data, f, indent=2, default=str)
        
        return export_data

def import_conversation(input_file: str, new_thread_id: str):
    """Import conversation from JSON file."""
    with open(input_file, 'r') as f:
        data = json.load(f)
    
    state = data["state"]
    state["metadata"] = {
        **state.get("metadata", {}),
        "imported_from": data["thread_id"],
        "imported_at": datetime.now().isoformat()
    }
    
    with SqliteSaver.from_conn_string(DB_PATH) as saver:
        app = graph.compile(checkpointer=saver)
        config = {"configurable": {"thread_id": new_thread_id}}
        return app.invoke(state, config)

def fork_conversation(source_thread: str, new_thread: str):
    """Create a copy of a conversation in a new thread."""
    with SqliteSaver.from_conn_string(DB_PATH) as saver:
        app = graph.compile(checkpointer=saver)
        
        # Load source
        source_config = {"configurable": {"thread_id": source_thread}}
        source_state = app.get_state(source_config).values
        
        # Add fork metadata
        source_state["metadata"] = {
            **source_state.get("metadata", {}),
            "forked_from": source_thread,
            "forked_at": datetime.now().isoformat()
        }
        
        # Save to new thread
        new_config = {"configurable": {"thread_id": new_thread}}
        return app.invoke(source_state, new_config)

# Demo
print("=== Export/Import Tool ===\n")

# Create a conversation
with SqliteSaver.from_conn_string(DB_PATH) as saver:
    app = graph.compile(checkpointer=saver)
    config = {"configurable": {"thread_id": "original"}}
    
    state = {
        "messages": [{"role": "user", "content": "Hello!"}],
        "metadata": {"topic": "greeting"}
    }
    app.invoke(state, config)
    print("Created original conversation")

# Export
export_conversation("original", "backup.json")
print("Exported to backup.json")

# Import to new thread
import_conversation("backup.json", "imported")
print("Imported to 'imported' thread")

# Fork
fork_conversation("original", "forked")
print("Forked to 'forked' thread")

# Verify all exist
print("\n--- All Threads ---")
with SqliteSaver.from_conn_string(DB_PATH) as saver:
    app = graph.compile(checkpointer=saver)
    
    for thread in ["original", "imported", "forked"]:
        config = {"configurable": {"thread_id": thread}}
        state = app.get_state(config)
        print(f"  {thread}: {len(state.values.get('messages', []))} messages")


---
## Section 15.5 Solutions

### Exercise 15.5.1: Smart Retry Decorator

In [None]:
# File: exercise_1_15_5_solution.py

"""
Smart retry decorator with configurable policy and metadata.
"""

from dataclasses import dataclass, field
from functools import wraps
from datetime import datetime
import time
import random

@dataclass
class RetryPolicy:
    """Configurable retry behavior."""
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    retryable_exceptions: tuple = (ConnectionError, TimeoutError)

@dataclass 
class RetryResult:
    """Metadata about retry attempts."""
    success: bool
    value: any = None
    attempts: int = 0
    errors: list = field(default_factory=list)
    total_wait_time: float = 0.0

def smart_retry(policy: RetryPolicy = None):
    """
    Decorator with configurable policy and metadata return.
    
    Returns RetryResult with both the value and retry metadata.
    """
    if policy is None:
        policy = RetryPolicy()
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs) -> RetryResult:
            result = RetryResult(success=False)
            
            for attempt in range(policy.max_attempts):
                result.attempts = attempt + 1
                timestamp = datetime.now().strftime("%H:%M:%S")
                
                try:
                    value = func(*args, **kwargs)
                    result.success = True
                    result.value = value
                    print(f"  [{timestamp}] Attempt {attempt + 1}: Success ‚úì")
                    return result
                    
                except policy.retryable_exceptions as e:
                    result.errors.append({"attempt": attempt + 1, "error": str(e)})
                    print(f"  [{timestamp}] Attempt {attempt + 1}: {e}")
                    
                    if attempt < policy.max_attempts - 1:
                        delay = min(policy.base_delay * (2 ** attempt), policy.max_delay)
                        delay += random.uniform(0, delay * 0.1)
                        result.total_wait_time += delay
                        print(f"  [{timestamp}] Waiting {delay:.1f}s before retry...")
                        time.sleep(delay)
                        
                except Exception as e:
                    # Non-retryable exception - fail immediately
                    result.errors.append({"attempt": attempt + 1, "error": str(e), "retryable": False})
                    print(f"  [{timestamp}] Non-retryable error: {e}")
                    return result
            
            return result
        return wrapper
    return decorator

# Demo
def flaky_operation():
    """Fails 70% of the time."""
    if random.random() < 0.7:
        raise ConnectionError("Service unavailable")
    return "Success!"

# Apply smart retry
aggressive_policy = RetryPolicy(max_attempts=5, base_delay=0.5)

@smart_retry(policy=aggressive_policy)
def reliable_operation():
    return flaky_operation()

print("=== Smart Retry Demo ===\n")

for i in range(3):
    print(f"Run {i + 1}:")
    result = reliable_operation()
    print(f"  Result: {'‚úì' if result.success else '‚úó'}")
    print(f"  Attempts: {result.attempts}, Wait time: {result.total_wait_time:.1f}s\n")


### Exercise 15.5.2: Circuit Breaker Pattern

In [None]:
# File: exercise_2_15_5_solution.py

"""
Circuit breaker pattern to prevent cascading failures.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from datetime import datetime, timedelta
import time
import random

class CircuitBreaker:
    """
    Circuit breaker with three states:
    - CLOSED: Normal operation, calls go through
    - OPEN: Failing, calls blocked immediately  
    - HALF_OPEN: Testing if service recovered
    """
    
    def __init__(self, failure_threshold: int = 3, reset_timeout: float = 10.0):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failures = 0
        self.state = "CLOSED"
        self.opened_at = None
    
    def can_execute(self) -> bool:
        """Check if we should attempt the call."""
        if self.state == "CLOSED":
            return True
            
        if self.state == "OPEN":
            # Check if cooldown period has passed
            if datetime.now() - self.opened_at > timedelta(seconds=self.reset_timeout):
                self.state = "HALF_OPEN"
                print(f"  üîÑ Circuit HALF_OPEN: Testing service...")
                return True
            return False
            
        # HALF_OPEN: allow one test call
        return True
    
    def record_success(self):
        """Record a successful call."""
        self.failures = 0
        if self.state == "HALF_OPEN":
            print(f"  ‚úÖ Circuit CLOSED: Service recovered!")
        self.state = "CLOSED"
    
    def record_failure(self):
        """Record a failed call."""
        self.failures += 1
        
        if self.state == "HALF_OPEN":
            # Failed during test - reopen
            self.state = "OPEN"
            self.opened_at = datetime.now()
            print(f"  üî¥ Circuit OPEN: Test failed, blocking calls")
            
        elif self.failures >= self.failure_threshold:
            self.state = "OPEN"
            self.opened_at = datetime.now()
            print(f"  üî¥ Circuit OPEN: {self.failures} failures, blocking calls")

# Global circuit breaker (in real app, would be per-service)
breaker = CircuitBreaker(failure_threshold=3, reset_timeout=5.0)

class BreakerState(TypedDict):
    requests: int
    successes: int
    blocked: int
    log: Annotated[list[str], add]

def call_with_breaker(state: BreakerState) -> dict:
    """Node that uses circuit breaker."""
    if not breaker.can_execute():
        return {
            "blocked": state["blocked"] + 1,
            "log": [f"Request {state['requests'] + 1}: BLOCKED (circuit open)"]
        }
    
    try:
        # Simulate flaky service (80% failure rate)
        if random.random() < 0.8:
            raise ConnectionError("Service failed")
        
        breaker.record_success()
        return {
            "requests": state["requests"] + 1,
            "successes": state["successes"] + 1,
            "log": [f"Request {state['requests'] + 1}: SUCCESS"]
        }
        
    except Exception as e:
        breaker.record_failure()
        return {
            "requests": state["requests"] + 1,
            "log": [f"Request {state['requests'] + 1}: FAILED - {e}"]
        }

# Build graph
graph = StateGraph(BreakerState)
graph.add_node("call", call_with_breaker)
graph.add_edge(START, "call")
graph.add_edge("call", END)
app = graph.compile()

# Demo: Make many requests to see circuit breaker in action
print("=== Circuit Breaker Demo ===\n")

state = {"requests": 0, "successes": 0, "blocked": 0, "log": []}

for i in range(15):
    state = app.invoke(state)
    print(state["log"][-1])
    
    if i == 9:  # Pause to let circuit reset
        print("\n  ‚è≥ Waiting for circuit reset...\n")
        time.sleep(6)

print(f"\nüìä Summary:")
print(f"  Total requests: {state['requests']}")
print(f"  Successful: {state['successes']}")
print(f"  Blocked: {state['blocked']}")


### Exercise 15.5.3: Retry Dashboard

In [None]:
# File: exercise_3_15_5_solution.py

"""
Simple retry monitoring dashboard.
"""

from dataclasses import dataclass, field
from datetime import datetime
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
import random

@dataclass
class RetryStats:
    """Statistics for a single node."""
    total_calls: int = 0
    successful_calls: int = 0
    total_retries: int = 0
    
    @property
    def failure_rate(self) -> float:
        if self.total_calls == 0:
            return 0.0
        return 1 - (self.successful_calls / self.total_calls)
    
    @property
    def avg_retries_per_success(self) -> float:
        if self.successful_calls == 0:
            return 0.0
        return self.total_retries / self.successful_calls

class RetryDashboard:
    """Central monitoring for retry behavior."""
    
    def __init__(self, alert_threshold: float = 0.5):
        self.stats: dict[str, RetryStats] = {}
        self.alert_threshold = alert_threshold
    
    def record(self, node_name: str, success: bool, retries: int):
        """Record an operation result."""
        if node_name not in self.stats:
            self.stats[node_name] = RetryStats()
        
        stats = self.stats[node_name]
        stats.total_calls += 1
        stats.total_retries += retries
        if success:
            stats.successful_calls += 1
        
        # Check for alert condition
        if stats.failure_rate > self.alert_threshold and stats.total_calls >= 5:
            print(f"  ‚ö†Ô∏è ALERT: {node_name} failure rate is {stats.failure_rate:.0%}!")
    
    def report(self):
        """Print dashboard report."""
        print("\n" + "=" * 50)
        print("üìä RETRY DASHBOARD")
        print("=" * 50)
        
        for name, stats in self.stats.items():
            print(f"\nüìå {name}:")
            print(f"   Calls: {stats.total_calls}")
            print(f"   Success rate: {(1 - stats.failure_rate):.0%}")
            print(f"   Total retries: {stats.total_retries}")
            print(f"   Avg retries/success: {stats.avg_retries_per_success:.1f}")
        
        print("\n" + "=" * 50)

# Global dashboard
dashboard = RetryDashboard(alert_threshold=0.4)

# Nodes that report to dashboard
def node_a(state: dict) -> dict:
    """Mostly reliable node."""
    retries = 0
    success = random.random() > 0.2  # 80% success
    
    if not success:
        retries = random.randint(1, 3)
        
    dashboard.record("node_a", success, retries)
    return {"a_done": True}

def node_b(state: dict) -> dict:
    """Less reliable node."""
    retries = 0
    success = random.random() > 0.5  # 50% success
    
    if not success:
        retries = random.randint(2, 5)
        
    dashboard.record("node_b", success, retries)
    return {"b_done": True}

def node_c(state: dict) -> dict:
    """Unreliable node - will trigger alerts."""
    retries = 0
    success = random.random() > 0.7  # Only 30% success
    
    if not success:
        retries = random.randint(3, 5)
        
    dashboard.record("node_c", success, retries)
    return {"c_done": True}

# Build graph
class DashState(TypedDict):
    a_done: bool
    b_done: bool
    c_done: bool

graph = StateGraph(DashState)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_node("c", node_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)
app = graph.compile()

# Run multiple times
print("=== Retry Dashboard Demo ===\n")

for i in range(10):
    print(f"Run {i + 1}...")
    app.invoke({"a_done": False, "b_done": False, "c_done": False})

# Show report
dashboard.report()


---
## Section 15.6 Solutions

### Exercise 15.6.1: Multi-Source Aggregator

In [None]:
# File: exercise_1_15_6_solution.py

"""
Multi-source aggregator with failure handling.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
import random

class AggregatorState(TypedDict):
    query: str
    results: Annotated[list[dict], add]
    errors: Annotated[list[str], add]

def make_source(name: str, fail_rate: float = 0.5):
    """Create a source node with configurable failure rate."""
    def source_node(state: AggregatorState) -> dict:
        if random.random() < fail_rate:
            return {"errors": [f"{name}: Connection failed"]}
        return {"results": [{"source": name, "data": f"Data from {name}"}]}
    return source_node

def aggregate(state: AggregatorState) -> dict:
    """Aggregate results and compute confidence."""
    successes = len(state["results"])
    failures = len(state["errors"])
    total = successes + failures
    
    confidence = successes / total if total > 0 else 0
    
    print(f"\n=== Aggregation Results ===")
    print(f"Succeeded: {successes}/{total}")
    print(f"Confidence: {confidence:.0%}")
    
    if state["results"]:
        print("\nData received:")
        for r in state["results"]:
            print(f"  ‚úì {r['source']}: {r['data']}")
    
    if state["errors"]:
        print("\nFailures:")
        for e in state["errors"]:
            print(f"  ‚úó {e}")
    
    return {}

# Build graph with 4 sources
graph = StateGraph(AggregatorState)

graph.add_node("source_a", make_source("Source A", 0.3))
graph.add_node("source_b", make_source("Source B", 0.7))
graph.add_node("source_c", make_source("Source C", 0.5))
graph.add_node("source_d", make_source("Source D", 0.4))
graph.add_node("aggregate", aggregate)

# All sources run, then aggregate
graph.add_edge(START, "source_a")
graph.add_edge(START, "source_b")
graph.add_edge(START, "source_c")
graph.add_edge(START, "source_d")
graph.add_edge("source_a", "aggregate")
graph.add_edge("source_b", "aggregate")
graph.add_edge("source_c", "aggregate")
graph.add_edge("source_d", "aggregate")
graph.add_edge("aggregate", END)

app = graph.compile()

# Run it
print("=== Multi-Source Aggregator Demo ===")
result = app.invoke({"query": "test", "results": [], "errors": []})


### Exercise 15.6.2: Fallback Chain

In [None]:
# File: exercise_2_15_6_solution.py

"""
Fallback chain with source tracking.
"""

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
import random

class FallbackState(TypedDict):
    query: str
    data: str
    source_used: str
    attempts: list[str]

def try_primary(state: FallbackState) -> dict:
    """Primary API - fails 70% of the time."""
    attempts = state.get("attempts", []) + ["primary"]
    
    if random.random() > 0.7:  # 30% success
        return {
            "data": "Fresh data from primary API",
            "source_used": "primary",
            "attempts": attempts
        }
    return {"attempts": attempts}

def try_secondary(state: FallbackState) -> dict:
    """Secondary API - fails 40% of the time."""
    if state.get("data"):  # Already have data
        return {}
    
    attempts = state.get("attempts", []) + ["secondary"]
    
    if random.random() > 0.4:  # 60% success
        return {
            "data": "Data from secondary API",
            "source_used": "secondary",
            "attempts": attempts
        }
    return {"attempts": attempts}

def try_cache(state: FallbackState) -> dict:
    """Cache - always succeeds but stale."""
    if state.get("data"):  # Already have data
        return {}
    
    attempts = state.get("attempts", []) + ["cache"]
    return {
        "data": "Stale data from cache (24h old)",
        "source_used": "cache",
        "attempts": attempts
    }

# Build graph
graph = StateGraph(FallbackState)
graph.add_node("primary", try_primary)
graph.add_node("secondary", try_secondary)
graph.add_node("cache", try_cache)

graph.add_edge(START, "primary")
graph.add_edge("primary", "secondary")
graph.add_edge("secondary", "cache")
graph.add_edge("cache", END)

app = graph.compile()

# Test multiple times
print("=== Fallback Chain Demo ===\n")

for i in range(5):
    result = app.invoke({
        "query": "test",
        "data": "",
        "source_used": "",
        "attempts": []
    })
    
    print(f"Run {i+1}:")
    print(f"  Source: {result['source_used']}")
    print(f"  Attempts: {' ‚Üí '.join(result['attempts'])}")
    print(f"  Data: {result['data'][:30]}...")
    print()


### Exercise 15.6.3: Graceful Feature Degradation

In [None]:
# File: exercise_3_15_6_solution.py

"""
Document analyzer with graceful feature degradation.
"""

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
import random

class AnalysisState(TypedDict):
    document: str
    features: dict
    warnings: Annotated[list[str], add]

def count_words(state: AnalysisState) -> dict:
    """Core feature - always works."""
    word_count = len(state["document"].split())
    features = state.get("features", {})
    features["word_count"] = {"value": word_count, "status": "ok"}
    return {"features": features}

def analyze_sentiment(state: AnalysisState) -> dict:
    """Optional - fails 40% of the time."""
    features = state.get("features", {})
    
    if random.random() < 0.4:
        features["sentiment"] = {"value": None, "status": "failed"}
        return {
            "features": features,
            "warnings": ["Sentiment analysis unavailable"]
        }
    
    features["sentiment"] = {"value": "positive", "status": "ok"}
    return {"features": features}

def extract_keywords(state: AnalysisState) -> dict:
    """Optional - fails 30% of the time."""
    features = state.get("features", {})
    
    if random.random() < 0.3:
        features["keywords"] = {"value": None, "status": "failed"}
        return {
            "features": features,
            "warnings": ["Keyword extraction unavailable"]
        }
    
    words = state["document"].split()[:3]
    features["keywords"] = {"value": words, "status": "ok"}
    return {"features": features}

def summarize(state: AnalysisState) -> dict:
    """Optional - fails 50% of the time."""
    features = state.get("features", {})
    
    if random.random() < 0.5:
        features["summary"] = {"value": None, "status": "failed"}
        return {
            "features": features,
            "warnings": ["Summarization unavailable"]
        }
    
    summary = state["document"][:50] + "..."
    features["summary"] = {"value": summary, "status": "ok"}
    return {"features": features}

# Build graph
graph = StateGraph(AnalysisState)
graph.add_node("words", count_words)
graph.add_node("sentiment", analyze_sentiment)
graph.add_node("keywords", extract_keywords)
graph.add_node("summary", summarize)

graph.add_edge(START, "words")
graph.add_edge("words", "sentiment")
graph.add_edge("sentiment", "keywords")
graph.add_edge("keywords", "summary")
graph.add_edge("summary", END)

app = graph.compile()

# Test
doc = "This is a sample document for testing the analyzer features."
result = app.invoke({"document": doc, "features": {}, "warnings": []})

print("=== Document Analysis Results ===\n")

for feature, data in result["features"].items():
    status = "‚úì" if data["status"] == "ok" else "‚úó"
    value = data["value"] if data["value"] else "N/A"
    print(f"{status} {feature}: {value}")

if result["warnings"]:
    print(f"\n‚ö†Ô∏è Warnings: {len(result['warnings'])}")
    for w in result["warnings"]:
        print(f"  - {w}")


---
## Section 15.7 Solutions

### Exercise 15.7.1: State Diff Viewer

In [None]:
# File: exercise_1_15_7_solution.py

"""
State diff viewer - compare snapshots and highlight changes.
"""

def diff_states(before: dict, after: dict) -> dict:
    """Compare two state snapshots."""
    diff = {
        "added": {},
        "removed": {},
        "modified": {},
        "unchanged": []
    }
    
    all_keys = set(before.keys()) | set(after.keys())
    
    for key in all_keys:
        if key not in before:
            diff["added"][key] = after[key]
        elif key not in after:
            diff["removed"][key] = before[key]
        elif before[key] != after[key]:
            diff["modified"][key] = {
                "from": before[key],
                "to": after[key]
            }
        else:
            diff["unchanged"].append(key)
    
    return diff

def print_diff(diff: dict, title: str = "State Diff"):
    """Format and print state diff."""
    print(f"\n{'‚ïê' * 50}")
    print(f"üìä {title}")
    print(f"{'‚ïê' * 50}")
    
    if diff["added"]:
        print("\n‚úÖ Added:")
        for key, value in diff["added"].items():
            print(f"  + {key}: {value}")
    
    if diff["removed"]:
        print("\n‚ùå Removed:")
        for key, value in diff["removed"].items():
            print(f"  - {key}: {value}")
    
    if diff["modified"]:
        print("\nüìù Modified:")
        for key, change in diff["modified"].items():
            print(f"  ~ {key}:")
            print(f"      from: {change['from']}")
            print(f"      to:   {change['to']}")
    
    if diff["unchanged"]:
        print(f"\n‚è∏Ô∏è Unchanged: {', '.join(diff['unchanged'])}")
    
    # Summary
    total_changes = len(diff["added"]) + len(diff["removed"]) + len(diff["modified"])
    print(f"\n{'‚îÄ' * 50}")
    print(f"Summary: {total_changes} change(s)")
    print(f"{'‚ïê' * 50}\n")

# Demo
if __name__ == "__main__":
    before = {
        "messages": ["Hello"],
        "count": 1,
        "status": "active",
        "user": "alice"
    }
    
    after = {
        "messages": ["Hello", "World"],
        "count": 2,
        "status": "active",
        "priority": "high"  # Added
        # "user" removed
    }
    
    diff = diff_states(before, after)
    print_diff(diff, "Step 1 ‚Üí Step 2")


### Exercise 15.7.2: Performance Dashboard

In [None]:
# File: exercise_2_15_7_solution.py

"""
Performance dashboard for agent monitoring.
"""

from collections import defaultdict
from datetime import datetime
import time
import random

class PerformanceDashboard:
    """Track and report node performance."""
    
    def __init__(self):
        self.node_stats = defaultdict(lambda: {
            "calls": 0,
            "successes": 0,
            "failures": 0,
            "total_time": 0.0,
            "times": []
        })
    
    def record(self, node: str, duration: float, success: bool):
        """Record a node execution."""
        stats = self.node_stats[node]
        stats["calls"] += 1
        stats["total_time"] += duration
        stats["times"].append(duration)
        
        if success:
            stats["successes"] += 1
        else:
            stats["failures"] += 1
    
    def wrap_node(self, node_name: str, func):
        """Create a wrapped node that auto-records metrics."""
        def wrapper(state):
            start = time.time()
            success = True
            try:
                result = func(state)
                return result
            except Exception as e:
                success = False
                raise
            finally:
                self.record(node_name, time.time() - start, success)
        return wrapper
    
    def print_report(self):
        """Print formatted performance report."""
        print("\n" + "‚ïê" * 60)
        print("üìä PERFORMANCE DASHBOARD")
        print("‚ïê" * 60)
        
        # Calculate rankings
        by_time = sorted(
            self.node_stats.items(),
            key=lambda x: x[1]["total_time"],
            reverse=True
        )
        
        print("\nüìà Node Statistics:")
        print("‚îÄ" * 60)
        print(f"{'Node':<20} {'Calls':>6} {'Avg':>8} {'Total':>8} {'Success':>8}")
        print("‚îÄ" * 60)
        
        for node, stats in by_time:
            avg = stats["total_time"] / stats["calls"] if stats["calls"] else 0
            rate = stats["successes"] / stats["calls"] * 100 if stats["calls"] else 0
            
            print(f"{node:<20} {stats['calls']:>6} {avg:>7.3f}s {stats['total_time']:>7.3f}s {rate:>7.0f}%")
        
        print("‚îÄ" * 60)
        
        # Slowest nodes
        print("\nüê¢ Slowest Nodes (by avg time):")
        by_avg = sorted(
            self.node_stats.items(),
            key=lambda x: x[1]["total_time"] / max(x[1]["calls"], 1),
            reverse=True
        )[:3]
        
        for i, (node, stats) in enumerate(by_avg, 1):
            avg = stats["total_time"] / stats["calls"]
            print(f"  {i}. {node}: {avg:.3f}s avg")
        
        print("\n" + "‚ïê" * 60)

# Demo
if __name__ == "__main__":
    dashboard = PerformanceDashboard()
    
    # Simulate some runs
    print("=== Performance Dashboard Demo ===")
    print("Simulating 10 agent runs...\n")
    
    for _ in range(10):
        dashboard.record("fetch_data", random.uniform(0.1, 0.5), random.random() > 0.1)
        dashboard.record("process", random.uniform(0.2, 0.8), random.random() > 0.2)
        dashboard.record("save", random.uniform(0.05, 0.15), random.random() > 0.05)
    
    dashboard.print_report()


### Exercise 15.7.3: Alert System

In [None]:
# File: exercise_3_15_7_solution.py

"""
Simple alerting system with thresholds and severity.
"""

from datetime import datetime
from enum import Enum
from dataclasses import dataclass

class Severity(str, Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    severity: Severity
    metric: str
    message: str
    value: float
    threshold: float
    timestamp: str

class AlertSystem:
    """Monitor metrics and trigger alerts."""
    
    def __init__(self):
        self.thresholds = {}
        self.alerts = []
        self.metrics = {}
    
    def set_threshold(self, metric: str, warning: float, critical: float):
        """Set alert thresholds for a metric."""
        self.thresholds[metric] = {
            "warning": warning,
            "critical": critical
        }
    
    def update_metric(self, metric: str, value: float):
        """Update a metric and check for alerts."""
        self.metrics[metric] = value
        
        if metric in self.thresholds:
            t = self.thresholds[metric]
            
            if value >= t["critical"]:
                self._trigger(Severity.CRITICAL, metric, value, t["critical"])
            elif value >= t["warning"]:
                self._trigger(Severity.WARNING, metric, value, t["warning"])
    
    def _trigger(self, severity: Severity, metric: str, value: float, threshold: float):
        """Trigger an alert."""
        alert = Alert(
            severity=severity,
            metric=metric,
            message=f"{metric} is {value:.1f} (threshold: {threshold:.1f})",
            value=value,
            threshold=threshold,
            timestamp=datetime.now().isoformat()
        )
        self.alerts.append(alert)
        
        # Print immediately
        icon = "üî¥" if severity == Severity.CRITICAL else "üü°"
        print(f"{icon} [{severity.value.upper()}] {alert.message}")
    
    def get_active_alerts(self) -> list[Alert]:
        """Get alerts from last hour."""
        # In real system, filter by time
        return self.alerts[-10:]  # Last 10 for demo
    
    def print_status(self):
        """Print current status."""
        print("\n" + "‚ïê" * 50)
        print("üö® ALERT SYSTEM STATUS")
        print("‚ïê" * 50)
        
        print("\nüìä Current Metrics:")
        for metric, value in self.metrics.items():
            status = "‚úì"
            if metric in self.thresholds:
                t = self.thresholds[metric]
                if value >= t["critical"]:
                    status = "üî¥"
                elif value >= t["warning"]:
                    status = "üü°"
            print(f"  {status} {metric}: {value:.1f}")
        
        print(f"\nüìã Alert History ({len(self.alerts)} total):")
        for alert in self.alerts[-5:]:
            icon = "üî¥" if alert.severity == Severity.CRITICAL else "üü°"
            print(f"  {icon} {alert.message}")
        
        print("‚ïê" * 50)

# Demo
if __name__ == "__main__":
    alerts = AlertSystem()
    
    # Set thresholds
    alerts.set_threshold("error_rate", warning=5.0, critical=10.0)
    alerts.set_threshold("latency_ms", warning=500, critical=1000)
    alerts.set_threshold("queue_size", warning=100, critical=200)
    
    print("=== Alert System Demo ===\n")
    
    # Simulate metrics - all OK
    print("Initial metrics (all OK):")
    alerts.update_metric("error_rate", 3.0)   # OK
    alerts.update_metric("latency_ms", 250)   # OK
    alerts.update_metric("queue_size", 50)    # OK
    
    print("\n--- Situation worsens ---\n")
    
    alerts.update_metric("error_rate", 7.0)   # Warning!
    alerts.update_metric("latency_ms", 1200)  # Critical!
    alerts.update_metric("queue_size", 150)   # Warning!
    
    alerts.print_status()


---
## Next Steps

Return to **Chapter 16: Next Topic**