# Tutorial 9 - Tracing LLM Workflows with Arize Phoenix

## Where You Are in the Learning Journey

```
 Tutorials 1-5      Tutorials 6-8      Tutorial 9
 RAG Fundamentals   Agent Extension    Observability
 (retrieval         (ReAct, Reflect,   and Tracing
  pipeline)          State Mgmt)        (you are here)
```

**What this tutorial adds:** visibility into what is happening *inside* the
LLM workflow at runtime. Every retrieval call, every LLM generation, and every
agent step produces a structured record called a **span**. A group of related
spans is called a **trace**. You will capture, inspect, and display those traces
both in-notebook and inside a running Arize Phoenix server.

**What you will learn in this tutorial:**
- What tracing is and why it matters for LLM workflows
- What a span is: the atomic unit of a trace
- How OpenTelemetry provides a standard API for capturing spans
- How to trace retrieval, generation, and agent steps step by step
- How Arize Phoenix visualises traces in a browser UI
- How to use auto-instrumentation so OpenAI calls are traced automatically

**Prerequisites:** Tutorials 1-8 (understand RAG and the agent loop).
Python basics. No prior knowledge of OpenTelemetry or Phoenix required.

```mermaid
flowchart LR
    Q[User Question] --> R[Retrieve]
    R --> G[Generate Answer]
    R -. span .-> T[(Trace Store)]
    G -. span .-> T
    T --> P[Phoenix UI]
```


## Why Does an LLM Workflow Need Tracing?

### The Problem with Black-Box Pipelines

After Tutorials 1-8 you have a working RAG pipeline and an agent loop.
Both can answer policy questions. But when something goes wrong - a wrong
answer, unexpected latency, or a retrieval miss - answering "why?" is hard
without detailed records of each step.

| Problem | Without tracing | With tracing |
|---------|----------------|--------------|
| Wrong answer | Re-read code; guess | Inspect the exact chunks the LLM received |
| High latency | Add print statements | See per-step timing in a timeline |
| Retrieval miss | Check embedding manually | See the query, top-k, and scores per call |
| Agent loop bug | Add logging everywhere | Read Thought/Action/Observation for every step |

### What Tracing Adds

Tracing instruments your code so that each meaningful operation emits a
structured record called a **span**. A span captures:

- a name (for example 'retrieval' or 'generation')
- a start timestamp and an end timestamp
- a status (OK or ERROR)
- arbitrary key-value **attributes** (for example query='leave policy', top_k=5)

Spans from the same user request are linked into a **trace**, which looks like
a nested timeline. A trace viewer such as Arize Phoenix shows you every step
of every request in one place.

```
Trace for question: 'What is the international work limit?'
  |-- retrieval  (12 ms)   query='international work limit'  result_count=5
  |-- generation (340 ms)  model='gpt-4.1-mini'  chunks=5  answer_words=28
```


## What Is OpenTelemetry?

**OpenTelemetry** (OTel) is an open standard for collecting observability
signals (traces, metrics, logs) from any software system. It is vendor-neutral:
the same instrumentation code can send data to Arize Phoenix, Jaeger, Grafana
Tempo, or any OTLP-compatible backend.

The three key concepts you need for this tutorial:

| Concept | What it is | Example |
|---------|-----------|--------|
| TracerProvider | The factory that creates tracers; also holds the exporter | One per application |
| Tracer | Creates spans; you call tracer.start_as_current_span() | One per module |
| Span | A single timed operation with attributes | 'retrieval' span |

**How data flows:**

```
Your code
  -> tracer.start_as_current_span('retrieval')
  -> span.set_attribute('retrieval.query', 'leave policy')
  -> span ends
  -> SpanProcessor sends the finished span to an Exporter
  -> Exporter writes to: in-memory buffer  OR  Phoenix server  OR  cloud backend
```

**This tutorial uses two exporters:**

1. `InMemorySpanExporter` - stores spans in a Python list; no server needed;
   used for the offline demos in this notebook.
2. Phoenix OTLP exporter - sends spans over HTTP to a running Phoenix server;
   used in the Phoenix integration section at the end.

