# Log Monitor Agent with OpenTelemetry Tracing & Evaluation

An event-driven agent that monitors server logs using LangGraph, with **OpenTelemetry** tracing exported to MLflow via OTLP.

**Source:** The agent is from https://github.com/jwm4/agents/tree/001-log-monitor-agent/examples/log-monitor-agent

**What we added:** OpenTelemetry tracing (OTLP export to MLflow) and Agent-as-a-Judge evaluation to demonstrate observability and automated evaluation of agent execution.

**Prerequisites:**
- Llama Stack server running at `http://localhost:8321`
- MLflow server running: `mlflow server --backend-store-uri sqlite:///mlflow.db --port 5001`
- `OPENAI_API_KEY` in environment (for Agent-as-a-Judge)

In [1]:
import json
import os
import sys
import time
from dotenv import load_dotenv

# Load .env from parent directory (agents_tracing-eval_mlflow/.env)
env_path = os.path.join(os.path.dirname(os.getcwd()), ".env")
load_dotenv(env_path)

import mlflow
from mlflow.genai.judges import make_judge
from typing import Literal

from opentelemetry import trace as otel_trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

  from .autonotebook import tqdm as notebook_tqdm


## Configuration

In [2]:
JUDGE_MODEL = "openai:/gpt-4o"

MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://127.0.0.1:5001")
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
experiment = mlflow.set_experiment("log-monitor-agent")
print(f"MLflow tracking: {MLFLOW_TRACKING_URI}")

MLflow tracking: http://127.0.0.1:5001


## OpenTelemetry Tracing Setup

Initialize a `TracerProvider` with an OTLP HTTP exporter that sends traces to the MLflow server.

**Important:** The `TracerProvider` must be set globally *before* importing the agent module, so that the sub-module tracers (`agent.py`, `llm.py`, `tools.py`) pick up this provider.

In [3]:
# Init OTel tracing → OTLP export to MLflow
# Must be set BEFORE importing agent so sub-module tracers use this provider
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(
    endpoint=f"{MLFLOW_TRACKING_URI.rstrip('/')}/v1/traces",
    headers={"x-mlflow-experiment-id": experiment.experiment_id},
)))
otel_trace.set_tracer_provider(tracer_provider)
tracer = otel_trace.get_tracer("log-monitor-agent")

## Import the Agent

The log monitor agent implements a LangGraph workflow:
1. **Classify** - error/warning/normal
2. **Diagnose** - root cause analysis (uses MCP tools for documentation lookup)
3. **Assess Severity** - high/low
4. **Route** - Slack alert (high) or GitHub ticket (low)

The OTel version (`log_monitor_agent_otel/`) uses `tracer.start_as_current_span()` calls to capture execution traces.

In [4]:
# Import agent AFTER TracerProvider is set up
from log_monitor_agent_otel.agent import process_log_message

In [5]:
def run_log_monitor(log_message: str) -> tuple[dict, str]:
    """Process a log message with OTel tracing. Returns (result, trace_id)."""
    with tracer.start_as_current_span("log_monitor_agent") as root:
        root.set_attribute("input.question", log_message)
        root.set_attribute("mlflow.spanInputs", json.dumps({"log_message": log_message}))

        result = process_log_message(log_message)

        root.set_attribute("output.response", json.dumps({
            "classification": result.get("classification", ""),
            "diagnosis": result.get("diagnosis", "")[:2000],
            "severity": result.get("severity", ""),
            "action_taken": result.get("action_taken", ""),
        }))
        root.set_attribute("mlflow.spanOutputs", json.dumps({
            "classification": result.get("classification", ""),
            "severity": result.get("severity", ""),
            "action_taken": result.get("action_taken", ""),
            "diagnosis": result.get("diagnosis", "")[:2000],
        }))

        trace_id = format(root.get_span_context().trace_id, "032x")

    return result, trace_id

## Agent-as-a-Judge

Two evaluation judges using MLflow's `make_judge`:

1. **log_monitor_evaluator** - General performance evaluation (classification, diagnosis, severity, routing)
2. **log_actionability** - Custom DevOps rubric that checks if alerts are actionable:
   - Identifies the failing service
   - Suggests a recovery action

In [6]:
log_monitor_judge = make_judge(
    name="log_monitor_evaluator",
    instructions=(
        "Evaluate the log monitor agent's performance in {{ trace }}.\n\n"
        "Check for:\n"
        "1. Classification Accuracy: Was the log correctly classified as error/warning/normal?\n"
        "2. Diagnosis Quality: Was the root cause analysis accurate and helpful?\n"
        "3. Severity Assessment: Was the severity (high/low) appropriate?\n"
        "4. Action Routing: Was the correct action taken (Slack for high, GitHub for low)?\n\n"
        "Rate as: 'good', 'acceptable', or 'poor'"
    ),
    feedback_value_type=Literal["good", "acceptable", "poor"],
    model=JUDGE_MODEL,
)

