In [2]:
import boto3
import json
import re
from typing import List, Dict, Any

class ConverseAgent:
    def __init__(self, model_id, region='us-west-2', system_prompt='You are a helpful assistant.'):
        self.model_id = model_id
        self.region = region
        self.client = boto3.client('bedrock-runtime', region_name=self.region)
        self.system_prompt = system_prompt
        self.messages = []
        self.tools = None
        self.response_output_tags = [] # ['<response>', '</response>']

    async def invoke_with_prompt(self, prompt):
        content = [
            {
                'text': prompt
            }
        ]
        return await self.invoke(content)

    async def invoke(self, content):
        print(f"User: {json.dumps(content, indent=2)}")

        self.messages.append(
            {
                "role": "user", 
                "content": content
            }
        )
        response = self._get_converse_response()

        print(f"Agent: {json.dumps(response, indent=2)}")

        return await self._handle_response(response)

    def _get_converse_response(self):
        """
        Invokes the Bedrock runtime converse API with proper configuration.
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse.html
        """
        # Prepare the request body
        request_params = {
            "modelId": self.model_id,
            "messages": self.messages,
            "system": [
                {
                    "text": self.system_prompt
                }
            ],
            "inferenceConfig": {
                "maxTokens": 8192,
                "temperature": 0.7,
            }
        }
        
        # Add tools if they exist
        if self.tools:
            tools_config = self.tools.get_tools()
            if tools_config and tools_config.get('tools'):
                request_params["toolConfig"] = tools_config
        
        # Make the API call
        response = self.client.converse(**request_params)
        return response
    
    async def _handle_response(self, response):
        # Add the response to the conversation history
        self.messages.append(response['output']['message'])

        # Process based on the stop reason
        stop_reason = response['stopReason']

        if stop_reason in ['end_turn', 'stop_sequence']:
            # Extract text from the response
            try:
                message = response.get('output', {}).get('message', {})
                content = message.get('content', [])
                text = content[0].get('text', '')
                
                # Apply output tags if defined
                if hasattr(self, 'response_output_tags') and len(self.response_output_tags) == 2:
                    pattern = f"(?s).*{re.escape(self.response_output_tags[0])}(.*?){re.escape(self.response_output_tags[1])}"
                    match = re.search(pattern, text)
                    if match:
                        return match.group(1)
                return text
            except (KeyError, IndexError):
                return ''

        elif stop_reason == 'tool_use':
            try:
                # Extract tool use details from response
                tool_response = []
                for content_item in response['output']['message']['content']:
                    if 'toolUse' in content_item:
                        tool_request = {
                            "toolUseId": content_item['toolUse']['toolUseId'],
                            "name": content_item['toolUse']['name'],
                            "input": content_item['toolUse']['input']
                        }
                        
                        # Execute the tool and add result to response
                        tool_result = await self.tools.execute_tool(tool_request)
                        tool_response.append({'toolResult': tool_result})
                
                # Continue the conversation with the tool results
                return await self.invoke(tool_response)
                
            except KeyError as e:
                raise ValueError(f"Missing required tool use field: {e}")
            except Exception as e:
                raise ValueError(f"Failed to execute tool: {e}")

        elif stop_reason == 'max_tokens':
            # Hit token limit - ask model to continue
            return await self.invoke_with_prompt('Please continue.')

        else:
            raise ValueError(f"Unknown stop reason: {stop_reason}")

In [3]:
from typing import Any, Dict, List, Callable
import json

