In [24]:
%pip install -Uq boto3 anthropic

Note: you may need to restart the kernel to use updated packages.


In [None]:
%pip install -r requirements.txt

In [1]:
from dotenv import load_dotenv
from utils.visualize import visualize
from typing import List, Dict,Optional
load_dotenv()

# MODEL= 'global.anthropic.claude-opus-4-5-20251101-v1:0'
MODEL = 'global.anthropic.claude-sonnet-4-5-20250929-v1:0'

viz = visualize(auto_show=True)

In [2]:
import json

from utils.team_expense_api import get_custom_budget, get_expenses, get_team_members

from anthropic import AnthropicBedrock

client = AnthropicBedrock()

message = client.messages.create(
    model=MODEL,
    max_tokens=256,
    messages=[{"role": "user", "content": "Hello, world"}]
)
print(message.content)

[TextBlock(citations=None, text='Hello! How can I help you today?', type='text')]


In [3]:

# Tool definitions for the team expense API
tools = [
    {
        "name": "get_team_members",
        "description": 'Returns a list of team members for a given department. Each team member includes their ID, name, role, level (junior, mid, senior, staff, principal), and contact information. Use this to get a list of people whose expenses you want to analyze. Available departments are: engineering, sales, and marketing.\n\nRETURN FORMAT: Returns a JSON string containing an ARRAY of team member objects (not wrapped in an outer object). Parse with json.loads() to get a list. Example: [{"id": "ENG001", "name": "Alice", ...}, {"id": "ENG002", ...}]',
        "input_schema": {
            "type": "object",
            "properties": {
                "department": {
                    "type": "string",
                    "description": "The department name. Case-insensitive.",
                }
            },
            "required": ["department"],
        },
        # "input_examples": [
        #     {"department": "engineering"},
        #     {"department": "sales"},
        #     {"department": "marketing"},
        # ],
    },
    {
        "name": "get_expenses",
        "description": "Returns all expense line items for a given employee in a specific quarter. Each expense includes extensive metadata: date, category, description, amount (in USD), currency, status (approved, pending, rejected), receipt URL, approval chain, merchant name and location, payment method, and project codes. An employee may have 20-50+ expense line items per quarter, and each line item contains substantial metadata for audit and compliance purposes. Categories include: 'travel' (flights, trains, rental cars, taxis, parking), 'lodging' (hotels, airbnb), 'meals', 'software', 'equipment', 'conference', 'office', and 'internet'. IMPORTANT: Only expenses with status='approved' should be counted toward budget limits.\n\nRETURN FORMAT: Returns a JSON string containing an ARRAY of expense objects (not wrapped in an outer object with an 'expenses' key). Parse with json.loads() to get a list directly. Example: [{\"expense_id\": \"ENG001_Q3_001\", \"amount\": 1250.50, \"category\": \"travel\", ...}, {...}]",
        "input_schema": {
            "type": "object",
            "properties": {
                "employee_id": {
                    "type": "string",
                    "description": "The unique employee identifier",
                },
                "quarter": {
                    "type": "string",
                    "description": "Quarter identifier: 'Q1', 'Q2', 'Q3', or 'Q4'",
                },
            },
            "required": ["employee_id", "quarter"],
        },
        # "input_examples": [
        #     {"employee_id": "ENG001", "quarter": "Q3"},
        #     {"employee_id": "SAL002", "quarter": "Q1"},
        #     {"employee_id": "MKT001", "quarter": "Q4"},
        # ],
    },
    {
        "name": "get_custom_budget",
        "description": 'Get the custom quarterly travel budget for a specific employee. Most employees have a standard $5,000 quarterly travel budget. However, some employees have custom budget exceptions based on their role requirements. This function checks if a specific employee has a custom budget assigned.\n\nRETURN FORMAT: Returns a JSON string containing a SINGLE OBJECT (not an array). Parse with json.loads() to get a dict. Example: {"user_id": "ENG001", "has_custom_budget": false, "travel_budget": 5000, "reason": "Standard", "currency": "USD"}',
        "input_schema": {
            "type": "object",
            "properties": {
                "user_id": {
                    "type": "string",
                    "description": "The unique employee identifier",
                }
            },
            "required": ["user_id"],
        },
        # "input_examples": [
        #     {"user_id": "ENG001"},
        #     {"user_id": "SAL002"},
        #     {"user_id": "MKT001"},
        # ],
    },
]

tool_functions = {
    "get_team_members": get_team_members,
    "get_expenses": get_expenses,
    "get_custom_budget": get_custom_budget,
}

### Traditional Tool Calling (Baseline)
In this first example, we'll use traditional tool calling to establish our baseline.

We'll call the messages.create API with our initial query. When the model stops with a tool_use reason, we will execute the tool as requested, and then add the output from the tool to the messages and call the model again.

In [4]:
import time