log_actionability = make_judge(
    name="log_actionability",
    instructions=(
        "You are a DevOps Lead evaluating log alert summaries for actionability.\n\n"
        "Evaluate the agent's response in {{ trace }} using this RUBRIC:\n\n"
        "A response is ACTIONABLE if it meets BOTH criteria:\n"
        "1. **Identifies the failing service**: The response clearly states which service, "
        "component, or system is failing (e.g., 'Redis connection', 'Kubernetes RBAC', 'PostgreSQL database')\n"
        "2. **Suggests a recovery action**: The response provides at least one concrete step "
        "to resolve or mitigate the issue (e.g., 'restart the pod', 'check RBAC permissions', "
        "'increase connection pool size')\n\n"
        "Rate as:\n"
        "- 'actionable': Meets BOTH criteria - identifies failing service AND suggests recovery action\n"
        "- 'partially_actionable': Meets only ONE criterion - either identifies service OR suggests action, but not both\n"
        "- 'not_actionable': Meets NEITHER criterion - unclear what failed or how to fix it\n\n"
        "In your rationale, explicitly state:\n"
        "1. What failing service was identified (or 'none' if unclear)\n"
        "2. What recovery action was suggested (or 'none' if missing)\n"
        "3. Why you assigned this rating based on the rubric"
    ),
    feedback_value_type=Literal["actionable", "partially_actionable", "not_actionable"],
    model=JUDGE_MODEL,
)

## Sample Log Messages

Real-world examples from common libraries. These logs benefit from the agent's MCP tools (DeepWiki, Context7) to look up documentation for accurate diagnosis.

Categories:
- **Kubernetes** - RBAC, resource not found, conflicts
- **Redis** - Connection, watch, timeout errors
- **Kafka** - Metadata, partition, producer errors
- **SQLAlchemy** - Connection, integrity, pool errors
- **LangChain** - Parser, tool call errors
- **AWS Boto3** - Access denied, throttling
- **Warnings** - Memory, certificate expiration
- **Info** - Normal operational logs (no action needed)

In [7]:
# Sample log messages - real-world examples that benefit from MCP tool research
EXAMPLES = [
    # === KUBERNETES PYTHON CLIENT ERRORS ===
    "ERROR: kubernetes.client.rest.ApiException: (403) Forbidden: pods is forbidden: User 'system:serviceaccount:default:myapp' cannot list resource 'pods' in API group '' in namespace 'production'",
    "ERROR: kubernetes.client.rest.ApiException: (404) Not Found: deployments.apps 'nginx-deployment' not found in namespace 'staging'",
    "ERROR: kubernetes.client.rest.ApiException: (409) Conflict: Operation cannot be fulfilled on configmaps 'app-config': the object has been modified; please apply your changes to the latest version",
    
    # === REDIS-PY ERRORS ===
    "ERROR: redis.exceptions.ConnectionError: Error 111 connecting to redis-master:6379. Connection refused.",
    "ERROR: redis.exceptions.WatchError: Watched variable changed during transaction - key 'inventory:item:12345' was modified by another client",
    "ERROR: redis.exceptions.TimeoutError: Timeout reading from redis-cluster:6379 after 30.0 seconds",
    
    # === KAFKA ERRORS ===
    "ERROR: kafka.errors.KafkaTimeoutError: Failed to update metadata after 60.0 secs - broker may be unreachable",
    "ERROR: kafka.errors.NotLeaderForPartitionError: This server is not the leader for topic-partition orders-events-3",
    "ERROR: org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'order-processor' has been fenced by a newer producer instance",
    
    # === SQLALCHEMY / DATABASE ERRORS ===
    "ERROR: sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at 'db.example.com' (10.0.1.50), port 5432 failed: Connection timed out",
    "ERROR: sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint 'users_email_key' - DETAIL: Key (email)=(user@example.com) already exists",
    "ERROR: sqlalchemy.pool.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00",
    
    # === LANGCHAIN ERRORS ===
    "ERROR: langchain_core.exceptions.OutputParserException: Failed to parse LLM output - expected JSON object but received malformed response",
    "ERROR: langchain.schema.InvalidToolCall: Tool 'search_database' received invalid arguments: missing required parameter 'query'",
    
    # === AWS BOTO3 ERRORS ===
    "ERROR: botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the GetObject operation: Access Denied for s3://my-bucket/private/data.json",
    "ERROR: botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DescribeInstances operation: Rate exceeded",
    
    # === HIGH SEVERITY WARNINGS ===
    "WARNING: Memory usage at 94% on pod ml-inference-worker-7b9c4 - OOMKilled likely imminent",
    "WARNING: Certificate for *.api.example.com expires in 12 hours (NotAfter: 2024-01-15T23:59:59Z)",
    
    # === NORMAL/INFO LOGS (no action expected) ===
    "INFO: Successfully connected to PostgreSQL database at db.example.com:5432",
    "INFO: Kafka consumer group 'order-processors' rebalanced - now consuming from partitions [0, 1, 2]",
]