**What is Arize Phoenix?**

Arize Phoenix is an open-source LLM observability platform. It provides:
- a local server that receives OpenTelemetry traces
- a browser UI that shows traces, timelines, and span attributes
- built-in evaluation tools for LLM output quality

You will use it as a trace viewer after learning to capture spans manually.

```mermaid
flowchart LR
    Code[Your Python code] -->|OTel spans| SP[SpanProcessor]
    SP -->|offline demo| Mem[InMemorySpanExporter]
    SP -->|Phoenix mode| OTLP[OTLP HTTP Exporter]
    OTLP --> PX[Phoenix Server :6006]
    PX --> UI[Browser UI]
```


## Setup: Install Dependencies and Load Environment

The cell below installs dependencies and loads the same RAG pipeline used in
Tutorials 1-8. It also imports the `tracing` module from `rag_tutorials` which
provides the span-recording helpers you will use throughout this notebook.


In [None]:
import importlib
import os
from pathlib import Path
import shutil
import subprocess
import sys

import pandas as pd
from dotenv import load_dotenv

if shutil.which("uv") is None:
    print("uv not found. Installing with pip...")
    subprocess.run([sys.executable, "-m", "pip", "install", "uv"], check=True)

cwd = Path.cwd().resolve()
repo_root = next(
    (path for path in [cwd, *cwd.parents]
     if (path / "pyproject.toml").exists() and (path / "src").exists()),
    cwd,
)
os.chdir(repo_root)
src_path = repo_root / "src"
if str(src_path) not in sys.path:
    sys.path.insert(0, str(src_path))

REQUIRED_PACKAGES = [
    "openai", "chromadb", "numpy", "pandas", "rank_bm25",
    "sentence_transformers", "dotenv", "opentelemetry",
    "arize", "openinference",
]
PIP_NAME_MAP = {
    "rank_bm25": "rank-bm25",
    "sentence_transformers": "sentence-transformers",
    "dotenv": "python-dotenv",
    "opentelemetry": "opentelemetry-sdk",
    "arize": "arize-phoenix",
    "openinference": "openinference-instrumentation-openai",
}

def find_missing(packages):
    importlib.invalidate_caches()
    return [
        pkg for pkg in packages
        if importlib.util.find_spec(pkg.split(".")[0]) is None
    ]

missing = find_missing(REQUIRED_PACKAGES)
if missing:
    print("Missing packages:", missing)
    subprocess.run(["uv", "sync"], check=True)

missing_after_sync = find_missing(REQUIRED_PACKAGES)
if missing_after_sync:
    pip_targets = [PIP_NAME_MAP.get(pkg, pkg) for pkg in missing_after_sync]
    subprocess.run([sys.executable, "-m", "pip", "install", *pip_targets], check=True)

final_missing = find_missing(REQUIRED_PACKAGES)
if final_missing:
    raise ImportError(f"Dependencies still missing: {final_missing}")

from rag_tutorials.io_utils import load_handbook_documents, load_queries
from rag_tutorials.chunking import semantic_chunk_documents
from rag_tutorials.pipeline import build_dense_retriever
from rag_tutorials.qa import answer_with_context
from rag_tutorials.tracing import (
    build_in_memory_tracer,
    record_retrieval_span,
    record_generation_span,
    record_agent_step_span,
    spans_to_dicts,
)

load_dotenv()
if not os.getenv("OPENAI_API_KEY"):
    raise EnvironmentError("OPENAI_API_KEY is required")

embedding_model = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
chat_model = os.getenv("OPENAI_CHAT_MODEL", "gpt-4.1-mini")

handbook_path = Path("data/handbook_manual.txt")
queries_path = Path("data/queries.jsonl")
if not handbook_path.exists() or not queries_path.exists():
    raise FileNotFoundError("Run: uv run python scripts/generate_data.py")

