# Day 3 — Observability for RAG (Traces-first, Cloud-Engineer-friendly)

**Audience:** senior cloud engineers (strong distributed-systems intuition, new to AI).

**Goal:** Learn to diagnose **latency**, **errors**, and **quality regressions** in an AI/RAG system the same way you debug microservices: with **traces + metrics + structured logs**.

## What you’ll do
- Start a local **Jaeger** UI and **OpenTelemetry Collector**.
- Run a tiny end-to-end **RAG** request and see spans for:
  - `rag.embed_query` → `rag.retrieve` → `rag.rerank` → `rag.pack_context` → `rag.generate`
- Run a few “consulting-style” drills:
  - **Tail latency** (p95 explosion)
  - **Quality regression** (Top_K / Rerank_K tradeoffs)
  - **Failure analysis** (timeouts, 502, retries)

## Prereqs
- Docker works (`docker ps`)
- Ollama server running at `NIM_BASE_URL` (default `http://localhost:11434`)

## URLs
- Jaeger UI: `http://localhost:16686`


In [None]:
# Setup + configuration

import os
import sys
import time
import json
import math
import subprocess
from pathlib import Path
from dataclasses import dataclass
from typing import Any

import numpy as np
import pandas as pd
import plotly.express as px

from nim_clients import NIMClient, NIMConfig
from rag_pipeline import (
    RAGConfig,
    Timer,
    VectorIndex,
    build_chunks_from_df,
    chunk_text,
    normalize_rows,
    make_prompt,
    clean_answer,
)

px.defaults.template = "plotly_white"

NIM_BASE_URL = os.environ.get("NIM_BASE_URL", "http://localhost:11434").rstrip("/")
JAEGER_UI = os.environ.get("JAEGER_UI", "http://localhost:16686")

# OpenTelemetry export target (OTLP/HTTP). This is what start_observability.sh exposes.
OTEL_EXPORTER_OTLP_ENDPOINT = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318/v1/traces")
OTEL_SERVICE_NAME = os.environ.get("OTEL_SERVICE_NAME", "fico-rag-observability")

print("sys.executable:", sys.executable)
print("NIM_BASE_URL:", NIM_BASE_URL)
print("OTEL_EXPORTER_OTLP_ENDPOINT:", OTEL_EXPORTER_OTLP_ENDPOINT)
print("OTEL_SERVICE_NAME:", OTEL_SERVICE_NAME)
print("JAEGER_UI:", JAEGER_UI)


## Observability mental model (AI edition)

Cloud intuition still applies:

- **Logs** answer: *what happened?* (debugging details, errors, inputs/outputs)
- **Metrics** answer: *how often and how bad?* (SLOs, p95/p99, error rate)
- **Traces** answer: *where did the time go?* (end-to-end path across stages/services)

### What changes in AI/RAG
In RAG systems, **latency** and **quality** are coupled:

- Bigger `top_k` can improve recall but also:
  - increases rerank cost
  - increases context size (slower generation)
  - can reduce answer quality via noisy context

### Golden signals (practical)
Track these for any customer:
- **Latency**: p50/p95/p99 (and stage breakdown)
- **Traffic**: req/s, concurrency
- **Errors**: rate + dominant class (timeouts, 5xx, invalid_request)
- **Saturation**: GPU memory, queueing, throttling
- **Quality proxy**: retrieval hit-rate, answer length, refusal rate, simple judge score

We’ll build a tiny version of that here.


In [None]:
# Preflight: Docker + start tracing stack (Jaeger + OTEL Collector)

import requests


def _run(cmd: list[str], *, check: bool = False) -> subprocess.CompletedProcess:
    return subprocess.run(cmd, check=check, text=True, capture_output=True)


def docker_ok() -> bool:
    try:
        r = _run(["docker", "ps"], check=False)
        if r.returncode != 0:
            print("docker ps failed:")
            print((r.stderr or r.stdout or "").strip()[:400])
            return False
        return True
    except FileNotFoundError:
        print("docker not found")
        return False


