In [1]:
!pip uninstall agent-diff -y

Found existing installation: agent-diff 1.0.6
Uninstalling agent-diff-1.0.6:
  Successfully uninstalled agent-diff-1.0.6


In [2]:
!pip install agent-diff langchain langchain-openai langchain-anthropic pandas matplotlib -q 

In [3]:
import os

#AGENT_DIFF_API_KEY = "ad_live_sk_KiQvllYLzBmgmeRb4S9deP-NxpscqnuM"
#AGENT_DIFF_BASE_URL = "https://api.agentdiff.dev"
OPENAI_API_KEY = "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"

In [4]:
import json
from pathlib import Path

# Base path relative to this notebook
DOCS_BASE = Path("../../examples")

def load_api_docs(filepath: Path) -> dict:
    """Load API docs JSON, return empty dict if not found."""
    if filepath.exists():
        return json.load(open(filepath))
    print(f"Docs not found: {filepath}")
    return {}

def format_docs_markdown(docs: dict) -> str:
    """Convert API docs dict to markdown format."""
    if not docs:
        return ""
    
    markdown = ""
    for endpoint, info in docs.items():
        markdown += f"## {endpoint}\n"
        markdown += f"{info.get('description', '')}\n\n"
        
        if info.get('parameters'):
            markdown += "**Parameters:**\n"
            for location, params in info['parameters'].items():
                markdown += f"  {location}:\n"
                if not isinstance(params, dict):
                    markdown += f"    {params}\n"
                    continue
                for param_name, param_info in params.items():
                    # Handle case where param_info might be a string
                    if not isinstance(param_info, dict):
                        markdown += f"    - `{param_name}`: {param_info}\n"
                        continue
                    required = "**required**" if param_info.get('required') else "optional"
                    param_type = param_info.get('type', 'any')
                    param_desc = param_info.get('description', '')
                    markdown += f"    - `{param_name}` ({param_type}, {required}): {param_desc}\n"
            markdown += "\n"
    
    return markdown

# Load available docs (all services)
slack_docs = load_api_docs(DOCS_BASE / "slack/testsuites/slack_docs/slack_api_full_docs.json")
box_docs = load_api_docs(DOCS_BASE / "box/testsuites/box_docs/box_api_full_docs.json")
calendar_docs = load_api_docs(DOCS_BASE / "calendar/testsuites/calendar_docs/calendar_api_full_docs.json")
linear_docs = load_api_docs(DOCS_BASE / "linear/testsuites/linear_docs/linear_api_full_docs.json")

# Format to markdown
slack_docs_markdown = format_docs_markdown(slack_docs)
box_docs_markdown = format_docs_markdown(box_docs)
calendar_docs_markdown = format_docs_markdown(calendar_docs)
linear_docs_markdown = format_docs_markdown(linear_docs)

# Summary
print(f"[{'OK' if slack_docs else 'MISSING'}] Slack docs: {len(slack_docs)} endpoints")
print(f"[{'OK' if box_docs else 'MISSING'}] Box docs: {len(box_docs)} endpoints")
print(f"[{'OK' if calendar_docs else 'MISSING'}] Calendar docs: {len(calendar_docs)} endpoints")
print(f"[{'OK' if linear_docs else 'MISSING'}] Linear docs: {len(linear_docs)} endpoints")


[OK] Slack docs: 27 endpoints
[OK] Box docs: 26 endpoints
[OK] Calendar docs: 37 endpoints
[OK] Linear docs: 19 endpoints


In [5]:
import re
from typing import Optional, Tuple

# Service configurations with base URLs
SERVICE_CONFIG = {
    "slack": {
        "name": "Slack",
        "base_url": "https://slack.com/api",
        "description": "Slack workspace messaging and collaboration API",
        "extra_context": "",
    },
    "box": {
        "name": "Box",
        "base_url": "https://api.box.com/2.0",
        "description": "Box cloud storage and file management API",
        "extra_context": "",
    },
    "calendar": {
        "name": "Google Calendar",
        "base_url": "https://www.googleapis.com/calendar/v3",
        "description": "Google Calendar scheduling and events API",
        "extra_context": "- **Current Date/Time**: Sunday, June 17, 2018 at 00:01 (midnight), timezone America/Los_Angeles. Use this as the reference point for all relative date/time expressions like 'today', 'tomorrow', 'this Saturday', etc.",
    },
    "linear": {
        "name": "Linear",
        "base_url": "https://api.linear.app/graphql",
        "description": "Linear project management and issue tracking API",
        "extra_context": "",
    },
}

# ReAct System Prompt 
REACT_SYSTEM_PROMPT_WITH_API_DOCS = """You are an AI assistant that completes tasks by interacting with APIs via bash commands.

## Current Session
- **Service**: {service_name}
- **Base URL**: {base_url}
- **Description**: {service_description}
{extra_context}

## Environment
- You are authenticated as a user in the {service_name} workspace/account.
- Authentication is handled automatically via proxy. Use placeholder tokens like `<TOKEN>` where credentials would go.
- You execute bash commands (primarily curl) to interact with the {service_name} API.
- The environment is stateless between commands - you cannot install packages or persist files.

## Response Format
You must respond using XML tags. Think step-by-step, then execute a command OR declare completion.

**To execute a bash command:**
<thinking>
Your reasoning about what needs to be done and why this command will help.
</thinking>

<action>
Your bash command here (e.g., curl request)
</action>

**When the task is complete:**
<thinking>
Your reasoning confirming the task is done based on API responses.
</thinking>

<done>
Brief summary of what was accomplished.
</done>

## Rules
1. Execute ONE command at a time, then wait for the result.
2. Parse API responses carefully - extract IDs and data needed for subsequent calls.
3. If a command fails, analyze the error and try a different approach.
4. Only use <done> when the task is fully completed (not just when you've gathered information).

## API Documentation
{api_docs}
"""

REACT_SYSTEM_PROMPT = """You are an AI assistant that completes tasks by interacting with APIs via bash commands.

## Current Session
- **Service**: {service_name}
- **Base URL**: {base_url}
- **Description**: {service_description}
{extra_context}

## Environment
- You are authenticated as a user in the {service_name} workspace/account.
- Authentication is handled automatically via proxy. Use placeholder tokens like `<TOKEN>` where credentials would go.
- You execute bash commands (primarily curl) to interact with the {service_name} API.
- If you are not sure how to use {service_name} API, explore the endpoint, parameters, and learn how it works.
- The environment is stateless between commands - you cannot install packages or persist files.

## Response Format
You must respond using XML tags. Think step-by-step, then execute a command OR declare completion.

**To execute a bash command:**
<thinking>
Your reasoning about what needs to be done and why this command will help.
</thinking>

<action>
Your bash command here (e.g., curl request)
</action>

**When the task is complete:**
<thinking>
Your reasoning confirming the task is done based on API responses.
</thinking>

<done>
Brief summary of what was accomplished.
</done>

## Rules
1. Execute ONE command at a time, then wait for the result.
2. Parse API responses carefully - extract IDs and data needed for subsequent calls.
3. If a command fails, analyze the error and try a different approach.
4. Only use <done> when the task is fully completed (not just when you've gathered information).

"""

# Function to build the full system prompt
def build_system_prompt(service: str, docs_markdown: str, include_api_docs: bool = True) -> str:
    """Build system prompt with service-specific context.
    
    Args:
        service: Service name (slack, box, calendar, linear)
        docs_markdown: Formatted API documentation markdown
        include_api_docs: Whether to include API docs in the prompt
    
    Returns:
        str: Complete system prompt
    """
    config = SERVICE_CONFIG.get(service.lower(), {
        "name": service,
        "base_url": "unknown",
        "description": f"{service} API",
        "extra_context": "",
    })
    
    extra_context = config.get("extra_context", "")
    
    if include_api_docs:
        return REACT_SYSTEM_PROMPT_WITH_API_DOCS.format(
            service_name=config["name"],
            base_url=config["base_url"],
            service_description=config["description"],
            extra_context=extra_context,
            api_docs=docs_markdown,
        )
    else:
        return REACT_SYSTEM_PROMPT.format(
            service_name=config["name"],
            base_url=config["base_url"],
            service_description=config["description"],
            extra_context=extra_context,
        )