documents = load_handbook_documents(handbook_path)
queries = load_queries(queries_path)
chunks = semantic_chunk_documents(documents)
dense_retriever, _ = build_dense_retriever(
    chunks=chunks,
    collection_name="tracing_tutorial_dense",
    embedding_model=embedding_model,
)
print("Setup complete.")
print(f"Documents loaded : {len(documents)}")
print(f"Chunks created   : {len(chunks)}")
print(f"Queries loaded   : {len(queries)}")


## Part 1: Capturing Spans In Memory

### What Does a Span Look Like?

A span is a Python object with the following fields:

| Field | Type | Meaning |
|-------|------|---------|
| `name` | str | Operation name, e.g. 'retrieval' |
| `start_time` | int (nanoseconds) | When the operation started |
| `end_time` | int (nanoseconds) | When the operation finished |
| `status` | StatusCode | OK or ERROR |
| `attributes` | dict | Key-value metadata, e.g. {'retrieval.query': 'leave policy'} |

The `build_in_memory_tracer()` helper creates a fresh `TracerProvider` and
`InMemorySpanExporter`. All spans started with the returned tracer are stored
inside the exporter until you call `exporter.get_finished_spans()`.

In the next cell you will create your first span manually to see the raw data.


In [None]:
# Create a tracer and exporter backed by an in-memory buffer.
# No server, no network calls - spans are stored as Python objects.

tracer, exporter = build_in_memory_tracer()

# Start a span, set some attributes, then end it.
# The 'with' block defines the span's lifetime:
# the span starts when the block is entered and ends when it exits.
with tracer.start_as_current_span("my_first_span") as span:
    span.set_attribute("greeting", "hello world")
    span.set_attribute("step", 1)
    # Any code here runs 'inside' the span.
    result = 2 + 2

# The span is now finished. Read it back from the exporter.
finished = exporter.get_finished_spans()
print(f"Number of finished spans : {len(finished)}")

span_obj = finished[0]
print(f"Name       : {span_obj.name}")
print(f"Status     : {span_obj.status.status_code.name}")
print(f"Attributes : {dict(span_obj.attributes)}")

duration_ns = span_obj.end_time - span_obj.start_time
print(f"Duration   : {duration_ns / 1_000_000:.3f} ms")


### Converting Spans to a Display Table

Raw span objects have nanosecond timestamps and OpenTelemetry status objects.
The `spans_to_dicts()` helper converts a list of finished spans into plain
Python dicts so you can display them with pandas.

Each dict has these keys:

- `name` - span name
- `status` - 'UNSET' (healthy), 'OK', or 'ERROR'
- `duration_ms` - wall-clock duration in milliseconds
- `attributes` - dict of all set attributes


In [None]:
# Use spans_to_dicts to convert spans to a readable format.

# First create a few spans so the table has multiple rows.
tracer2, exporter2 = build_in_memory_tracer()

import time

with tracer2.start_as_current_span("step_one") as s:
    s.set_attribute("info", "preprocessing")
    time.sleep(0.005)   # simulate 5 ms of work

with tracer2.start_as_current_span("step_two") as s:
    s.set_attribute("info", "retrieval")
    time.sleep(0.012)   # simulate 12 ms of work

with tracer2.start_as_current_span("step_three") as s:
    s.set_attribute("info", "generation")
    time.sleep(0.025)   # simulate 25 ms of work

span_dicts = spans_to_dicts(exporter2.get_finished_spans())

# Show as a summary table (without the nested attributes column)
df_spans = pd.DataFrame([
    {"name": d["name"], "status": d["status"], "duration_ms": d["duration_ms"]}
    for d in span_dicts
])
print(df_spans.to_string(index=False))

# Show the attributes for each span
print("\nAttributes per span:")
for d in span_dicts:
    print(f"  {d['name']}: {d['attributes']}")


## Part 2: Tracing a Retrieval Step

The `record_retrieval_span()` helper wraps a retriever call in a span so that
every call records:

| Attribute | What it captures |
|-----------|------------------|
| `retrieval.query` | The query string passed to the retriever |
| `retrieval.top_k` | Maximum results requested |
| `retrieval.result_count` | Actual results returned |
| `retrieval.top_score` | Cosine similarity of the best match (if any) |

**Why is this useful?**

If the agent is getting a wrong answer, you can check `retrieval.top_score` to
see whether the retrieval was confident. A low top score means the vector index
did not find a good match and the answer is likely to be wrong.

**How does it work?**

You call `record_retrieval_span()` *after* you have the results back from the
retriever. The function starts a span, sets the attributes, and ends the span
all inside a single `with` block, so the duration captures the full retrieval
latency when you put the retriever call inside the block instead of before it.

The example below shows both patterns:
1. Record attributes from a finished retrieval (simpler)
2. Run the retrieval inside the span (records exact latency)


In [None]:
# Pattern 1: record attributes from results you already have

query = "What is the maximum number of days an employee can work internationally?"
TOP_K = 5

# Run the retriever
results = dense_retriever(query, top_k=TOP_K)

# Create a fresh tracer for this demo
tracer_r, exporter_r = build_in_memory_tracer()

# Record a span that describes this retrieval call
record_retrieval_span(tracer_r, query=query, results=results, top_k=TOP_K)

span_data = spans_to_dicts(exporter_r.get_finished_spans())[0]
print("Retrieval span attributes:")
for key, value in span_data["attributes"].items():
    print(f"  {key}: {value}")
print(f"\nDuration: {span_data['duration_ms']} ms  (recording overhead only)")
print()

# Show the top retrieved chunks for inspection
print("Top chunks retrieved:")
df_results = pd.DataFrame([
    {"rank": i+1, "chunk_id": r.chunk_id, "score": round(r.score, 4),
     "preview": r.text[:80] + "..."}
    for i, r in enumerate(results)
])
print(df_results.to_string(index=False))


## Part 3: Tracing an Answer-Generation Step

The `record_generation_span()` helper records the LLM call that turns
retrieved chunks into a final answer. It captures:

| Attribute | What it captures |
|-----------|------------------|
| `generation.question` | The user question |
| `generation.model` | Chat model name |
| `generation.context_chunk_count` | Number of chunks in the prompt |
| `generation.answer_word_count` | Number of words in the answer |

**Note:** Recording the full question and answer text inside a span attribute
is useful for debugging but should be done selectively in production because
long strings increase storage and may contain sensitive content.

In the cell below we run a full retrieval-then-generation pipeline and trace
both steps so you can see a two-span trace.


In [None]:
# Trace a full retrieval -> generation pipeline.

tracer_pipe, exporter_pipe = build_in_memory_tracer()

question = "What is the policy for working remotely from another country?"

# Step 1: Retrieve with tracing
retrieved = dense_retriever(question, top_k=5)
record_retrieval_span(tracer_pipe, query=question, results=retrieved, top_k=5)

# Step 2: Generate answer with tracing
context_chunks = [r.text for r in retrieved]
answer = answer_with_context(question, context_chunks, model=chat_model)
record_generation_span(
    tracer_pipe,
    question=question,
    answer=answer,
    model=chat_model,
    context_chunk_count=len(context_chunks),
)

# Display the two-span trace as a table
trace_dicts = spans_to_dicts(exporter_pipe.get_finished_spans())
df_trace = pd.DataFrame([
    {
        "span": d["name"],
        "status": d["status"],
        "duration_ms": d["duration_ms"],
        "key_attribute": next(iter(d["attributes"].items()), ("", ""))[1],
    }
    for d in trace_dicts
])
print("Two-span trace summary:")
print(df_trace.to_string(index=False))

print("\nFull attribute details:")
for d in trace_dicts:
    print(f"\n  [{d['name']}]")
    for k, v in d["attributes"].items():
        v_display = (str(v)[:80] + "...") if len(str(v)) > 80 else v
        print(f"    {k}: {v_display}")

print("\n" + "="*60)
print("Answer:", answer)


## Part 4: Tracing Agent Steps

In Tutorial 6 the ReAct agent produces a series of Thought-Action-Observation
cycles. Without tracing, you can only see these by printing them to the
terminal. With tracing, each step is a span you can query and filter.

The `record_agent_step_span()` helper records each cycle as a span named
`agent_step` with these attributes:

| Attribute | What it captures |
|-----------|------------------|
| `agent.step_number` | Step index within the loop (1-based) |
| `agent.thought` | The agent's reasoning text |
| `agent.action` | Tool name called ('retrieve' or 'finish') |
| `agent.action_input` | Input string passed to the tool |
| `agent.observation_length` | Length of the tool's output in characters |

The cell below builds a **traced agent loop**: a wrapper around
`run_react_loop` that records a span for each step after the loop completes.

**Observation:** After running, you can query the trace to answer questions
like 'how many steps did the agent take?' or 'what did the agent search for?'
without re-reading the agent's terminal output.


In [None]:
from rag_tutorials.agent_loop import run_react_loop


def traced_agent_run(question: str, tracer, top_k: int = 3) -> dict:
    """Run the ReAct agent and record each step as an agent_step span.

    Args:
        question: User question to answer.
        tracer: Active OpenTelemetry tracer.
        top_k: Number of chunks per retrieval call.

    Returns:
        Dict with 'answer', 'step_count', and 'steps'.
    """
    def retrieve_tool(query: str) -> str:
        results = dense_retriever(query, top_k=top_k)
        if not results:
            return "No relevant chunks found."
        parts = [f"Chunk {i+1} [{r.chunk_id}]: {r.text}" for i, r in enumerate(results)]
        return "\n\n".join(parts)

    tools = {"retrieve": retrieve_tool}
    result = run_react_loop(question=question, tools=tools, model=chat_model, max_steps=5)

    for i, step in enumerate(result.steps, start=1):
        record_agent_step_span(
            tracer,
            step_number=i,
            thought=step.thought,
            action=step.action,
            action_input=step.action_input,
            observation=step.observation,
        )

    return {"answer": result.answer, "step_count": len(result.steps), "steps": result.steps}


tracer_agent, exporter_agent = build_in_memory_tracer()

agent_question = "What happens if an employee works internationally beyond the allowed limit?"
agent_run = traced_agent_run(agent_question, tracer_agent)

print(f"Question   : {agent_question}")
print(f"Steps taken: {agent_run['step_count']}")
print(f"Answer     : {agent_run['answer']}")

print("\nAgent step spans:")
agent_span_dicts = spans_to_dicts(exporter_agent.get_finished_spans())
for d in agent_span_dicts:
    attrs = d["attributes"]
    print(
        f"  step {attrs.get('agent.step_number')}"
        f"  action={attrs.get('agent.action')!r}"
        f"  obs_len={attrs.get('agent.observation_length')}"
        f"  duration={d['duration_ms']} ms"
    )
print()
print("Thoughts recorded:")
for d in agent_span_dicts:
    print(f"  Step {d['attributes']['agent.step_number']}: {d['attributes']['agent.thought'][:100]}")


## Part 5: Connecting to Arize Phoenix

The in-memory approach you used above captures spans in Python objects.
This is useful for quick inspection but there is no persistent storage and
no visual timeline. Arize Phoenix solves both problems.

### What Phoenix Provides

When you send traces to Phoenix:

1. Every span is stored in Phoenix's local database.
2. Phoenix groups spans from the same request into a trace timeline.
3. You can filter traces by project, by span name, by attribute value.
4. You can view the nested span tree for each request in a timeline diagram.

### How to Start a Local Phoenix Server

Phoenix can run as a local server that receives spans via OTLP/HTTP on port
6006. There are two ways to start it:

**Option A: In-notebook launch (simplest)**

```python
import phoenix as px
session = px.launch_app()      # starts the Phoenix server in the background
print(session.url)             # open this URL in your browser
```

**Option B: Command-line launch**

```bash
python -m phoenix.server.main serve
# then open http://localhost:6006 in your browser
```

### How to Register the Phoenix Tracer

After starting the server, register Phoenix as the global OTLP destination:

```python
from phoenix.otel import register

tracer_provider = register(
    project_name="rag_tutorial",
    endpoint="http://localhost:6006/v1/traces",  # Phoenix OTLP endpoint
)
```

After calling `register()`, the global OpenTelemetry tracer provider is set to
Phoenix. Any span you create from this point on is automatically sent to the
Phoenix server.

### How to Auto-Instrument OpenAI

OpenInference provides auto-instrumentation for the OpenAI Python SDK:

```python
from openinference.instrumentation.openai import OpenAIInstrumentor

OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)
```

After this call, every `client.chat.completions.create()` and
`client.responses.create()` call automatically produces a span with the
model name, prompt tokens, completion tokens, and latency - no manual
span recording required.

The next cells set up Phoenix and run the traced pipeline with auto-instrumentation.


In [None]:
# This cell starts a local Phoenix server and registers it as the
# global OpenTelemetry tracer provider.
#
# If Phoenix is not installed or the server is already running,
# the cell prints a clear error and continues safely.

phoenix_available = False
phoenix_session = None
phoenix_tracer_provider = None

try:
    import phoenix as px
    from phoenix.otel import register

    phoenix_session = px.launch_app()
    phoenix_url = phoenix_session.url

    phoenix_tracer_provider = register(
        project_name="rag_tutorial",
        endpoint=f"{phoenix_url}v1/traces",
    )

    phoenix_available = True
    print(f"Phoenix server started: {phoenix_url}")
    print("Open the URL above in your browser to view traces.")
    print("Project: rag_tutorial")

except Exception as exc:
    print(f"Phoenix not available: {exc}")
    print("The in-memory tracing cells above still work without Phoenix.")
    print("To enable Phoenix: uv sync (installs arize-phoenix from pyproject.toml)")


In [None]:
# Auto-instrument OpenAI if Phoenix is running.
# After this call, every OpenAI API call is automatically traced.

auto_instrumented = False

if phoenix_available and phoenix_tracer_provider is not None:
    try:
        from openinference.instrumentation.openai import OpenAIInstrumentor

        OpenAIInstrumentor().instrument(tracer_provider=phoenix_tracer_provider)
        auto_instrumented = True
        print("OpenAI auto-instrumentation active.")
        print("All embedding and chat calls will be traced automatically.")
    except Exception as exc:
        print(f"Auto-instrumentation unavailable: {exc}")
else:
    print("Skipping auto-instrumentation (Phoenix not running).")
    print("Using manual record_*_span() helpers instead.")


### Running the Traced Pipeline Against Phoenix

The cell below runs the same retrieval and generation steps as Part 3 but
with the Phoenix tracer provider active. After running:

1. Open the Phoenix URL printed above.
2. Select the 'rag_tutorial' project.
3. Each row in the traces list represents one pipeline run.
4. Click a row to see the span tree and every attribute.

**What to look for in Phoenix:**

- The 'retrieval' span shows the query and top score.
- The 'generation' span shows the model and word count.
- If auto-instrumentation is active, you will also see child spans for
  the embedding API call and the chat completion call, with token counts.


In [None]:
# Run pipeline with Phoenix tracing if available,
# otherwise fall back to in-memory tracing.

from opentelemetry import trace as otel_trace
from rag_tutorials.tracing import TRACER_NAME

if phoenix_available and phoenix_tracer_provider is not None:
    # Use the Phoenix provider that was registered by phoenix.otel.register()
    phoenix_tracer = phoenix_tracer_provider.get_tracer(TRACER_NAME)
    exporter_px = None
    print("Using Phoenix tracer (spans sent to Phoenix server)")
else:
    # Fall back to in-memory
    phoenix_tracer, exporter_px = build_in_memory_tracer()
    print("Using in-memory tracer (Phoenix not running)")

phoenix_question = "What are the requirements for a Global Mobility case?"

# Retrieve
px_results = dense_retriever(phoenix_question, top_k=5)
record_retrieval_span(phoenix_tracer, query=phoenix_question, results=px_results, top_k=5)

# Generate
px_context = [r.text for r in px_results]
px_answer = answer_with_context(phoenix_question, px_context, model=chat_model)
record_generation_span(
    phoenix_tracer,
    question=phoenix_question,
    answer=px_answer,
    model=chat_model,
    context_chunk_count=len(px_context),
)

if exporter_px is not None:
    # In-memory fallback: print the trace locally
    px_span_dicts = spans_to_dicts(exporter_px.get_finished_spans())
    print("\nIn-memory trace (Phoenix not running):")
    for d in px_span_dicts:
        print(f"  [{d['name']}] {d['duration_ms']} ms   {d['attributes']}")

print(f"\nAnswer: {px_answer}")


## Part 6: Tracing Multiple Questions (Benchmark Mode)

The cells above traced individual queries one at a time. In a real system you
want to trace all evaluation queries so you can spot patterns:

- Which queries produce low top scores (likely retrieval misses)?
- Which queries generate the longest answers?
- Which queries take the most time?

The cell below runs five queries from the shared query set, records a span
for each step, and summarises the trace table.


In [None]:
# Trace five queries and summarise the results.

tracer_bench, exporter_bench = build_in_memory_tracer()

eval_queries = queries[:5]

for q in eval_queries:
    bench_results = dense_retriever(q.question, top_k=5)
    record_retrieval_span(
        tracer_bench, query=q.question, results=bench_results, top_k=5
    )

    bench_context = [r.text for r in bench_results]
    bench_answer = answer_with_context(q.question, bench_context, model=chat_model)
    record_generation_span(
        tracer_bench,
        question=q.question,
        answer=bench_answer,
        model=chat_model,
        context_chunk_count=len(bench_context),
    )

all_span_dicts = spans_to_dicts(exporter_bench.get_finished_spans())

retrieval_spans = [d for d in all_span_dicts if d["name"] == "retrieval"]
generation_spans = [d for d in all_span_dicts if d["name"] == "generation"]

print(f"Total spans captured : {len(all_span_dicts)}")
print(f"  retrieval spans    : {len(retrieval_spans)}")
print(f"  generation spans   : {len(generation_spans)}")

print("\nRetrieval span summary:")
df_ret = pd.DataFrame([
    {
        "query": d["attributes"].get("retrieval.query", "")[:50] + "...",
        "result_count": d["attributes"].get("retrieval.result_count"),
        "top_score": round(d["attributes"].get("retrieval.top_score", 0), 4),
        "duration_ms": d["duration_ms"],
    }
    for d in retrieval_spans
])
print(df_ret.to_string(index=False))

print("\nGeneration span summary:")
df_gen = pd.DataFrame([
    {
        "model": d["attributes"].get("generation.model"),
        "chunks": d["attributes"].get("generation.context_chunk_count"),
        "answer_words": d["attributes"].get("generation.answer_word_count"),
        "duration_ms": d["duration_ms"],
    }
    for d in generation_spans
])
print(df_gen.to_string(index=False))


## Part 7: Visualising Trace Data in the Notebook

When Phoenix is not available, you can still visualise trace data using
matplotlib. The chart below shows the per-step latency breakdown for the
five benchmark queries so you can identify which operations are slowest.


In [None]:
import matplotlib.pyplot as plt

# Build a side-by-side bar chart of retrieval vs generation latency
retrieval_times = [d["duration_ms"] for d in retrieval_spans]
generation_times = [d["duration_ms"] for d in generation_spans]
query_labels = [f"Q{i+1}" for i in range(len(retrieval_spans))]

x = range(len(query_labels))
width = 0.35

fig, ax = plt.subplots(figsize=(9, 4))
bars_ret = ax.bar([xi - width/2 for xi in x], retrieval_times, width, label="retrieval", color="steelblue")
bars_gen = ax.bar([xi + width/2 for xi in x], generation_times, width, label="generation", color="coral")

ax.set_xlabel("Query")
ax.set_ylabel("Duration (ms)")
ax.set_title("Per-query latency: retrieval vs generation (from trace spans)")
ax.set_xticks(list(x))
ax.set_xticklabels(query_labels)
ax.legend()
plt.tight_layout()
plt.show()