from anthropic.types import TextBlock, ToolUseBlock
from anthropic.types.beta import (
    BetaMessageParam as MessageParam,
)
from anthropic.types.beta import (
    BetaTextBlock,
    BetaToolUseBlock,
)


In [9]:

messages: list[MessageParam] = []


def run_agent_without_ptc(user_message):
    """Run agent using traditional tool calling"""
    messages.append({"role": "user", "content": user_message})
    total_tokens = 0
    start_time = time.time()
    api_counter = 0

    while True:
        response = client.beta.messages.create(
            model=MODEL,
            max_tokens=8000,
            tools=tools,
            messages=messages,
            # betas=["tool-examples-2025-10-29"],
        )

        api_counter += 1

        # Track token usage
        total_tokens += response.usage.input_tokens + response.usage.output_tokens
        viz.capture(response)
        if response.stop_reason == "end_turn":
            # Extract the first text block from the response
            final_response = next(
                (
                    block.text
                    for block in response.content
                    if isinstance(block, (BetaTextBlock, TextBlock))
                ),
                None,
            )
            elapsed_time = time.time() - start_time
            return final_response, messages, total_tokens, elapsed_time, api_counter

        # Process tool calls
        if response.stop_reason == "tool_use":
            # First, add the assistant's response to messages
            messages.append({"role": "assistant", "content": response.content})

            # Collect all tool results
            tool_results = []

            for block in response.content:
                if isinstance(block, (BetaToolUseBlock, ToolUseBlock)):
                    tool_name = block.name
                    tool_input = block.input
                    tool_use_id = block.id

                    result = tool_functions[tool_name](**tool_input)

                    content = str(result)

                    tool_result = {
                        "type": "tool_result",
                        "tool_use_id": tool_use_id,
                        "content": content,
                    }
                    tool_results.append(tool_result)

            # Append all tool results at once after collecting them
            messages.append({"role": "user", "content": tool_results})

        else:
            print(f"\nUnexpected stop reason: {response.stop_reason}")
            elapsed_time = time.time() - start_time

            final_response = next(
                (
                    block.text
                    for block in response.content
                    if isinstance(block, (BetaTextBlock, TextBlock))
                ),
                f"Stopped with reason: {response.stop_reason}",
            )
            return final_response, messages, total_tokens, elapsed_time, api_counter

In [10]:
query = "Which engineering team members exceeded their Q3 travel budget? Standard quarterly travel budget is $5,000. However, some employees have custom budget limits. For anyone who exceeded the $5,000 standard budget, check if they have a custom budget exception. If they do, use that custom limit instead to determine if they truly exceeded their budget."


In [15]:
# Run the agent
result, conversation, total_tokens, elapsed_time, api_count_without_ptc = run_agent_without_ptc(
    query
)

print(f"Result: {result}")
print(f"API calls made: {api_count_without_ptc}")
print(f"Total tokens used: {total_tokens:,}")
print(f"Total time taken: {elapsed_time:.2f}s")

Result: Perfect! Now I can determine who actually exceeded their budgets:

## **Engineering Team Members Who Exceeded Their Q3 Travel Budget:**

### **1. Carol White (ENG003) - Software Engineer**
- **Q3 Travel Expenses:** $5,380.91 (approved only)
- **Budget Limit:** $5,000 (standard)
- **Amount Over Budget:** $380.91

### **2. Emma Johnson (ENG005) - Junior Software Engineer**
- **Q3 Travel Expenses:** $8,405.63 (approved only)
- **Budget Limit:** $5,000 (standard)
- **Amount Over Budget:** $3,405.63

### **3. Frank Liu (ENG006) - Senior Software Engineer**
- **Q3 Travel Expenses:** $5,147.57 (approved only)
- **Budget Limit:** $5,000 (standard)
- **Amount Over Budget:** $147.57

---

### **Team Members Who Did NOT Exceed Their Budget (Had Custom Exceptions):**

- **Bob Martinez (ENG002)** - Staff Engineer: Spent $6,491.75 but has a custom budget of **$8,000** ✓ Under budget
- **David Kim (ENG004)** - Principal Engineer: Spent $5,579.28 but has a custom budget of **$12,000** ✓ Under 

# Customized PTC

In [2]:
import asyncio
import json
import logging
from dataclasses import dataclass

# Add project root to path for imports
import sys
import os
# project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# sys.path.insert(0, project_root)

from sandboxed_ptc import ToolRegistry, ToolCallerType, SandboxSession, ExecutionResult
from sandboxed_ptc.sandbox import SandboxExecutor, SandboxConfig

# Import team expense API tools
from utils.team_expense_api import get_team_members, get_expenses, get_custom_budget

# Import visualizer
from utils.visualize import visualize

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s"
)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('httpcore').setLevel(logging.WARNING)
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('anthropic').setLevel(logging.WARNING)
logging.getLogger('docker').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)



logger = logging.getLogger(__name__)


# ============================================================
# 1. Mock Weather API (Direct Call Tool)
# ============================================================

