# OpenTelemetry LLM Observability

**Duration:** ~35 min | **Platform:** Kaggle dual Tesla T4

This notebook covers the full **observability stack** in llamatelemetry —
decorator-based tracing, context managers, semantic conventions, sampling
strategies, prompt redaction, and custom metrics.

### What you'll learn
1. Decorator-based tracing (`@trace`, `@workflow`, `@task`, `@tool`)
2. Context managers (`span()`, `session()`, `suppress_tracing()`)
3. LLM semantic conventions
4. Sampling strategies
5. Prompt redaction
6. Custom metrics

In [None]:
!pip install -q git+https://github.com/llamatelemetry/llamatelemetry.git@v1.0.0

import llamatelemetry

# Initialize with OTLP endpoint for trace export
llamatelemetry.init(
    service_name="otel-observability",
    environment="kaggle",
    # otlp_endpoint="https://otlp-gateway-prod-us-central-0.grafana.net/otlp",
    # otlp_headers={"Authorization": "Basic <token>"},
)
print(f"llamatelemetry {llamatelemetry.version()} — observability enabled")

## Decorator-Based Tracing

llamatelemetry provides four decorators that create spans with different semantics:

| Decorator | Purpose | Span Kind |
|-----------|---------|----------|
| `@trace()` | General tracing | Internal |
| `@workflow()` | Multi-step pipeline | Internal |
| `@task()` | Single unit of work | Internal |
| `@tool()` | External tool call | Internal |

In [None]:
@llamatelemetry.trace(name="generic-operation")
def traced_function(x):
    return x * 2

@llamatelemetry.workflow(name="data-pipeline")
def pipeline(data):
    cleaned = clean_data(data)
    analyzed = analyze_data(cleaned)
    return analyzed

@llamatelemetry.task(name="clean-data")
def clean_data(data):
    return [d.strip().lower() for d in data]

@llamatelemetry.task(name="analyze-data")
def analyze_data(data):
    return {"count": len(data), "unique": len(set(data))}

@llamatelemetry.tool(name="external-lookup")
def lookup_tool(key):
    return {"key": key, "value": f"result-for-{key}"}

# Execute the traced pipeline
result = pipeline([" Hello ", "World", " hello ", "test"])
print(f"Pipeline result: {result}")

tool_result = lookup_tool("my-key")
print(f"Tool result: {tool_result}")
print("\nTrace hierarchy: workflow → task(clean) → task(analyze)")

## Context Managers

Use `span()` for inline tracing, `session()` for user/session grouping,
and `suppress_tracing()` to skip noisy operations.

In [None]:
import time

# span() — inline tracing with custom attributes
with llamatelemetry.span("data-processing", batch_size=100, priority="high") as span:
    time.sleep(0.1)  # simulate work
    print("Span: data-processing (with custom attrs)")

# session() — group spans by user session
with llamatelemetry.session("user-abc-123", user_id="user-42") as session_span:
    with llamatelemetry.span("user-request-1"):
        time.sleep(0.05)
    with llamatelemetry.span("user-request-2"):
        time.sleep(0.05)
    print("Session: user-abc-123 (2 requests grouped)")

# suppress_tracing() — skip tracing for noisy/internal operations
with llamatelemetry.suppress_tracing():
    # This operation will NOT be traced
    for _ in range(100):
        pass
    print("Suppressed: 100 iterations not traced")

## Semantic Conventions

llamatelemetry defines standardized attribute keys for LLM, GPU, and NCCL
telemetry, following OpenTelemetry semantic convention patterns.

In [None]:
from llamatelemetry.semconv import keys
from llamatelemetry.semconv.attrs import model_attrs, gpu_attrs, set_llm_attrs

# View available semantic convention keys
llm_keys = [k for k in dir(keys) if k.startswith("LLM_")]
gpu_keys = [k for k in dir(keys) if k.startswith("GPU_")]
nccl_keys = [k for k in dir(keys) if k.startswith("NCCL_")]

print("LLM Semantic Keys:")
for k in llm_keys:
    print(f"  {k} = \"{getattr(keys, k)}\"")

print(f"\nGPU Keys: {len(gpu_keys)} keys")
print(f"NCCL Keys: {len(nccl_keys)} keys")

# Using attribute builders
attrs = model_attrs("gemma-3-1b-it", quant="Q4_K_M", sha256="abc123")
print(f"\nModel attributes: {attrs}")

gpu_a = gpu_attrs(gpu_id=0, utilization_pct=85.0, mem_used_mb=5120)
print(f"GPU attributes: {gpu_a}")

## Request Tracing with Phases

LLM inference has two distinct phases: **prefill** (prompt processing) and
**decode** (token generation). The SDK can create child spans for each phase.

In [None]:
from llamatelemetry.llama import ServerManager, LlamaCppClient
from huggingface_hub import hf_hub_download