class ConverseToolManager:
    def __init__(self):
        self._tools = {}
        self._name_mapping = {}  # Maps sanitized names to original names
    
    def _sanitize_name(self, name: str) -> str:
        """Convert hyphenated names to underscore format for Bedrock compatibility"""
        return name.replace('-', '_')
    
    def register_tool(self, name: str, func: Callable, description: str, input_schema: Dict):
        """
        Register a new tool with the system, sanitizing the name for Bedrock compatibility
        """
        sanitized_name = self._sanitize_name(name)
        self._name_mapping[sanitized_name] = name
        self._tools[sanitized_name] = {
            'function': func,
            'description': description,
            'input_schema': input_schema,
            'original_name': name
        }
        print(f"Registered tool: {name} (sanitized as {sanitized_name})")
    
    def get_tools(self) -> Dict[str, List[Dict]]:
        tool_specs = []
        for sanitized_name, tool in self._tools.items():
            tool_specs.append({
                'toolSpec': {
                    'name': sanitized_name,
                    'description': tool['description'],
                    # Instead of passing the schema directly,
                    # wrap it in the "json" field:
                    'inputSchema': {
                        'json': tool['input_schema']
                    }
                }
            })
    
        return {'tools': tool_specs}
    
    async def execute_tool(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute a tool based on the agent's request, handling name translation
        """
        tool_use_id = payload['toolUseId']
        sanitized_name = payload['name']
        tool_input = payload['input']
        
        if sanitized_name not in self._tools:
            raise ValueError(f"Unknown tool: {sanitized_name}")
        
        try:
            tool_func = self._tools[sanitized_name]['function']
            # Use original name when calling the actual function via Smithery
            original_name = self._tools[sanitized_name]['original_name']
            
            # Call the tool and get the result
            result = await tool_func(original_name, tool_input)
            
            # Format the result for Bedrock converse API
            return {
                'toolUseId': tool_use_id,
                'content': [{
                    'text': json.dumps(result, indent=2)
                }],
                'status': 'success'
            }
        except Exception as e:
            return {
                'toolUseId': tool_use_id,
                'content': [{
                    'text': f"Error executing tool: {str(e)}"
                }],
                'status': 'error'
            }
    
    def clear_tools(self):
        """Clear all registered tools"""
        self._tools.clear()
        self._name_mapping.clear()

In [4]:
import json
import smithery
import mcp.client.session

from typing import Any, Dict

class SmitheryToolManager(ConverseToolManager):
    """
    Tool manager that knows how to execute tools via a Smithery (E2B) session,
    rather than local Python functions.
    """
    def __init__(self, session: mcp.client.session.ClientSession):
        super().__init__()
        self.session = session
        
    async def execute_tool(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """
        Overridden to call Smithery (E2B) tools using our session,
        rather than local Python function calls.
        """
        tool_use_id = payload['toolUseId']
        sanitized_name = payload['name']
        tool_input = payload['input']
        
        if sanitized_name not in self._tools:
            return {
                'toolUseId': tool_use_id,
                'content': [{
                    'text': f"Error: Unknown tool: {sanitized_name}"
                }],
                'status': 'error'
            }
        
        original_name = self._tools[sanitized_name]['original_name']
        
        try:
            # Call the tool in E2B
            tool_result = await self.session.call_tool(original_name, tool_input)
            
            # Convert to dict so we can JSON-serialize
            tool_result_dict = tool_result.model_dump()
            result_str = json.dumps(tool_result_dict, indent=2)
            
            return {
                'toolUseId': tool_use_id,
                'content': [{
                    'text': result_str
                }],
                'status': 'success'
            }
        except Exception as e:
            # In case E2B call failed
            return {
                'toolUseId': tool_use_id,
                'content': [{
                    'text': f"Error executing tool: {str(e)}"
                }],
                'status': 'error'
            }


In [5]:
import os
import asyncio
import json

import smithery
import mcp.client.session

# Assuming you've already defined:
# - ConverseAgent
# - SmitheryToolManager

class MCPBedrockAgent(ConverseAgent):
    """
    Bedrock-based agent that also integrates with Smithery (E2B)
    for tool usage.
    """
    def __init__(
        self, 
        model_id: str,
        region: str = 'us-west-2',
        system_prompt: str = 'You are a helpful assistant.'
    ):
        super().__init__(model_id=model_id, region=region, system_prompt=system_prompt)
        # Create a smithery URL
        self.url = smithery.create_smithery_url(
            "wss://server.smithery.ai/e2b/ws", 
            {"e2bApiKey": "e2b_ac1bb7a02f7ff62dbdd64f5875abe0c584e500e0"}
        )
        
    async def run_with_smithery(self, user_input: str) -> str:
        """
        Example usage:
          1. Open a Smithery session
          2. Query the list of E2B tools
          3. Register them with our SmitheryToolManager
          4. Set that tool manager on the agent
          5. Finally, do an `invoke_with_prompt` or `invoke` with user's input
        """
        async with smithery.websocket_client(self.url) as streams:
            async with mcp.client.session.ClientSession(*streams) as session:
                # 1. List all Smithery Tools
                tools_result = await session.list_tools()
                # 'tools_result.model_dump()' is an MCP structure
                e2b_tools = tools_result.model_dump()["tools"]
                
                print("Available E2B tools:", e2b_tools)
                
                # 2. Create a specialized tool manager for E2B
                tool_manager = SmitheryToolManager(session=session)
                
                # 3. Register each tool from E2B with the manager
                #    so that Bedrock can see them via 'get_tools()'.
                for t in e2b_tools:
                    # For Bedrock, we expect (name, description, inputSchema)
                    # E2B returns them differently; let's unify field names:
                    name = t["name"]
                    desc = t.get("description", "")
                    input_schema = t.get("inputSchema", {})
                    
                    # Important: We're registering the tool manager's callback function
                    # as a proxy that will handle calling the actual tool via E2B
                    async def tool_proxy(original_name, tool_input):
                        # This function will be called by ConverseToolManager.execute_tool
                        # But will be ignored by SmitheryToolManager.execute_tool which overrides it
                        return None
                        
                    tool_manager.register_tool(name, tool_proxy, desc, input_schema)
                
                # 4. Attach this tool manager to our agent
                self.tools = tool_manager
                
                # 5. Now we can prompt the LLM; if it decides to use a tool,
                #    the 'stop_reason=tool_use' flow in ConverseAgent will
                #    route that call to SmitheryToolManager.execute_tool()
                response_text = await self.invoke_with_prompt(user_input)
                return response_text

In [5]:
agent = MCPBedrockAgent(
        model_id="anthropic.claude-3-5-sonnet-20241022-v2:0",  
        region="us-west-2",
        system_prompt="You are a helpful assisatant with access to various tools."
    )

# Sample user input
user_input = "Hello! Can you please do a sentiment analysis on this text: 'I love sunny days.'"

# Let the agent open the E2B session, fetch tools, and handle conversation
response = await agent.run_with_smithery(user_input)
print("Final response:\n", response)

Available E2B tools: [{'name': 'run_code', 'description': 'Run python code in a secure sandbox by E2B. Using the Jupyter Notebook syntax.', 'inputSchema': {'type': 'object', 'properties': {'code': {'type': 'string'}}, 'required': ['code'], 'additionalProperties': False, '$schema': 'http://json-schema.org/draft-07/schema#'}}]
Registered tool: run_code (sanitized as run_code)
User: [
  {
    "text": "Hello! Can you please do a sentiment analysis on this text: 'I love sunny days.'"
  }
]
Agent: {
  "ResponseMetadata": {
    "RequestId": "a52f4b46-85e7-4151-a249-0214d4edb3a6",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Sat, 08 Mar 2025 08:48:56 GMT",
      "content-type": "application/json",
      "content-length": "1347",
      "connection": "keep-alive",
      "x-amzn-requestid": "a52f4b46-85e7-4151-a249-0214d4edb3a6"
    },
    "RetryAttempts": 0
  },
  "output": {
    "message": {
      "role": "assistant",
      "content": [
        {
          "text": "I'll help y

CancelledError: 

In [6]:
import asyncio
import textarena as ta

# Instantiate two distinct Bedrock agents
agents = {
    0: MCPBedrockAgent(
        model_id="anthropic.claude-3-5-sonnet-20241022-v2:0",
        region="us-west-2",
        system_prompt="You are Agent 0 (sonnet). You are negotiating to get the best possible deal. Run the coding tools if it helps with your decision"
    ),
    1: MCPBedrockAgent(
        model_id="anthropic.claude-3-5-sonnet-20241022-v2:0",
        region="us-west-2",
        system_prompt="You are Agent 1 (haiku). You are negotiating to get the best possible deal."
    )
}

# Create and wrap the environment
env = ta.make_online(env_id="SimpleNegotiation-v0")
env = ta.wrappers.LLMObservationWrapper(env=env)
env = ta.wrappers.SimpleRenderWrapper(
    env=env,
    player_names={0: "sonnet", 1: "haiku"},
)

# Reset for two players
env.reset(num_players=len(agents))

done = False
while not done:
    # 1) The environment tells us which player's turn it is
    #    and what that agent observes.
    player_id, observation = env.get_observation()
    
    # 2) Pass that observation to the player's agent
    #    and get back an "action" (a message, move, etc.).
    #
    #    NOTE: .run_with_smithery() is the method you showed
    #    that opens a Smithery session, queries for tools,
    #    and then calls invoke_with_prompt().
    #    If you prefer a direct "invoke" call, adapt accordingly.
    action = await agents[player_id].run_with_smithery(observation)
    
    # 3) Send that action to the environment
    done, info = env.step(action=action)
    print("step complete")

# Once done, retrieve final rewards or outcome
rewards = env.close()
print("Final Rewards:", rewards)


AttributeError: module 'textarena' has no attribute 'make_online'

In [1]:
import asyncio
from textarena.core import Agent
import textarena as ta
from typing import Optional

STANDARD_GAME_PROMPT = "You are a competitive and reflective game player. Reflect deeply after each round, and use the best strategies to win. Make sure you read the game instructions carefully, and always follow the required format."

class AsyncAnthropicAgent(Agent):
    """Agent class using the Anthropic Claude API to generate responses asynchronously."""
    def __init__(self, model_name: str, system_prompt: Optional[str] = STANDARD_GAME_PROMPT, max_tokens: int = 1000, temperature: float = 0.9, verbose: bool = False):
        super().__init__()
        self.model_name = model_name
        self.system_prompt = system_prompt
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.verbose = verbose

        try:
            import anthropic
        except ImportError:
            raise ImportError(
                "Anthropic package is required for AsyncAnthropicAgent. "
                "Install it with: pip install anthropic"
            )
        self.client = anthropic.AsyncAnthropic()

    async def _make_request(self, observation: str) -> str:
        print("Making request")
        response = await self.client.messages.create(
            model=self.model_name,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
            system=self.system_prompt,
            messages=[
                {"role": "user", "content": [{"type": "text", "text": observation}]}
            ]
        )
        print(response)
        return response.content[0].text.strip()

    async def _retry_request(self, observation: str, retries: int = 3, delay: int = 5) -> str:
        last_exception = None
        for attempt in range(1, retries + 1):
            try:
                response = await self._make_request(observation)
                if self.verbose:
                    print(f"\nObservation: {observation}\nResponse: {response}")
                return response
            except Exception as e:
                last_exception = e
                print(f"Attempt {attempt} failed with error: {e}")
                if attempt < retries:
                    await asyncio.sleep(delay)
        raise last_exception

    async def __call__(self, observation: str) -> str:
        if not isinstance(observation, str):
            raise ValueError(f"Observation must be a string. Received type: {type(observation)}")
        return await self._retry_request(observation)


for i in range(5):
    try:
        model_name = "Test LLM v7"
        model_description = "Standard Anthropic model."
        email = "imaginecheckingemails@gmail.com"

        agent = AsyncAnthropicAgent(model_name="claude-3-7-sonnet-20250219")

        env = ta.make_online(
            env_id=["SimpleNegotiation-v0"],
            model_name=model_name,
            model_description=model_description,
            email=email
        )
        env = ta.wrappers.LLMObservationWrapper(env=env)
        env.reset(num_players=2)
        done = False

        while not done:
            player_id, observation = env.get_observation()
            action = asyncio.get_event_loop().run_until_complete(agent(observation))
            done, info = env.step(action=action)
            print("Step complete")

        rewards = env.close()
        print("Game finished. Rewards:", rewards)
    except:
        pass

ImportError: cannot import name 'Agent' from 'textarena.core' (/home/kang5647/.local/lib/python3.10/site-packages/textarena/core.py)