def find_fico_root() -> Path:
    """Find the `fico/` directory regardless of notebook working directory."""
    here = Path.cwd().resolve()
    candidates = [here, *here.parents]
    for c in candidates:
        if (c / "scripts" / "setup_workshop.sh").exists() and (c / "nim_clients.py").exists():
            return c
        if (c / "fico" / "scripts" / "setup_workshop.sh").exists() and (c / "fico" / "nim_clients.py").exists():
            return c / "fico"
    raise RuntimeError("Could not find repo root (expected to find fico/scripts/setup_workshop.sh)")


FICO_DIR = find_fico_root()
print("FICO_DIR:", FICO_DIR)

if docker_ok():
    start = FICO_DIR / "scripts" / "start_observability.sh"
    if start.exists():
        print("Starting observability stack...")
        r = _run(["bash", str(start)], check=False)
        if r.returncode != 0:
            print("start_observability.sh failed:")
            print((r.stderr or r.stdout or "").strip()[:800])
        else:
            print("✅ start_observability.sh ran")

    # Basic UI reachability (doesn't guarantee traces are flowing, but catches common issues)
    try:
        ui = requests.get(JAEGER_UI, timeout=2)
        print("Jaeger UI HTTP:", ui.status_code)
    except Exception as e:
        print("Jaeger UI not reachable yet:", type(e).__name__, str(e)[:120])
else:
    print("ℹ️ Docker unavailable: this notebook will still run, but traces will be printed to the console instead of Jaeger.")


## Tracing setup (OpenTelemetry)

We’ll emit spans to the OTEL Collector using **OTLP/HTTP**.

- If Docker + the collector are reachable, traces show up in Jaeger.
- If not, we fall back to printing spans (still useful for learning).

### Trace hygiene (important for AI consulting)
- Don’t put raw customer data into span attributes.
- Prefer **sizes**, **counts**, **ids**, and **hashes** over raw text.
- Use **events** for concise summaries.


## Initialize tracing

Run the next cell once. It:
- configures an OpenTelemetry tracer provider
- instruments `requests` so outbound HTTP calls show up as spans
- exports to Jaeger (via OTEL Collector) if reachable; otherwise prints spans


In [None]:
# OpenTelemetry setup

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

# Prefer OTLP/HTTP exporter if installed and reachable.
_export_to_jaeger = False
_otlp_exporter = None

try:
    from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

    _otlp_exporter = OTLPSpanExporter(endpoint=OTEL_EXPORTER_OTLP_ENDPOINT)
    _export_to_jaeger = True
except Exception as e:
    print("OTLP exporter not available; falling back to console spans:", type(e).__name__, str(e)[:120])

resource = Resource.create({"service.name": OTEL_SERVICE_NAME})
provider = TracerProvider(resource=resource)

if _export_to_jaeger and _otlp_exporter is not None:
    provider.add_span_processor(BatchSpanProcessor(_otlp_exporter))
    print("✅ Exporting spans via OTLP/HTTP to:", OTEL_EXPORTER_OTLP_ENDPOINT)
else:
    provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
    print("✅ Exporting spans to console")

trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

# Optional: auto-instrument requests so HTTP calls show in Jaeger
try:
    from opentelemetry.instrumentation.requests import RequestsInstrumentor

    RequestsInstrumentor().instrument()
    print("✅ requests instrumentation enabled")
except Exception as e:
    print("requests auto-instrumentation not available (ok):", type(e).__name__, str(e)[:120])

print("If Jaeger UI is up, open:", JAEGER_UI)



## Instrumented RAG (what we will trace)

We’ll trace a **single request** through these stages:

- `rag.embed_query` (NIM embeddings)
- `rag.retrieve` (vector similarity search)
- `rag.rerank` (NIM reranker)
- `rag.pack_context` (context budget enforcement)
- `rag.generate` (NIM chat completion)

