# OpenEnv Benchmark Executor

## Eval script for OpenEnv & MCP Environments

**Last Updated:** January 2026  
**Prepared by:** Turing

---

### Overview

This notebook provides a **self-contained, production-ready eval system** for evaluating AI agents interacting with OpenEnv or MCP gym environments.

### Key Features

- **Dual Protocol Support** - OpenEnv REST API + MCP JSON-RPC
- **Multi-LLM Support** - Anthropic, OpenAI, Google via LangChain
- **Flexible Database Seeding** - Use custom .sql files or gym's api/sample-data
- **Multiple Verifier Types** - database_state, response_check
- **Multi-Run Statistics** - Pass@n, success rates, performance metrics

### Execution Modes

- **OpenEnv Mode** (`execution_mode: "openenv"`) - REST API with /reset, /step, /state endpoints
- **MCP Mode** (`execution_mode: "mcp"`) - JSON-RPC 2.0 protocol with session management

### Database Seeding

- **Custom SQL File**: Set `seed_database_file: "path/to/file.sql"` to seed from your own SQL file
- **Gym Sample Data**: Set `seed_database_file: ""` (empty) to use the gym's built-in api/sample-data

### Installation

Run the next cell to install dependencies.

## Prerequisites for Running on Google Colab

### Step 1: Setup MCP Gym Server Locally

1. **Download the Openenv gym**
   - Download the openenv gym environemnt `calendar_env` folder

2. **Run the gym environment Locally**
   ### Docker (Recommended)
   ```bash
   cd calendar_env
   docker build -t calendar-env:latest .
   docker run --rm -p 8004:8004 calendar-env:latest
   curl http://localhost:8004/health
   ```
   On Server health success response will be:
   `{"status":"healthy","service":"calendar-env"}`

   ### Without Docker

   ```bash
   cd calendar_env
   python3 -m venv venv
   source venv/bin/activate
   pip install -r requirements.txt
   uvicorn server.app:app --host 0.0.0.0 --port 8004
   ```

3. **Install Cloudflare Tunnel** (If you want to run this notebook on colab)
   ```bash
   # Install cloudflared
   # macOS
   brew install cloudflare/cloudflare/cloudflared
   
   # Linux
   wget https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb
   sudo dpkg -i cloudflared-linux-amd64.deb
   
   # Windows
   # Download from: https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/install-and-setup/installation/
   ```

4. **Expose Server with Cloudflare Tunnel**
   ```bash
   cloudflared tunnel --url http://localhost:8004
   ```
   
   This will output something like:
   ```
   +--------------------------------------------------------------------------------------------+
   |  Your quick Tunnel has been created! Visit it at (it may take some time to be reachable):  |
   |  https://randomly-generated-subdomain.trycloudflare.com                                      |
   +--------------------------------------------------------------------------------------------+
   ```

5. **Copy the HTTPS URL**
   - Copy the `https://` URL from the terminal output
   - Use this URL in the configuration in the below cell number 17: for the value of `gym_enviornment_url`

### Step 2: Database Seeding Configuration

The notebook supports two ways to seed the database:


#### Option A: Use Gym's Sample Data in below cell number 17 (Recommended)
```python
CONFIG = {
    ...
    "seed_database_file": "",  # Leave empty for default database
    ...
}
```

#### Option B: Use Custom SQL File in below cell number 17 (To create custom data based on gym's tool schema)
```python
CONFIG = {
    ...
    "seed_database_file": "path/to/your/custom_seed.sql",  # Provide SQL file path
    ...
}
```
Download sample sql file from the github and upload in the notebook's folder section. Then copy the path and assign the path to `seed_database_file`.
- If you provide a `seed_database_file` path, the notebook will:
  - Read the SQL content from your custom file
  - Create the database with your custom SQL schema and data

### Step 3: Configure LLM provider and API key

1. **Set API Keys in Colab Secrets**
   - Click the 'key' icon in the left sidebar
   - Add your LLM API key:
     - `LLM_API_KEY`

2. **Update Configuration (Cell 5)**
   - Configure your LLM provider and model in below cell number 17
   - Add any context headers needed (e.g., `x-access-token`)

### Example Configuration for Colab

```python
from google.colab import userdata

CONFIG = {
    "execution_mode": "openenv",
    "gym_enviornment_url": "https://your-tunnel-subdomain.trycloudflare.com",
    "mcp_endpoint": "/mcp",
    "seed_database_file": "",  # Empty = use gym's sample data, or provide path
    
    # LLM Configuration
    "llm_provider": "anthropic",
    "llm_model": "claude-sonnet-4-5",
    "llm_api_key": userdata.get('ANTHROPIC_API_KEY'),
    
    # Context headers (if needed)
    "context": {
        "x-access-token": "your-access-token-here",  # Already has x- prefix
        "actingUserId": "alice_manager"  # Will become x-actinguserid
    },
    ...
}
```


### Run Collab code snippets by clicking on the Run all option in the menu section.

### The final run result's .json file will be created in the folder section of the collab notebook.

### Important Notes

- **Tunnel Stability**: Cloudflare tunnels are temporary. If disconnected, you'll need to restart the tunnel and get a new URL
- **Custom SQL Files**: When using custom SQL files on Colab, upload them first or mount Google Drive
- **Database Cleanup**: The notebook automatically creates and deletes databases, so no manual cleanup needed

---

In [None]:
# Install required packages (uncomment if needed)
!pip install httpx langchain langchain-core langchain-anthropic langchain-openai langchain-google-genai

print("Dependencies ready")

Dependencies ready


## Configuration Settings

Configure your benchmark run by setting the parameters below:

