<a target="_blank" href="https://colab.research.google.com/github/raghavbali/mastering_llms_workshop/blob/main/docs/module_04_llm_ops/03_mcp_getting_started.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Model Context Protocol (MCP) 
### Why MCP
LLMs are powerful but limited to their training data and can't interact with external systems, databases, or real-time information. MCP servers bridge this gap by providing secure, standardized connections between LLMs and external resources.

### What is MCP?
An **MCP (Model Context Protocol)** server is a lightweight service that exposes specific capabilities—like database queries, API calls, file operations, or tool integrations—to LLMs through a standardized protocol. It acts as a secure intermediary that allows AI models to access and interact with external systems while maintaining proper authentication and access controls.

> Think of MCP like a USB-C port for AI applications


<img src="../assets/04_mcp.png">

> Source: [IBM](https://www.ibm.com/think/topics/model-context-protocol)

In [3]:
# !pip3 install fastmcp

In [39]:
%%writefile notebook_server.py

import os
import json
from fastmcp import FastMCP
from scraper_utils import NB_Markdown_Scraper

mcp = FastMCP(
    name="Notebook Server",
    instructions="""
        This server provides a markdown scraper utility
        and a tool to write JSON file
        """,)

class NotebookServer():
    def __init__(self,mcp_instance):
        self.notebook_scraper = NB_Markdown_Scraper(input_paths=[f'../{d}' for d in os.listdir("../") if d.startswith("module")])
        
        # Register methods  
        mcp_instance.tool(self.greet)
        mcp_instance.tool(self.get_notebook_list)
        mcp_instance.tool(self.get_markdown_from_notebook)
        mcp_instance.tool(self.notebook_scraper.scrape_markdowns)
        mcp_instance.tool(self.write_json)
        mcp_instance.resource("resource://data")(self.resource_method)

    def greet(self,name: str= None):
        '''Greets the User'''
        if not name:
            return "Hi, I am NotebookServer"
        else:
            return f"Hi {name}, I am NotebookServer"

    def get_notebook_list(self):
        '''Returns List of Notebooks Scraped'''
        return list(self.notebook_scraper.notebook_md_dict.keys())

    def get_markdown_from_notebook(self,notebook_name):
        '''Returns Markdown Cells for specified notebook'''
        if notebook_name in list(self.notebook_scraper.notebook_md_dict.keys()):
            return self.notebook_scraper.notebook_md_dict[notebook_name]
        else:
            return f"Requested notebook ({notebook_name}) does not exist"
        
    def write_json(self,file_name: str):
        '''Tool to write a json file in the format notebook:markdown content'''
        try:
            with open(f"./{file_name}", "w") as record_file:
                json.dump(self.notebook_scraper.notebook_md_dict,record_file)
            return f"File:{file_name} written successfully"
        except Exception as ex:
            return f"Could not write {file_name} due to {ex}"
        
    
    def resource_method(self):
        return """
        Resources provide read-only access to data for the LLM or client application. When a client requests a resource URI:
            + FastMCP finds the corresponding resource definition.
            + If it’s dynamic (defined by a function), the function is executed.
            + The content (text, JSON, binary data) is returned to the client.
        This allows LLMs to access files, database content, configuration, or dynamically generated information relevant to the conversation.
        """

# The methods are automatically registered when creating the instance
provider = NotebookServer(mcp)

if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='stdio')

Overwriting notebook_server.py


In [38]:
%%writefile dummy_client.py
import asyncio
from fastmcp import Client, FastMCP

# In-memory server (ideal for testing)
server = FastMCP("TestServer")
client = Client(server)

# HTTP server
client = Client("https://example.com/mcp")

# Local Python script
client = Client("notebook_server.py")

async def main():
    async with client:
        # Basic server interaction
        await client.ping()
        
        # List available operations
        tools = await client.list_tools()
        resources = await client.list_resources()
        prompts = await client.list_prompts()
        
        print("-"*30)
        print("Tools:")
        print("-"*30)
        print(tools)
        print("-"*30)
        print("Resources:")
        print("-"*30)
        print(resources)
        print("-"*30)
        print("Prompts:")
        print(prompts)
        print("-"*30)
        # Execute operations
        await client.call_tool("scrape_markdowns", {})
        result = await client.call_tool("write_json", {"file_name":"test_mcp_server.json"})
        print("-"*30)
        print(f"Result of tool call for write_json:\n{result}")