For consulting work, your job is to:
1. **Locate the bottleneck stage** (most of the latency)
2. **Decide if it’s expected** (token budget, queueing) or a bug
3. **Propose a safe knob** (Top_K, Rerank_K, max_tokens, timeouts)

### Hardware observability (the missing half)
Traces tell you *where* the time went. Hardware signals tell you *why*:

- **GPU VRAM used**: KV cache pressure, model residency
- **GPU utilization**: compute-bound vs waiting/queueing
- **Power + temperature**: power limits and thermal throttling ("my p95 got slow because physics")

Next we’ll collect NVML snapshots during runs and attach a few key numbers to spans.



In [None]:
# Hardware cockpit: GPU/CPU snapshots + a lightweight background watcher

from dataclasses import dataclass
import threading


def _read_meminfo() -> dict[str, float]:
    out: dict[str, float] = {}
    try:
        with open("/proc/meminfo", "r", encoding="utf-8") as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 2:
                    key = parts[0].rstrip(":")
                    val_kib = float(parts[1])
                    out[key] = val_kib
    except Exception:
        return {}
    return out


def cpu_snapshot() -> dict[str, float]:
    mem = _read_meminfo()
    # meminfo is KiB
    mem_total_gib = float(mem.get("MemTotal", float("nan")) / (1024.0**2))
    mem_avail_gib = float(mem.get("MemAvailable", float("nan")) / (1024.0**2))

    try:
        l1, l5, l15 = os.getloadavg()
    except Exception:
        l1, l5, l15 = float("nan"), float("nan"), float("nan")

    return {
        "cpu_load1": float(l1),
        "cpu_load5": float(l5),
        "cpu_load15": float(l15),
        "mem_total_gib": float(mem_total_gib),
        "mem_avail_gib": float(mem_avail_gib),
    }


def _bytes_gib(x: float) -> float:
    return float(x) / (1024.0**3)


def gpu_snapshot() -> dict[str, float]:
    """Best-effort NVML snapshot. Returns empty dict if NVML isn't available."""
    try:
        from pynvml import (
            nvmlInit,
            nvmlShutdown,
            nvmlDeviceGetHandleByIndex,
            nvmlDeviceGetMemoryInfo,
            nvmlDeviceGetUtilizationRates,
            nvmlDeviceGetPowerUsage,
            nvmlDeviceGetTemperature,
            NVML_TEMPERATURE_GPU,
        )

        nvmlInit()
        try:
            h = nvmlDeviceGetHandleByIndex(0)
            mem = nvmlDeviceGetMemoryInfo(h)
            util = nvmlDeviceGetUtilizationRates(h)
            pwr_w = float(nvmlDeviceGetPowerUsage(h)) / 1000.0
            temp_c = float(nvmlDeviceGetTemperature(h, NVML_TEMPERATURE_GPU))
            return {
                "gpu_vram_used_gib": _bytes_gib(mem.used),
                "gpu_vram_total_gib": _bytes_gib(mem.total),
                "gpu_util_pct": float(util.gpu),
                "gpu_mem_util_pct": float(util.memory),
                "gpu_power_w": float(pwr_w),
                "gpu_temp_c": float(temp_c),
            }
        finally:
            try:
                nvmlShutdown()
            except Exception:
                pass
    except Exception:
        return {}


def system_snapshot() -> dict[str, float]:
    out = {}
    out.update(cpu_snapshot())
    out.update(gpu_snapshot())
    return out


@dataclass
class GPUWatcher:
    interval_s: float = 0.25

    def __post_init__(self):
        self._stop = threading.Event()
        self._rows: list[dict[str, float]] = []
        self._t0 = 0.0
        self._th: threading.Thread | None = None

    def start(self):
        self._t0 = time.perf_counter()

        def _loop():
            while not self._stop.is_set():
                t = time.perf_counter() - self._t0
                row = {"t_s": float(t), **system_snapshot()}
                self._rows.append(row)
                time.sleep(float(self.interval_s))

        self._th = threading.Thread(target=_loop, daemon=True)
        self._th.start()

    def stop(self) -> pd.DataFrame:
        self._stop.set()
        if self._th is not None:
            self._th.join(timeout=2.0)
        return pd.DataFrame(self._rows)


