### Import Dependencies

In [None]:
from fastmcp import Client

from pydantic import BaseModel, Field

from qdrant_client import QdrantClient
from qdrant_client.models import Prefetch, Filter, FieldCondition, MatchText, FusionQuery

from langsmith import traceable, get_current_run_tree

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode

from langchain_core.messages import AIMessage, ToolMessage, convert_to_openai_messages

from jinja2 import Template
from typing import Literal, Dict, Any, Annotated, List, Optional
from IPython.display import Image, display
from operator import add
from openai import OpenAI

from utils.utils import format_ai_message

import openai

import random
import ast
import inspect
import instructor
import json

### List available tools in MCP servers running on http://localhost:8001/mcp and http://localhost:8002/mcp

In [None]:
client = Client("http://localhost:8001/mcp")

async with client:

    tools = await client.list_tools()

In [None]:
tools

In [None]:
print("======NAME=======")
print(tools[0].name)
print("======DESCRIPTION=======")
print(tools[0].description)
print("======INPUT SCHEMA=======")
print(tools[0].inputSchema)

In [None]:
client = Client("http://localhost:8002/mcp")

async with client:

    tools = await client.list_tools()

In [None]:
tools

In [None]:
print("======NAME=======")
print(tools[0].name)
print("======DESCRIPTION=======")
print(tools[0].description)
print("======INPUT SCHEMA=======")
print(tools[0].inputSchema)

### Execute a tool on one of the running MCP Servers

In [None]:
client = Client("http://localhost:8001/mcp")

async with client:
    # List available resources
    result = await client.call_tool("get_formatted_item_context", {"query": "earphones", "top_k": 5})

In [None]:
result

In [None]:
print(result.content[0].text)

### A function to extract tool definitions of all available tools in provided MCP Servers

In [None]:
def parse_docstring_params(docstring: str) -> Dict[str, str]:
    """Extract parameter descriptions from docstring (handles both Args: and Parameters: formats)."""
    params = {}
    lines = docstring.split('\n')
    in_params = False
    current_param = None
    
    for line in lines:
        stripped = line.strip()
        
        # Check for parameter section start
        if stripped in ['Args:', 'Arguments:', 'Parameters:', 'Params:']:
            in_params = True
            current_param = None
        elif stripped.startswith('Returns:') or stripped.startswith('Raises:'):
            in_params = False
        elif in_params:
            # Parse parameter line (handles "param: desc" and "- param: desc" formats)
            if ':' in stripped and (stripped[0].isalpha() or stripped.startswith(('-', '*'))):
                param_name = stripped.lstrip('- *').split(':')[0].strip()
                param_desc = ':'.join(stripped.lstrip('- *').split(':')[1:]).strip()
                params[param_name] = param_desc
                current_param = param_name
            elif current_param and stripped:
                # Continuation of previous parameter description
                params[current_param] += ' ' + stripped
    
    return params

In [None]:
async def get_tool_descriptions_from_mcp_servers(mcp_servers: list[str]) -> list[dict]:

    tool_descriptions = []

    for server in mcp_servers:

        client = Client(server)

        async with client:

            tools = await client.list_tools()

            for tool in tools:
                
                result = {
                    "name": "",
                    "description": "",
                    "parameters": {"type": "object", "properties": {}},
                    "required": [],
                    "returns": {"type": "string", "description": ""},
                    "server": server
                }

                result["name"] = tool.name
                result["required"] = tool.inputSchema.get("required", [])

                ## Get Description

                description = tool.description.split("\n\n")[0]
                result["description"] = description


                ## Get Returns

                returns = tool.description.split("Returns:")[1].strip()
                result["returns"]["description"] = returns

                ## Get parameters

                property_descriptions = parse_docstring_params(tool.description)
                properties = tool.inputSchema.get("properties", {})
                for key, value in properties.items():
                    properties[key]["description"] = property_descriptions.get(key, "")

                result["parameters"]["properties"] = properties

                tool_descriptions.append(result)

    return tool_descriptions

In [None]:
mcp_servers = ["http://localhost:8001/mcp", "http://localhost:8002/mcp"]

