<a href="https://colab.research.google.com/github/mrm8488/shared_colab_notebooks/blob/master/open_llm_mcp_client_server_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Create an `MPC` client and server with OS LLM

In this tutorial we will create and **MCP server** that offers tools for weather forecasting and an **MCP client** that uses an Open Source LLM (`Qwen-2.5-7B-Instruct`) with **HuggingFace** `transformers`

> Based on the Anthropic [example](https://modelcontextprotocol.io/introduction)

> Created by [Manu Romero](http://twitter.com/mrm8488)

## **1. Introduction to MCP**

The Model Context Protocol (MCP) is an open standard developed by Anthropic that enables AI applications to interact seamlessly with external data sources and tools. By implementing MCP, AI models can access real-time information and perform tasks beyond their initial training data.

---

## **2. Setting Up the Environment**

In this section, we'll set up the necessary environment by installing required libraries and configuring the runtime to support asynchronous operations.


In [2]:
! pip install -qU transformers bitsandbytes "mcp[cli]" httpx

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.1/76.1 MB[0m [31m26.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m97.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m73.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m40.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m40.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

We will use an **L4** GPU, you can try with the free **T4** and maybe quantize the model to 4 bits (in this example we quantize it to 8 bits)

In [1]:
! nvidia-smi

Sat Mar 22 20:54:17 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA L4                      Off |   00000000:00:03.0 Off |                    0 |
| N/A   76C    P0             35W /   72W |       0MiB /  23034MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [3]:
# Apply nest_asyncio to allow nested use of asyncio.run in notebooks.
import nest_asyncio
nest_asyncio.apply()

## **3. Implementing the MCP Server**

We'll create an MCP server that provides tools to fetch weather alerts and forecasts. This server will interact with external weather APIs and expose functionalities via MCP tools.

**Key Components:**
- **make_nws_request:** A helper function to make requests to the National Weather Service (NWS) API.
- **format_alert:** A function to format weather alert data.
- **get_alerts:** An MCP tool to retrieve active weather alerts for a given state.
- **get_forecast:** An MCP tool to fetch weather forecasts for a specific location based on latitude and longitude.


In [5]:
%%writefile weather_mcp_server.py

from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("weather")

# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


# Helper functions for formatting
async def make_nws_request(url: str) -> dict[str, Any] | None:
    """Make a request to the NWS API with proper error handling."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None

def format_alert(feature: dict) -> str:
    """Format an alert feature into a readable string."""
    props = feature["properties"]
    return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""

# Implementing tool execution
@mcp.tool()
async def get_alerts(state: str) -> str:
    """Get weather alerts for a US state.

    Args:
        state: Two-letter US state code (e.g. CA, NY)
    """
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"
    data = await make_nws_request(url)

    if not data or "features" not in data:
        return "Unable to fetch alerts or no alerts found."

    if not data["features"]:
        return "No active alerts for this state."

    alerts = [format_alert(feature) for feature in data["features"]]
    return "\n---\n".join(alerts)

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
    """Get weather forecast for a location.

    Args:
        latitude: Latitude of the location
        longitude: Longitude of the location
    """
    # First get the forecast grid endpoint
    points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
    points_data = await make_nws_request(points_url)

    if not points_data:
        return "Unable to fetch forecast data for this location."

    # Get the forecast URL from the points response
    forecast_url = points_data["properties"]["forecast"]
    forecast_data = await make_nws_request(forecast_url)

    if not forecast_data:
        return "Unable to fetch detailed forecast."

    # Format the periods into a readable forecast
    periods = forecast_data["properties"]["periods"]
    forecasts = []
    for period in periods[:5]:  # Only show next 5 periods
        forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
        forecasts.append(forecast)

    return "\n---\n".join(forecasts)


if __name__ == "__main__":
  # Initialize and run the server
  mcp.run(transport='stdio')

Overwriting weather_mcp_server.py


## **4. Implementing the MCP Client**

We'll develop a client that connects to the MCP server and utilizes the tools provided. This client will use Anthropic's API to interact with the model and process user queries.

**Key Components:**
- **MCPClient Class:** Manages the connection to the MCP server, lists available tools, and facilitates interaction between the user and the server.
- **connect_to_server:** Establishes a connection to the specified MCP server.
- **list_tools:** Retrieves and displays the tools available on the connected server.
- **chat_loop:** Handles the interactive loop where the user inputs queries, and the client processes them using the model and server tools.
- **cleanup:** Ensures proper closure of the client session.


In [18]:
%%writefile mcp_client.py

import re
import json
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import warnings

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from transformers import AutoModelForCausalLM, AutoTokenizer

warnings.filterwarnings("ignore")

# Use there the model you wish (it must support tool calling)
MODEL_ID = "Qwen/Qwen2.5-7B-Instruct"


# Load the model and tokenizer (quantized to 8bit)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype="auto",
    device_map="auto",
    load_in_8bit=True,
)

# Helper function becaus in transformers the tool calls should be a field of assistant messages.
def try_parse_tool_calls(content: str):
    """Try parse the tool calls."""
    tool_calls = []
    offset = 0
    for i, m in enumerate(re.finditer(r"<tool_call>\n(.+)?\n</tool_call>", content)):
        if i == 0:
            offset = m.start()
        try:
            func = json.loads(m.group(1))
            tool_calls.append({"type": "function", "function": func})
            if isinstance(func["arguments"], str):
                func["arguments"] = json.loads(func["arguments"])
        except json.JSONDecodeError as e:
            print(f"Failed to parse tool calls: the content is {m.group(1)} and {e}")
            pass
    if tool_calls:
        if offset > 0 and content[:offset].strip():
            c = content[:offset]
        else:
            c = ""
        return {"role": "assistant", "content": c, "tool_calls": tool_calls}
    return {"role": "assistant", "content": re.sub(r"<\|im_end\|>$", "", content)}


class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.llm = model
        self.tokenizer = tokenizer

    async def connect_to_server(self, server_script_path: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
      """Process a query using Claude and available tools"""
      messages = [
          {
              "role": "user",
              "content": query
          }
      ]

      response = await self.session.list_tools()
      available_tools = [{
          "name": tool.name,
          "description": tool.description,
          "input_schema": tool.inputSchema
      } for tool in response.tools]


      # Initial LLM Call
      text = self.tokenizer.apply_chat_template(messages, tools=available_tools, add_generation_prompt=True, tokenize=False)
      inputs = self.tokenizer(text, return_tensors="pt").to(model.device)
      outputs = self.llm.generate(**inputs, max_new_tokens=512)
      output_text = tokenizer.batch_decode(outputs)[0][len(text):]


      # Processing response and handel tool calls with the LLM `output_text`
      final_text = []

      parsed_message = try_parse_tool_calls(output_text)
      messages.append(parsed_message)

      final_text.append(parsed_message["content"])

      if tool_calls := messages[-1].get("tool_calls", None):
          for tool_call in tool_calls:
              if fn_call := tool_call.get("function"):
                  fn_name: str = fn_call["name"]
                  fn_args: dict = fn_call["arguments"]

                  print(f"Calling tool: {fn_name} with args: {fn_args}")
                  final_text.append(f"Calling tool: {fn_name} with args: {fn_args}")
                  result = await self.session.call_tool(fn_name, fn_args)
                  #print(result)
                  fn_res = result.content
                  #print(f"Tool result: {fn_res}")

                  messages.append({
                      "role": "tool",
                      "name": fn_name,
                      "content": fn_res,
                  })

              # Get next response from Claude
              text = self.tokenizer.apply_chat_template(messages, tools=available_tools, add_generation_prompt=True, tokenize=False)
              inputs = self.tokenizer(text, return_tensors="pt").to(model.device)
              outputs = self.llm.generate(**inputs, max_new_tokens=512)
              output_text = self.tokenizer.batch_decode(outputs)[0][len(text):]

              final_text.append(output_text)



      return "\n".join(final_text)

    async def chat_loop(self):
      """Run an interactive chat loop"""
      print("\nMCP Client Started!")
      print("Type your queries or 'quit' to exit.")

      while True:
          try:
              query = input("\nQuery: ").strip()

              if query.lower() == 'quit':
                  break

              response = await self.process_query(query)
              print("\n" + response)

          except Exception as e:
              print(f"\nError: {str(e)}")

    async def cleanup(self):
      """Clean up resources"""
      await self.exit_stack.aclose()


async def main():
  if len(sys.argv) < 2:
      print("Usage: python client.py <path_to_server_script>")
      sys.exit(1)

  client = MCPClient()
  try:
      await client.connect_to_server(sys.argv[1])
      await client.chat_loop()
  finally:
      await client.cleanup()

if __name__ == "__main__":
    import sys
    asyncio.run(main())

Overwriting mcp_client.py


## Runing it

In [19]:
! python mcp_client.py weather_mcp_server.py

2025-03-22 21:26:59.376107: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-22 21:26:59.396282: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1742678819.416234   27153 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1742678819.421799   27153 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-22 21:26:59.440865: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr