## Installing required packages

In [1]:
!pip install mcp langchain-aws langchain --upgrade --quiet --no-warn-conflicts

# MCP
Model Context Protocol (MCP) enables seamless integration between LLM applications and external data sources and tools. It is made up of two main components:
- MCP Server
- MCP Client

## Server
First, let's create an MCP server that check the weather in a given location.

In [2]:
%%writefile weather_server.py
from typing import List
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Weather")

@mcp.tool()
async def get_weather(location: str) -> str:
    """Get weather for location."""
    return f"It's sunny in {location}"

if __name__ == "__main__":
    mcp.run(transport="stdio")

Overwriting weather_server.py


## Client
Now, let's define a client class that has two methods (`list_tools` and `execute_tool`)

In [3]:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

server_params = StdioServerParameters(
    command="python",
    # Make sure to update to the full absolute path to your math_server.py file
    args=["/home/ec2-user/SageMaker/weather_server.py"],
)


class MCP_client:
    def __init__(self, server_params):
        self.server_params = server_params
        self.operation = {
            "list_tools": self.list_tools,
            "execute_tool": self.execute_tool
        }

    async def list_tools(self):
        async with stdio_client(self.server_params) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()
                # Get tools
                tools = await session.list_tools()
                return "["+",\n".join([tool.model_dump_json(indent=4) for tool in tools.tools]) + "]", False

    async def execute_tool(self, tool_name, tool_parameters):
        async with stdio_client(self.server_params) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()
                # Call tools
                result = await session.call_tool(tool_name, tool_parameters)
                return result.content[0].text, result.isError

### Let's test our MCP client

In [4]:
weather_client = MCP_client(server_params)
print("Calling list_tools operation")
print((await weather_client.list_tools())[0])
print()
print("Calling execute_tool operation")
print((await weather_client.execute_tool("get_weather", {"location": "Sydney"}))[0])

Calling list_tools operation
[{
    "name": "get_weather",
    "description": "Get weather for location.",
    "inputSchema": {
        "properties": {
            "location": {
                "title": "Location",
                "type": "string"
            }
        },
        "required": [
            "location"
        ],
        "title": "get_weatherArguments",
        "type": "object"
    }
}]

Calling execute_tool operation
It's sunny in Sydney


# Using MCP with Bedrock Marketplace models

Here we are going to illustrate how to use bedrock marketplace models with MCP.
Make sure to deploy your model from Bedrock Marketplace console first. See [here](https://docs.aws.amazon.com/bedrock/latest/userguide/bedrock-marketplace-deploy-a-model.html).

## Defining some helper code

In [5]:
import uuid


def is_valid_json(text):
    # Find the text between the first and last curly brackets
    match = re.search(r'\{([\s\S]*)\}', text)
    if not match:
        return {"isJSON": False, "parsedJson": None}  # No curly brackets found
    # Extract the content between the first and last curly brackets
    # We add back the curly brackets to make it a complete JSON string
    json_candidate = '{' + match.group(1) + '}'
    try:
        # Try to parse the string as JSON
        parsed_json = json.loads(json_candidate)
        return {"isJSON": True, "parsedJson": parsed_json}  # If no error is thrown, it's valid JSON
    except json.JSONDecodeError:
        return {"isJSON": False, "parsedJson": None}  # If an error is thrown, it's not valid JSON


def is_tool_use(input_data):
    tool_use = ""
    input_message = ""
    is_json = is_valid_json(input_data)
    if is_json["isJSON"]:
        parsed_json = is_json["parsedJson"]
        if "operation" in parsed_json:
            tool_use = parsed_json
        else:
            input_message = input_data
    else:
        input_message = input_data
    return {
            "tool_use": tool_use,
            "inputMessage": input_message
        }

In [6]:
async def handle_llm_response(response):
    tool_use_response = is_tool_use(response)
    tool_call_id = uuid.uuid4()
    if tool_use_response["inputMessage"]:
        return {"AI": tool_use_response["inputMessage"]}
    elif tool_use_response["tool_use"]["operation"] in weather_client.operation:
        kwargs = {}
        if "tool_name" in tool_use_response["tool_use"]:
            kwargs["tool_name"] = tool_use_response["tool_use"]["tool_name"]
        if "tool_parameters" in tool_use_response["tool_use"]:
            kwargs["tool_parameters"] = tool_use_response["tool_use"]["tool_parameters"]
        result = await weather_client.operation[tool_use_response["tool_use"]["operation"]](**kwargs)
        if result[1]:
            return {"Tool": f"Error: {result[0]}", "call_id": tool_call_id}
        else:
            return {"Tool": result[0], "call_id": tool_call_id}
    else:
        return {"Tool": "NoSuchOperationError: Operation not supported. Did you mean to use execute_tool operation with tool_name? reformat your JSON correctly and try again.", "call_id": tool_call_id}
        

## LangChain and Bedrock
Initializing langchain Chat class for Bedrock. Make sure to provide SageMaker AI endpoint ARN as your Bedrock `model_id`

In [7]:
import boto3
import json
import re
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_aws import ChatBedrockConverse
from langchain.schema.messages import HumanMessage, AIMessage, SystemMessage

region = "us-west-2"

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name=region,
)

