# Workout: Observability

## Setup
```bash
uv add langfuse openai
```

---
## Drill 1: LLM Call Logger 游릭
**Task:** Create a structured LLM call logger

In [None]:
from dataclasses import dataclass, asdict
import json

@dataclass
class LLMLog:
    model: str
    prompt: str
    response: str
    tokens: int
    latency_ms: float

def log_call(log: LLMLog):
    """Print log as JSON."""
    pass

# Test
log_call(LLMLog(
    model="gpt-4o",
    prompt="Hello",
    response="Hi!",
    tokens=10,
    latency_ms=250
))

---
## Drill 2: Timed LLM Call 游릭
**Task:** Wrap OpenAI call with timing

In [None]:
import time
from openai import OpenAI

client = OpenAI()

def timed_chat(messages: list, model: str = "gpt-4o-mini") -> tuple[str, float]:
    """Return (response, latency_ms)."""
    pass

---
## Drill 3: Cost Calculator 游리
**Task:** Calculate cost of LLM calls

In [None]:
# Pricing per 1M tokens (approximate)
PRICING = {
    "gpt-4o": {"input": 5.00, "output": 15.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
}

def calculate_cost(
    model: str,
    input_tokens: int,
    output_tokens: int
) -> float:
    """Return cost in USD."""
    pass

# Test
cost = calculate_cost("gpt-4o", 1000, 500)
print(f"${cost:.4f}")

---
## Drill 4: Simple Tracer 游리
**Task:** Create a context manager for tracing

In [None]:
from contextlib import contextmanager
import time

@contextmanager
def trace(name: str):
    """Print span start and end with duration."""
    pass

# Usage
# with trace("outer"):
#     with trace("inner"):
#         time.sleep(0.1)
# Should print nested timing info

---
## Drill 5: Request ID Context 游리
**Task:** Use contextvars for request tracking

In [None]:
from contextvars import ContextVar
import uuid

request_id: ContextVar[str] = ContextVar("request_id")

def set_request_id() -> str:
    """Generate and set request ID."""
    pass

def get_request_id() -> str:
    """Get current request ID."""
    pass

def log_with_context(message: str):
    """Log message with request ID prefix."""
    pass

---
## Drill 6: Metrics Aggregator 游리
**Task:** Collect and summarize metrics

In [None]:
from dataclasses import dataclass
from statistics import mean, median

@dataclass
class Metrics:
    count: int
    latency_avg: float
    latency_p50: float
    latency_p95: float
    error_rate: float

def aggregate_metrics(logs: list[dict]) -> Metrics:
    """Calculate statistics from logs."""
    pass

logs = [
    {"latency_ms": 100, "error": False},
    {"latency_ms": 200, "error": False},
    {"latency_ms": 150, "error": True},
    {"latency_ms": 180, "error": False},
]
print(aggregate_metrics(logs))

---
## Drill 7: Langfuse Decorator 游댮
**Task:** Create a decorator that logs to Langfuse

In [None]:
from langfuse import Langfuse
from functools import wraps

# langfuse = Langfuse()  # Uses env vars

def observe(name: str = None):
    """Decorator to trace function calls."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create trace
            # Execute function
            # Log result
            pass
        return wrapper
    return decorator

@observe("my_function")
def my_function(x: int) -> int:
    return x * 2

---
## Drill 8: Async Logger 游댮
**Task:** Create non-blocking log batching

In [None]:
import asyncio
from collections import deque

class AsyncLogger:
    def __init__(self, batch_size: int = 10):
        self.buffer = deque()
        self.batch_size = batch_size

    def log(self, entry: dict):
        """Add to buffer, flush if full."""
        pass

    async def _flush(self):
        """Send batch to backend."""
        pass

# Test
# logger = AsyncLogger(batch_size=5)
# for i in range(12):
#     logger.log({"event": i})

---
## Self-Check

- [ ] Can create structured LLM logs
- [ ] Can measure and track latency
- [ ] Can calculate LLM costs
- [ ] Can implement basic tracing
- [ ] Can propagate request context
- [ ] Can aggregate metrics