def get_weather(city: str, units: str = "celsius") -> str:
    """
    Mock weather API - returns simulated weather data for a city.
    This is a DIRECT CALL tool - Claude calls it directly, not via code execution.
    """
    import random

    # Mock weather data for different cities
    weather_data = {
        "beijing": {"temp_c": 15, "condition": "Partly Cloudy", "humidity": 45, "wind_kph": 12},
        "shanghai": {"temp_c": 22, "condition": "Sunny", "humidity": 60, "wind_kph": 8},
        "new york": {"temp_c": 18, "condition": "Cloudy", "humidity": 55, "wind_kph": 15},
        "london": {"temp_c": 12, "condition": "Rainy", "humidity": 80, "wind_kph": 20},
        "tokyo": {"temp_c": 20, "condition": "Clear", "humidity": 50, "wind_kph": 10},
        "paris": {"temp_c": 14, "condition": "Overcast", "humidity": 65, "wind_kph": 18},
        "sydney": {"temp_c": 25, "condition": "Sunny", "humidity": 40, "wind_kph": 5},
        "san francisco": {"temp_c": 16, "condition": "Foggy", "humidity": 75, "wind_kph": 22},
    }

    city_lower = city.lower().strip()

    if city_lower in weather_data:
        data = weather_data[city_lower]
    else:
        # Generate random weather for unknown cities
        data = {
            "temp_c": random.randint(5, 35),
            "condition": random.choice(["Sunny", "Cloudy", "Rainy", "Clear", "Windy"]),
            "humidity": random.randint(30, 90),
            "wind_kph": random.randint(5, 30)
        }

    # Convert temperature if needed
    if units.lower() == "fahrenheit":
        temp = data["temp_c"] * 9/5 + 32
        temp_unit = "°F"
    else:
        temp = data["temp_c"]
        temp_unit = "°C"

    result = {
        "city": city.title(),
        "temperature": f"{temp:.1f}{temp_unit}",
        "condition": data["condition"],
        "humidity": f"{data['humidity']}%",
        "wind": f"{data['wind_kph']} km/h",
        "units": units.lower()
    }

    return json.dumps(result, ensure_ascii=False)


# ============================================================
# 2. Tool Configurations
# ============================================================

# Code execution tool configurations (called via execute_code)
TOOL_CONFIGS = [
    {
        "name": "get_team_members",
        "description": 'Returns a list of team members for a given department. Each team member includes their ID, name, role, level (junior, mid, senior, staff, principal), and contact information. Use this to get a list of people whose expenses you want to analyze. Available departments are: engineering, sales, and marketing.\n\nRETURN FORMAT: Returns a JSON string containing an ARRAY of team member objects (not wrapped in an outer object). Parse with json.loads() to get a list. Example: [{"id": "ENG001", "name": "Alice", ...}, {"id": "ENG002", ...}]',
        "input_schema": {
            "type": "object",
            "properties": {
                "department": {
                    "type": "string",
                    "description": "The department name. Case-insensitive.",
                }
            },
            "required": ["department"],
        },
    },
    {
        "name": "get_expenses",
        "description": "Returns all expense line items for a given employee in a specific quarter. Each expense includes extensive metadata: date, category, description, amount (in USD), currency, status (approved, pending, rejected), receipt URL, approval chain, merchant name and location, payment method, and project codes. An employee may have 20-50+ expense line items per quarter, and each line item contains substantial metadata for audit and compliance purposes. Categories include: 'travel' (flights, trains, rental cars, taxis, parking), 'lodging' (hotels, airbnb), 'meals', 'software', 'equipment', 'conference', 'office', and 'internet'. IMPORTANT: Only expenses with status='approved' should be counted toward budget limits.\n\nRETURN FORMAT: Returns a JSON string containing an ARRAY of expense objects (not wrapped in an outer object with an 'expenses' key). Parse with json.loads() to get a list directly. Example: [{\"expense_id\": \"ENG001_Q3_001\", \"amount\": 1250.50, \"category\": \"travel\", ...}, {...}]",
        "input_schema": {
            "type": "object",
            "properties": {
                "employee_id": {
                    "type": "string",
                    "description": "The unique employee identifier",
                },
                "quarter": {
                    "type": "string",
                    "description": "Quarter identifier: 'Q1', 'Q2', 'Q3', or 'Q4'",
                },
            },
            "required": ["employee_id", "quarter"],
        },
    },
    {
        "name": "get_custom_budget",
        "description": 'Get the custom quarterly travel budget for a specific employee. Most employees have a standard $5,000 quarterly travel budget. However, some employees have custom budget exceptions based on their role requirements. This function checks if a specific employee has a custom budget assigned.\n\nRETURN FORMAT: Returns a JSON string containing a SINGLE OBJECT (not an array). Parse with json.loads() to get a dict. Example: {"user_id": "ENG001", "has_custom_budget": false, "travel_budget": 5000, "reason": "Standard", "currency": "USD"}',
        "input_schema": {
            "type": "object",
            "properties": {
                "user_id": {
                    "type": "string",
                    "description": "The unique employee identifier",
                }
            },
            "required": ["user_id"],
        },
    },
]