print("Hardware cockpit ready.")
display(pd.DataFrame([system_snapshot()]))



In [None]:
# Load corpus and build a tiny chunk set (fast)

RUN_DIR = Path("corpus_runs/llm_richer_n20_20251211_193028")
CSV_PATH = RUN_DIR / "fico_corpus_embedded.csv"

if not CSV_PATH.exists():
    raise FileNotFoundError(f"Missing corpus CSV: {CSV_PATH}. Run the corpus generation notebook or check corpus_runs/.")

df_docs = pd.read_csv(CSV_PATH)
print("docs:", len(df_docs))

# Build chunks using the helper, then re-chunk with smaller sizes for the lab.

def build_chunk_rows(*, chunk_size: int = 900, overlap: int = 150) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    for _, row in df_docs.iterrows():
        doc_id = str(row.get("doc_id"))
        title = row.get("title")
        body = row.get("body_redacted") or row.get("body") or ""
        parts = chunk_text(str(body), chunk_size=int(chunk_size), overlap=int(overlap))
        for j, part in enumerate(parts):
            out.append(
                {
                    "doc_id": doc_id,
                    "chunk_id": f"{doc_id}::c{j:03d}",
                    "title": str(title) if title is not None else None,
                    "text": part,
                }
            )
    return out

chunk_rows = build_chunk_rows(chunk_size=900, overlap=150)
print("chunks:", len(chunk_rows))

# NIM client
nim = NIMClient(NIMConfig(base_url=NIM_BASE_URL, timeout_s=float(os.environ.get("NIM_TIMEOUT_S", "60"))))



In [None]:
# Build (or reuse) a vector index for retrieval

_INDEX_CACHE: dict[tuple[int, int], tuple[list[dict[str, Any]], VectorIndex]] = {}


def get_index(chunk_size: int, overlap: int) -> tuple[list[dict[str, Any]], VectorIndex]:
    key = (int(chunk_size), int(overlap))
    if key in _INDEX_CACHE:
        return _INDEX_CACHE[key]

    rows = build_chunk_rows(chunk_size=int(chunk_size), overlap=int(overlap))
    texts = [r["text"] for r in rows]

    # Embed passages (this is NOT part of the per-request trace; it's offline/index build)
    with tracer.start_as_current_span("index.embed_passages") as sp:
        sp.set_attribute("chunk_size", int(chunk_size))
        sp.set_attribute("overlap", int(overlap))
        sp.set_attribute("num_chunks", int(len(texts)))
        embs, dt = nim.embed_many(texts, batch_size=64, input_type="passage")
        sp.set_attribute("embed_seconds", float(dt))

    E = np.array(embs, dtype=np.float32)
    E_norm = normalize_rows(E)

    with tracer.start_as_current_span("index.build") as sp:
        sp.set_attribute("num_vectors", int(E_norm.shape[0]))
        index = VectorIndex(E_norm, use_faiss=True)

    _INDEX_CACHE[key] = (rows, index)
    return _INDEX_CACHE[key]


rows, index = get_index(chunk_size=900, overlap=150)
print("index ready. chunks:", len(rows))



In [None]:
# Instrumented single RAG request

@dataclass
class RAGRun:
    answer: str
    answer_clean: str
    timings_s: dict[str, float]
    used_chunks: list[dict[str, Any]]
    hw_samples: list[dict[str, Any]]