def parse_react_response(response: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
    """
    Parse ReAct XML response.
    Returns: (thinking, action, done)
    - If action is present: execute the command
    - If done is present: task is complete
    """
    thinking_match = re.search(r'<thinking>(.*?)</thinking>', response, re.DOTALL)
    action_match = re.search(r'<action>(.*?)</action>', response, re.DOTALL)
    done_match = re.search(r'<done>(.*?)</done>', response, re.DOTALL)
    
    thinking = thinking_match.group(1).strip() if thinking_match else None
    action = action_match.group(1).strip() if action_match else None
    done = done_match.group(1).strip() if done_match else None
    
    return thinking, action, done


In [6]:
import asyncio
import time
import httpx
from pathlib import Path
from datetime import datetime
from typing import Any, List, Dict
from IPython.display import display, HTML, clear_output
import pandas as pd
from tqdm.auto import tqdm

from agent_diff import (
    AgentDiff,
    BashExecutorProxy,
)

# ============ Benchmark Configurations ============

BENCHMARK_CONFIGS = {
    "slack": {
        "test_suite_name": "Slack Bench v2",
        "docs_markdown": slack_docs_markdown,
    },
    "box": {
        "test_suite_name": "Box Bench v2",
        "docs_markdown": box_docs_markdown,
    },
    "calendar": {
        "test_suite_name": "Calendar Bench",
        "docs_markdown": calendar_docs_markdown,
    },
    "linear": {
        "test_suite_name": "Linear Bench",
        "docs_markdown": linear_docs_markdown,
    },
}

def get_benchmark_config(service: str, include_api_docs: bool = True) -> dict:
    """Get benchmark configuration for a service.
    
    Args:
        service: Service name
        include_api_docs: Whether to include API docs in the system prompt
    """
    service_lower = service.lower()
    if service_lower not in BENCHMARK_CONFIGS:
        raise ValueError(f"Unknown service: {service}. Available: {list(BENCHMARK_CONFIGS.keys())}")
    
    config = BENCHMARK_CONFIGS[service_lower]
    return {
        "service": service_lower,
        "test_suite_name": config["test_suite_name"],
        "docs_markdown": config["docs_markdown"],
        "include_api_docs": include_api_docs,
        "system_prompt": build_system_prompt(service_lower, config["docs_markdown"], include_api_docs),
    }

# ============ Output Directory ============

OUTPUT_DIR = Path("evaluation_outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

# ============  ReAct Agent ============

def call_openrouter(
    model: str,
    messages: List[Dict],
    api_key: str,
    max_retries: int = 3,
    base_delay: float = 2.0,
) -> Dict:
    """Make a completion request to OpenRouter API (no tool calling).
    
    Includes retry logic for transient failures (502, 503, connection errors).
    
    Returns:
        dict: {
            "content": str,           # Model response text
            "finish_reason": str,     # "stop", "length", etc.
            "usage": {
                "prompt_tokens": int,
                "completion_tokens": int,
                "total_tokens": int,
                "cost": float,        # USD cost
            }
        }
    """
    import time
    import random
    
    last_error = None
    
    for attempt in range(max_retries):
        try:
            with httpx.Client(timeout=120) as client:
                response = client.post(
                    "https://openrouter.ai/api/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {api_key}",
                        "Content-Type": "application/json",
                    },
                    json={
                        "model": model,
                        "messages": messages,
                    },
                )
                response.raise_for_status()
                data = response.json()
                
                choice = data["choices"][0]
                usage = data.get("usage", {})
                
                return {
                    "content": choice["message"]["content"],
                    "finish_reason": choice.get("finish_reason"),
                    "usage": {
                        "prompt_tokens": usage.get("prompt_tokens", 0),
                        "completion_tokens": usage.get("completion_tokens", 0),
                        "total_tokens": usage.get("total_tokens", 0),
                        "cost": usage.get("cost", 0.0),
                    }
                }
        except (httpx.HTTPStatusError, httpx.ConnectError, httpx.ReadError, httpx.RemoteProtocolError) as e:
            last_error = e
            # Retry on 502, 503, 504 or connection errors
            should_retry = False
            if isinstance(e, httpx.HTTPStatusError):
                should_retry = e.response.status_code in (502, 503, 504, 429)
            else:
                should_retry = True  # Connection/read errors are retryable
            
            if should_retry and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"[RETRY] OpenRouter request failed (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {delay:.1f}s...")
                time.sleep(delay)
                continue
            raise
    
    # Should not reach here, but just in case
    raise last_error


def run_react_agent(
    model_name: str,
    task_prompt: str,
    bash_executor: BashExecutorProxy,
    system_prompt: str,
    max_iterations: int = 25,
    trace_accumulator: Dict = None,
    stop_event: "threading.Event" = None,
) -> Dict:
    """
    Custom ReAct agent loop using XML tags.
    Returns structured trace with each step containing thinking, action, observation,
    plus token usage and finish reasons.
    
    If trace_accumulator is provided, steps are written to it in real-time,
    allowing partial trace recovery on timeout.
    
    If stop_event is provided and set, the loop exits gracefully at the next iteration.
    """
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"Task: {task_prompt}"},
    ]
    
    # Use provided accumulator or create new one
    if trace_accumulator is not None:
        steps = trace_accumulator.setdefault("steps", [])
        trace_accumulator["final"] = None
        trace_accumulator["completed"] = False
        trace_accumulator["usage"] = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "cost": 0.0}
    else:
        steps = []
    
    final_step = None
    completed = False
    
    # Track total usage across all iterations
    total_usage = {
        "prompt_tokens": 0,
        "completion_tokens": 0,
        "total_tokens": 0,
        "cost": 0.0,
    }
    
    for iteration in range(max_iterations):
        # Check stop signal at start of each iteration
        if stop_event and stop_event.is_set():
            # Timeout was triggered - exit gracefully
            break
        
        # Get model response
        try:
            api_response = call_openrouter(
                model=model_name,
                messages=messages,
                api_key=OPENAI_API_KEY,
            )
            response_text = api_response["content"]
            finish_reason = api_response["finish_reason"]
            iter_usage = api_response["usage"]
            
            # Accumulate total usage
            total_usage["prompt_tokens"] += iter_usage["prompt_tokens"]
            total_usage["completion_tokens"] += iter_usage["completion_tokens"]
            total_usage["total_tokens"] += iter_usage["total_tokens"]
            total_usage["cost"] += iter_usage["cost"]
            
            # Update accumulator in real-time
            if trace_accumulator is not None:
                trace_accumulator["usage"] = total_usage.copy()
                
        except Exception as e:
            steps.append({
                "iteration": iteration + 1,
                "error": f"API error: {str(e)}",
            })
            break
        
        # Parse XML response
        thinking, action, done = parse_react_response(response_text)
        
        # If model includes both, execute the action and ignore premature done
        if action:
            # Execute bash command
            try:
                result = bash_executor.execute(action)
                # Normalize result to dict for consistent storage
                if isinstance(result, dict):
                    observation = {
                        "stdout": result.get("stdout", ""),
                        "stderr": result.get("stderr", ""),
                        "exit_code": result.get("exit_code", 0),
                    }
                else:
                    observation = {
                        "stdout": str(result) if result else "",
                        "stderr": "",
                        "exit_code": 0,
                    }
            except Exception as e:
                observation = {
                    "stdout": "",
                    "stderr": str(e),
                    "exit_code": 1,
                    "error": str(e),
                }
            
            # Record this step with nested structure + usage
            steps.append({
                "iteration": iteration + 1,
                "thinking": thinking,
                "action": action,
                "observation": observation,
                "raw_response": response_text,
                "finish_reason": finish_reason,
                "usage": iter_usage,
            })
            
            # Format observation for model (just stdout, or error info)
            if observation.get("exit_code", 0) != 0:
                obs_text = f"{observation['stdout']}\n[stderr]: {observation['stderr']}\n[exit_code]: {observation['exit_code']}".strip()
            else:
                obs_text = observation["stdout"].strip() if observation["stdout"] else "(empty output)"
            
            # Add to conversation
            messages.append({"role": "assistant", "content": response_text})
            messages.append({"role": "user", "content": f"<observation>\n{obs_text}\n</observation>"})
        elif done:
            # Task completed (only when NO action present)
            final_step = {
                "iteration": iteration + 1,
                "thinking": thinking,
                "summary": done,
                "raw_response": response_text,
                "finish_reason": finish_reason,
                "usage": iter_usage,
            }
            completed = True
            break
        else:
            # No action and no done - malformed response
            steps.append({
                "iteration": iteration + 1,
                "thinking": thinking,
                "warning": "No <action> or <done> tag found",
                "raw_response": response_text,
                "finish_reason": finish_reason,
                "usage": iter_usage,
            })
            messages.append({"role": "assistant", "content": response_text})
            messages.append({"role": "user", "content": "Please respond with either an <action> to execute or <done> if the task is complete."})
    
    result = {
        "steps": steps,
        "final": final_step,
        "iterations": iteration + 1,
        "completed": completed,
        "usage": total_usage,
    }
    
    # Update accumulator if provided (for timeout recovery)
    if trace_accumulator is not None:
        trace_accumulator["final"] = final_step
        trace_accumulator["iterations"] = iteration + 1
        trace_accumulator["completed"] = completed
        trace_accumulator["usage"] = total_usage
    
    return result