In [None]:
tool_descriptions = await get_tool_descriptions_from_mcp_servers(mcp_servers)

In [None]:
tool_descriptions

## Agent integration with tools exposed via MCP Servers

In [None]:
class ToolCall(BaseModel):
    name: str
    arguments: dict
    server: str

class RAGUsedContext(BaseModel):
    id: str = Field(description="ID of the item used to answer the question.")
    description: str = Field(description="Short description of the item used to answer the question.")

class AgentResponse(BaseModel):
    answer: str = Field(description="Answer to the question.")
    references: list[RAGUsedContext] = Field(description="List of items used to answer the question.")
    final_answer: bool = False
    tool_calls: List[ToolCall] = []

class State(BaseModel):
    messages: Annotated[List[Any], add] = []
    question_relevant: bool = False
    iteration: int = 0
    answer: str = ""
    available_tools: List[Dict[str, Any]] = []
    tool_calls: List[ToolCall] = []
    final_answer: bool = False
    references: Annotated[List[RAGUsedContext], add] = []

In [None]:
@traceable(
    name="agent_node",
    run_type="llm",
    metadata={"ls_provider": "openai", "ls_model_name": "gpt-4.1-mini"}
)
def agent_node(state: State) -> dict:

   prompt_template =  """You are a shopping assistant that can answer questions about the products in stock.

You will be given a conversation history and a list of tools you can use to answer the latest query.

<Available tools>
{{ available_tools | tojson }}
</Available tools>

When making tool calls, use this exact format:
{
    "name": "tool_name",
    "arguments": {
        "parameter1": "value1",
        "parameter2": "value2",
    },
    "server": "server_name"
}

CRITICAL: All parameters must go inside the "arguments" object, not at the top level of the tool call.

Examples:
- Get formatted item context:
{
    "name": "get_formatted_item_context",
    "arguments": {
        "query": "Kool kids toys.",
        "top_k": 5
    },
    "server": "http://localhost:8001/mcp"
}

- Get formatted user reviews:
    {
        "name": "get_formatted_reviews_context",
        "arguments": {
            "query": "Durable.",
            "item_list": ["123", "456"],
            "top_k": 5
        },
        "server": "http://localhost:8002/mcp"
    }

CRITICAL RULES:
- If tool_calls has values, final_answer MUST be false
(You cannot call tools and exit the graph in the same response)
- If final_answer is true, tool_calls MUST be []
(You must wait for tool results before exiting the graph)
- If you need tool results before answering, set:
tool_calls=[...], final_answer=false
- After receiving tool results, you can then set:
tool_calls=[], final_answer=true
- When suggesting tool calls, use names specificly provided in the available tools. Don't add any additional text to the names. You can only use get_formatted_item_context or get_formatted_reviews_context for tool names.

Instructions:
- You need to answer the question based on the outputs from the tools using the available tools only.
- Do not suggest the same tool call more than once.
- If the question can be decomposed into multiple sub-questions, suggest all of them.
- If multipple tool calls can be used at once to answer the question, suggest all of them.
- Do not explain your next steps in the answer, instead use tools to answer the question.
- Never use word context and refer to it as the available products.
- If using reviews data, make sure to match reviews to item IDs and summarise them instead of returning them as is, clearly split reviews from general item specification.
- You should only answer questions about the products in stock. If the question is not about the products in stock, you should ask for clarification.
- As an output you need to return the following:

* answer: The answer to the question based on your current knowledge and the tool results.
* references: The list of the indexes from the chunks returned from all tool calls that were used to answer the question. If more than one chunk was used to compile the answer from a single tool call, be sure to return all of them.
* Each reference should have an id and a short description of the item based on the retrieved context.
* final_answer: True if you have all the information needed to provide a complete answer, False otherwise.

- The answer to the question should contain detailed information about the product and should be returned with detailed specification in bullet points.
- The short description should have the name of the item.
- If the user's request requires using a tool, set tool_calls with the appropriate function names and arguments.
"""

   template = Template(prompt_template)
   
   prompt = template.render(
      available_tools=state.available_tools
   )

   messages = state.messages

   conversation = []

   for message in messages:
        conversation.append(convert_to_openai_messages(message))

   client = instructor.from_openai(OpenAI())

   response, raw_response = client.chat.completions.create_with_completion(
        model="gpt-4.1-mini",
        response_model=AgentResponse,
        messages=[{"role": "system", "content": prompt}, *conversation],
        temperature=0.5,
   )

   ai_message = format_ai_message(response)

   return {
      "messages": [ai_message],
      "tool_calls": response.tool_calls,
      "iteration": state.iteration + 1,
      "answer": response.answer,
      "final_answer": response.final_answer,
      "references": response.references
   }