# Direct call tool configurations (Claude calls directly, not via code execution)
DIRECT_TOOL_CONFIGS = [
    {
        "name": "get_weather",
        "description": "Get current weather information for a city. This tool provides real-time weather data including temperature, conditions, humidity, and wind speed. Use this when you need to check weather conditions for travel planning or general information.",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "The city name to get weather for (e.g., 'Beijing', 'New York', 'London')",
                },
                "units": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "Temperature units. Default is 'celsius'.",
                    "default": "celsius"
                }
            },
            "required": ["city"],
        },
    },
]

# Map tool names to actual functions
TOOL_FUNCTIONS = {
    "get_team_members": get_team_members,
    "get_expenses": get_expenses,
    "get_custom_budget": get_custom_budget,
}

# Direct call tool functions
DIRECT_TOOL_FUNCTIONS = {
    "get_weather": get_weather,
}


# ============================================================
# 3. Agent Configuration
# ============================================================

@dataclass
class AgentConfig:
    """Agent configuration"""
    model: str = "global.anthropic.claude-opus-4-5-20251101-v1:0"
    max_tokens: int = 8192
    max_iterations: int = 15
    temperature: float = 0.7
    enable_visualization: bool = True


# ============================================================
# 4. Bedrock Docker Sandbox Agent
# ============================================================