## Run Agent & Evaluate

1. Process a log message (OTel traces sent via OTLP)
2. Flush traces to MLflow and wait for ingestion
3. Load traces via `mlflow.search_traces()`
4. Evaluate with both judges via `mlflow.genai.evaluate()`

In [8]:
# Process a log message (pick any index from EXAMPLES)
log_message = EXAMPLES[0]  # Kubernetes RBAC error
print(f"Processing: {log_message}\n")

result, trace_id = run_log_monitor(log_message)
print(f"\nResult: {result}")
print(f"Trace ID: {trace_id}")

# Flush OTel traces to MLflow
print("\nFlushing traces to MLflow...")
tracer_provider.force_flush()
time.sleep(2)

# Load traces from MLflow and evaluate with both judges
print("Loading traces from MLflow...")
traces_df = mlflow.search_traces(locations=[experiment.experiment_id])

# Keep only traces produced in this run
for col in ("request_id", "client_request_id", "trace_id"):
    if not traces_df.empty and col in traces_df.columns:
        match_ids = [trace_id] if col != "trace_id" else [f"tr-{trace_id}"]
        filtered = traces_df[traces_df[col].isin(match_ids)]
        if not filtered.empty:
            traces_df = filtered
            break

print(f"Found {len(traces_df)} trace(s) matching this run.\n")

if traces_df.empty:
    print(
        "ERROR: No traces found. Make sure MLflow server is running:\n"
        "  mlflow server --backend-store-uri sqlite:///mlflow.db --port 5001\n"
    )
else:
    print("=== Running agent-as-a-judge evaluation ===\n")
    results = mlflow.genai.evaluate(
        data=traces_df,
        scorers=[log_monitor_judge, log_actionability],
    )
    print("--- Evaluation results ---")
    for name, table in results.tables.items():
        print(f"\n{name}:")
        for _, row in table.iterrows():
            for col in table.columns:
                if any(j in str(col) for j in ("log_monitor_evaluator", "log_actionability")):
                    print(f"  {col}: {row[col]}")

Processing: ERROR: kubernetes.client.rest.ApiException: (403) Forbidden: pods is forbidden: User 'system:serviceaccount:default:myapp' cannot list resource 'pods' in API group '' in namespace 'production'

[LLM] Using Llama Stack at http://localhost:8321
[LLM] Model: openai/gpt-4o
[Classify] Classification: error (confidence: 0.95)
[Classify] Indicators: ['ERROR', 'ApiException', '403 Forbidden', 'cannot list resource']
[Diagnose] Analyzing root cause...
[LLM] Using Llama Stack at http://localhost:8321
[LLM] Model: openai/gpt-4o
[MCP] Connecting to research tools...
[MCP]   - DeepWiki: https://mcp.deepwiki.com/mcp
[MCP]   - Context7: https://mcp.context7.com/mcp
[MCP] Available tools: ['read_wiki_structure', 'read_wiki_contents', 'ask_question', 'resolve-library-id', 'query-docs']
[Diagnose] MCP research tools available
[Diagnose] Diagnosis: The error message indicates a permissions issue: a Kubernetes service account, identified as 'system:serviceaccount:default:myapp', lacks the nece

2026/02/09 14:01:50 INFO mlflow.models.evaluation.utils.trace: Auto tracing is temporarily enabled during the model evaluation for computing some metrics and debugging. To disable tracing, call `mlflow.autolog(disable=True)`.


Loading traces from MLflow...
Found 1 trace(s) matching this run.

=== Running agent-as-a-judge evaluation ===



Evaluating: 100%|██████████| 1/1 [Elapsed: 00:33, Remaining: 00:00] 


--- Evaluation results ---

eval_results:
  log_actionability/value: actionable
  log_monitor_evaluator/value: good


## View Traces in MLflow UI

The MLflow server should already be running (required for OTLP export). Open it in your browser:

```bash
# If not already running:
mlflow server --backend-store-uri sqlite:///mlflow.db --port 5001
```

Then open http://localhost:5001 in your browser.

### How to Navigate

1. **Select the Experiment** - Click on `log-monitor-agent` in the left sidebar
2. **Go to Traces tab** - Click the "Traces" tab to see all agent executions
3. **View Trace Details** - Click on any Trace ID to open the trace detail view
   - You'll see the span hierarchy showing each step (classify → diagnose → assess severity → route)
   - Click on individual spans to see inputs/outputs for each step
4. **View Assessments** - In the trace detail view, look for the assessments side-panel on the right
   - This shows the Agent-as-a-Judge evaluation results