def run_rag_traced(
    query: str,
    *,
    chunk_size: int = 900,
    overlap: int = 150,
    top_k: int = 10,
    rerank_k: int = 5,
    max_context_chars: int = 6000,
    max_new_tokens: int = 160,
    temperature: float = 0.2,
) -> RAGRun:
    rows, idx = get_index(chunk_size=int(chunk_size), overlap=int(overlap))

    timings: dict[str, float] = {}
    t_total0 = time.perf_counter()

    hw_samples: list[dict[str, Any]] = []

    def record_hw(stage: str, sp=None) -> dict[str, Any]:
        snap = system_snapshot()
        snap["stage"] = str(stage)
        snap["t_s"] = float(time.perf_counter() - t_total0)
        hw_samples.append(snap)

        if sp is not None:
            # Keep span attributes small and numeric.
            for k in ["gpu_vram_used_gib", "gpu_util_pct", "gpu_power_w", "gpu_temp_c", "mem_avail_gib"]:
                v = snap.get(k)
                if isinstance(v, (int, float)) and not math.isnan(float(v)):
                    sp.set_attribute(k, float(v))
        return snap

    with tracer.start_as_current_span("rag.request") as root:
        record_hw("start", root)
        root.set_attribute("query_chars", int(len(query)))
        root.set_attribute("top_k", int(top_k))
        root.set_attribute("rerank_k", int(rerank_k))
        root.set_attribute("max_context_chars", int(max_context_chars))
        root.set_attribute("max_new_tokens", int(max_new_tokens))

        # 1) Embed query
        t0 = time.perf_counter()
        with tracer.start_as_current_span("rag.embed_query") as sp:
            sp.set_attribute("model", nim.cfg.embed_model)
            embs, _dt = nim.embed([query], input_type="query")
            q_emb = np.array(embs[0], dtype=np.float32)
            sp.set_attribute("seconds", float(time.perf_counter() - t0))
            record_hw("embed_query.end", sp)
        timings["embed_query"] = float(time.perf_counter() - t0)

        # 2) Retrieve
        t0 = time.perf_counter()
        with tracer.start_as_current_span("rag.retrieve") as sp:
            q_norm = q_emb / (np.linalg.norm(q_emb) + 1e-12)
            idxs, sims = idx.search(q_norm, int(top_k))
            idxs = [int(i) for i in idxs]
            sp.set_attribute("candidates", int(len(idxs)))
            if len(sims) > 0:
                sp.add_event(
                    "sims_summary",
                    {
                        "sim_min": float(np.min(sims)),
                        "sim_p50": float(np.percentile(sims, 50)),
                        "sim_p95": float(np.percentile(sims, 95)),
                        "sim_max": float(np.max(sims)),
                    },
                )
            record_hw("retrieve.end", sp)
        timings["retrieve"] = float(time.perf_counter() - t0)

        candidates = [rows[i] for i in idxs]

        # 3) Rerank
        t0 = time.perf_counter()
        with tracer.start_as_current_span("rag.rerank") as sp:
            sp.set_attribute("model", nim.cfg.rerank_model)
            docs = [c["text"] for c in candidates]
            order, _ = nim.rerank(query, docs, top_n=min(int(rerank_k), len(docs)))
            sp.set_attribute("reranked", int(len(order)))
            record_hw("rerank.end", sp)
        timings["rerank"] = float(time.perf_counter() - t0)

        reranked = [candidates[i] for i in order[: int(rerank_k)]] if rerank_k > 0 else candidates

        # 4) Pack context
        t0 = time.perf_counter()
        with tracer.start_as_current_span("rag.pack_context") as sp:
            used: list[dict[str, Any]] = []
            total_chars = 0
            for r in reranked:
                t = r["text"]
                if total_chars + len(t) > int(max_context_chars):
                    break
                used.append(r)
                total_chars += len(t)
            sp.set_attribute("context_chars", int(total_chars))
            sp.set_attribute("context_chunks", int(len(used)))
            sp.add_event("context_ids", {"chunk_ids": [u["chunk_id"] for u in used[:8]]})
            record_hw("pack_context.end", sp)
        timings["pack_context"] = float(time.perf_counter() - t0)

        # 5) Generate
        t0 = time.perf_counter()
        with tracer.start_as_current_span("rag.generate") as sp:
            sp.set_attribute("model", nim.cfg.gen_model)
            # Convert to rag_pipeline Chunk objects for prompt construction
            from rag_pipeline import Chunk

            context_chunks = [
                Chunk(doc_id=u["doc_id"], chunk_id=u["chunk_id"], text=u["text"], title=u.get("title"))
                for u in used
            ]
            prompt = make_prompt(query, context_chunks)
            ans, _ = nim.chat(prompt, max_tokens=int(max_new_tokens), temperature=float(temperature))
            sp.set_attribute("answer_chars", int(len(ans or "")))
            record_hw("generate.end", sp)
        timings["generate"] = float(time.perf_counter() - t0)

        timings["total"] = float(time.perf_counter() - t_total0)
        root.set_attribute("total_seconds", float(timings["total"]))

        record_hw("end", root)

        ans_clean = clean_answer(ans)
        return RAGRun(answer=ans, answer_clean=ans_clean, timings_s=timings, used_chunks=used, hw_samples=hw_samples)