In [None]:
def tool_router(state: State) -> str:
    """Decide whether to continue or end"""
    
    if state.final_answer:
        return "end"
    elif state.iteration > 2:
        return "end"
    elif len(state.tool_calls) > 0:
        return "tools"
    else:
        return "end"

In [None]:
class IntentRouterResponse(BaseModel):
    question_relevant: bool
    answer: str

In [None]:
@traceable(
    name="agent_node",
    run_type="llm",
    metadata={"ls_provider": "openai", "ls_model_name": "gpt-4.1-mini"}
)
def intent_router_node(state: State):

   prompt_template =  """You are part of a shopping assistant that can answer questions about products in stock.

Instructions:
- You will be given a conversation history with the user.
- If the latest user messgae is not relevant, return False in field "question_relevant" and set "answer" to explanation why it is not relevant.
- If the latest user message is relevant, return True in field "question_relevant" and set "answer" to "".
- You should only answer questions about the products in stock. If the question is not about the products in stock, you should ask for clarification.
"""

   template = Template(prompt_template)
   
   prompt = template.render()

   messages = state.messages

   conversation = []

   for message in messages:
        conversation.append(convert_to_openai_messages(message))

   client = instructor.from_openai(OpenAI())

   response, raw_response = client.chat.completions.create_with_completion(
        model="gpt-4.1-mini",
        response_model=IntentRouterResponse,
        messages=[{"role": "system", "content": prompt}, *conversation],
        temperature=0.5,
   )

   return {
      "question_relevant": response.question_relevant,
      "answer": response.answer
      }

In [None]:
def intent_router_conditional_edges(state: State):

    if state.question_relevant:
        return "agent_node"
    else:
        return "end"

### Custom implementation of MCP Tool Node

In [None]:
async def mcp_tool_node(state: State) -> str:

    tool_messages = []

    for i, tc in enumerate(state.tool_calls):

        client = Client(tc.server)

        async with client:

            result = await client.call_tool(tc.name, tc.arguments)

            tool_message = ToolMessage(
                content=result,
                tool_call_id=f"call_{i}"
            )

            tool_messages.append(tool_message)

    return {
        "messages": tool_messages
    }

## Graph

In [None]:
workflow = StateGraph(State)

mcp_servers = ["http://localhost:8001/mcp", "http://localhost:8002/mcp"]
tool_descriptions = await get_tool_descriptions_from_mcp_servers(mcp_servers)

workflow.add_node("agent_node", agent_node)
workflow.add_node("mcp_tool_node", mcp_tool_node)
workflow.add_node("intent_router_node", intent_router_node)

workflow.add_edge(START, "intent_router_node")

workflow.add_conditional_edges(
    "intent_router_node",
    intent_router_conditional_edges,
    {
        "agent_node": "agent_node",
        "end": END
    }
)

workflow.add_conditional_edges(
    "agent_node",
    tool_router,
    {
        "tools": "mcp_tool_node",
        "end": END
    }
)

workflow.add_edge("mcp_tool_node", "agent_node")

graph = workflow.compile()

In [None]:
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

In [None]:
state = {
    "messages": [{"role": "user", "content": "Can I get earphones for myself, a laptop bag for my wife and something cool for my kids?"}],
    "available_tools": tool_descriptions
}
config = {"configurable": {"thread_id": "test000050"}}

async with AsyncPostgresSaver.from_conn_string("postgresql://langgraph_user:langgraph_password@localhost:5433/langgraph_db") as checkpointer:

    graph = workflow.compile(checkpointer=checkpointer)

    async for chunk in graph.astream(
        state,
        config=config,
        stream_mode="updates"
    ):
        print(chunk)