async def run_single_test(
    client: AgentDiff, 
    model_name: str, 
    test: Any, 
    system_prompt: str,
    test_timeout_seconds: int = 300,
    max_iterations: int = 25,
) -> tuple:
    """Run a single test case using custom ReAct agent.
    
    Args:
        client: AgentDiff client instance
        model_name: Model identifier (e.g., 'openai/gpt-5-mini')
        test: Test object with id and prompt attributes
        system_prompt: Full system prompt including API docs
        test_timeout_seconds: Max seconds before timeout
        max_iterations: Max ReAct loop iterations
    
    Returns:
        tuple: (test_id, result_dict) where result_dict contains:
            - prompt (str): Task prompt
            - status (str): 'passed', 'failed', 'timeout', or 'error'
            - passed (bool): Whether assertions passed
            - score (float): Score 0-100
            - time (float): Execution seconds
            - failures (list[str]): Failure messages
            - runId (str): Run UUID
            - error (str|None): Error message if status='error'
            - trace (dict): Execution trace containing:
                - steps (list): Each step has iteration, thinking, action, 
                  observation, raw_response, finish_reason, usage
                - final (dict|None): Completion step with usage
                - iterations (int): Total iterations
                - completed (bool): Whether agent declared done
                - usage (dict): Total {prompt_tokens, completion_tokens, 
                  total_tokens, cost}
            - diff (dict|None): State changes {inserts, updates, deletes}
    """
    import threading
    
    test_id = test.id
    prompt = test.prompt
    response = None
    timed_out = False
    env = None
    stop_event = threading.Event()  # Signal for graceful thread cancellation

    try:
        # Initialize environment
        env = client.init_env(testId=test_id)
        run = client.start_run(envId=env.environmentId, testId=test_id)

        # Create bash executor (direct, not LangChain tool)
        bash_executor = BashExecutorProxy(
            env.environmentId,
            base_url=client.base_url,
            api_key=client.api_key,
        )

        # Execution with timeout
        # Use trace_accumulator to capture partial trace on timeout
        trace_accumulator = {
            "steps": [], 
            "final": None, 
            "completed": False,
            "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "cost": 0.0},
        }
        
        start = time.perf_counter()
        try:
            response = await asyncio.wait_for(
                asyncio.to_thread(
                    run_react_agent,
                    model_name=model_name,
                    task_prompt=prompt,
                    bash_executor=bash_executor,
                    system_prompt=system_prompt,
                    max_iterations=max_iterations,
                    trace_accumulator=trace_accumulator,
                    stop_event=stop_event,
                ),
                timeout=test_timeout_seconds
            )
        except asyncio.TimeoutError:
            timed_out = True
            # Signal thread to stop at next iteration
            stop_event.set()
            # Give thread a moment to finish current operation and exit
            await asyncio.sleep(2)
            # Use accumulated trace (partial) instead of losing it
            response = {
                "steps": trace_accumulator.get("steps", []),
                "final": trace_accumulator.get("final"),
                "iterations": len(trace_accumulator.get("steps", [])),
                "completed": False,
                "usage": trace_accumulator.get("usage", {}),
                "timeout_error": f"Test timed out after {test_timeout_seconds} seconds",
            }
        except Exception as e:
            response = {
                "steps": trace_accumulator.get("steps", []),
                "final": trace_accumulator.get("final"),
                "iterations": len(trace_accumulator.get("steps", [])),
                "completed": False,
                "usage": trace_accumulator.get("usage", {}),
                "error": str(e),
            }
        finally:
            execution_time = time.perf_counter() - start

        # Evaluation
        score = client.evaluate_run(runId=run.runId)
        run_result = client.get_results_for_run(runId=run.runId)

        result = {
            "prompt": prompt,
            "status": "timeout" if timed_out else run_result.status,
            "passed": False if timed_out else run_result.passed,
            "score": 0 if timed_out else run_result.score.get("percent", 0),
            "time": round(execution_time, 2),
            "failures": ["Test timed out"] if timed_out else run_result.failures,
            "runId": run.runId,
            "trace": response,
            "diff": getattr(run_result, "diff", None),
        }

        # Cleanup
        client.delete_env(envId=env.environmentId)
        return test_id, result

    except Exception as e:
        # Cleanup on error if environment was created
        if env:
            try:
                client.delete_env(envId=env.environmentId)
            except:
                pass
        return test_id, {"passed": False, "score": 0, "status": "error", "error": str(e)}