class BedrockDockerSandboxAgent:
    """
    AI Agent using AnthropicBedrock with Docker Sandbox for Code Execution

    This version uses Docker sandbox for secure, isolated code execution.
    Suitable for production environments where security is critical.

    Features:
    - Docker-based sandboxed code execution
    - Network isolation and resource limits
    - Multi-turn conversation with history
    - Tool registration with external configs
    - Direct tool calls + Programmatic tool calling

    Usage:
        agent = BedrockDockerSandboxAgent()
        response = await agent.chat("Analyze expense data")

        # Or with context manager
        async with BedrockDockerSandboxAgent() as agent:
            response = await agent.chat("Analyze expense data")
    """

    def __init__(
        self,
        config: AgentConfig | None = None,
        sandbox_config: SandboxConfig | None = None,
        enable_session_reuse: bool | None = None
    ):
        self.config = config or AgentConfig()
        self.sandbox_config = sandbox_config or SandboxConfig(
            memory_limit="256m",
            timeout_seconds=60.0,
            network_disabled=True,
            enable_session_reuse=enable_session_reuse or False,
            session_timeout_seconds=270.0,  # 4.5 minutes like official PTC
        )
        # Use explicit parameter if provided, otherwise read from sandbox_config
        self.enable_session_reuse = enable_session_reuse if enable_session_reuse is not None else self.sandbox_config.enable_session_reuse
        self.tool_registry = ToolRegistry()
        self._client = None
        self._conversation_history: list[dict] = []
        self._sandbox: SandboxExecutor | None = None
        self._visualizer = None
        self._current_session_id: str | None = None

        # Register tools from config
        self._register_tools_from_config()

        # Initialize visualizer if enabled
        if self.config.enable_visualization:
            self._visualizer = visualize(auto_show=True)

        logger.info(f"Initialized BedrockDockerSandboxAgent with model: {self.config.model}")
        if enable_session_reuse:
            logger.info("Session reuse mode enabled")

    def _register_tools_from_config(self) -> None:
        """Register tools from TOOL_CONFIGS and DIRECT_TOOL_CONFIGS"""
        # Register code execution tools
        for tool_config in TOOL_CONFIGS:
            name = tool_config["name"]
            func = TOOL_FUNCTIONS.get(name)

            if func is None:
                logger.warning(f"Function not found for tool: {name}")
                continue

            self.tool_registry.register(
                name=name,
                description=tool_config["description"],
                input_schema=tool_config["input_schema"],
                allowed_callers=[ToolCallerType.CODE_EXECUTION]
            )(func)

            logger.info(f"Registered tool: {name}")

        # Register direct call tools
        for tool_config in DIRECT_TOOL_CONFIGS:
            name = tool_config["name"]
            func = DIRECT_TOOL_FUNCTIONS.get(name)

            if func is None:
                logger.warning(f"Function not found for direct tool: {name}")
                continue

            self.tool_registry.register(
                name=name,
                description=tool_config["description"],
                input_schema=tool_config["input_schema"],
                allowed_callers=[ToolCallerType.DIRECT]
            )(func)

            logger.info(f"Registered direct tool: {name}")

    async def __aenter__(self):
        """Async context manager entry"""
        logger.info("Docker sandbox agent ready")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        # Close session if using session reuse
        if self._current_session_id and self._sandbox:
            await self._sandbox.close_session(self._current_session_id)
            logger.info(f"Closed session: {self._current_session_id}")
        logger.info("Docker sandbox agent closed")

    @property
    def current_session_id(self) -> str | None:
        """Get the current session ID (for container reuse mode)"""
        return self._current_session_id

    @property
    def active_sessions(self) -> dict:
        """Get all active sessions info"""
        if self._sandbox:
            return self._sandbox.active_sessions
        return {}

    @property
    def sandbox(self) -> SandboxExecutor:
        """Lazy load sandbox executor"""
        if self._sandbox is None:
            self._sandbox = SandboxExecutor(self.tool_registry, self.sandbox_config)
        return self._sandbox

    @property
    def client(self):
        """Lazy load AnthropicBedrock client"""
        if self._client is None:
            from anthropic import AnthropicBedrock
            self._client = AnthropicBedrock()
            logger.info("AnthropicBedrock client initialized")
        return self._client

    def _build_system_prompt(self) -> str:
        """Build the system prompt with tool documentation"""
        tools_doc = self.tool_registry.generate_tools_documentation()

        return f"""You are a powerful AI assistant capable of completing complex tasks through a code execution environment.

## Code Execution Environment

You have a Python code execution environment with the following predefined asynchronous tool functions:

{tools_doc}

## Usage

When you need to execute multi-step tasks, use the `execute_code` tool to write Python code.

### Key Rules:
1. All tool calls must use `await`, for example: `result = await query_sales(region="East")`
2. Use `print()` to output results - this is the only way for you to get execution results
3. You can perform data processing, filtering, aggregation, and conditional logic in your code
4. After code execution completes, you will see the content output by print

## Best Practices for coding

1. **Batch Processing**: Write multiple related operations in a single code block
```python
results = {{}}
for region in ["East", "West", "Central"]:
    data = await query_sales(region=region)
    results[region] = sum(item["revenue"] for item in data)
print(f"Regional revenue: {{results}}")
```

2. **Data Filtering**: Fetch data first, then filter in code
```python
servers = await list_servers()
for server in servers:
    status = await check_server_health(server_id=server)
    if status["status"] != "healthy":
        print(f"Problem: {{server}} - {{status}}")
```

3. **Conditional Logic**: Decide next steps based on intermediate results
```python
file_info = await get_file_info(path="/data/large.csv")
if file_info["size"] > 1000000:
    summary = await get_file_summary(path="/data/large.csv")
else:
    content = await read_file(path="/data/large.csv")
    summary = content
print(summary)
```

4. **Early Termination**: Stop immediately once the desired result is found
```python
servers = ["us-east", "eu-west", "ap-south"]
for server in servers:
    status = await check_health(server_id=server)
    if status["healthy"]:
        print(f"Found healthy server: {{server}}")
        break
        

## Docker Sandbox Features

- Secure, isolated execution environment
- Network disabled for security
- Resource limits enforced (memory, CPU)
- Timeout protection

## Best Practices

1. **Batch Processing**: Combine multiple operations in one code block to minimize round-trips
2. **Always parse JSON**: Tool functions return JSON strings
3. **Handle errors gracefully**: Use try/except for robust code
"""

    def _create_execute_code_tool(self) -> dict:
        """Create the execute_code tool definition"""
        return {
            "name": "execute_code",
            "description": """Execute Python code in a sandbox environment.

The code can call predefined asynchronous tool functions to complete tasks.
Use `print()` to output the results you need to see.

Applicable Scenarios:
- Need to call tools multiple times (e.g., loop iterations)
- Need to process, filter, or aggregate data returned by tools
- Need to make conditional decisions based on intermediate results
- Need to batch process multiple similar tasks

Note: All tool calls must use `await`, for example:
result = await query_database(sql="SELECT * FROM users")
""",
            "input_schema": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "Python code to execute in Docker sandbox. Use await for tool calls, print() for output."
                    }
                },
                "required": ["code"]
            }
        }

    async def _execute_code(self, code: str) -> dict:
        """Execute code using the Docker sandbox"""
        logger.info(f"Executing code in Docker sandbox:\n{code}")

        # Use session reuse if enabled
        if self.enable_session_reuse:
            execution_result = await self.sandbox.execute(
                code,
                session_id=self._current_session_id,
                reuse_session=True
            )
            # Unpack result and session_id (returns tuple in session mode)
            if isinstance(execution_result, tuple):
                result, session_id = execution_result
                # Store session_id for next execution
                if self._current_session_id is None:
                    self._current_session_id = session_id
                    logger.info(f"Created new session: {session_id}")
                else:
                    logger.debug(f"Reused session: {session_id}")
            else:
                result = execution_result
        else:
            # Single execution mode (no session reuse)
            result = await self.sandbox.execute(code)

        if result.success:
            content = result.stdout or "(Code executed successfully, but no print output)"
        else:
            content = f"Execution Error: {result.stderr}"

        logger.info(f"Sandbox execution result: {content[:500]}...")
        return {"success": result.success, "output": content}

    async def chat(
        self,
        message: str,
        reset_history: bool = False
    ) -> tuple[str, int, int]:
        """
        Send a message and get a response

        Args:
            message: User message
            reset_history: If True, clear conversation history before processing

        Returns:
            Tuple of (response_text, input_tokens, output_tokens)
        """
        if reset_history:
            self._conversation_history.clear()
            logger.info("Conversation history cleared")

        # Add user message to history
        self._conversation_history.append({
            "role": "user",
            "content": message
        })
        total_input_tokens = 0
        total_output_tokens = 0
        iteration = 0

        # Build tools list: execute_code + direct call tools
        tools = [self._create_execute_code_tool()]
        for tool_config in DIRECT_TOOL_CONFIGS:
            tools.append({
                "name": tool_config["name"],
                "description": tool_config["description"],
                "input_schema": tool_config["input_schema"]
            })

        while iteration < self.config.max_iterations:
            iteration += 1
            logger.info(f"--- Iteration {iteration} ---")

            # Call Claude API
            response = self.client.beta.messages.create(
                model=self.config.model,
                max_tokens=self.config.max_tokens,
                system=self._build_system_prompt(),
                betas=["tool-examples-2025-10-29"],
                messages=self._conversation_history,
                tools=tools
            )
            total_input_tokens += response.usage.input_tokens
            total_output_tokens += response.usage.output_tokens

            # Visualize the response if enabled
            if self._visualizer:
                self._visualizer.capture(response)

            logger.info(f"Stop reason: {response.stop_reason}")

            # Check stop reason
            if response.stop_reason == "end_turn":
                # Extract text response
                text_content = ""
                for block in response.content:
                    if hasattr(block, "text"):
                        text_content += block.text

                # Add assistant response to history
                self._conversation_history.append({
                    "role": "assistant",
                    "content": text_content
                })

                return text_content, total_input_tokens, total_output_tokens

            elif response.stop_reason == "tool_use":
                # Process tool calls
                assistant_content = []
                tool_results = []

                for block in response.content:
                    if hasattr(block, "text"):
                        assistant_content.append({
                            "type": "text",
                            "text": block.text
                        })

                    elif block.type == "tool_use":
                        assistant_content.append({
                            "type": "tool_use",
                            "id": block.id,
                            "name": block.name,
                            "input": block.input
                        })

                        logger.info(f"Tool call: {block.name}")

                        # Execute the tool
                        if block.name == "execute_code":
                            code = block.input.get("code", "")
                            result = await self._execute_code(code)
                            content = result["output"]
                        elif block.name in DIRECT_TOOL_FUNCTIONS:
                            # Direct tool call
                            try:
                                func = DIRECT_TOOL_FUNCTIONS[block.name]
                                result = func(**block.input)
                                content = result
                                logger.info(f"Direct tool result: {content[:200]}..." if len(content) > 200 else f"Direct tool result: {content}")
                            except Exception as e:
                                content = f"Error calling {block.name}: {str(e)}"
                                logger.error(content)
                        else:
                            content = f"Unknown tool: {block.name}"

                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": content
                        })

                # Add to conversation history
                self._conversation_history.append({
                    "role": "assistant",
                    "content": assistant_content
                })
                self._conversation_history.append({
                    "role": "user",
                    "content": tool_results
                })

            else:
                logger.warning(f"Unknown stop reason: {response.stop_reason}")
                break

        raise RuntimeError(f"Exceeded maximum iterations ({self.config.max_iterations})")

    def reset(self) -> None:
        """Reset conversation history"""
        self._conversation_history.clear()
        logger.info("Agent conversation reset")

    @property
    def history(self) -> list[dict]:
        """Get conversation history"""
        return self._conversation_history.copy()