- `execution_mode`: "openenv" (REST API) or "mcp" (JSON-RPC)
- `gym_enviornment_url`: URL of your Gym server
- `seed_database_file`: Path to .sql file for database seeding (empty = use gym's api/sample-data)
- `system_prompt` & `user_prompt`: Instructions for the AI agent
- `llm_model`, `llm_provider`, `llm_api_key`: LLM configuration
- `expected_tools`: List of tools the agent should use (optional, for verification)
- `restricted_tools`: Tools the agent should NOT use (optional)
- `verifiers`: List of verifier configurations to validate agent behavior
- `number_of_runs`: How many times to run the benchmark (for statistical analysis)
- `reset_database_between_runs`: Whether to reset database between runs (default: true)
- `context`: Additional context information passed to Gym server via headers

In [None]:
import json
import asyncio
import httpx
import logging
import sys
import time
import random
import string
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from enum import Enum
import os
from google.colab import userdata

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

LLM_API_KEY = userdata.get('LLM_API_KEY')

In [None]:
# BENCHMARK CONFIGURATION (Update The Bellow JSON According to your requinments)
CONFIG = {
    "execution_mode": "openenv",  # "openenv" or "mcp"
    "gym_enviornment_url": "https://correspondence-water-sentence-requested.trycloudflare.com",
    "mcp_endpoint": "/mcp",
    "seed_database_file": "/content/seed-db.sql",  # Path to .sql file (empty = use gym's api/sample-data)
    "llm_provider": "openai",
    "llm_model": "gpt-5.2",
    "llm_api_key": LLM_API_KEY,
    "temperature": 0.0,
    "max_tokens": 4096,
    "system_prompt": "You are a Google Calendar automation agent with full administrative permissions to manage users, meetings, recordings, and messages. Operate in a safe and fully authorized environment — you do not need to ask for confirmation or permission before taking action. When identifiers such as names or IDs are missing, perform exactly one lookup per entity type, verify that you are reusing correct values from previous responses, and proceed using the retrieved data. Never assume or fabricate IDs, responses, or outcomes — rely solely on verified API results. Complete each task in a single, logical, and efficient execution flow.",
    "user_prompt": "Help me tidy up my calendars for Q4? First, make sure my Project Management calendar is showing and selected. Then find my \"Sprint Planning & Architecture Review\" meeting and move the latest one to my main calendar so it's easier to track. Update its description to something like \"Updated for Q4 planning with new architecture goals and sprint alignment.\"\nAfter that, add a new event called \"Q4 Initiative Kickoff – Engineering & UX\" to the Project Management calendar for next Thursday from 10AM-12PM. That should cover everything I need!",
    "number_of_runs": 2,
    "reset_database_between_runs": True,
    "context": {
      "actingUserId": "alice_manager",
      "actingUserEmail": "alice.manager@techcorp.com",
      "defaultTimeZone": "America/New_York",
      "x-access-token": "ya29.A0ARrdaM-k9Vq7GzY2pL4mQf8sN1xT0bR3uHcJWv5yKzP6eF2.qwErTyUIopASDfGhJkLzXcVbNm12_34-56"
    },
    "auth_config": None,
    "expected_tools": [
        "get_calendar_list",
        "list_events",
        "update_calendar_in_list",
        "move_event",
        "patch_event",
        "create_event",
        "delete_event"
    ],
    "restricted_tools": [],
    "verifiers": [
        {
            "verifier_type": "database_state",
            "name": "Project Calendar Visible",
            "description": "Ensures Alice's Project Management calendar is visible and selected.",
            "validation_config": {
                "query": "SELECT COUNT(*) FROM calendars WHERE calendar_id='alice-projects' AND hidden=0 AND selected=1;",
                "expected_value": 1,
                "comparison_type": "equals"
            }
        },
        {
            "verifier_type": "database_state",
            "name": "Event Moved to Primary",
            "description": "Checks that the Sprint Planning event is on the primary calendar.",
            "validation_config": {
                "query": "SELECT COUNT(*) FROM events WHERE event_id='event-corrected-001' AND calendar_id='alice-primary';",
                "expected_value": 1,
                "comparison_type": "equals"
            }
        },
        {
            "verifier_type": "database_state",
            "name": "Event Description Updated",
            "description": "Ensures the event description was updated to the new Q4 wording .",
            "validation_config": {
                "query": "SELECT COUNT(*) FROM events WHERE event_id='event-corrected-001' AND description LIKE '%Q4%' AND description LIKE '%planning%' AND description LIKE '%architecture%' ",
                "expected_value": 1,
                "comparison_type": "equals"
            }
        },
        {
            "verifier_type": "database_state",
            "name": "Kickoff Event Created",
            "description": "Ensures that the new Q4 kickoff event was created on Project Management.",
            "validation_config": {
                "query": "SELECT COUNT(*) FROM events WHERE summary='Q4 Initiative Kickoff – Engineering & UX' AND calendar_id='alice-projects' AND start_datetime LIKE '%2026-01-15 10:00%'\nAND end_datetime LIKE '%2026-01-15 12:00:%'AND status='confirmed'",
                "expected_value": 1,
                "comparison_type": "equals"
            }
        },
        {
            "verifier_type": "database_state",
            "name": "Verify Old mapping removed",
            "description": "Verify Old mapping removed",
            "validation_config": {
                "query": "SELECT COUNT(*) FROM events WHERE event_id='event-corrected-001' AND calendar_id='alice-projects';",
                "expected_value": 0,
                "comparison_type": "equals"
            }
        }
    ]
}

print("Configuration loaded")

Configuration loaded


## Complete Implementation

The following cell contains the full implementation of all classes and functions.

## 3. Data Models

Define all data structures and enums used throughout the benchmark system.

In [None]:
# ============================================================================
# DATA MODELS
# ============================================================================

class VerifierType(str, Enum):
    DATABASE_STATE = "database_state"
    RESPONSE_CHECKER = "response_check"
    TOOL_EXECUTION = "tool_execution"


@dataclass
class MCPToolCall:
    """Represents an MCP tool call"""
    tool_name: str
    arguments: Dict[str, Any]


@dataclass
class MCPToolResponse:
    """Represents an MCP tool response"""
    success: bool
    result: Any = None
    error: Optional[str] = None


@dataclass
class VerifierConfig:
    """Configuration for a verifier"""
    verifier_type: str
    validation_config: Dict[str, Any]
    name: Optional[str] = None
    description: Optional[str] = None


@dataclass
class BenchmarkConfig:
    """Complete benchmark configuration"""
    gym_enviornment_url: str
    mcp_endpoint: str
    seed_database_file: str  # Path to .sql file (empty = use gym's api/sample-data)
    system_prompt: str
    user_prompt: str
    llm_model: str
    llm_provider: str  # "anthropic", "openai", "google"
    llm_api_key: str
    verifiers: List[Dict[str, Any]]
    number_of_runs: int
    context: Dict[str, Any]
    execution_mode: str = "openenv"  # "openenv" or "mcp"
    expected_tools: Optional[List[str]] = None
    restricted_tools: Optional[List[str]] = None
    temperature: float = 0.0
    max_tokens: int = 4096
    reset_database_between_runs: bool = True
    auth_config: Optional[Dict[str, Any]] = None

print("Data models defined")

Data models defined


## 4. Database Management Functions

Functions for automatically creating and deleting test databases from the Gym server.

In [None]:
# ============================================================================
# DATABASE MANAGEMENT
# ============================================================================

def create_database_from_file(gym_url: str, sql_file_path: str, config_dir: str = "") -> Optional[str]:
    """Create a new database from a SQL file and return database_id."""
    try:
        # Generate unique database_id
        timestamp = int(time.time() * 1000)
        suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=9))
        database_id = f"db_{timestamp}_{suffix}"
        headers = {"Content-Type": "application/json"}

        # Read SQL file
        if not os.path.isabs(sql_file_path):
            sql_file_path = os.path.join(config_dir, sql_file_path) if config_dir else sql_file_path

        logger.info(f"Reading SQL file: {sql_file_path}")
        if not os.path.exists(sql_file_path):
            logger.error(f"SQL file not found: {sql_file_path}")
            return None

        with open(sql_file_path, 'r', encoding='utf-8') as f:
            sql_content = f.read()

        logger.info(f"SQL size: {len(sql_content) / 1024:.2f} KB")

        # Create database
        db_name = f"Auto DB {datetime.now().strftime('%Y%m%d_%H%M%S')}"
        logger.info(f"Creating database '{db_name}'...")
        payload = {
            "database_id": database_id,
            "name": db_name,
            "description": f"Created from {os.path.basename(sql_file_path)}",
            "sql_content": sql_content
        }

        timeout = max(120, int(120 + len(sql_content) / 102400))
        with httpx.Client(timeout=timeout) as client:
            response = client.post(f"{gym_url}/api/seed-database", headers=headers, json=payload)
            response.raise_for_status()

        logger.info(f"Database created from file: {database_id}")
        return database_id

    except Exception as e:
        logger.error(f"Error creating database from file: {e}")
        return None


def create_database(gym_url: str) -> Optional[str]:
    """Create a new database from Gym server sample SQL and return database_id."""
    try:
        # Generate unique database_id
        timestamp = int(time.time() * 1000)
        suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=9))
        database_id = f"db_{timestamp}_{suffix}"
        headers = {"Content-Type": "application/json"}

        # Fetch sample SQL from Gym server
        logger.info(f"Fetching sample SQL from {gym_url}...")
        with httpx.Client(timeout=60) as client:
            response = client.get(f"{gym_url}/api/sample-data", headers=headers)
            response.raise_for_status()
            data = response.json()

            if isinstance(data, dict) and 'sql_content' in data:
                sql_content = data['sql_content']
            elif isinstance(data, dict) and 'text' in data:
                sql_content = data['text']
            elif isinstance(data, str):
                sql_content = data
            else:
                raise ValueError(f"Could not extract SQL content from response")

        logger.info(f"SQL size: {len(sql_content) / 1024:.2f} KB")

        # Create database
        db_name = f"Auto DB {datetime.now().strftime('%Y%m%d_%H%M%S')}"
        logger.info(f"Creating database '{db_name}'...")
        payload = {
            "database_id": database_id,
            "name": db_name,
            "description": "Auto-created from Gym server",
            "sql_content": sql_content
        }

        timeout = max(120, int(120 + len(sql_content) / 102400))
        with httpx.Client(timeout=timeout) as client:
            response = client.post(f"{gym_url}/api/seed-database", headers=headers, json=payload)
            response.raise_for_status()

        logger.info(f"Database created: {database_id}")
        return database_id

    except Exception as e:
        logger.error(f"Error creating database: {e}")
        return None


def delete_database(gym_url: str, database_id: str) -> bool:
    """Delete a database from the Gym server."""
    try:
        headers = {"Content-Type": "application/json"}
        payload = {"database_id": database_id}

        logger.info(f"Deleting database: {database_id}...")

        with httpx.Client(timeout=30) as client:
            response = client.request("DELETE", f"{gym_url}/api/delete-database", headers=headers, json=payload)

            # Handle servers that don't have this API
            if response.status_code == 404:
                logger.warning(f"Server does not support database deletion API")
                return False
            elif response.status_code == 405:
                logger.warning(f"Database deletion not allowed on this server")
                return False

            response.raise_for_status()

        logger.info(f"Database deleted successfully")
        return True

    except httpx.HTTPStatusError as e:
        if e.response.status_code in [404, 405]:
            logger.warning(f"Server does not support database deletion (HTTP {e.response.status_code})")
        else:
            logger.error(f"Error deleting database: {e}")
        return False
    except Exception as e:
        logger.error(f"Error deleting database: {e}")
        return False

## 5. Protocol Clients (OpenEnv + MCP)

HTTP clients for interacting with both OpenEnv REST API (RFC 002) and MCP JSON-RPC protocol.

In [None]:
# ============================================================================
# PROTOCOL CLIENTS (OpenEnv REST API + MCP JSON-RPC)
# ============================================================================