total_ret = sum(retrieval_times)
total_gen = sum(generation_times)
pct_ret = 100 * total_ret / (total_ret + total_gen) if (total_ret + total_gen) > 0 else 0
pct_gen = 100 * total_gen / (total_ret + total_gen) if (total_ret + total_gen) > 0 else 0
print(f"Average retrieval latency : {sum(retrieval_times)/len(retrieval_times):.1f} ms ({pct_ret:.0f}% of total)")
print(f"Average generation latency: {sum(generation_times)/len(generation_times):.1f} ms ({pct_gen:.0f}% of total)")


## Part 8: Tracing Error and Edge-Case Paths

A span can have status `OK` or `ERROR`. Recording errors in spans lets you
filter for failed requests in Phoenix and inspect exactly which attributes
were set at the time of the failure.

The cell below demonstrates wrapping a failing operation in a span that
records the error status.

**How to set an error status on a span:**

```python
from opentelemetry.trace import StatusCode

with tracer.start_as_current_span("my_operation") as span:
    try:
        result = risky_operation()
        span.set_status(StatusCode.OK)
    except Exception as exc:
        span.set_status(StatusCode.ERROR, description=str(exc))
        span.record_exception(exc)
        raise
```

The `record_exception()` call stores the exception type, message, and
stack trace as span events so you can read them in Phoenix.

The cell below shows a retrieval span on an empty query (a common edge case)
and confirms the span still has UNSET status (no error was raised - the
retriever returns an empty list gracefully).


In [None]:
from opentelemetry.trace import StatusCode

tracer_err, exporter_err = build_in_memory_tracer()

# Edge case 1: empty query -> retriever returns empty list; span is still OK
empty_results = dense_retriever("", top_k=3)
record_retrieval_span(tracer_err, query="", results=empty_results, top_k=3)

# Edge case 2: manually record a simulated error span
with tracer_err.start_as_current_span("retrieval_error_example") as err_span:
    try:
        raise ValueError("Simulated retriever failure: index unavailable")
    except ValueError as exc:
        err_span.set_status(StatusCode.ERROR, description=str(exc))
        err_span.record_exception(exc)

error_spans = spans_to_dicts(exporter_err.get_finished_spans())
print("Error-path span statuses:")
for d in error_spans:
    print(f"  name={d['name']!r}  status={d['status']}  attrs={d['attributes']}")


## Learning Checkpoint: Tracing with Arize Phoenix

### What Works

- Spans capture structured metadata about each pipeline step without changing
  the step's logic. The span is an observer, not a participant.
- `build_in_memory_tracer()` lets you capture and inspect spans entirely in
  Python without a server, which makes tracing easy to add to any tutorial.
- `spans_to_dicts()` converts spans to a table-friendly format so you can
  query trace data with pandas.
- `record_retrieval_span()`, `record_generation_span()`, and
  `record_agent_step_span()` are thin wrappers that cover the most important
  steps in the RAG and agent workflow without duplicating business logic.
- When Phoenix is running, the same spans appear in a browser timeline with
  no code changes beyond calling `register()` at startup.

### What Does Not Work Well

- Manual span recording requires you to remember to call the helper after
  every significant step. Auto-instrumentation (via OpenInference) removes
  this burden for OpenAI API calls but not for custom retrieval code.
- The in-memory exporter discards spans when the Python process ends. For
  persistent trace storage, a running Phoenix server (or a cloud backend)
  is required.
- Span attributes are limited to primitive types (strings, numbers, booleans).
  Complex objects such as chunk lists must be summarised (e.g. by recording
  only the count and the top score, not the full text).

### How to Extend This in Production

- Replace `build_in_memory_tracer()` with `phoenix.otel.register()` at
  application startup to send all traces to a persistent Phoenix server.
- Use `openinference.instrumentation.openai.OpenAIInstrumentor` to automatically
  capture token usage and prompt/completion text for every LLM call.
- Add span nesting: start a parent 'pipeline' span that wraps both the
  retrieval and generation child spans so they appear as one tree in Phoenix.
- Use Phoenix's evaluation tools to run automated LLM-as-judge quality checks
  on traced answers and flag low-quality responses.