model_id = "<sagemaker endpoint ARN>"
provider = "amazon" # Works for Qwen2.5 14B Instruct

bedrock_client = ChatBedrockConverse(
    client=bedrock_runtime,
    model_id=model_id,
    provider=provider
)

### System prompt
We want to make the LLM know that if they want to access a tool they should call `list_tools` first.

In [8]:
systemMessage = """You are a helpful assistance.
You have access to perform operation calls, when not performing an operation call, always respond with unstructred text.
When you need to perform an operation call, you must respond with JSON format only. Always start with 'ChainOfThoughs' attribute. You can use 'ChainOfThoughs' attribute when performing an operation to tell me your thought process. Also use 'ChainOfThoughs' attribute to talk to me when performing an operation. Do not generate any text outside of the JSON format.
ALWAYS call list_tool operation before performing any execute_tool operation
You have access to the following operations:
<Tools/>
[
{
  'operation': 'list_tools',
  'description': 'Use this operation to list all tools that you have access to.'
  }
},
{
  'operation': 'execute_tool'
  'description': 'Use this operation to execute a tool. Only tools listed in the list_tools operation can be executed. You can get tool_name and tool_parameters from the list_tools response',
  'tool_name': '<the tool name you get from list_tools>',
  'tool_parameters': '<the tool parameters to supply as per list_tools>'
  }
}
]
</Tools>
"""

question = "What's the weather in New York?"

### Invocation logic
The `handle_llm_response` function will decide if the AI is calling a tool operation or it is just responding with normal text.

In [14]:
# Invoke Example

# Initialize chat history
messages = [
    { "role": "system", "content": systemMessage },
    { "role": "user", "content": question }
]

AI_responded = False
tool_response = ""
while not AI_responded:
    # prompt = ChatPromptTemplate.from_messages(template)


    # Create simple pipeline
    chain = bedrock_client | StrOutputParser()

    # Chain Invoke
    response = chain.invoke(messages)
    messages.append({"role": "ai", "content": response})
    result = await handle_llm_response(response)
    if "AI" in result:
        AI_responded = True
    else:
        messages.append({"role": "user", "content": result["Tool"]})
        tool_response = result["Tool"]

print(response)

print()
print()
print("### CHAT HISTORY ###")
print()
print(f'User: {messages[1]["content"]}')
print()
for message in messages[2:]:
    role = "Tool" if message["role"] == "user" else "AI"
    print(f'{role}: {message["content"]}')
    print()

The weather in New York is currently sunny.


### CHAT HISTORY ###

User: What's the weather in New York?

AI: {
  "operation": "list_tools"
}

Tool: [{
    "name": "get_weather",
    "description": "Get weather for location.",
    "inputSchema": {
        "properties": {
            "location": {
                "title": "Location",
                "type": "string"
            }
        },
        "required": [
            "location"
        ],
        "title": "get_weatherArguments",
        "type": "object"
    }
}]

AI: {
  "operation": "execute_tool",
  "tool_name": "get_weather",
  "tool_parameters": {
    "location": "New York"
  },
  "ChainOfThoughts": "The user wants to know the weather in New York, so I need to use the get_weather tool with location set to New York."
}

Tool: It's sunny in New York

AI: The weather in New York is currently sunny.