asyncio.run(main())

Overwriting dummy_client.py


> Go to terminal and from the module 4's directory execute : ``>python3 dummy_client.py``

The following should be the output on your screen
<img src="../assets/04_mcp_client.png">

## ChatBot

In [None]:
%%writefile mcp_chatbot.py
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List
import asyncio
import nest_asyncio
from ollama import Client


nest_asyncio.apply()

class MCP_ChatBot:

    def __init__(self):
        # Initialize session and client objects
        self.session: ClientSession = None
        self.available_tools: List[dict] = []
        self.open_ai_compat_client = Client(
            host='http://localhost:11434',
            # headers={'x-some-header': 'some-value'}
        )
        self.model_name = 'llama3.1'

    async def process_query(self, query):
        messages = [{'role':'user', 'content':query}]
        response = self.open_ai_compat_client.chat(model=self.model_name, messages=messages,tools=self.available_tools)
        process_query = True
        while process_query:
            assistant_content = []
            # for content in response.message.content:
            content = response.message.content
            tool_calls = response.message.tool_calls
            print(content)
            if not tool_calls:
                print("No tool calls detected")
                assistant_content.append(content)
                if(len(response.message.content) >1):
                    process_query= False
            elif tool_calls:
                print(" tool calls detected")
                assistant_content.append(content)
                messages.append({'role':'assistant', 'content':assistant_content})
                tool_args = tool_calls.function.arguments
                tool_name = tool_calls.function.name

                print(f"Calling tool {tool_name} with args {tool_args}")
                
                # Call a tool
                # tool invocation through the client session
                result = await self.session.call_tool(tool_name, arguments=tool_args)
                messages.append({"role": "user", 
                                  "message": [
                                      {
                                          # "tool_use_id":tool_id,
                                          "content": result.message.content
                                      }
                                  ]
                                })
                response = self.open_ai_compat_client.chat(model=self.model_name, messages=messages,tools=self.available_tools)                
                if not response.message.tool_calls:
                    print(response.message.content)
                    process_query= False

    
    
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Chatbot Started!")
        print("Type your queries or 'quit' to exit.")
        
        while True:
            try:
                query = input("\nQuery: ").strip()
        
                if query.lower() == 'quit':
                    break
                    
                await self.process_query(query)
                print("\n")
                    
            except Exception as e:
                print(f"\nError: {str(e)}")
    
    async def connect_to_server_and_run(self):
        # Create server parameters for stdio connection
        server_params = StdioServerParameters(
            command="python3",  # Executable
            args=["notebook_server.py"],  # Optional command line arguments
            env=None,  # Optional environment variables
        )
        async with stdio_client(server_params) as (read, write):
            async with ClientSession(read, write) as session:
                self.session = session
                # Initialize the connection
                await session.initialize()
    
                # List available tools
                response = await session.list_tools()
                
                tools = response.tools
                print("\nConnected to server with tools:", [tool.name for tool in tools])
                
                self.available_tools = [{
                    "name": tool.name,
                    "description": tool.description,
                    "input_schema": tool.inputSchema
                } for tool in response.tools]
    
                await self.chat_loop()


async def main():
    chatbot = MCP_ChatBot()
    await chatbot.connect_to_server_and_run()
  

if __name__ == "__main__":
    asyncio.run(main())


In [46]:
%%writefile mcp_chatbot_v2.py

from dotenv import load_dotenv
import os
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List, Dict, TypedDict
from contextlib import AsyncExitStack
import json
import asyncio

load_dotenv()

class ToolDefinition(TypedDict):
    name: str
    description: str
    input_schema: dict