# Try a run
q = os.environ.get("OBS_QUERY", "Kubernetes incident runbook")
run = run_rag_traced(q, top_k=10, rerank_k=5, max_new_tokens=160)
print("TOTAL:", f"{run.timings_s['total']:.3f}s")
print("\n--- Answer (clean) ---\n")
print((run.answer_clean or "").strip()[:1200])

# Hardware summary (stage boundary snapshots)
if getattr(run, "hw_samples", None):
    df_hw = pd.DataFrame(run.hw_samples)
    display(df_hw[[c for c in ["stage", "t_s", "gpu_vram_used_gib", "gpu_util_pct", "gpu_power_w", "gpu_temp_c", "mem_avail_gib"] if c in df_hw.columns]])
    # A simple time series, if GPU metrics exist
    cols = [c for c in ["gpu_vram_used_gib", "gpu_util_pct", "gpu_power_w", "gpu_temp_c"] if c in df_hw.columns]
    if cols:
        px.line(df_hw, x="t_s", y=cols, markers=True, title="Hardware snapshots across the RAG request").show()



In [None]:
# Notebook-side metrics + pretty figures

# Record a few runs with different knobs, then visualize.

cases = [
    {"name": "fast", "top_k": 6, "rerank_k": 0, "max_new_tokens": 96, "max_context_chars": 3000},
    {"name": "balanced", "top_k": 10, "rerank_k": 5, "max_new_tokens": 160, "max_context_chars": 6000},
    {"name": "accurate-ish", "top_k": 20, "rerank_k": 10, "max_new_tokens": 220, "max_context_chars": 9000},
]

rows_out = []
for c in cases:
    r = run_rag_traced(q, top_k=c["top_k"], rerank_k=c["rerank_k"], max_new_tokens=c["max_new_tokens"], max_context_chars=c["max_context_chars"])
    rows_out.append(
        {
            "case": c["name"],
            **{k: float(v) for k, v in r.timings_s.items()},
            "answer_chars": int(len(r.answer_clean or "")),
            "context_chunks": int(len(r.used_chunks)),
        }
    )

df = pd.DataFrame(rows_out)
display(df[["case", "total", "embed_query", "retrieve", "rerank", "pack_context", "generate", "answer_chars", "context_chunks"]])

fig = px.bar(
    df,
    x="case",
    y=["embed_query", "retrieve", "rerank", "pack_context", "generate"],
    title="Stage latency breakdown (seconds)",
    labels={"value": "seconds", "variable": "stage"},
)
fig.update_layout(barmode="stack")
fig.show()

fig2 = px.scatter(df, x="answer_chars", y="total", color="case", title="Total latency vs answer size")
fig2.show()



## Consulting drills

These are the kinds of “in the room with the customer” exercises that turn cloud engineers into strong AI consultants.

### Drill A: Tail latency (p95 blows up)
We’ll increase concurrency and watch:
- error rate
- p95
- trace spans showing queueing/backpressure