async def run_benchmark_suite(
    service: str,
    models: list,
    runs_per_test: int = 1,
    max_tests: int = None,
    max_concurrent_models: int = 1,
    max_concurrent_tests: int = 10,
    max_calls_per_minute: int = 90,
    test_timeout_seconds: int = 300,
    max_iterations: int = 25,
    include_api_docs: bool = True,
    checkpoint: "BenchmarkCheckpoint" = None,
):
    """Run benchmark for a single service.
    
    Args:
        service: Service to benchmark ('slack', 'box', 'calendar', 'linear')
        models: List of model identifiers to evaluate
        runs_per_test: Number of times to run each test
        max_tests: Maximum number of tests to run (None = all)
        max_concurrent_models: Max parallel model evaluations
        max_concurrent_tests: Max parallel test executions
        max_calls_per_minute: Rate limit for API calls
        test_timeout_seconds: Timeout per test in seconds
        max_iterations: Max ReAct iterations per test
        include_api_docs: Whether to include API documentation in system prompt
        checkpoint: Optional BenchmarkCheckpoint for resuming interrupted runs.
                   If provided, skips already-completed tasks and saves progress incrementally.
    
    Returns:
        List[dict]: List of result dicts, each containing:
            - prompt (str): The task prompt
            - status (str): 'passed', 'failed', 'timeout', or 'error'
            - passed (bool): Whether the test passed
            - score (float): Score percentage (0-100)
            - time (float): Execution time in seconds
            - failures (list): List of failure messages
            - runId (str): Unique run identifier
            - error (str|None): Error message if status='error'
            - model (str): Model identifier used
            - test_id (str): Test UUID (deterministic, constant across runs)
            - test_name (str): Human-readable test name from benchmark suite
            - service (str): Service name (e.g., 'slack', 'box')
            - test_suite_name (str): Full test suite name (e.g., 'Slack Bench v2')
            - include_api_docs (bool): Whether API docs were included in prompt
            - timestamp (str): ISO format timestamp when test was run
            - trace (dict): Execution trace containing:
                - steps (list): List of ReAct steps, each with:
                    - iteration (int)
                    - thinking (str): Model's reasoning
                    - action (str): Bash command executed
                    - observation (dict): {stdout, stderr, exit_code}
                    - raw_response (str): Full model response
                    - finish_reason (str): "stop", "length" (context overflow), etc.
                    - usage (dict): {prompt_tokens, completion_tokens, total_tokens, cost}
                - final (dict|None): Completion step with thinking, summary, usage
                - iterations (int): Total iterations
                - completed (bool): Whether agent declared done
                - usage (dict): Total tokens/cost for entire run:
                    {prompt_tokens, completion_tokens, total_tokens, cost}
            - diff (dict|None): State diff with inserts, updates, deletes
    """
    # Get benchmark configuration for this service
    config = get_benchmark_config(service, include_api_docs=include_api_docs)
    test_suite_name = config["test_suite_name"]
    system_prompt = config["system_prompt"]
    run_timestamp = datetime.now().isoformat()
    
    client = AgentDiff(
        #api_key=AGENT_DIFF_API_KEY,
        #base_url=AGENT_DIFF_BASE_URL,
    )
    try:
        suite_list = client.list_test_suites(name=test_suite_name)
        if not suite_list.testSuites:
            print(f"[ERROR] Test suite '{test_suite_name}' not found on AgentDiff server.")
            return []

        suite = client.get_test_suite(suite_list.testSuites[0].id, expand=True)
        tests = suite.tests[:max_tests] if max_tests else suite.tests
    except Exception as e:
        print(f"[ERROR] Error connecting to AgentDiff: {e}")
        return []

    total_logical = len(tests) * len(models)
    total_runs = total_logical * runs_per_test
    
    # Checkpointing: determine which tasks need to run
    if checkpoint:
        # Build list of all tasks, filter out completed ones
        all_tasks_spec = []
        for model in models:
            for test in tests:
                test_id = str(test.id)
                for run_idx in range(runs_per_test):
                    if not checkpoint.is_completed(model, test_id, run_idx):
                        all_tasks_spec.append((model, test, run_idx))
        
        skipped = total_runs - len(all_tasks_spec)
        if skipped > 0:
            print(f"\n[{config['service'].upper()}] {test_suite_name} | {len(tests)} tests x {len(models)} models x {runs_per_test} runs")
            print(f"[CHECKPOINT] Skipping {skipped} already completed, {len(all_tasks_spec)} remaining")
        else:
            print(f"\n[{config['service'].upper()}] {test_suite_name} | {len(tests)} tests x {len(models)} models x {runs_per_test} runs = {total_runs} total")
        
        # Use checkpoint's lock for thread safety
        checkpoint_lock = asyncio.Lock()
    else:
        # No checkpoint - run all tasks
        all_tasks_spec = [(model, test, run_idx) 
                         for model in models 
                         for test in tests 
                         for run_idx in range(runs_per_test)]
        checkpoint_lock = None
        print(f"\n[{config['service'].upper()}] {test_suite_name} | {len(tests)} tests x {len(models)} models x {runs_per_test} runs = {total_runs} total")

    semaphore = asyncio.Semaphore(max_concurrent_models * max_concurrent_tests)

    # rate limiting state (per minute window)
    window_seconds = 60
    window_start = time.monotonic()
    calls_in_window = 0
    rate_lock = asyncio.Lock()

    async def acquire_rate_slot():
        nonlocal window_start, calls_in_window
        while True:
            async with rate_lock:
                now = time.monotonic()
                # reset window if needed
                if now - window_start >= window_seconds:
                    window_start = now
                    calls_in_window = 0

                if calls_in_window < max_calls_per_minute:
                    calls_in_window += 1
                    return  # allowed to proceed

                # need to wait until current window ends
                sleep_for = window_seconds - (now - window_start)
            # sleep outside the lock
            if sleep_for > 0:
                await asyncio.sleep(sleep_for)

    # Progress tracking state
    completed_results = []
    results_lock = asyncio.Lock()
    
    # Create progress bar with model names
    model_names = [m.split("/")[-1][:12] for m in models]
    initial_desc = f"{config['service'].upper()} | " + " | ".join(f"{m}: 0/0" for m in model_names)
    
    # Progress bar shows remaining tasks (may be less than total if resuming)
    tasks_to_run = len(all_tasks_spec)
    pbar = tqdm(
        total=tasks_to_run,
        desc=initial_desc,
        unit="test",
        leave=True,
        dynamic_ncols=True,
        mininterval=0.05,  # More frequent updates
    )
    pbar.refresh()  # Force initial display
    
    async def update_progress():
        """Update progress bar with current stats per model."""
        async with results_lock:
            n = len(completed_results)
            if n > 0:
                # Build per-model stats
                model_stats = {}
                for r in completed_results:
                    m = r.get("model", "unknown").split("/")[-1][:12]  # Short model name
                    if m not in model_stats:
                        model_stats[m] = {"passed": 0, "total": 0}
                    model_stats[m]["total"] += 1
                    if r.get("passed"):
                        model_stats[m]["passed"] += 1
                
                # Format: "model1: 5/10 | model2: 3/8"
                model_parts = [f"{m}: {s['passed']}/{s['total']}" for m, s in model_stats.items()]
                model_str = " | ".join(model_parts)
                
                pbar.set_description(f"{config['service'].upper()} | {model_str}")
                pbar.refresh()

    async def worker(model_name, test, run_idx):
        await acquire_rate_slot()
        async with semaphore:
            tid, res = await run_single_test(
                client, model_name, test, system_prompt,
                test_timeout_seconds=test_timeout_seconds,
                max_iterations=max_iterations,
            )
            res["model"] = model_name
            res["test_id"] = str(tid)
            res["test_name"] = test.name
            res["run_index"] = run_idx  # Track which run this is
            
            # Add metadata immediately (needed for checkpoint)
            res["service"] = config["service"]
            res["test_suite_name"] = test_suite_name
            res["include_api_docs"] = include_api_docs
            res["timestamp"] = run_timestamp
            
            # Track result and update progress
            async with results_lock:
                completed_results.append(res)
            
            # Save to checkpoint if enabled
            if checkpoint and checkpoint_lock:
                async with checkpoint_lock:
                    checkpoint.mark_completed(model_name, str(tid), run_idx, res.copy())
                    checkpoint.save()  # Incremental save after each test
            
            pbar.update(1)
            await update_progress()
            pbar.refresh()  # Force display refresh in Jupyter
            
            # Log failures to tqdm (won't mess up progress bar)
            if not res.get("passed"):
                name_short = test.name[:35] + "..." if len(test.name) > 35 else test.name
                model_short = model_name.split("/")[-1][:15]  # e.g., "anthropic/claude-3" -> "claude-3"
                if res.get("status") == "timeout":
                    tqdm.write(f"[TIMEOUT] {model_short} | {name_short} | {res.get('time', 0):.1f}s")
                elif res.get("status") == "error":
                    tqdm.write(f"[ERROR] {model_short} | {name_short} | {res.get('error', 'unknown')[:50]}")
                else:
                    tqdm.write(f"[FAIL] {model_short} | {name_short} | {res.get('score')}%")
            
            return res

    # Create tasks from the (possibly filtered) task spec
    tasks = [worker(model, test, run_idx) for model, test, run_idx in all_tasks_spec]

    results = await asyncio.gather(*tasks)
    pbar.close()
    
    # Note: Metadata is already added in the worker function for checkpoint support
    
    # Combine with checkpoint results if resuming
    if checkpoint:
        # Get all results from checkpoint (includes both old and newly added)
        all_results = checkpoint.get_results()
        # Filter to only this service's results
        service_results = [r for r in all_results if r.get("service") == config["service"]]
        
        # Final summary (includes resumed results)
        passed = sum(1 for r in service_results if r.get("passed"))
        avg_score = sum(r.get("score", 0) for r in service_results) / len(service_results) if service_results else 0
        print(f"{config['service'].upper()} Complete: {passed}/{len(service_results)} passed ({avg_score:.1f}% avg)")
        if len(results) < len(service_results):
            print(f"  (includes {len(service_results) - len(results)} resumed from checkpoint)")
        
        return service_results
    else:
        # Final summary
        passed = sum(1 for r in results if r.get("passed"))
        avg_score = sum(r.get("score", 0) for r in results) / len(results) if results else 0
        print(f"{config['service'].upper()} Complete: {passed}/{len(results)} passed ({avg_score:.1f}% avg)")
        
        return results