model_path = hf_hub_download(
    repo_id="bartowski/google_gemma-3-1b-it-GGUF",
    filename="google_gemma-3-1b-it-Q4_K_M.gguf",
    cache_dir="/root/.cache/huggingface",
)

mgr = ServerManager()
mgr.start_server(model_path=model_path, gpu_layers=99, ctx_size=2048)
mgr.wait_until_ready(timeout=60)
client = LlamaCppClient(base_url="http://127.0.0.1:8090")

# Traced request with timing breakdown
@llamatelemetry.workflow(name="traced-inference")
def traced_inference(prompt):
    with llamatelemetry.span("prefill", **{keys.LLM_PHASE: "prefill"}):
        t0 = time.perf_counter()

    with llamatelemetry.span("completion", **{keys.LLM_MODEL: "gemma-3-1b-it"}):
        resp = client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            max_tokens=64, temperature=0.7,
        )
        elapsed_ms = (time.perf_counter() - t0) * 1000

    with llamatelemetry.span("decode", **{
        keys.LLM_PHASE: "decode",
        keys.LLM_TOKENS_TOTAL: resp.usage.completion_tokens,
        keys.LLM_REQUEST_DURATION_MS: elapsed_ms,
    }):
        pass

    return resp

resp = traced_inference("What is OpenTelemetry?")
print(f"Response: {resp.choices[0].message.content}")
print(f"Tokens: {resp.usage.completion_tokens}")

## Sampling Strategies

In production, you don't want to trace every request. Use sampling to control
the volume of telemetry data.

In [None]:
from llamatelemetry.otel import build_sampler

# Available sampling strategies
strategies = [
    ("always_on",   {}, "Trace everything — development/debugging"),
    ("ratio",       {"ratio": 0.1}, "10% of requests — production"),
    ("ratio",       {"ratio": 0.01}, "1% of requests — high-traffic production"),
]

for strategy, kwargs, description in strategies:
    sampler = build_sampler(strategy, **kwargs)
    print(f"  {strategy:15s} {str(kwargs):20s} — {description}")

print("\nUsage: llamatelemetry.init(sampling='ratio', sampling_ratio=0.1)")

## Prompt Redaction

Automatically redact sensitive information from traced prompts and responses.

In [None]:
from llamatelemetry.otel import RedactionSpanProcessor

# RedactionSpanProcessor replaces prompt/response content with [REDACTED]
# when configured via init()
print("Prompt redaction modes:")
print("  1. llamatelemetry.init(redact=True)")
print("     → Redacts all prompt and response content in spans")
print("")
print("  2. llamatelemetry.init(redact=True, redact_keys=['password', 'api_key'])")
print("     → Redacts specific attribute keys containing sensitive data")
print("")
print("  3. suppress_tracing() context manager")
print("     → Completely skip tracing for specific operations")

# Example: sensitive operation with suppression
with llamatelemetry.suppress_tracing():
    # API key handling — not traced
    api_key = "sk-secret-key-12345"
    print(f"\nSensitive operation completed (not traced)")

## Custom Metrics

Create custom OpenTelemetry metrics for your application.

In [None]:
from llamatelemetry.otel import get_meter

# Create a custom meter
meter = get_meter("llm-app")

# Define custom metrics
request_counter = meter.create_counter(
    name="llm.requests.total",
    description="Total LLM requests",
    unit="requests",
)

latency_histogram = meter.create_histogram(
    name="llm.request.latency",
    description="LLM request latency",
    unit="ms",
)

token_counter = meter.create_counter(
    name="llm.tokens.generated",
    description="Total tokens generated",
    unit="tokens",
)

# Use metrics in inference
for i in range(3):
    t0 = time.perf_counter()
    resp = client.chat.completions.create(
        messages=[{"role": "user", "content": f"Question {i+1}: What is ML?"}],
        max_tokens=32, temperature=0.7,
    )
    latency_ms = (time.perf_counter() - t0) * 1000

    # Record metrics
    request_counter.add(1, {"model": "gemma-3-1b-it", "status": "success"})
    latency_histogram.record(latency_ms, {"model": "gemma-3-1b-it"})
    token_counter.add(resp.usage.completion_tokens, {"model": "gemma-3-1b-it"})

    print(f"  Request {i+1}: {latency_ms:.0f} ms, {resp.usage.completion_tokens} tokens")

print("\nCustom metrics recorded and will be exported with next flush.")

## Summary — Observability Best Practices

1. **Use `@workflow` for pipelines**, `@task` for steps, `@tool` for external calls
2. **Group by session** with `session()` for per-user tracking
3. **Set semantic attributes** for standardized dashboards
4. **Sample in production** — 1-10% is typical
5. **Redact prompts** when tracing production user input
6. **Custom metrics** for business-specific KPIs
7. **Always `flush()` before shutdown** to export pending data

In [None]:
mgr.stop_server()
llamatelemetry.flush()
llamatelemetry.shutdown()
print("Done.")