### Drill B: Quality regression (Top_K too small / too big)
We’ll sweep `top_k` and track:
- total latency
- a simple proxy: did the *expected doc_id* appear in the chosen context? (hit-rate)

### Drill C: Failure analysis (timeouts, 502)
We’ll intentionally set aggressive timeouts and show how to:
- classify errors
- add safe retries
- surface failures in traces



In [None]:
# Drill A: concurrency sweep (portable threadpool loadgen)

from concurrent.futures import ThreadPoolExecutor, as_completed


def pct(xs: list[float], p: float) -> float:
    if not xs:
        return float("nan")
    return float(np.percentile(np.array(xs, dtype=np.float64), p))


def one_request(timeout_s: float) -> tuple[bool, float, str | None]:
    t0 = time.perf_counter()
    try:
        # Keep it smaller so the sweep finishes.
        _ = run_rag_traced(q, top_k=10, rerank_k=5, max_new_tokens=96)
        return True, float(time.perf_counter() - t0), None
    except Exception as e:
        return False, float(time.perf_counter() - t0), f"{type(e).__name__}: {str(e)[:120]}"


def run_load(total_requests: int, concurrency: int, timeout_s: float) -> dict[str, Any]:
    # Start a lightweight hardware watcher so we can correlate p95 cliffs with saturation.
    watcher = GPUWatcher(interval_s=0.25)
    watcher.start()

    t0 = time.perf_counter()
    ok_lat: list[float] = []
    errs: list[str] = []

    with ThreadPoolExecutor(max_workers=int(concurrency)) as ex:
        futs = [ex.submit(one_request, float(timeout_s)) for _ in range(int(total_requests))]
        for f in as_completed(futs):
            ok, dt, err = f.result()
            if ok:
                ok_lat.append(float(dt))
            else:
                errs.append(str(err))

    wall = float(time.perf_counter() - t0)

    df_hw = watcher.stop()
    # Aggregate a few signals
    def _mean(col: str) -> float:
        if col not in df_hw.columns or df_hw.empty:
            return float("nan")
        return float(np.nanmean(df_hw[col].astype(float)))

    def _max(col: str) -> float:
        if col not in df_hw.columns or df_hw.empty:
            return float("nan")
        return float(np.nanmax(df_hw[col].astype(float)))

    return {
        "concurrency": int(concurrency),
        "total_requests": int(total_requests),
        "throughput_rps": float(total_requests / wall) if wall > 0 else float("nan"),
        "error_rate": float(len(errs) / total_requests) if total_requests else float("nan"),
        "p50_s": pct(ok_lat, 50),
        "p95_s": pct(ok_lat, 95),
        "p99_s": pct(ok_lat, 99),
        "sample_error": (errs[0] if errs else None),
        "gpu_util_mean": _mean("gpu_util_pct"),
        "gpu_util_max": _max("gpu_util_pct"),
        "gpu_vram_used_max_gib": _max("gpu_vram_used_gib"),
        "gpu_power_max_w": _max("gpu_power_w"),
        "gpu_temp_max_c": _max("gpu_temp_c"),
    }


levels = [1, 2, 4, 8, 16]
rows = [run_load(total_requests=40, concurrency=c, timeout_s=60.0) for c in levels]
df_load = pd.DataFrame(rows)
display(df_load)

fig = px.line(df_load, x="concurrency", y=["p50_s", "p95_s", "p99_s"], markers=True, title="RAG latency vs concurrency")
fig.show()

fig2 = px.line(df_load, x="concurrency", y="throughput_rps", markers=True, title="Throughput vs concurrency")
fig2.show()

fig3 = px.bar(df_load, x="concurrency", y="error_rate", title="Error rate vs concurrency")
fig3.show()

# Hardware overlay: did we saturate?
if any(c in df_load.columns for c in ["gpu_util_mean", "gpu_vram_used_max_gib"]):
    cols = [c for c in ["gpu_util_mean", "gpu_util_max", "gpu_vram_used_max_gib", "gpu_power_max_w", "gpu_temp_max_c"] if c in df_load.columns]
    fig4 = px.line(df_load, x="concurrency", y=cols, markers=True, title="Hardware signals vs concurrency")
    fig4.show()