# ============================================================
# 5. Demo Functions
# ============================================================

async def run_demo(enable_visualization: bool = True):
    """Run the Docker sandbox agent demonstration"""
    print("=" * 70)
    print("Bedrock Docker Sandbox Agent Demo")
    print("=" * 70)

    config = AgentConfig(
        # model="global.anthropic.claude-opus-4-5-20251101-v1:0",
        model = 'global.anthropic.claude-sonnet-4-5-20250929-v1:0',
        max_tokens=8192,
        max_iterations=15,
        enable_visualization=enable_visualization
    )

    sandbox_config = SandboxConfig(
        memory_limit="256m",
        timeout_seconds=60.0,
        network_disabled=True,
        enable_session_reuse=True,
    )

    # Use async context manager to manage Docker sandbox lifecycle
    async with BedrockDockerSandboxAgent(config=config, sandbox_config=sandbox_config) as agent:
        print(f"\nDocker Sandbox Agent created with {len(agent.tool_registry.get_all())} tools:")
        for tool in agent.tool_registry.get_all():
            print(f"  - {tool.name} ({tool.allowed_callers[0].value})")

        print("\nDocker Sandbox Configuration:")
        print(f"  - Memory Limit: {sandbox_config.memory_limit}")
        print(f"  - Timeout: {sandbox_config.timeout_seconds}s")
        print(f"  - Network Disabled: {sandbox_config.network_disabled}")
        print(f"  - Session Reuse: {sandbox_config.enable_session_reuse}")

        # Demo: Team Expense Analysis
        print("\n" + "=" * 70)
        query = "Which engineering team members exceeded their Q3 travel budget? Standard quarterly travel budget is $5,000. However, some employees have custom budget limits. For anyone who exceeded the $5,000 standard budget, check if they have a custom budget exception. If they do, use that custom limit instead to determine if they truly exceeded their budget."

        print(f"Query: {query}")
        print("=" * 70)
        response, total_input_tokens, total_output_tokens = await agent.chat(query)
        print("\n--- Agent Response ---")
        print(response)
        print(f"\nToken Usage:")
        print(f"  - Input tokens: {total_input_tokens}")
        print(f"  - Output tokens: {total_output_tokens}")
        print(f"  - Total tokens: {total_input_tokens + total_output_tokens}")




In [4]:
await run_demo(enable_visualization=True)

2025-12-28 11:25:15,878 - __main__ - 1162309431.py:279 - INFO - Registered tool: get_team_members
2025-12-28 11:25:15,879 - __main__ - 1162309431.py:279 - INFO - Registered tool: get_expenses
2025-12-28 11:25:15,879 - __main__ - 1162309431.py:279 - INFO - Registered tool: get_custom_budget
2025-12-28 11:25:15,879 - __main__ - 1162309431.py:297 - INFO - Registered direct tool: get_weather
2025-12-28 11:25:15,880 - __main__ - 1162309431.py:257 - INFO - Initialized BedrockDockerSandboxAgent with model: global.anthropic.claude-sonnet-4-5-20250929-v1:0
2025-12-28 11:25:15,881 - __main__ - 1162309431.py:301 - INFO - Docker sandbox agent ready
2025-12-28 11:25:15,881 - __main__ - 1162309431.py:520 - INFO - --- Iteration 1 ---


Bedrock Docker Sandbox Agent Demo

Docker Sandbox Agent created with 4 tools:
  - get_team_members (code_execution)
  - get_expenses (code_execution)
  - get_custom_budget (code_execution)
  - get_weather (direct)

Docker Sandbox Configuration:
  - Memory Limit: 256m
  - Timeout: 60.0s
  - Network Disabled: True
  - Session Reuse: True

Query: Which engineering team members exceeded their Q3 travel budget? Standard quarterly travel budget is $5,000. However, some employees have custom budget limits. For anyone who exceeded the $5,000 standard budget, check if they have a custom budget exception. If they do, use that custom limit instead to determine if they truly exceeded their budget.


2025-12-28 11:25:16,271 - __main__ - 1162309431.py:337 - INFO - AnthropicBedrock client initialized


2025-12-28 11:25:27,736 - __main__ - 1162309431.py:538 - INFO - Stop reason: tool_use
2025-12-28 11:25:27,736 - __main__ - 1162309431.py:576 - INFO - Tool call: execute_code
2025-12-28 11:25:27,737 - __main__ - 1162309431.py:449 - INFO - Executing code in Docker sandbox:

import json

# Step 1: Get all engineering team members
team_members_json = await get_team_members(department="engineering")
team_members = json.loads(team_members_json)

print(f"Found {len(team_members)} engineering team members")
print()

# Step 2: For each team member, get their Q3 expenses and calculate travel spending
results = []

for member in team_members:
    employee_id = member['id']
    name = member['name']
    
    # Get Q3 expenses
    expenses_json = await get_expenses(employee_id=employee_id, quarter="Q3")
    expenses = json.loads(expenses_json)
    
    # Calculate total approved travel expenses
    travel_spending = 0
    for expense in expenses:
        if expense['category'] == 'travel' and expen

2025-12-28 11:25:31,718 - __main__ - 1162309431.py:538 - INFO - Stop reason: end_turn
2025-12-28 11:25:31,719 - sandboxed_ptc.sandbox - sandbox.py:560 - INFO - Closing session: sess_5b8ce8ce717a
2025-12-28 11:25:31,786 - __main__ - 1162309431.py:309 - INFO - Closed session: sess_5b8ce8ce717a
2025-12-28 11:25:31,787 - __main__ - 1162309431.py:310 - INFO - Docker sandbox agent closed



--- Agent Response ---
## Summary

**One engineering team member exceeded their Q3 travel budget:**

**Emma Johnson (ENG005)**
- Budget Limit: $5,000.00 (Standard)
- Travel Spending: $6,545.89
- Overage: $1,545.89

Emma Johnson exceeded the standard $5,000 quarterly travel budget by $1,545.89. She does not have a custom budget exception, so the standard limit applies.

All other engineering team members stayed within their $5,000 quarterly travel budget for Q3, with Emma being the only one to exceed this limit.

Token Usage:
  - Input tokens: 5596
  - Output tokens: 1210
  - Total tokens: 6806


## Test for Antropic API Proxy

In [5]:
import copy

ptc_tools = copy.deepcopy(tools)
for tool in ptc_tools:
    tool["allowed_callers"] = ["code_execution_20250825"]  # type: ignore


# Add the code execution tool
ptc_tools.append(
    {
        "type": "code_execution_20250825",  # type: ignore
        "name": "code_execution",
    }
)


In [15]:
import anthropic
messages = []


anthropic_client = anthropic.Anthropic(api_key='sk-22b986366e084cafae975331ae994e8a',
                                       base_url='http://127.0.0.1:8000')

message = anthropic_client.beta.messages.create(
    # model="qwen.qwen3-coder-480b-a35b-v1:0",
    model = MODEL,
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "你好，Claude！"}
    ]
)

print(message.content[0])

BetaTextBlock(citations=None, text='你好！很高兴见到你！有什么我可以帮助你的吗？😊', type='text', cache_control=None)


In [16]:
messages = []

def run_agent_with_ptc(user_message):
    """Run agent using PTC"""
    messages.append({"role": "user", "content": user_message})
    total_tokens = 0
    start_time = time.time()
    container_id = None
    api_counter = 0

    while True:
        # Build request with PTC beta headers
        request_params = {
            "model": MODEL,
            "max_tokens": 8000,
            "tools": ptc_tools,
            "messages": messages,
        }

        response = anthropic_client.beta.messages.create(
            **request_params,
            betas=[
                "advanced-tool-use-2025-11-20",
            ],
            extra_body={"container": container_id} if container_id else None,
        )
        viz.capture(response)
        api_counter += 1

        # Track container for stateful execution
        if hasattr(response, "container") and response.container:
            container_id = response.container.id
            print(f"\n[Container] ID: {container_id}")
            if hasattr(response.container, "expires_at"):
                # If the container has expired, we would need to restart our workflow. In our case, it completes before expiration.
                print(f"[Container] Expires at: {response.container.expires_at}")

        # Track token usage
        total_tokens += response.usage.input_tokens + response.usage.output_tokens

        if response.stop_reason == "end_turn":
            # Extract the first text block from the response
            final_response = next(
                (block.text for block in response.content if isinstance(block, BetaTextBlock)),
                None,
            )
            elapsed_time = time.time() - start_time
            return final_response, messages, total_tokens, elapsed_time, api_counter

        # As before, we process tool calls
        if response.stop_reason == "tool_use":
            # First, add the assistant's response to messages
            messages.append({"role": "assistant", "content": response.content})

            # Collect all tool results
            tool_results = []

            for block in response.content:
                if isinstance(block, BetaToolUseBlock):
                    tool_name = block.name
                    tool_input = block.input
                    tool_use_id = block.id

                    # We can use caller type to understand how the tool was invoked
                    caller_type = block.caller.type  # type: ignore

                    if caller_type == "code_execution_20250825":
                        print(f"[PTC] Tool called from code execution environment: {tool_name}")

                    elif caller_type == "direct":
                        print(f"[Direct] Tool called by model: {tool_name}")

                    result = tool_functions[tool_name](**tool_input)

                    # Format result as proper content for the API
                    if isinstance(result, list) and result and isinstance(result[0], str):
                        content = "\n".join(result)
                    elif isinstance(result, (dict, list)):
                        content = json.dumps(result)
                    else:
                        content = str(result)

                    tool_results.append(
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_use_id,
                            "content": content,
                        }
                    )

            messages.append({"role": "user", "content": tool_results})

        else:
            print(f"\nUnexpected stop reason: {response.stop_reason}")
            elapsed_time = time.time() - start_time

            final_response = next(
                (block.text for block in response.content if isinstance(block, BetaTextBlock)),
                f"Stopped with reason: {response.stop_reason}",
            )
            return final_response, messages, total_tokens, elapsed_time, api_counter

In [17]:
# Run the PTC agent
query = "Which engineering team members exceeded their Q3 travel budget? Standard quarterly travel budget is $5,000. However, some employees have custom budget limits. For anyone who exceeded the $5,000 standard budget, check if they have a custom budget exception. If they do, use that custom limit instead to determine if they truly exceeded their budget."

result_ptc, conversation_ptc, total_tokens_ptc, elapsed_time_ptc, api_count_with_ptc = (
    run_agent_with_ptc(query)
)

BadRequestError: Error code: 400 - {'type': 'error', 'error': {'type': 'invalid_request_error', 'message': 'Programmatic Tool Calling requires Docker which is not available. Please ensure Docker is running.'}}