async def run_all_benchmarks(
    models: list,
    services: list = None,
    runs_per_test: int = 1,
    max_tests: int = None,
    max_concurrent_models: int = 1,
    max_concurrent_tests: int = 10,
    max_calls_per_minute: int = 90,
    test_timeout_seconds: int = 300,
    max_iterations: int = 25,
    include_api_docs: bool = True,
    checkpoint: "BenchmarkCheckpoint" = None,
):
    """Run benchmarks for multiple services.
    
    Args:
        models: List of model identifiers to evaluate
        services: List of services to benchmark. If None, runs all available services.
                  Options: ['slack', 'box', 'calendar', 'linear']
        runs_per_test: Number of times to run each test
        max_tests: Maximum number of tests to run per service (None = all)
        max_concurrent_models: Max parallel model evaluations
        max_concurrent_tests: Max parallel test executions
        max_calls_per_minute: Rate limit for API calls
        test_timeout_seconds: Timeout per test in seconds
        max_iterations: Max ReAct iterations per test
        include_api_docs: Whether to include API documentation in system prompt
        checkpoint: Optional BenchmarkCheckpoint for resuming interrupted runs
    
    Returns:
        Dict[str, List[dict]]: Mapping of service name to list of results.
            Each result dict contains (see run_benchmark_suite for full schema):
            - prompt, status, passed, score, time, failures, error
            - runId, model, test_id, test_name, service, test_suite_name
            - include_api_docs (bool), timestamp (ISO format)
            - trace: {steps, final, iterations, completed, usage}
              - Each step includes: finish_reason, usage (per-iteration tokens/cost)
              - usage: Total {prompt_tokens, completion_tokens, total_tokens, cost}
            - diff: {inserts, updates, deletes}
    """
    if services is None:
        services = list(BENCHMARK_CONFIGS.keys())
    
    docs_status = "with API docs" if include_api_docs else "NO API docs"
    print(f"Benchmarks: {', '.join(s.upper() for s in services)} | {len(models)} models | {docs_status} | {test_timeout_seconds}s timeout")
    
    all_results = {}
    for service in services:
        try:
            results = await run_benchmark_suite(
                service=service,
                models=models,
                runs_per_test=runs_per_test,
                max_tests=max_tests,
                max_concurrent_models=max_concurrent_models,
                max_concurrent_tests=max_concurrent_tests,
                max_calls_per_minute=max_calls_per_minute,
                test_timeout_seconds=test_timeout_seconds,
                max_iterations=max_iterations,
                include_api_docs=include_api_docs,
                checkpoint=checkpoint,
            )
            all_results[service] = results
        except Exception as e:
            print(f"[ERROR] Error running {service} benchmark: {e}")
            all_results[service] = []
    
    # Overall summary
    print(f"\n{'='*60}")
    print("OVERALL SUMMARY")
    print(f"{'='*60}")
    total_passed = 0
    total_tests = 0
    for service, results in all_results.items():
        if results:
            passed = sum(1 for r in results if r.get("passed"))
            total = len(results)
            total_passed += passed
            total_tests += total
            print(f"  {service.upper()}: {passed}/{total} passed")
    
    if total_tests > 0:
        print(f"\n  TOTAL: {total_passed}/{total_tests} passed ({100*total_passed/total_tests:.1f}%)")
    
    return all_results

In [7]:
# ============ Checkpointing System ============
# Tracks progress and allows resuming interrupted benchmark runs

import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Optional, Set, Tuple

CHECKPOINT_DIR = Path("evaluation_outputs/checkpoints")
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

class BenchmarkCheckpoint:
    """Manages checkpoint state for benchmark runs.
    
    Tracks which (model, test_id, run_index) combinations have been completed,
    allowing runs to be resumed after interruption.
    """
    
    def __init__(self, checkpoint_path: Optional[Path] = None, run_name: str = None):
        """Initialize checkpoint manager.
        
        Args:
            checkpoint_path: Path to checkpoint file. If None, generates based on run_name.
            run_name: Name for this run (used to generate checkpoint filename if path not given).
        """
        if checkpoint_path:
            self.checkpoint_path = Path(checkpoint_path)
        else:
            name = run_name or datetime.now().strftime('%Y%m%d_%H%M%S')
            self.checkpoint_path = CHECKPOINT_DIR / f"checkpoint_{name}.json"
        
        self.completed: Set[str] = set()  # Set of "model|test_id|run_idx" keys
        self.results: list = []  # Accumulated results
        self.metadata: dict = {}  # Run metadata
        self._lock = None  # Will be set to asyncio.Lock() when running async
        
    def _make_key(self, model: str, test_id: str, run_idx: int) -> str:
        """Create unique key for a (model, test, run) combination."""
        return f"{model}|{test_id}|{run_idx}"
    
    def is_completed(self, model: str, test_id: str, run_idx: int) -> bool:
        """Check if a specific (model, test, run) has been completed."""
        return self._make_key(model, test_id, run_idx) in self.completed
    
    def mark_completed(self, model: str, test_id: str, run_idx: int, result: dict):
        """Mark a (model, test, run) as completed and store result."""
        key = self._make_key(model, test_id, run_idx)
        self.completed.add(key)
        result["_checkpoint_key"] = key  # Store key in result for deduplication
        self.results.append(result)
    
    def save(self):
        """Save checkpoint to disk."""
        data = {
            "completed": list(self.completed),
            "results": self.results,
            "metadata": self.metadata,
            "saved_at": datetime.now().isoformat(),
        }
        
        def safe_serialize(obj):
            if isinstance(obj, bytes):
                return obj.decode('utf-8', errors='replace')
            return str(obj)
        
        with open(self.checkpoint_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, default=safe_serialize, ensure_ascii=False)
    
    def load(self) -> bool:
        """Load checkpoint from disk. Returns True if loaded successfully."""
        if not self.checkpoint_path.exists():
            return False
        
        try:
            with open(self.checkpoint_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            self.completed = set(data.get("completed", []))
            self.results = data.get("results", [])
            self.metadata = data.get("metadata", {})
            return True
        except Exception as e:
            print(f"[CHECKPOINT] Warning: Failed to load checkpoint: {e}")
            return False
    
    def get_remaining_tasks(
        self, 
        models: list, 
        tests: list, 
        runs_per_test: int
    ) -> list:
        """Get list of (model, test, run_idx) tuples that haven't been completed."""
        remaining = []
        for model in models:
            for test in tests:
                test_id = str(test.id)
                for run_idx in range(runs_per_test):
                    if not self.is_completed(model, test_id, run_idx):
                        remaining.append((model, test, run_idx))
        return remaining
    
    def get_completed_count(self) -> int:
        """Get number of completed tasks."""
        return len(self.completed)
    
    def get_results(self) -> list:
        """Get all accumulated results (removing internal checkpoint keys)."""
        results = []
        for r in self.results:
            r_clean = {k: v for k, v in r.items() if not k.startswith("_checkpoint")}
            results.append(r_clean)
        return results
    
    def summary(self) -> str:
        """Get summary string of checkpoint state."""
        return f"Checkpoint: {self.get_completed_count()} completed, {len(self.results)} results"


def get_or_create_checkpoint(
    checkpoint_path: Optional[str] = None,
    resume: bool = True
) -> BenchmarkCheckpoint:
    """Get or create a checkpoint for the current run.
    
    Args:
        checkpoint_path: Path to checkpoint file. If None, creates new timestamped checkpoint.
        resume: If True and checkpoint exists, load it. If False, start fresh.
    
    Returns:
        BenchmarkCheckpoint instance
    """
    checkpoint = BenchmarkCheckpoint(
        checkpoint_path=Path(checkpoint_path) if checkpoint_path else None
    )
    
    if resume and checkpoint.load():
        print(f"[CHECKPOINT] Resumed from {checkpoint.checkpoint_path}")
        print(f"[CHECKPOINT] {checkpoint.summary()}")
    else:
        print(f"[CHECKPOINT] Starting fresh run, saving to {checkpoint.checkpoint_path}")
    
    return checkpoint


def list_checkpoints(checkpoint_dir: Path = CHECKPOINT_DIR) -> list:
    """List all available checkpoints."""
    checkpoints = sorted(checkpoint_dir.glob("checkpoint_*.json"), reverse=True)
    print(f"Found {len(checkpoints)} checkpoints in {checkpoint_dir}:")
    for i, cp in enumerate(checkpoints[:10]):  # Show last 10
        size_kb = cp.stat().st_size / 1024
        # Load to get summary
        try:
            with open(cp) as f:
                data = json.load(f)
            n_completed = len(data.get("completed", []))
            saved_at = data.get("saved_at", "unknown")
            print(f"  [{i}] {cp.name} | {n_completed} completed | {saved_at} | {size_kb:.1f}KB")
        except:
            print(f"  [{i}] {cp.name} | {size_kb:.1f}KB (error reading)")
    return checkpoints

print("[OK] Checkpoint system loaded")

[OK] Checkpoint system loaded


In [8]:
# Models to evaluate (uncomment to include)

MODELS = [
    # "openai/gpt-5-mini",
    # "anthropic/claude-haiku-4.5",
    # "anthropic/claude-sonnet-4.5",
    # "anthropic/claude-opus-4.5",
    # "x-ai/grok-4.1-fast",
    # "deepseek/deepseek-v3.2",
    # "moonshotai/kimi-k2-0905",
    # "qwen/qwen3-vl-235b-a22b-instruct",
    "moonshotai/kimi-k2-thinking",
    "openai/gpt-oss-120b"
]

In [9]:
# Runtime Settings (all passed to benchmark functions)

BENCHMARK_SETTINGS = {
    "models": MODELS,                    # Models to evaluate (from cell above)
    "runs_per_test": 1,                  # Number of runs per test
    "max_tests": None,                   # None = all tests, or set a limit
    "max_concurrent_models": 5,          # Parallel model evaluations
    "max_concurrent_tests": 15,          # Parallel test executions
    "max_calls_per_minute": 480,         # API rate limit
    "test_timeout_seconds": 480,         # 8 minutes per test
    "max_iterations": 40,                # Max ReAct iterations
    "include_api_docs": True,           # Include API docs in prompt (False = agent explores)
}

# ============ Checkpointing (optional) ============
# Enable to resume interrupted runs. Checkpoint saves after each test.

# Option 1: Fresh run with new checkpoint
checkpoint = get_or_create_checkpoint(resume=False)

# Option 2: Resume from latest checkpoint (comment out Option 1)
# checkpoints = list_checkpoints()  # List available checkpoints
# checkpoint = get_or_create_checkpoint(checkpoint_path=str(checkpoints[0]), resume=True)

# Option 3: No checkpointing (comment out both options above)
# checkpoint = None

# Add checkpoint to settings
BENCHMARK_SETTINGS["checkpoint"] = checkpoint

# ============ Run Benchmark ============

# Single service:
#results = await run_benchmark_suite(service="calendar", **BENCHMARK_SETTINGS)

# Multiple services:
#all_results = await run_all_benchmarks(services=["slack", "box"], **BENCHMARK_SETTINGS)

# All services:
all_results = await run_all_benchmarks(**BENCHMARK_SETTINGS)

[CHECKPOINT] Starting fresh run, saving to evaluation_outputs/checkpoints/checkpoint_20260130_204940.json
Benchmarks: SLACK, BOX, CALENDAR, LINEAR | 2 models | with API docs | 480s timeout

[SLACK] Slack Bench v2 | 59 tests x 2 models x 1 runs = 118 total


SLACK | kimi-k2-thin: 0/0 | gpt-oss-120b: 0/0:   0%|          | 0/118 [00:00<?, ?test/s]

[FAIL] kimi-k2-thinkin | Send direct message Artem, and Hube... | 50.0%
[FAIL] kimi-k2-thinkin | Add emoji reaction | 0.0%
[FAIL] kimi-k2-thinkin | Mention user in message | 0.0%
[FAIL] kimi-k2-thinkin | Multi-channel send | 50.0%
[FAIL] kimi-k2-thinkin | Edit existing message | 0.0%
[FAIL] kimi-k2-thinkin | Reply in a thread | 0.0%
[FAIL] kimi-k2-thinkin | Search messages in a channel and co... | 0.0%
[FAIL] kimi-k2-thinkin | Rich Text: Block Quote | 0.0%
[FAIL] kimi-k2-thinkin | Search messages in multiple channel... | 0.0%
[FAIL] kimi-k2-thinkin | Rich Text: Code Block and Numbered ... | 50.0%
[FAIL] kimi-k2-thinkin | Forward questions to another channe... | 0.0%
[FAIL] kimi-k2-thinkin | Ambiguity Resolution (Contextual Ch... | 0.0%
[FAIL] kimi-k2-thinkin | Search messages in multiple channel... | 25.0%
[FAIL] kimi-k2-thinkin | Table Block Generation | 50.0%
[FAIL] kimi-k2-thinkin | Cross-channel Summarization | 0.0%
[TIMEOUT] kimi-k2-thinkin | Add emoji reactions | 482.0s
[TIMEOUT]

BOX | kimi-k2-thin: 0/0 | gpt-oss-120b: 0/0:   0%|          | 0/96 [00:00<?, ?test/s]

[FAIL] kimi-k2-thinkin | Level 2: Hub Setup | 0.0%
[FAIL] kimi-k2-thinkin | Level 4: Rename Folder from CSV (CP... | 0.0%
[FAIL] kimi-k2-thinkin | Level 3: Search and Rename Folder | 0.0%
[FAIL] kimi-k2-thinkin | Level 4: Find Duplicates, Delete Mi... | 50.0%
[FAIL] kimi-k2-thinkin | Level 1: Upload History Note | 0.0%
[FAIL] kimi-k2-thinkin | Level 3: Ambiguous Sorting | 0.0%
[FAIL] kimi-k2-thinkin | Level 3: Sort Misfiled Reading | 0.0%
[FAIL] kimi-k2-thinkin | Level 4: Upload TXT named from CSV ... | 0.0%
[FAIL] kimi-k2-thinkin | Level 3: Dyslexic User Typo Fix | 80.0%
[FAIL] kimi-k2-thinkin | Level 3: Organize Research Hub | 0.0%
[FAIL] kimi-k2-thinkin | Level 2: Description From Name | 0.0%
[FAIL] kimi-k2-thinkin | Level 1: Upload and Delete File (Tr... | 0.0%
[TIMEOUT] kimi-k2-thinkin | Level 3: Cross-Folder Dedup | 482.0s
[TIMEOUT] kimi-k2-thinkin | Level 3: Organize By Extension (Fla... | 482.3s
[TIMEOUT] kimi-k2-thinkin | Level 3: Remove Duplicate (Buenos A... | 482.0s
[TIMEOU

CALENDAR | kimi-k2-thin: 0/0 | gpt-oss-120b: 0/0:   0%|          | 0/120 [00:00<?, ?test/s]

[FAIL] kimi-k2-thinkin | Symposium of Infinite Curiosity - A... | 16.666666666666664%
[FAIL] kimi-k2-thinkin | Crypto-Zoology Summit - Mythical cr... | 25.0%
[FAIL] kimi-k2-thinkin | Time-Traveler's Convention - Tempor... | 75.0%
[FAIL] kimi-k2-thinkin | Thunderwave Music Festival - Multi-... | 0.0%
[FAIL] kimi-k2-thinkin | Celluloid Dreams Film Festival - La... | 83.33333333333334%
[FAIL] kimi-k2-thinkin | Museum of Whispered Relics - Calend... | 77.77777777777779%
[TIMEOUT] kimi-k2-thinkin | Cosmic Voyagers Astronomy Club - Mu... | 482.0s
[TIMEOUT] kimi-k2-thinkin | Green Thumbs Urban Garden Collectiv... | 482.6s
[TIMEOUT] kimi-k2-thinkin | Dice & Dragons Tabletop Gaming Guil... | 482.8s
[TIMEOUT] kimi-k2-thinkin | Mirage Menagerie Caravan Festival -... | 482.9s
[TIMEOUT] kimi-k2-thinkin | Emberline Embassy Network - Calenda... | 483.0s
[TIMEOUT] kimi-k2-thinkin | Firefly Conservatory - Recurring pa... | 483.1s
[TIMEOUT] kimi-k2-thinkin | Clockwork Tinkerers Guild - Recurri... | 483.

CancelledError: 

In [None]:
# ============ Analysis Functions ============
# Reusable functions for analyzing benchmark results

def styled_display(df, format_str="{:.1f}%", gradient_cols=None):
    """Display dataframe with styling if jinja2 is available, otherwise plain."""
    try:
        styled = df.style.format(format_str)
        if gradient_cols:
            cols = [c for c in gradient_cols if c in df.columns]
            if cols:
                styled = styled.background_gradient(cmap="RdYlGn", vmin=0, vmax=100, subset=cols)
        display(styled)
    except (ImportError, AttributeError):
        display(df)


def analyze_results(results: list):
    """Display comprehensive analysis tables for benchmark results.
    
    Args:
        results: List of result dicts from benchmark runs
    """
    if not results:
        print("No results to analyze.")
        return
    
    df = pd.DataFrame(results)
    
    # Helper to format model names (shorter)
    df["model_short"] = df["model"].apply(lambda x: x.split("/")[-1] if "/" in str(x) else x)

    # ============ 1. Overall Leaderboard by Model ============
    display(HTML("<h3>1. Overall Results by Model</h3>"))
    leaderboard = df.groupby("model_short").agg(
        passed=("passed", "sum"),
        total=("passed", "count"),
        avg_score=("score", "mean"),
        avg_time=("time", "mean")
    ).reset_index()
    leaderboard["pass_rate"] = (leaderboard["passed"] / leaderboard["total"] * 100).round(1)
    leaderboard = leaderboard.sort_values("pass_rate", ascending=False)
    display(leaderboard)

    # ============ 2. Results by Model and Service ============
    display(HTML("<h3>2. Results by Model and Service</h3>"))
    by_model_service = df.groupby(["model_short", "service"]).agg(
        passed=("passed", "sum"),
        total=("passed", "count"),
        avg_score=("score", "mean"),
    ).reset_index()
    by_model_service["pass_rate"] = (by_model_service["passed"] / by_model_service["total"] * 100).round(1)
    
    # Pivot for better readability
    pivot_pass_rate = by_model_service.pivot(index="model_short", columns="service", values="pass_rate")
    pivot_pass_rate["OVERALL"] = leaderboard.set_index("model_short")["pass_rate"]
    pivot_pass_rate = pivot_pass_rate.sort_values("OVERALL", ascending=False)
    styled_display(pivot_pass_rate, gradient_cols=list(pivot_pass_rate.columns))

    # ============ 3. Results by Service (all models) ============
    display(HTML("<h3>3. Results by Service (All Models)</h3>"))
    by_service = df.groupby("service").agg(
        passed=("passed", "sum"),
        total=("passed", "count"),
        avg_score=("score", "mean"),
        avg_time=("time", "mean")
    ).reset_index()
    by_service["pass_rate"] = (by_service["passed"] / by_service["total"] * 100).round(1)
    by_service = by_service.sort_values("pass_rate", ascending=False)
    display(by_service)

    # ============ 4. With Docs vs Without Docs ============
    if "include_api_docs" in df.columns and df["include_api_docs"].nunique() > 1:
        display(HTML("<h3>4. With API Docs vs Without API Docs</h3>"))
        
        # Overall by docs
        by_docs = df.groupby("include_api_docs").agg(
            passed=("passed", "sum"),
            total=("passed", "count"),
            avg_score=("score", "mean"),
            avg_time=("time", "mean")
        ).reset_index()
        by_docs["pass_rate"] = (by_docs["passed"] / by_docs["total"] * 100).round(1)
        by_docs["include_api_docs"] = by_docs["include_api_docs"].map({True: "With Docs", False: "Without Docs"})
        display(by_docs)
        
        # By model and docs
        display(HTML("<h4>4a. By Model: With vs Without Docs</h4>"))
        by_model_docs = df.groupby(["model_short", "include_api_docs"]).agg(
            passed=("passed", "sum"),
            total=("passed", "count"),
            avg_score=("score", "mean"),
        ).reset_index()
        by_model_docs["pass_rate"] = (by_model_docs["passed"] / by_model_docs["total"] * 100).round(1)
        by_model_docs["docs"] = by_model_docs["include_api_docs"].map({True: "With Docs", False: "Without Docs"})
        
        pivot_docs = by_model_docs.pivot(index="model_short", columns="docs", values="pass_rate")
        if "With Docs" in pivot_docs.columns and "Without Docs" in pivot_docs.columns:
            pivot_docs["Delta"] = pivot_docs["With Docs"] - pivot_docs["Without Docs"]
            pivot_docs = pivot_docs.sort_values("Delta", ascending=False)
        styled_display(pivot_docs, gradient_cols=["With Docs", "Without Docs"])
        
        # By service and docs
        display(HTML("<h4>4b. By Service: With vs Without Docs</h4>"))
        by_service_docs = df.groupby(["service", "include_api_docs"]).agg(
            passed=("passed", "sum"),
            total=("passed", "count"),
            avg_score=("score", "mean"),
        ).reset_index()
        by_service_docs["pass_rate"] = (by_service_docs["passed"] / by_service_docs["total"] * 100).round(1)
        by_service_docs["docs"] = by_service_docs["include_api_docs"].map({True: "With Docs", False: "Without Docs"})
        
        pivot_service_docs = by_service_docs.pivot(index="service", columns="docs", values="pass_rate")
        if "With Docs" in pivot_service_docs.columns and "Without Docs" in pivot_service_docs.columns:
            pivot_service_docs["Delta"] = pivot_service_docs["With Docs"] - pivot_service_docs["Without Docs"]
        styled_display(pivot_service_docs, gradient_cols=["With Docs", "Without Docs"])
    else:
        docs_status = df["include_api_docs"].iloc[0] if "include_api_docs" in df.columns else "unknown"
        print(f"\n[Note] All results have include_api_docs={docs_status}, no comparison available.")

    # ============ 5. Usage Summary ============
    display(HTML("<h3>5. Usage Summary</h3>"))
    
    # Extract usage data into dataframe columns
    def extract_tokens(row):
        trace = row["trace"] if "trace" in row and isinstance(row["trace"], dict) else {}
        usage = trace.get("usage", {}) if isinstance(trace, dict) else {}
        return usage.get("total_tokens", 0) if isinstance(usage, dict) else 0
    
    def extract_cost(row):
        trace = row["trace"] if "trace" in row and isinstance(row["trace"], dict) else {}
        usage = trace.get("usage", {}) if isinstance(trace, dict) else {}
        return usage.get("cost", 0) if isinstance(usage, dict) else 0
    
    df["total_tokens"] = df.apply(extract_tokens, axis=1)
    df["cost"] = df.apply(extract_cost, axis=1)
    
    print(f"Total: {df['total_tokens'].sum():,.0f} tokens | ${df['cost'].sum():.4f} USD")
    
    # Usage by model
    display(HTML("<h4>5a. Usage by Model</h4>"))
    usage_by_model = df.groupby("model_short").agg(
        tests=("model_short", "count"),
        tokens=("total_tokens", "sum"),
        cost=("cost", "sum"),
    ).reset_index()
    usage_by_model["tokens_per_test"] = (usage_by_model["tokens"] / usage_by_model["tests"]).round(0).astype(int)
    usage_by_model["cost_per_test"] = (usage_by_model["cost"] / usage_by_model["tests"]).round(4)
    usage_by_model = usage_by_model.sort_values("cost", ascending=False)
    display(usage_by_model)
    
    # Usage by model and include_api_docs
    if "include_api_docs" in df.columns and df["include_api_docs"].nunique() > 1:
        display(HTML("<h4>5b. Usage by Model: With vs Without Docs</h4>"))
        usage_by_model_docs = df.groupby(["model_short", "include_api_docs"]).agg(
            tests=("model_short", "count"),
            tokens=("total_tokens", "sum"),
            cost=("cost", "sum"),
        ).reset_index()
        usage_by_model_docs["tokens_per_test"] = (usage_by_model_docs["tokens"] / usage_by_model_docs["tests"]).round(0).astype(int)
        usage_by_model_docs["cost_per_test"] = (usage_by_model_docs["cost"] / usage_by_model_docs["tests"]).round(4)
        usage_by_model_docs["docs"] = usage_by_model_docs["include_api_docs"].map({True: "With Docs", False: "Without Docs"})
        
        # Pivot for comparison
        pivot_tokens = usage_by_model_docs.pivot(index="model_short", columns="docs", values="tokens_per_test")
        pivot_cost = usage_by_model_docs.pivot(index="model_short", columns="docs", values="cost_per_test")
        
        print("Tokens per test:")
        display(pivot_tokens)
        print("\nCost per test:")
        display(pivot_cost)


def load_and_analyze(file_path: str = None):
    """Load results from a JSON file and analyze them.
    
    Args:
        file_path: Path to JSON file. If None, lists available files.
    
    Returns:
        list: Loaded results (or None if just listing files)
    """
    if file_path is None:
        files = list_result_files()
        print("\nUsage: load_and_analyze('path/to/file.json')")
        print("   or: load_and_analyze(str(files[0]))")
        return None
    
    with open(file_path, 'r', encoding='utf-8') as f:
        results = json.load(f)
    
    print(f"Loaded {len(results)} results from {file_path}\n")
    analyze_results(results)
    return results

print("[OK] Analysis functions loaded")

In [None]:
# Handle both single service (results = list) and multi-service (all_results = dict)
if 'all_results' in dir() and all_results:
    # Flatten all_results dict into a single list
    results_to_save = []
    for service, service_results in all_results.items():
        results_to_save.extend(service_results)
elif 'results' in dir() and results:
    results_to_save = results
else:
    results_to_save = []

if results_to_save:
    # Analyze results (uses analyze_results function from cell below)
    analyze_results(results_to_save)
    
    # Save results to JSON
    ts = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_path = OUTPUT_DIR / f"full_results_{ts}.json"

    def safe_serialize(obj):
        if isinstance(obj, bytes):
            return obj.decode('utf-8', errors='replace')
        return str(obj)

    try:
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(results_to_save, f, indent=2, default=safe_serialize, ensure_ascii=False)
        print(f"\nResults saved to {output_path}")
    except Exception as e:
        print(f"Error saving JSON: {e}")
else:
    print("No results generated.")




In [None]:
# Utility: Merge Multiple Result Files

def list_result_files(output_dir: Path = OUTPUT_DIR) -> list:
    """List all result JSON files in the output directory."""
    files = sorted(output_dir.glob("full_results_*.json"))
    print(f"Found {len(files)} result files in {output_dir}:")
    for i, f in enumerate(files):
        size_kb = f.stat().st_size / 1024
        print(f"  [{i}] {f.name} ({size_kb:.1f} KB)")
    return files


def merge_result_files(
    files: list = None,
    output_dir: Path = OUTPUT_DIR,
    output_name: str = None,
    deduplicate: bool = False,
) -> list:
    """Merge multiple result JSON files into one.
    
    Args:
        files: List of file paths to merge. If None, merges all files in output_dir.
        output_dir: Directory containing result files (used if files=None)
        output_name: Output filename. If None, generates timestamped name.
        deduplicate: If True, removes duplicate entries (same test_id + model + timestamp)
    
    Returns:
        list: Merged results
    """
    if files is None:
        files = sorted(output_dir.glob("full_results_*.json"))
    
    if not files:
        print("No files to merge.")
        return []
    
    print(f"Merging {len(files)} files...")
    
    all_results = []
    for f in files:
        try:
            with open(f, 'r', encoding='utf-8') as fp:
                data = json.load(fp)
                if isinstance(data, list):
                    all_results.extend(data)
                    print(f"  + {f.name}: {len(data)} results")
                else:
                    print(f"  ! {f.name}: unexpected format (not a list)")
        except Exception as e:
            print(f"  ! {f.name}: error loading - {e}")
    
    print(f"\nTotal: {len(all_results)} results")
    
    # Deduplicate if requested
    if deduplicate and all_results:
        seen = set()
        unique_results = []
        for r in all_results:
            # Create unique key from test_id, model, timestamp, and run_index
            # run_index distinguishes multiple runs of the same test
            key = (r.get("test_id"), r.get("model"), r.get("timestamp"), r.get("run_index", 0))
            if key not in seen:
                seen.add(key)
                unique_results.append(r)
        
        removed = len(all_results) - len(unique_results)
        if removed > 0:
            print(f"Deduplicated: removed {removed} duplicates, {len(unique_results)} unique results")
        all_results = unique_results
    
    # Save merged results
    if output_name is None:
        ts = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_name = f"merged_results_{ts}.json"
    
    output_path = output_dir / output_name
    
    def safe_serialize(obj):
        if isinstance(obj, bytes):
            return obj.decode('utf-8', errors='replace')
        return str(obj)
    
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(all_results, f, indent=2, default=safe_serialize, ensure_ascii=False)
    
    print(f"\nSaved to: {output_path}")
    
    # Summary by model and service
    if all_results:
        df = pd.DataFrame(all_results)
        print("\n--- Summary by Model ---")
        model_summary = df.groupby("model").agg(
            tests=("passed", "count"),
            passed=("passed", "sum"),
            avg_score=("score", "mean"),
        ).reset_index()
        model_summary["pass_rate"] = (100 * model_summary["passed"] / model_summary["tests"]).round(1)
        display(model_summary)
        
        if "service" in df.columns:
            print("\n--- Summary by Service ---")
            service_summary = df.groupby("service").agg(
                tests=("passed", "count"),
                passed=("passed", "sum"),
                avg_score=("score", "mean"),
            ).reset_index()
            service_summary["pass_rate"] = (100 * service_summary["passed"] / service_summary["tests"]).round(1)
            display(service_summary)
    
    return all_results


# Example usage:
# files = list_result_files()
# merged = merge_result_files()  # Merge all files
# merged = merge_result_files(files=[files[0], files[2]])  # Merge specific files
# merged = merge_result_files(deduplicate=True)  # Merge and remove duplicates

In [None]:
# List available result files
files = list_result_files()

#print(files)

# Merge all files with deduplication
merged = merge_result_files(deduplicate=False)

# Alternative options:
# merged = merge_result_files()  # Merge all without deduplication
# merged = merge_result_files(files=[files[0], files[-1]])  # Merge specific files only

In [None]:
analyze_results(merged)