In [None]:
# Drill B: sweep Top_K (latency vs a simple retrieval-quality proxy)

# Proxy: because this workshop corpus is synthetic, we use a simple signal:
# if any chunk from the same doc_id appears in the final context.

TOPK = [3, 5, 8, 10, 15, 20, 30]

def proxy_hit_rate(runs: list[RAGRun], gold_doc_id: str) -> float:
    hits = 0
    for r in runs:
        if any(str(u.get("doc_id")) == str(gold_doc_id) for u in r.used_chunks):
            hits += 1
    return float(hits / max(1, len(runs)))

# Pick a doc as the "gold" target (first row) and build a query from its title
row0 = df_docs.iloc[0]
gold_doc_id = str(row0.get("doc_id"))
gold_title = str(row0.get("title") or "")
query2 = (gold_title[:90] or "Kubernetes incident runbook")

records = []
for k in TOPK:
    r = run_rag_traced(query2, top_k=int(k), rerank_k=5, max_new_tokens=120)
    records.append({"top_k": int(k), "total_s": float(r.timings_s["total"]), "context_chunks": len(r.used_chunks)})

_df = pd.DataFrame(records)
display(_df)

fig = px.line(_df, x="top_k", y="total_s", markers=True, title="Latency vs Top_K")
fig.show()

fig2 = px.line(_df, x="top_k", y="context_chunks", markers=True, title="Context chunks used vs Top_K")
fig2.show()



In [None]:
# Drill C: failure analysis (timeouts + retries)

# This demonstrates how to surface failures in traces and still produce usable consulting artifacts.


def embed_with_timeout(text: str, timeout_s: float) -> dict[str, Any]:
    cfg = NIMConfig(base_url=NIM_BASE_URL, timeout_s=float(timeout_s))
    c = NIMClient(cfg)
    t0 = time.perf_counter()
    try:
        _emb, _ = c.embed([text], input_type="query")
        return {"ok": True, "latency_s": float(time.perf_counter() - t0), "error": None}
    except Exception as e:
        return {"ok": False, "latency_s": float(time.perf_counter() - t0), "error": f"{type(e).__name__}: {str(e)[:160]}"}


def embed_with_retry(text: str, timeout_s: float, retries: int = 3, base_sleep_s: float = 0.25) -> dict[str, Any]:
    for attempt in range(1, int(retries) + 1):
        out = embed_with_timeout(text, timeout_s=float(timeout_s))
        out["attempt"] = int(attempt)
        if out["ok"]:
            return out
        if attempt < retries:
            time.sleep(float(base_sleep_s) * (2 ** (attempt - 1)))
    return out


# Intentionally aggressive timeout
res = embed_with_retry("ping", timeout_s=0.25, retries=3)
print(res)



## Consulting checklist (what to say in front of a customer)

When you have traces, your story becomes crisp.

### 1) Frame the problem in SLO language
- “Our p95 is Xs. We need it under Ys.”
- “Error rate is Z% and rising with concurrency.”

### 2) Point at the bottleneck stage
- “Generation dominates because the token budget is large.”
- “Rerank dominates because rerank_k is high.”
- “Embed dominates because we’re embedding too often / not caching.”

### 3) Offer safe, reversible knobs
- Reduce `max_new_tokens` (often the biggest lever)
- Reduce `top_k` (but explain recall risk)
- Reduce `rerank_k` (often good tradeoff)
- Cap concurrency; add backpressure
- Add timeouts + retries with backoff (avoid retry storms)

### 4) Call out capacity constraints explicitly
- “These services are GPU-memory heavy; you may need to split embed/rerank vs chat onto different GPUs.”

### 5) Leave them with artifacts
- A trace screenshot showing stage breakdown
- A small table of configs and p50/p95/error rate