class MCP_ChatBot:

    def __init__(self):
        # Initialize session and client objects
        self.sessions: List[ClientSession] = []
        self.exit_stack = AsyncExitStack()
        
        # Initialize OpenAI client with configurable base URL for Ollama compatibility
        base_url = os.getenv("OPENAI_BASE_URL", 'http://localhost:11434')#"https://api.openai.com/v1")
        api_key = os.getenv("OPENAI_API_KEY", "")  # Ollama doesn't need a real API key
        
        self.client = OpenAI(
            base_url=base_url,
            api_key=api_key
        )
        
        # Model configuration
        self.model = os.getenv("MODEL_NAME", "llama3.1")  # Default to OpenAI, can be changed to ollama model
        
        self.available_tools: List[ToolDefinition] = []
        self.tool_to_session: Dict[str, ClientSession] = {}

    def convert_mcp_tools_to_openai_format(self) -> List[Dict]:
        """Convert MCP tool definitions to OpenAI function calling format."""
        openai_tools = []
        for tool in self.available_tools:
            openai_tool = {
                "type": "function",
                "function": {
                    "name": tool["name"],
                    "description": tool["description"],
                    "parameters": tool["input_schema"]
                }
            }
            openai_tools.append(openai_tool)
        return openai_tools

    async def connect_to_server(self, server_name: str, server_config: dict) -> None:
        """Connect to a single MCP server."""
        try:
            # server_params = StdioServerParameters(**server_config)
            server_params = StdioServerParameters(
                command="python3",  # Executable
                args=["notebook_server.py"],  # Optional command line arguments
                env=None,  # Optional environment variables
            )
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read, write = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await session.initialize()
            self.sessions.append(session)
            
            # List available tools for this session
            response = await session.list_tools()
            tools = response.tools
            print(f"\nConnected to {server_name} with tools:", [t.name for t in tools])
            
            for tool in tools:
                self.tool_to_session[tool.name] = session
                self.available_tools.append({
                    "name": tool.name,
                    "description": tool.description,
                    "input_schema": tool.inputSchema
                })
        except Exception as e:
            print(f"Failed to connect to {server_name}: {e}")

    # async def connect_to_servers(self):
    #     """Connect to all configured MCP servers."""
    #     try:
    #         with open("server_config.json", "r") as file:
    #             data = json.load(file)
            
    #         servers = data.get("mcpServers", {})
            
    #         for server_name, server_config in servers.items():
    #             await self.connect_to_server(server_name, server_config)
    #     except Exception as e:
    #         print(f"Error loading server configuration: {e}")
    #         raise
    
    async def process_query(self, query):
        messages = [{'role': 'user', 'content': query}]
        
        # Convert MCP tools to OpenAI format
        openai_tools = self.convert_mcp_tools_to_openai_format()
        
        # Make initial API call
        kwargs = {
            'model': self.model,
            'messages': messages,
            'max_tokens': 2024
        }
        
        # Only add tools if we have any available
        if openai_tools:
            kwargs['tools'] = openai_tools
            kwargs['tool_choice'] = 'auto'
        
        response = self.client.chat.completions.create(**kwargs)
        
        process_query = True
        while process_query:
            message = response.choices[0].message
            
            # Handle text response
            if message.content:
                print(message.content)
                
            # Handle tool calls
            if message.tool_calls:
                # Add assistant message with tool calls to conversation
                messages.append({
                    'role': 'assistant',
                    'content': message.content,
                    'tool_calls': message.tool_calls
                })
                
                # Process each tool call
                for tool_call in message.tool_calls:
                    tool_name = tool_call.function.name
                    tool_args = json.loads(tool_call.function.arguments)
                    tool_call_id = tool_call.id
                    
                    print(f"Calling tool {tool_name} with args {tool_args}")
                    
                    # Call the MCP tool
                    session = self.tool_to_session[tool_name]
                    result = await session.call_tool(tool_name, arguments=tool_args)
                    
                    # Add tool result to messages
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_call_id,
                        "content": str(result.content)
                    })
                
                # Make follow-up API call with tool results
                kwargs = {
                    'model': self.model,
                    'messages': messages,
                    'max_tokens': 2024
                }
                
                if openai_tools:
                    kwargs['tools'] = openai_tools
                    kwargs['tool_choice'] = 'auto'
                
                response = self.client.chat.completions.create(**kwargs)
            else:
                # No tool calls, we're done
                process_query = False
    
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print(f"\nMCP Chatbot Started! Using model: {self.model}")
        print("Type your queries or 'quit' to exit.")
        
        while True:
            try:
                query = input("\nQuery: ").strip()
        
                if query.lower() == 'quit':
                    break
                    
                await self.process_query(query)
                print("\n")
                    
            except Exception as e:
                print(f"\nError: {str(e)}")
    
    async def cleanup(self):
        """Cleanly close all resources using AsyncExitStack."""
        await self.exit_stack.aclose()


async def main():
    chatbot = MCP_ChatBot()
    try:
        await chatbot.connect_to_server("Notebook Server",None)
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

Overwriting mcp_chatbot_v2.py