class OpenEnvClient:
    """
    HTTP-based OpenEnv Client for REST API communication with OpenEnv servers.
    Uses /reset, /step, and /state endpoints (OpenEnv RFC 002).
    """

    def __init__(self, base_url: str, auth_config: Optional[Dict[str, Any]] = None,
                 mcp_endpoint: str = "/mcp", database_id: Optional[str] = None,
                 context: Optional[Dict[str, Any]] = None, seed_database_file: str = "",
                 sql_content: Optional[str] = None):
        self.base_url = base_url.rstrip("/")
        self.database_id = database_id
        self.auth_config = auth_config
        self.context = context or {}
        self.seed_database_file = seed_database_file
        self.sql_content = sql_content  # SQL content from seed file
        self.timeout = 60.0
        self.total_reward = 0.0
        self.connected = False

    def _headers(self) -> Dict[str, str]:
        """Get headers for HTTP requests"""
        headers = {
            "Content-Type": "application/json",
            "x-database-id": self.database_id,
        }
        # Add authentication token if provided
        if self.auth_config:
            auth_type = self.auth_config.get("type")
            token = self.auth_config.get("token")
            header_name = self.auth_config.get("header_name", "Authorization")

            if auth_type == "bearer":
                headers[header_name] = f"Bearer {token}"
            elif auth_type == "api_key":
                headers[header_name] = token

        # Add context headers
        if self.context and isinstance(self.context, dict):
            for key, value in self.context.items():
                # Don't add extra "x-" if key already starts with "x-"
                if key.lower().startswith("x-"):
                    header_key = key.lower().replace('_', '-')
                else:
                    header_key = f"x-{key.lower().replace('_', '-')}"
                headers[header_key] = str(value)

        return headers

    async def connect(self) -> bool:
        """Connect to OpenEnv server (no initialization needed for REST API)"""
        try:
            # For OpenEnv, we just verify the server is reachable
            self.connected = True
            logger.info(f"Connected to OpenEnv server: {self.base_url}")
            return True
        except Exception as e:
            logger.error(f"Failed to connect to OpenEnv server: {e}")
            return False

    async def reset(self) -> Dict[str, Any]:
        """Reset environment using OpenEnv /reset endpoint with custom SQL content"""
        logger.info(f"Resetting {self.base_url} (db: {self.database_id})...")
        try:
            # Build request payload
            payload = {}

            # Add sql_content if available (from user-provided seed file)
            if self.sql_content:
                payload["sql_content"] = self.sql_content
                logger.info(f"Using custom SQL ({len(self.sql_content) / 1024:.2f} KB)")

            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.base_url}/reset",
                    json=payload if payload else None,
                    headers=self._headers()
                )

                if response.status_code == 200:
                    data = response.json()
                    episode_id = data.get('metadata', {}).get('episode_id', 'N/A')
                    logger.info(f"Reset successful - Episode: {episode_id[:8] if episode_id != 'N/A' else 'N/A'}")
                    self.total_reward = 0.0
                    return {"success": True, "data": data}

                # Handle servers that don't support /reset endpoint
                if response.status_code == 404:
                    logger.warning("Server does not support /reset endpoint - continuing without reset")
                    self.total_reward = 0.0
                    return {"success": True, "data": {}, "fallback": True}

                logger.error(f"Reset failed: HTTP {response.status_code}")
                return {"success": False, "error": f"HTTP {response.status_code}"}

        except Exception as e:
            logger.warning(f"Reset failed: {e} - continuing without reset")
            self.total_reward = 0.0
            return {"success": True, "data": {}, "fallback": True}

    async def list_tools(self) -> List[Dict[str, Any]]:
        """List available tools using OpenEnv /step with ListToolsAction"""
        logger.info("Listing tools...")
        try:
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.base_url}/step",
                    json={"action_type": "ListToolsAction"},
                    headers=self._headers()
                )

                if response.status_code == 200:
                    tools = response.json().get("observation", {}).get("tools_list", [])
                    logger.info(f"Found {len(tools)} tools")
                    return tools

                logger.error(f"List tools failed: HTTP {response.status_code}")
                return []
        except Exception as e:
            logger.error(f"List tools failed: {e}")
            return []

    async def call_tool(
        self,
        tool_name: str,
        arguments: Dict[str, Any] = None,
        database_id: Optional[str] = None,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute a tool using OpenEnv /step with ToolCallAction"""
        logger.info(f"Calling {tool_name}...")
        try:
            # Build headers (override instance values if provided)
            headers = self._headers()
            if database_id:
                headers["x-database-id"] = database_id

            # Add any additional context headers (these override instance context)
            if context and isinstance(context, dict):
                for key, value in context.items():
                    # Don't add extra "x-" if key already starts with "x-"
                    if key.lower().startswith("x-"):
                        header_key = key.lower().replace('_', '-')
                    else:
                        header_key = f"x-{key.lower().replace('_', '-')}"
                    headers[header_key] = str(value)

            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.base_url}/step",
                    json={
                        "action_type": "ToolCallAction",
                        "tool_name": tool_name,
                        "arguments": arguments or {}
                    },
                    headers=headers
                )

                if response.status_code == 200:
                    data = response.json()
                    reward = data.get("reward", 0.0)
                    self.total_reward += reward
                    observation = data.get("observation", {})
                    success = observation.get("success", False)
                    logger.info(f"{tool_name} (success={success}, reward={reward})")
                    return {
                        "success": True,
                        "result": observation,
                        "reward": reward
                    }

                logger.error(f"Tool call failed: HTTP {response.status_code}")
                return {"success": False, "error": f"HTTP {response.status_code}"}
        except Exception as e:
            logger.error(f"Tool call failed: {e}")
            return {"success": False, "error": str(e)}

    async def reset_database(self, database_id: str = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Reset database using OpenEnv /reset endpoint"""
        return await self.reset()

    async def get_state(self, verify_queries: Optional[List[str]] = None) -> Dict[str, Any]:
        """Get current environment state using OpenEnv /state endpoint"""
        logger.info("GET /state")
        try:
            params = [("verify_queries", q) for q in verify_queries] if verify_queries else []

            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.get(
                    f"{self.base_url}/state",
                    params=params if params else None,
                    headers=self._headers()
                )

                if response.status_code == 200:
                    return {"success": True, "data": response.json()}

                logger.error(f"Get state failed: HTTP {response.status_code}")
                return {"success": False, "error": f"HTTP {response.status_code}"}
        except Exception as e:
            logger.error(f"Get state failed: {e}")
            return {"success": False, "error": str(e)}


class MCPClient:
    """
    MCP (Model Context Protocol) Client for JSON-RPC communication.
    Uses /mcp endpoint with JSON-RPC 2.0 protocol.
    """

    def __init__(self, base_url: str, auth_config: Optional[Dict[str, Any]] = None,
                 mcp_endpoint: str = "/mcp", database_id: Optional[str] = None,
                 context: Optional[Dict[str, Any]] = None, seed_database_file: str = "",
                 sql_content: Optional[str] = None):
        self.base_url = base_url.rstrip("/")
        self.mcp_endpoint = mcp_endpoint
        self.database_id = database_id
        self.auth_config = auth_config
        self.context = context or {}
        self.seed_database_file = seed_database_file
        self.sql_content = sql_content  # SQL content from seed file
        self.timeout = 60.0
        self.session_id = None
        self.request_id = 0
        self.connected = False

    def _next_request_id(self) -> int:
        """Get next request ID for JSON-RPC"""
        self.request_id += 1
        return self.request_id

    def _headers(self) -> Dict[str, str]:
        """Get headers for HTTP requests"""
        headers = {"Content-Type": "application/json"}

        if self.session_id:
            headers["mcp-session-id"] = self.session_id

        if self.database_id:
            headers["x-database-id"] = self.database_id

        # Add authentication
        if self.auth_config:
            auth_type = self.auth_config.get("type")
            token = self.auth_config.get("token")
            header_name = self.auth_config.get("header_name", "Authorization")

            if auth_type == "bearer":
                headers[header_name] = f"Bearer {token}"
            elif auth_type == "api_key":
                headers[header_name] = token

        # Add context headers
        if self.context and isinstance(self.context, dict):
            for key, value in self.context.items():
                # Don't add extra "x-" if key already starts with "x-"
                if key.lower().startswith("x-"):
                    header_key = key.lower().replace('_', '-')
                else:
                    header_key = f"x-{key.lower().replace('_', '-')}"
                headers[header_key] = str(value)

        return headers

    async def _send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Send JSON-RPC request"""
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_request_id(),
            "method": method,
            "params": params or {}
        }

        try:
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.base_url}{self.mcp_endpoint}",
                    json=payload,
                    headers=self._headers()
                )

                if response.status_code == 200:
                    data = response.json()
                    if "error" in data:
                        return {"success": False, "error": data["error"]}
                    return {"success": True, "result": data.get("result")}

                return {"success": False, "error": f"HTTP {response.status_code}"}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def connect(self) -> bool:
        """Initialize MCP session"""
        logger.info(f"Initializing MCP session with {self.base_url}...")
        result = await self._send_request("initialize", {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "benchmark-executor", "version": "1.0.0"}
        })

        if result.get("success"):
            # Extract session ID from response if provided
            session_info = result.get("result", {})
            if isinstance(session_info, dict) and "sessionId" in session_info:
                self.session_id = session_info["sessionId"]

            self.connected = True
            logger.info(f"MCP session initialized")
            return True

        logger.error(f"MCP initialization failed: {result.get('error')}")
        return False

    async def reset(self) -> Dict[str, Any]:
        """Reset environment (no-op for MCP, handled by reset_database)"""
        return {"success": True, "data": {}}

    async def list_tools(self) -> List[Dict[str, Any]]:
        """List available tools via MCP tools/list"""
        logger.info("Listing tools via MCP...")
        result = await self._send_request("tools/list")

        if result.get("success"):
            tools = result.get("result", {}).get("tools", [])
            logger.info(f"Found {len(tools)} tools")
            return tools

        logger.error(f"List tools failed: {result.get('error')}")
        return []

    async def call_tool(
        self,
        tool_name: str,
        arguments: Dict[str, Any] = None,
        database_id: Optional[str] = None,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute a tool via MCP tools/call"""
        logger.info(f"Calling {tool_name} via MCP...")

        # Temporarily override database_id and context if provided
        original_db = self.database_id
        original_ctx = self.context

        if database_id:
            self.database_id = database_id
        if context:
            self.context = {**self.context, **context}

        try:
            result = await self._send_request("tools/call", {
                "name": tool_name,
                "arguments": arguments or {}
            })

            if result.get("success"):
                tool_result = result.get("result", {})
                logger.info(f"{tool_name} completed")
                return {"success": True, "result": tool_result}

            logger.error(f"Tool call failed: {result.get('error')}")
            return {"success": False, "error": result.get("error")}
        finally:
            # Restore original values
            self.database_id = original_db
            self.context = original_ctx

    async def reset_database(self, database_id: str = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Reset database using MCP or fallback to /api/seed-database"""
        target_db = database_id or self.database_id
        logger.info(f"Resetting database: {target_db}")

        # Try OpenEnv /reset endpoint first (with sql_content if available)
        try:
            headers = self._headers()
            if database_id:
                headers["x-database-id"] = database_id

            if context and isinstance(context, dict):
                for key, value in context.items():
                    # Don't add extra "x-" if key already starts with "x-"
                    if key.lower().startswith("x-"):
                        header_key = key.lower().replace('_', '-')
                    else:
                        header_key = f"x-{key.lower().replace('_', '-')}"
                    headers[header_key] = str(value)

            # Build payload with sql_content if available
            payload = {}
            if self.sql_content:
                payload["sql_content"] = self.sql_content
                logger.info(f"Using custom SQL ({len(self.sql_content) / 1024:.2f} KB)")

            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.base_url}/reset",
                    json=payload if payload else None,
                    headers=headers
                )

                if response.status_code == 200:
                    logger.info(f"Database reset successful via /reset")
                    return {"success": True}
        except Exception as e:
            logger.debug(f"OpenEnv /reset not available: {e}")

        # Fallback: Use /api/seed-database endpoint
        logger.info(f"Attempting database reset via /api/seed-database...")
        try:
            # Use sql_content if available, otherwise get sample SQL
            if self.sql_content:
                sql_content = self.sql_content
                logger.info(f"Using custom SQL from seed file")
            else:
                async with httpx.AsyncClient(timeout=60) as client:
                    response = await client.get(f"{self.base_url}/api/sample-data")
                    if response.status_code != 200:
                        return {"success": False, "error": "Failed to fetch sample SQL"}

                    data = response.json()
                    sql_content = data.get('sql_content') or data.get('text') or data

            # Seed database
            async with httpx.AsyncClient(timeout=max(120, int(120 + len(sql_content) / 102400))) as client:
                payload = {
                    "database_id": target_db,
                    "name": f"Reset DB {datetime.now().strftime('%Y%m%d_%H%M%S')}",
                    "sql_content": sql_content
                }

                response = await client.post(
                    f"{self.base_url}/api/seed-database",
                    json=payload,
                    headers={"Content-Type": "application/json"}
                )

                if response.status_code == 200:
                    logger.info(f"Database reset successful via /api/seed-database")
                    return {"success": True}

                return {"success": False, "error": f"Seed failed: HTTP {response.status_code}"}
        except Exception as e:
            logger.error(f"Database reset failed: {e}")
            return {"success": False, "error": str(e)}

    async def get_state(self, verify_queries: Optional[List[str]] = None) -> Dict[str, Any]:
        """Get state (MCP doesn't have native state endpoint, return empty)"""
        return {"success": True, "data": {}}

print("Protocol clients (OpenEnv + MCP) defined")

Protocol clients (OpenEnv + MCP) defined


## 6. LLM Client

Unified LLM interface supporting multiple providers (Anthropic Claude, OpenAI GPT, Google Gemini) with LangChain integration.

In [None]:
# ============================================================================
# LLM CLIENT (Unified interface for Anthropic, OpenAI, Google)
# ============================================================================

class LLMClient:
    """
    Unified LLM client supporting multiple providers.
    Uses LangChain for consistent tool calling interface.
    """

    def __init__(self, provider: str, model: str, api_key: str,
                 temperature: float = 0.0, max_tokens: int = 4096):
        self.provider = provider.lower()
        self.model = model
        self.api_key = api_key
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.llm = None

        self._initialize_llm()

    def _initialize_llm(self):
        """Initialize LLM based on provider"""
        try:
            if self.provider == "anthropic":
                from langchain_anthropic import ChatAnthropic
                self.llm = ChatAnthropic(
                    model=self.model,
                    anthropic_api_key=self.api_key,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens
                )
            elif self.provider == "openai":
                from langchain_openai import ChatOpenAI
                self.llm = ChatOpenAI(
                    model=self.model,
                    openai_api_key=self.api_key,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens
                )
            elif self.provider == "google":
                from langchain_google_genai import ChatGoogleGenerativeAI
                self.llm = ChatGoogleGenerativeAI(
                    model=self.model,
                    google_api_key=self.api_key,
                    temperature=self.temperature,
                    max_tokens=self.max_tokens
                )
            else:
                raise ValueError(f"Unsupported LLM provider: {self.provider}")

            logger.info(f"Initialized {self.provider} LLM: {self.model}")

        except ImportError as e:
            logger.error(f"Failed to import LangChain provider for {self.provider}: {e}")
            raise

    def _convert_mcp_tools_to_langchain(self, mcp_tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Convert MCP tool definitions to LangChain format, filtering incompatible schemas"""
        langchain_tools = []

        for tool in mcp_tools:
            input_schema = tool.get("inputSchema", {
                "type": "object",
                "properties": {},
                "required": []
            })

            # Clean schema: remove oneOf, allOf, anyOf at top level (Anthropic doesn't support them)
            cleaned_schema = self._clean_json_schema(input_schema)

            tool_def = {
                "type": "function",
                "function": {
                    "name": tool["name"],
                    "description": tool.get("description", ""),
                    "parameters": cleaned_schema
                }
            }
            langchain_tools.append(tool_def)

        return langchain_tools

    def _clean_json_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]:
        """Clean JSON schema to be compatible with Anthropic API

        Removes oneOf, allOf, anyOf at top level and converts to simple object schema.
        """
        if not isinstance(schema, dict):
            return {
                "type": "object",
                "properties": {},
                "required": []
            }

        # If schema has oneOf/allOf/anyOf at top level, extract the first valid object schema
        if "oneOf" in schema:
            logger.debug(f"Schema has oneOf at top level, extracting first object schema")
            for option in schema["oneOf"]:
                if isinstance(option, dict) and option.get("type") == "object":
                    schema = option
                    break
            else:
                # No object schema found, return empty
                return {
                    "type": "object",
                    "properties": {},
                    "required": []
                }

        if "allOf" in schema:
            logger.debug(f"Schema has allOf at top level, merging schemas")
            merged_schema = {"type": "object", "properties": {}, "required": []}
            for sub_schema in schema["allOf"]:
                if isinstance(sub_schema, dict):
                    if "properties" in sub_schema:
                        merged_schema["properties"].update(sub_schema["properties"])
                    if "required" in sub_schema:
                        merged_schema["required"].extend(sub_schema["required"])
            schema = merged_schema

        if "anyOf" in schema:
            logger.debug(f"Schema has anyOf at top level, extracting first object schema")
            for option in schema["anyOf"]:
                if isinstance(option, dict) and option.get("type") == "object":
                    schema = option
                    break
            else:
                return {
                    "type": "object",
                    "properties": {},
                    "required": []
                }

        # Ensure schema has required fields
        if "type" not in schema:
            schema["type"] = "object"

        if schema["type"] == "object" and "properties" not in schema:
            schema["properties"] = {}

        return schema

    async def invoke_with_tools(
        self,
        messages: List[Any],
        tools: List[Dict[str, Any]]
    ) -> Any:
        """Invoke LLM with tools"""
        from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage

        # Convert MCP tools to LangChain format
        langchain_tools = self._convert_mcp_tools_to_langchain(tools)

        # Bind tools to LLM
        llm_with_tools = self.llm.bind_tools(langchain_tools)

        # Invoke
        logger.info(f"Invoking {self.provider} LLM with {len(tools)} tools")
        response = await llm_with_tools.ainvoke(messages)

        return response

## 7. Verifier Engine

Comprehensive verification engine supporting multiple verifier types: database_state (SQL validation), response_check (LLM-as-judge), and tool_execution (tool usage validation).

In [None]:
# ============================================================================
# VERIFIER ENGINE
# ============================================================================

class VerifierEngine:
    """
    Verifier engine for validating benchmark results.
    Supports: database_state, response_check, tool_execution
    """

    def __init__(self, protocol_client, llm_client: LLMClient, execution_mode: str = "openenv"):
        self.protocol_client = protocol_client
        self.llm_client = llm_client
        self.execution_mode = execution_mode

    async def execute_verifier(
        self,
        verifier: VerifierConfig,
        model_response: Dict[str, Any],
        database_id: str,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute a single verifier"""
        logger.info(f"Executing verifier: {verifier.verifier_type}")

        if verifier.verifier_type == "database_state":
            return await self._execute_database_state_verifier(
                verifier.validation_config, database_id, context
            )
        elif verifier.verifier_type == "response_check":
            return await self._execute_response_check_verifier(
                verifier.validation_config, model_response, database_id, context
            )
        elif verifier.verifier_type == "tool_execution":
            return await self._execute_tool_execution_verifier(
                verifier.validation_config, model_response
            )
        else:
            return {
                "passed": False,
                "error": f"Unsupported verifier type: {verifier.verifier_type}"
            }

    async def _execute_database_state_verifier(
        self,
        validation_config: Dict[str, Any],
        database_id: str,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute database state verifier"""
        sql_query = validation_config.get("query")
        expected_value = validation_config.get("expected_value")
        comparison_type = validation_config.get("comparison_type", "equals")

        if not sql_query:
            return {"passed": False, "error": "No SQL query provided"}

        logger.info(f"Executing SQL query: {sql_query}")

        # Execute SQL query via MCP
        result = await self._execute_sql_query(sql_query, database_id, context)

        if not result["success"]:
            return {
                "passed": False,
                "error": f"SQL query failed: {result.get('error')}",
                "query": sql_query
            }

        # Extract value from result
        actual_value = self._extract_value_from_sql_result(result)

        logger.info(f"SQL result - Expected: {expected_value}, Actual: {actual_value}")

        # Compare values
        comparison_result = self._compare_values(actual_value, expected_value, comparison_type)

        return {
            "passed": comparison_result["passed"],
            "expected": expected_value,
            "actual": actual_value,
            "comparison_type": comparison_type,
            "query": sql_query,
            "details": comparison_result.get("details")
        }

    async def _execute_response_check_verifier(
        self,
        validation_config: Dict[str, Any],
        model_response: Dict[str, Any],
        database_id: str,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute response check verifier using LLM-as-judge"""
        sql_query = validation_config.get("sql_query")
        comparison_prompt = validation_config.get("comparison_prompt")
        minimum_comparison_value = validation_config.get("minimum_comparison_value", 7)

        if not sql_query or not comparison_prompt:
            return {
                "passed": False,
                "error": "Missing sql_query or comparison_prompt"
            }

        # Execute SQL query
        sql_result = await self._execute_sql_query(sql_query, database_id, context)

        if not sql_result["success"]:
            return {
                "passed": False,
                "error": f"SQL query failed: {sql_result.get('error')}"
            }

        # Extract LLM response text
        llm_response_text = self._extract_llm_content(model_response)

        # Use LLM as judge
        judge_result = await self._compare_with_llm(
            sql_result,
            llm_response_text,
            comparison_prompt,
            minimum_comparison_value
        )

        return judge_result

    async def _execute_tool_execution_verifier(
        self,
        validation_config: Dict[str, Any],
        model_response: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Execute tool execution verifier"""
        expected_tools = validation_config.get("expected_tools", [])
        minimum_tool_calls = validation_config.get("minimum_tool_calls", 1)

        # Extract tools called from model response
        tools_called = []
        if "tool_calls" in model_response and model_response["tool_calls"]:
            tools_called = [tc["name"] for tc in model_response["tool_calls"]]

        logger.info(f"Expected tools: {expected_tools}, Called: {tools_called}")

        # Check if expected tools were called
        missing_tools = [tool for tool in expected_tools if tool not in tools_called]

        # Check minimum tool calls
        passed = (
            len(missing_tools) == 0 and
            len(tools_called) >= minimum_tool_calls
        )

        return {
            "passed": passed,
            "expected_tools": expected_tools,
            "tools_called": tools_called,
            "missing_tools": missing_tools,
            "minimum_tool_calls": minimum_tool_calls,
            "actual_tool_calls": len(tools_called)
        }

    async def _execute_sql_query(
        self,
        query: str,
        database_id: str,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute SQL query via OpenEnv /state or MCP /api/sql-runner"""
        try:
            logger.info(f"Executing SQL query: {query}")

            if self.execution_mode == "openenv":
                # Use OpenEnv /state endpoint with verify_queries parameter
                state_result = await self.protocol_client.get_state(verify_queries=[query])

                if not state_result.get("success"):
                    return {
                        "success": False,
                        "error": state_result.get("error", "State query failed")
                    }

                # Extract verification result
                state_data = state_result.get("data", {})
                verification_results = state_data.get("verification_results", [])

                if not verification_results or len(verification_results) == 0:
                    return {
                        "success": False,
                        "error": "No verification results returned from /state"
                    }

                # Get the first (and only) verification result
                query_result = verification_results[0]

                # Check if query itself failed
                if "error" in query_result:
                    return {
                        "success": False,
                        "error": query_result.get("error", "Query execution failed")
                    }

                # Return the query result
                return {
                    "success": True,
                    "result": query_result.get("result", [])
                }

            else:  # MCP mode
                # Use direct HTTP call to /api/sql-runner endpoint
                headers = {
                    "Content-Type": "application/json",
                    "x-database-id": database_id
                }

                # Add context headers
                if context and isinstance(context, dict):
                    for key, value in context.items():
                        header_key = f"x-{key.lower().replace('_', '-')}"
                        headers[header_key] = str(value)

                # Add session ID if available
                if hasattr(self.protocol_client, 'session_id') and self.protocol_client.session_id:
                    headers["mcp-session-id"] = self.protocol_client.session_id

                payload = {"query": query}

                async with httpx.AsyncClient(timeout=30.0) as client:
                    response = await client.post(
                        f"{self.protocol_client.base_url}/api/sql-runner",
                        json=payload,
                        headers=headers
                    )

                    if response.status_code == 200:
                        result_data = response.json()
                        return {"success": True, "result": result_data}

                    return {
                        "success": False,
                        "error": f"SQL runner failed: HTTP {response.status_code}"
                    }

        except Exception as e:
            logger.error(f"SQL query execution failed: {e}")
            return {"success": False, "error": str(e)}

    def _extract_value_from_sql_result(self, result: dict) -> Any:
        """Extract the actual value from SQL query result (matches production implementation)"""
        if not result:
            return None

        # If the result itself is not successful, check for error content
        if not result.get("success"):
            # Try to extract error message from MCP response format
            result_data = result.get("result", {})
            if isinstance(result_data, dict) and "content" in result_data:
                content = result_data["content"]
                if isinstance(content, list) and len(content) > 0:
                    for item in content:
                        if isinstance(item, dict) and item.get("type") == "text":
                            return item.get("text", "Error")
            return None

        result_data = result.get("result", {})

        # Handle OpenEnv format: result is directly a list like [{'count': 1}]
        if isinstance(result_data, list):
            if len(result_data) > 0:
                # If single row with single column, return the value directly
                if len(result_data) == 1 and isinstance(result_data[0], dict) and len(result_data[0]) == 1:
                    return list(result_data[0].values())[0]
                # If single row with multiple columns, return the row dict
                elif len(result_data) == 1:
                    return result_data[0]
                # Multiple rows, return the full result
                else:
                    return result_data
            return result_data

        # Handle different result formats from MCP sql-runner
        if isinstance(result_data, dict):
            # If result has 'data' field (common format)
            if "data" in result_data:
                data = result_data["data"]
                if isinstance(data, list) and len(data) > 0:
                    # If single row with single column, return the value directly
                    if len(data) == 1 and isinstance(data[0], dict) and len(data[0]) == 1:
                        return list(data[0].values())[0]
                    # If single row with multiple columns, return the row dict
                    elif len(data) == 1:
                        return data[0]
                    # Multiple rows, return the full result
                    else:
                        return data
                return data

            # If result has 'rows' field
            elif "rows" in result_data:
                rows = result_data["rows"]
                if isinstance(rows, list) and len(rows) > 0:
                    # Single value from single row
                    if len(rows) == 1 and isinstance(rows[0], dict) and len(rows[0]) == 1:
                        return list(rows[0].values())[0]
                    # Single row as list
                    elif len(rows) == 1 and isinstance(rows[0], list) and len(rows[0]) == 1:
                        return rows[0][0]
                    # Single row (dict or list)
                    elif len(rows) == 1:
                        return rows[0]
                    # Multiple rows
                    else:
                        return rows
                return rows

            # If result has 'content' field (MCP error format)
            elif "content" in result_data:
                content = result_data["content"]
                if isinstance(content, list) and len(content) > 0:
                    # Extract text from content array
                    for item in content:
                        if isinstance(item, dict) and item.get("type") == "text":
                            return item.get("text", result_data)
                return content

            # Direct result format (nested 'result' field)
            elif "result" in result_data:
                return result_data["result"]

        # Return as-is if we can't extract anything specific
        return result_data

    def _compare_values(
        self,
        actual: Any,
        expected: Any,
        comparison_type: str
    ) -> Dict[str, Any]:
        """Compare actual vs expected values"""
        try:
            if comparison_type == "equals":
                passed = actual == expected
            elif comparison_type == "greater_than":
                passed = actual > expected
            elif comparison_type == "less_than":
                passed = actual < expected
            elif comparison_type == "contains":
                passed = expected in str(actual)
            else:
                return {
                    "passed": False,
                    "details": f"Unknown comparison type: {comparison_type}"
                }

            return {
                "passed": passed,
                "details": f"Comparison {comparison_type}: {actual} vs {expected}"
            }

        except Exception as e:
            return {
                "passed": False,
                "details": f"Comparison error: {e}"
            }

    def _extract_llm_content(self, model_response: Dict[str, Any]) -> str:
        """Extract text content from LLM response"""
        if "content" in model_response:
            return str(model_response["content"])
        elif "text" in model_response:
            return str(model_response["text"])
        elif "response" in model_response:
            return str(model_response["response"])

        return str(model_response)

    async def _compare_with_llm(
        self,
        sql_result: Dict[str, Any],
        llm_response: str,
        comparison_prompt: str,
        minimum_score: int
    ) -> Dict[str, Any]:
        """Use LLM as judge to compare SQL result with LLM response"""
        from langchain_core.messages import SystemMessage, HumanMessage

        # Build judge prompt
        system_prompt = """You are an AI judge evaluating the quality and accuracy of an AI assistant's response.
Compare the database query result with the AI's response and rate how well they match.
Provide a score from 1-10 where:
- 1-3: Poor match, incorrect or missing information
- 4-6: Partial match, some correct information
- 7-8: Good match, mostly correct
- 9-10: Excellent match, fully accurate

Respond with ONLY a JSON object in this format:
{
  "score": <number 1-10>,
  "reasoning": "<brief explanation>"
}"""

        sql_result_str = json.dumps(sql_result.get("result", {}), indent=2)

        user_prompt = f"""Database Query Result:
{sql_result_str}

AI Assistant Response:
{llm_response}

Comparison Task:
{comparison_prompt}

Please provide your judgment as JSON."""

        messages = [
            SystemMessage(content=system_prompt),
            HumanMessage(content=user_prompt)
        ]

        try:
            response = await self.llm_client.llm.ainvoke(messages)
            response_text = response.content

            # Parse JSON response
            # Try to extract JSON from markdown code blocks
            if "```json" in response_text:
                response_text = response_text.split("```json")[1].split("```")[0].strip()
            elif "```" in response_text:
                response_text = response_text.split("```")[1].split("```")[0].strip()

            judge_result = json.loads(response_text)
            score = judge_result.get("score", 0)
            reasoning = judge_result.get("reasoning", "")

            passed = score >= minimum_score

            return {
                "passed": passed,
                "score": score,
                "minimum_score": minimum_score,
                "reasoning": reasoning,
                "sql_result": sql_result_str,
                "llm_response": llm_response
            }

        except Exception as e:
            logger.error(f"LLM judge comparison failed: {e}")
            return {
                "passed": False,
                "error": f"Judge comparison failed: {e}"
            }

## 8. Benchmark Executor

Main orchestration engine that coordinates the entire benchmark execution: environment setup, LLM invocation, tool execution, and verification.

In [None]:
# ============================================================================
# BENCHMARK EXECUTOR
# ============================================================================

class BenchmarkExecutor:
    """
    Main benchmark executor that orchestrates the complete flow:
    1. Load tools from OpenEnv server
    2. Send prompts to LLM
    3. Execute tool calls via OpenEnv /step
    4. Loop until completion
    5. Run verifiers
    """

    def __init__(self, config: BenchmarkConfig):
        self.config = config
        self.protocol_client = None
        self.llm_client = None
        self.verifier_engine = None
        self.available_tools = []
        self.auto_created_database = False  # Track if we created the database
        self.execution_mode = config.execution_mode if hasattr(config, 'execution_mode') else "openenv"

    async def initialize(self):
        """Initialize all clients"""
        logger.info(f"Initializing benchmark executor in {self.execution_mode.upper()} mode...")

        # Initialize protocol client based on execution mode
        if self.execution_mode == "openenv":
            self.protocol_client = OpenEnvClient(
                self.config.gym_enviornment_url,
                auth_config=self.config.auth_config,
                mcp_endpoint=self.config.mcp_endpoint,
                database_id=self.config.database_id,
                context=self.config.context,
                seed_database_file=self.config.seed_database_file
            )
        else:  # mcp mode
            self.protocol_client = MCPClient(
                self.config.gym_enviornment_url,
                auth_config=self.config.auth_config,
                mcp_endpoint=self.config.mcp_endpoint,
                database_id=self.config.database_id,
                context=self.config.context,
                seed_database_file=self.config.seed_database_file
            )

        connected = await self.protocol_client.connect()
        if not connected:
            raise Exception(f"Failed to connect to {self.execution_mode.upper()} server")

        # Reset the environment
        reset_result = await self.protocol_client.reset()
        if not reset_result.get("success") and not reset_result.get("fallback"):
            logger.warning("Failed to reset environment, continuing anyway")

        # Load available tools
        self.available_tools = await self.protocol_client.list_tools()
        logger.info(f"Loaded {len(self.available_tools)} tools from {self.execution_mode.upper()} server")

        # Apply tool restrictions if configured
        if self.config.restricted_tools:
            self.available_tools = [
                tool for tool in self.available_tools
                if tool["name"] not in self.config.restricted_tools
            ]
            logger.info(f"Applied tool restrictions. {len(self.available_tools)} tools available")

        # Initialize LLM client
        self.llm_client = LLMClient(
            self.config.llm_provider,
            self.config.llm_model,
            self.config.llm_api_key,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )

        # Initialize verifier engine
        self.verifier_engine = VerifierEngine(
            self.protocol_client,
            self.llm_client,
            self.execution_mode
        )

        logger.info("Initialization complete")

    async def execute_single_run(self, run_number: int) -> Dict[str, Any]:
        """Execute a single benchmark run"""
        logger.info(f"\n{'='*80}")
        logger.info(f"STARTING RUN {run_number}/{self.config.number_of_runs}")
        logger.info(f"{'='*80}\n")

        start_time = datetime.now(timezone.utc)

        # Reset database if configured
        if run_number > 1 and self.config.reset_database_between_runs:
            logger.info("Resetting environment between runs...")
            # database_id and context are already set in protocol_client
            await self.protocol_client.reset()

        # Execute the main task (LLM + tool calling loop)
        task_result = await self._execute_task()

        # Run verifiers
        verification_results = await self._run_verifiers(task_result)

        # Calculate execution time
        execution_time_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)

        # Determine overall success
        overall_success = all(
            v["passed"] for v in verification_results.values()
        )

        # Calculate verification summary
        total_verifiers = len(verification_results)
        passed_verifiers = sum(1 for v in verification_results.values() if v.get("passed", False))
        failed_verifiers = total_verifiers - passed_verifiers

        result = {
            "run_number": run_number,
            "started_at": start_time.isoformat(),
            "execution_time_ms": execution_time_ms,
            "model_response": task_result.get("final_response"),
            "conversation_flow": task_result.get("conversation_flow", []),
            "tools_used": task_result.get("tools_used", []),
            "tool_results": task_result.get("tool_results", []),
            "verification_results": verification_results,
            "verification_summary": {
                "total": total_verifiers,
                "passed": passed_verifiers,
                "failed": failed_verifiers,
                "pass_rate": passed_verifiers / total_verifiers if total_verifiers > 0 else 0.0
            },
            "overall_success": overall_success
        }

        logger.info(f"\nRUN {run_number} COMPLETED")
        logger.info(f"Verification: {passed_verifiers}/{total_verifiers} passed ({passed_verifiers/total_verifiers*100:.1f}%)")
        logger.info(f"Overall Success: {overall_success}")
        logger.info(f"Execution time: {execution_time_ms}ms")
        logger.info(f"Tools used: {', '.join(task_result.get('tools_used', []))}")

        return result

    async def _execute_task(self) -> Dict[str, Any]:
        """Execute the main task with LLM + tool calling loop"""
        from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage

        # Build initial messages
        messages = [
            SystemMessage(content=self.config.system_prompt),
            HumanMessage(content=self.config.user_prompt)
        ]

        conversation_flow = []
        tools_used = []
        tool_results = []
        max_iterations = 20  # Prevent infinite loops

        for iteration in range(max_iterations):
            logger.info(f"\n--- Iteration {iteration + 1} ---")

            # Invoke LLM with tools
            response = await self.llm_client.invoke_with_tools(
                messages,
                self.available_tools
            )

            # Add AI response to conversation
            messages.append(response)
            conversation_flow.append({
                "type": "ai_message",
                "content": response.content,
                "tool_calls": [
                    {
                        "name": tc["name"],
                        "args": tc["args"]
                    }
                    for tc in (response.tool_calls or [])
                ]
            })

            logger.info(f"LLM Response: {response.content}")

            # Check if LLM wants to call tools
            if not response.tool_calls or len(response.tool_calls) == 0:
                logger.info("No tool calls requested. Task complete.")
                break

            # Execute tool calls
            for tool_call in response.tool_calls:
                tool_name = tool_call["name"]
                tool_args = tool_call["args"]

                logger.info(f"Executing tool: {tool_name}")
                logger.debug(f"Tool arguments: {tool_args}")

                # Execute tool via protocol client (database_id and context already set in client)
                tool_result = await self.protocol_client.call_tool(
                    tool_name,
                    tool_args
                )

                logger.info(f"Tool result success: {tool_result.get('success')}")

                # Track tools used
                if tool_name not in tools_used:
                    tools_used.append(tool_name)

                tool_results.append({
                    "tool_name": tool_name,
                    "arguments": tool_args,
                    "result": tool_result
                })

                # Add tool result to conversation
                tool_message = ToolMessage(
                    content=json.dumps(tool_result.get("result", {})),
                    tool_call_id=tool_call.get("id", "")
                )
                messages.append(tool_message)

                conversation_flow.append({
                    "type": "tool_result",
                    "tool_name": tool_name,
                    "result": tool_result
                })

        return {
            "final_response": messages[-1].content if messages else "",
            "conversation_flow": conversation_flow,
            "tools_used": tools_used,
            "tool_results": tool_results,
            "messages": messages
        }

    async def _run_verifiers(self, task_result: Dict[str, Any]) -> Dict[str, Any]:
        """Run all configured verifiers"""
        logger.info("\n--- Running Verifiers ---")

        verification_results = {}

        for i, verifier_config in enumerate(self.config.verifiers):
            verifier = VerifierConfig(**verifier_config)
            verifier_name = verifier.name or f"verifier_{i+1}"

            logger.info(f"Running verifier: {verifier_name} ({verifier.verifier_type})")

            model_response = {
                "content": task_result.get("final_response", ""),
                "tool_calls": [
                    {"name": tr["tool_name"], "args": tr["arguments"]}
                    for tr in task_result.get("tool_results", [])
                ]
            }

            result = await self.verifier_engine.execute_verifier(
                verifier,
                model_response,
                self.config.database_id,
                self.config.context
            )

            verification_results[verifier_name] = result

            logger.info(f"Verifier result: {'PASSED' if result.get('passed') else 'FAILED'}")
            if not result.get("passed"):
                logger.warning(f"Failure reason: {result.get('error') or result.get('details')}")

        return verification_results

    async def execute_benchmark(self) -> Dict[str, Any]:
        """Execute complete benchmark with multiple runs"""
        logger.info(f"\n{'='*80}")
        logger.info(f"STARTING BENCHMARK EXECUTION")
        logger.info(f"Mode: {self.execution_mode.upper()}")
        logger.info(f"Model: {self.config.llm_provider}/{self.config.llm_model}")
        logger.info(f"Number of runs: {self.config.number_of_runs}")
        logger.info(f"{'='*80}\n")

        # Create database: use seed_database_file if provided, otherwise use gym's api/sample-data
        seed_file = self.config.seed_database_file
        sql_content = None  # Will store SQL content for resets

        if seed_file and seed_file.strip():
            # Use custom SQL file
            logger.info(f"Using seed database file: {seed_file}")
            config_dir = os.path.dirname(os.path.abspath("config.json")) if os.path.exists("config.json") else ""
            created_db_id = create_database_from_file(
                self.config.gym_enviornment_url,
                seed_file,
                config_dir
            )

            # Read and store SQL content for reset operations
            sql_file_path = seed_file if os.path.isabs(seed_file) else os.path.join(config_dir, seed_file)
            if os.path.exists(sql_file_path):
                with open(sql_file_path, 'r', encoding='utf-8') as f:
                    sql_content = f.read()
        else:
            # Use gym's api/sample-data
            logger.info("Using gym's api/sample-data for database creation...")
            created_db_id = create_database(self.config.gym_enviornment_url)

        if created_db_id:
            self.config.database_id = created_db_id
            self.auto_created_database = True
            logger.info(f"Using database: {created_db_id}")
        else:
            logger.error("Failed to create database. Cannot proceed.")
            raise RuntimeError("Database creation failed")

        try:
            # Update initialize() to pass sql_content to protocol clients
            await self.initialize()

            # Store sql_content in protocol_client for resets
            if sql_content and self.protocol_client:
                self.protocol_client.sql_content = sql_content

            # Reset already done in initialize(), but log it
            logger.info("Environment initialized and reset")

            all_runs = []

            for run_number in range(1, self.config.number_of_runs + 1):
                try:
                    run_result = await self.execute_single_run(run_number)
                    all_runs.append(run_result)
                except Exception as e:
                    logger.error(f"Run {run_number} failed with error: {e}")
                    all_runs.append({
                        "run_number": run_number,
                        "error": str(e),
                        "overall_success": False
                    })

            # Calculate statistics
            statistics = self._calculate_statistics(all_runs)

            result = {
                "benchmark_config": {
                    "execution_mode": self.execution_mode,
                    "model": f"{self.config.llm_provider}/{self.config.llm_model}",
                    "number_of_runs": self.config.number_of_runs,
                    "user_prompt": self.config.user_prompt,
                    "database_id": self.config.database_id,
                    "seed_database_file": seed_file or "(gym's api/sample-data)",
                    "auto_created_database": self.auto_created_database
                },
                "runs": all_runs,
                "statistics": statistics
            }

            logger.info(f"\n{'='*80}")
            logger.info(f"BENCHMARK COMPLETED")
            logger.info(f"Overall Success Rate: {statistics['overall_success_rate']:.1%} ({statistics['successful_runs']}/{statistics['total_runs']} runs)")
            logger.info(f"Verifier Pass Rate: {statistics['verifier_level_pass_rate']:.1%} ({statistics['total_verifiers_passed']}/{statistics['total_verifiers_checked']} verifiers)")
            logger.info(f"Pass@1: {statistics['pass_at_1']:.1%}")
            logger.info(f"Mean execution time: {statistics['mean_execution_time_ms']:.0f}ms")
            logger.info(f"\nIndividual Verifier Statistics:")
            for verifier_name, stats in statistics.get('individual_verifier_stats', {}).items():
                logger.info(f"  - {verifier_name}: {stats['pass_rate']:.1%} ({stats['passed']}/{stats['total']})")
            logger.info(f"{'='*80}\n")

            return result

        finally:
            # Cleanup: Delete auto-created database
            if self.auto_created_database and self.config.database_id:
                logger.info(f"\nCleaning up auto-created database...")
                print("DATABASE_ID: ", self.config.database_id)
                # delete_database(self.config.gym_enviornment_url, self.config.database_id)

    def _calculate_statistics(self, runs: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Calculate benchmark statistics"""
        successful_runs = [r for r in runs if r.get("overall_success")]
        total_runs = len(runs)

        # Overall success rate (all verifiers must pass)
        overall_success_rate = len(successful_runs) / total_runs if total_runs > 0 else 0

        # Pass@1: success on first run
        pass_at_1 = 1.0 if runs and runs[0].get("overall_success") else 0.0

        # Verifier-level statistics
        total_verifiers_count = 0
        passed_verifiers_count = 0
        verifier_pass_rates = {}

        for run in runs:
            if "verification_summary" in run:
                total_verifiers_count += run["verification_summary"]["total"]
                passed_verifiers_count += run["verification_summary"]["passed"]

            # Track individual verifier pass rates
            for verifier_name, result in run.get("verification_results", {}).items():
                if verifier_name not in verifier_pass_rates:
                    verifier_pass_rates[verifier_name] = {"passed": 0, "total": 0}
                verifier_pass_rates[verifier_name]["total"] += 1
                if result.get("passed", False):
                    verifier_pass_rates[verifier_name]["passed"] += 1

        # Calculate pass rate for each verifier
        verifier_stats = {}
        for verifier_name, counts in verifier_pass_rates.items():
            verifier_stats[verifier_name] = {
                "passed": counts["passed"],
                "total": counts["total"],
                "pass_rate": counts["passed"] / counts["total"] if counts["total"] > 0 else 0.0
            }

        # Overall verifier pass rate
        verifier_level_pass_rate = passed_verifiers_count / total_verifiers_count if total_verifiers_count > 0 else 0

        # Mean execution time
        execution_times = [
            r.get("execution_time_ms", 0)
            for r in runs
            if "execution_time_ms" in r
        ]
        mean_time = sum(execution_times) / len(execution_times) if execution_times else 0

        # Tool usage
        all_tools = []
        for run in runs:
            all_tools.extend(run.get("tools_used", []))

        tool_counts = {}
        for tool in all_tools:
            tool_counts[tool] = tool_counts.get(tool, 0) + 1

        return {
            "total_runs": total_runs,
            "successful_runs": len(successful_runs),
            "overall_success_rate": overall_success_rate,
            "pass_at_1": pass_at_1,
            "verifier_level_pass_rate": verifier_level_pass_rate,
            "total_verifiers_checked": total_verifiers_count,
            "total_verifiers_passed": passed_verifiers_count,
            "individual_verifier_stats": verifier_stats,
            "mean_execution_time_ms": mean_time,
            "tool_usage": tool_counts
        }

## 9. Configuration Loader & Main Execution

Configuration loading utility and main execution function to run the complete benchmark.

In [None]:
# ============================================================================
# CONFIGURATION LOADER
# ============================================================================

def load_config_from_dict(config_dict: Dict[str, Any]) -> BenchmarkConfig:
    """Load configuration from CONFIG dictionary (no external file needed)"""
    try:
        config_data = config_dict.copy()

        # Remove comment fields (any key starting with underscore)
        config_data = {k: v for k, v in config_data.items() if not k.startswith("_")}

        # Clean verifiers - remove _description fields
        if "verifiers" in config_data and config_data["verifiers"]:
            cleaned_verifiers = []
            for verifier in config_data["verifiers"]:
                cleaned_verifier = {k: v for k, v in verifier.items() if not k.startswith("_")}
                cleaned_verifiers.append(cleaned_verifier)
            config_data["verifiers"] = cleaned_verifiers

        # Validate required fields
        required_fields = [
            "gym_enviornment_url",
            "seed_database_file",
            "system_prompt",
            "user_prompt",
            "llm_model",
            "llm_provider",
            "llm_api_key"
        ]

        for field in required_fields:
            if field not in config_data:
                raise ValueError(f"Missing required field in CONFIG: {field}")

        # Set defaults
        config_data.setdefault("mcp_endpoint", "/mcp")
        config_data.setdefault("verifiers", [])
        config_data.setdefault("number_of_runs", 1)
        config_data.setdefault("context", {})
        config_data.setdefault("temperature", 0.0)
        config_data.setdefault("max_tokens", 4096)
        config_data.setdefault("reset_database_between_runs", True)
        config_data.setdefault("execution_mode", "openenv")

        # Validate execution_mode
        if config_data["execution_mode"] not in ["openenv", "mcp"]:
            raise ValueError(f"Invalid execution_mode: {config_data['execution_mode']}. Must be 'openenv' or 'mcp'")

        return BenchmarkConfig(**config_data)

    except Exception as e:
        logger.error(f"Failed to load configuration: {e}")
        raise


# ============================================================================
# MAIN ENTRY POINT (Uses CONFIG from cell 5)
# ============================================================================

async def main(config_dict: Dict[str, Any] = None):
    """Main entry point - uses CONFIG dictionary from notebook"""
    try:
        # Use provided config_dict or expect CONFIG to be defined globally
        if config_dict is None:
            if 'CONFIG' not in globals():
                raise ValueError("CONFIG dictionary not found. Please define it in cell 5.")
            config_dict = CONFIG

        # Load configuration from dictionary
        config = load_config_from_dict(config_dict)

        # Create executor
        executor = BenchmarkExecutor(config)

        # Execute benchmark
        result = await executor.execute_benchmark()

        # Save results to file
        output_file = f"benchmark_results_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
        with open(output_file, 'w') as f:
            json.dump(result, f, indent=2, default=str)

        logger.info(f"\nResults saved to: {output_file}")

        # Print summary
        print("\n" + "="*80)
        print("BENCHMARK SUMMARY")
        print("="*80)
        print(f"Execution Mode: {result['benchmark_config']['execution_mode'].upper()}")
        print(f"Model: {result['benchmark_config']['model']}")
        print(f"Runs: {result['statistics']['total_runs']}")
        print(f"Overall Success Rate: {result['statistics']['overall_success_rate']:.1%} ({result['statistics']['successful_runs']}/{result['statistics']['total_runs']} runs)")
        print(f"Verifier Pass Rate: {result['statistics']['verifier_level_pass_rate']:.1%} ({result['statistics']['total_verifiers_passed']}/{result['statistics']['total_verifiers_checked']} verifiers)")
        print(f"Pass@1: {result['statistics']['pass_at_1']:.1%}")
        print(f"Mean Execution Time: {result['statistics']['mean_execution_time_ms']:.0f}ms")
        print("\nIndividual Verifier Statistics:")
        for verifier_name, stats in result['statistics'].get('individual_verifier_stats', {}).items():
            print(f"  - {verifier_name}: {stats['pass_rate']:.1%} ({stats['passed']}/{stats['total']})")
        print("\nTool Usage:")
        for tool, count in result['statistics']['tool_usage'].items():
            print(f"  - {tool}: {count} calls")
        print("="*80)

        return result

    except Exception as e:
        logger.error(f"Benchmark execution failed: {e}", exc_info=True)
        raise

print("Configuration loader and main function ready")

Configuration loader and main function ready


## Execute Benchmark

Run the cell below to execute the benchmark with your configuration.

In [None]:
# Execute benchmark using CONFIG from cell 5
# Simply run: await run_benchmark()

async def run_benchmark():
    """Execute benchmark using CONFIG dictionary from cell 5"""
    try:
        # Call main function with CONFIG
        result = await main(CONFIG)
        return result
    except Exception as e:
        logger.error(f"Benchmark failed: {e}", exc_info=True)
        raise

await run_benchmark()



DATABASE_ID:  db_1769535944605_7a0xldyiq

BENCHMARK SUMMARY
Execution Mode: OPENENV
Model: openai/gpt-5.2
Runs: 2
Overall Success Rate: 0.0% (0/2 runs)
Verifier Pass Rate: 60.0% (6/10 verifiers)
Pass@1: 0.0%
Mean Execution Time: 20168ms

Individual Verifier Statistics:
  - Project Calendar Visible: 100.0% (2/2)
  - Event Moved to Primary: 50.0% (1/2)
  - Event Description Updated: 50.0% (1/2)
  - Kickoff Event Created: 0.0% (0/2)
  - Verify Old mapping removed: 100.0% (2/2)

Tool Usage:
  - get_calendar_list: 2 calls
  - update_calendar_in_list: 1 calls
  - list_events: 1 calls
  - move_event: 1 calls
  - patch_event: 1 calls
  - create_event: 1 calls


{'benchmark_config': {'execution_mode': 'openenv',
  'model': 'openai/gpt-5.2',
  'number_of_runs': 2,
  'user_prompt': 'Help me tidy up my calendars for Q4? First, make sure my Project Management calendar is showing and selected. Then find my "Sprint Planning & Architecture Review" meeting and move the latest one to my main calendar so it\'s easier to track. Update its description to something like "Updated for Q4 planning with new architecture goals and sprint alignment."\nAfter that, add a new event called "Q4 Initiative Kickoff – Engineering & UX" to the Project Management calendar for next Thursday from 10AM-12PM. That should cover everything I need!',
  'database_id': 'db_1769535944605_7a0xldyiq',
  'seed_database_file': '/content/seed-db.sql',
  'auto_created_database': True},
 'runs': [{'run_number': 1,
   'started_at': '2026-01-27T17:45:46.615589+00:00',
   'execution_time_ms': 23452,
   'model_response': '- **Project Management calendar**: confirmed it’s **visible and selecte

## Results Analysis

After running the benchmark, you can analyze the results using the cells below.

In [None]:
# Load and display results
import glob
import json

# Find most recent results file
result_files = glob.glob("benchmark_results_*.json")
if result_files:
    latest_file = max(result_files)
    print("File Name: ", latest_file)
    with open(latest_file, 'r') as f:
        results = json.load(f)

    print(results['statistics'])
    print(json.dumps(results['statistics'], indent=2, sort_keys=True))
else:
    print("No results files found. Run the benchmark first.")

File Name:  benchmark_results_20260127_174626.json
{'total_runs': 2, 'successful_runs': 0, 'overall_success_rate': 0.0, 'pass_at_1': 0.0, 'verifier_level_pass_rate': 0.6, 'total_verifiers_checked': 10, 'total_verifiers_passed': 6, 'individual_verifier_stats': {'Project Calendar Visible': {'passed': 2, 'total': 2, 'pass_rate': 1.0}, 'Event Moved to Primary': {'passed': 1, 'total': 2, 'pass_rate': 0.5}, 'Event Description Updated': {'passed': 1, 'total': 2, 'pass_rate': 0.5}, 'Kickoff Event Created': {'passed': 0, 'total': 2, 'pass_rate': 0.0}, 'Verify Old mapping removed': {'passed': 2, 'total': 2, 'pass_rate': 1.0}}, 'mean_execution_time_ms': 20168.0, 'tool_usage': {'get_calendar_list': 2, 'update_calendar_in_list': 1, 'list_events': 1, 'move_event': 1, 'patch_event': 1, 'create_event': 1}}
{
  "individual_verifier_stats": {
    "Event Description Updated": {
      "pass_rate": 0.5,
      "passed": 1,
      "total": 2
    },
    "Event Moved to Primary": {
      "pass_rate": 0.5,
     