# 🚀 Building LangGraph with MCP Tools: A Complete Tutorial

This notebook demonstrates how to build a **multi-agent RAG (Retrieval-Augmented Generation)** system that combines:

- **Local nodes** for log retrieval and processing  
- **MCP (Model Context Protocol) agent** for metrics analysis  

---

## 📋 What You'll Learn

1. How to create MCP servers with custom tools  
2. How to integrate MCP tools into LangGraph nodes  
3. How to build a hybrid pipeline with both local and MCP-powered nodes  
4. How ReAct agents work with MCP tools  

---

## 🎯 Use Case

Analyze **OAuth2 microservice logs** and correlate **errors with system metrics** to find root causes.


In [1]:
# Standard library imports
import asyncio
from contextlib import AsyncExitStack
from typing_extensions import TypedDict
from dotenv import load_dotenv
import os

# MCP (Model Context Protocol) imports
from mcp import StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.session import ClientSession
from langchain_mcp_adapters.tools import load_mcp_tools

# LangChain imports
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage

# LangGraph imports
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent


# Load environment variables
load_dotenv(override=True)

print("✅ All imports successful!")

✅ All imports successful!


## 🏗️ Part 2: Define Graph State

The `GraphState` is the data structure that flows through all nodes in your **LangGraph**.  
Think of it as a shared context that each node can read from and write to.


In [2]:
class GraphState(TypedDict):
    """
    The state that flows through our LangGraph pipeline.
    
    Each node can read from and update this state.
    """
    # Input
    path: str                    # Path to log file
    question: str                # User's question (can be enriched by nodes)
    metrics_csv_path: str        # Path to metrics CSV
    
    # Processing
    documents: list[str]         # Retrieved documents
    metrics_analysis: str
    iterations: int              # Number of query transformations
    
    # Output
    generation: str              # Final answer from LLM
    
    # Optional metadata
    target_service: str          # Which microservice to analyze

print("✅ GraphState defined!")

✅ GraphState defined!


## 🔧 Part 3: MCP Server Setup

**MCP (Model Context Protocol)** allows you to create custom tools that the LLM can use.  
We'll create a **metrics analysis server** with tools like:

- `get_go_memory_stats` — Check memory usage  
- `get_cpu_stats` — Check CPU usage  
- `diagnose_error_correlation` — Link errors to metrics  

---

### Example MCP Server (see `mcp_server_metrics.py` for the full implementation)

```python
from mcp.server.fastmcp import FastMCP
import pandas as pd

mcp = FastMCP("Metrics Analyzer")

@mcp.tool()
async def get_go_memory_stats(csv_path: str) -> dict:
    """Get Go memory statistics from metrics CSV"""
    df = pd.read_csv(csv_path)
    # ... analyze memory metrics
    return {"memory_usage": "85%", "status": "high"}

@mcp.tool()
async def diagnose_error_correlation(error_message: str, csv_path: str) -> str:
    """Correlate errors with metrics anomalies"""
    # ... analyze correlations
    return "High memory usage detected during error period"

if __name__ == "__main__":
    mcp.run(transport="stdio")

## 🤖 Part 4: Create MCP Manager

The **MCP Manager** handles the lifecycle of the MCP connection:

- Starting the MCP server process  
- Loading tools from the server  
- Cleaning up when done


In [3]:
class MCPManager:
    """
    Manages the lifecycle of an MCP (Model Context Protocol) server connection.
    
    The MCP server runs as a separate process and provides tools
    that the LLM can use through the Model Context Protocol.
    """
    
    def __init__(self):
        self.session = None
        self.exit_stack = None
        self.tools = None
    
    async def setup(self, mcp_config):
        """
        Initialize MCP connection and load tools.
        
        Args:
            mcp_config: Dict with 'command' and 'args' for starting MCP server
        
        Returns:
            List of LangChain tools loaded from the MCP server
        """
        # Configure how to start the MCP server process
        server_params = StdioServerParameters(
            command=mcp_config.get("command", "./.venv/bin/python"),
            args=mcp_config.get("args", ["mcp_server_metrics.py"]),
        )
        
        # Use AsyncExitStack to manage async context managers
        self.exit_stack = AsyncExitStack()
        
        # Start the MCP server and get read/write streams
        read, write = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        
        # Create a session to communicate with the MCP server
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(read, write)
        )
        
        # Initialize the session
        await self.session.initialize()
        
        # Load tools from the MCP server
        # These become LangChain tools that can be used by agents
        self.tools = await load_mcp_tools(self.session)
        
        print(f"✅ Loaded {len(self.tools)} tools from MCP server:")
        for tool in self.tools:
            print(f"   • {tool.name}")
        
        return self.tools
    
    async def cleanup(self):
        """Close MCP connection and cleanup resources."""
        if self.exit_stack:
            await self.exit_stack.aclose()
            print("✅ MCP connection closed")

print("✅ MCPManager class defined!")

✅ MCPManager class defined!


## 🧠 Part 5: Create MCP Agent Node

This is where the magic happens!  
We create a **ReAct agent** that:

1. Receives a question about logs/errors  
2. Uses MCP tools to analyze metrics  
3. Provides a detailed metrics-based analysis to enrich the response 


In [4]:
# System prompt for the MCP agent
SYSTEM_MESSAGE_TEMPLATE = """You are an expert log and metrics analyst for OAuth2 microservices.

You have access to:
1. **Log Analysis**: Document retrieval from vector database (logs are pre-loaded)
2. **Metrics Analysis**: Real-time metrics tools from MCP server

WORKFLOW:
1. First, understand WHAT errors occurred from the logs
2. Then, check metrics to understand WHY (root cause)
3. Finally, synthesize findings into actionable insights

METRICS CONTEXT:
- Current metrics CSV: {metrics_csv_path}
- You can check: memory, GC, goroutines, CPU, system resources
- Use diagnose_error_correlation to link errors to performance issues

EXAMPLE WORKFLOW:
User: "Why did update_password fail with 404?"
1. Check metrics: get_go_memory_stats, get_go_gc_stats
2. Correlate: diagnose_error_correlation("404 user not found")
3. Conclude: "Memory pressure → slow queries → timeouts → 404s"

BE PROACTIVE: When you see errors, always check relevant metrics without being asked!

Output a 2-3 sentence summary of your findings.
"""


class MCPNode:
    """
    A LangGraph node that uses a ReAct agent with MCP tools.
    
    This node analyzes metrics and enriches the question with context
    that will be used by downstream nodes.
    """
    
    def __init__(self, mcp_tools, model):
        """
        Initialize the MCP node with tools and model.
        
        Args:
            mcp_tools: List of MCP tools loaded from the server
            model: LangChain LLM (e.g., ChatAnthropic)
        """
        self.mcp_tools = mcp_tools
        self.model = model
        
        # Create a ReAct agent that can use MCP tools
        # ReAct = Reasoning + Acting pattern
        # The agent will: Think → Act (use tool) → Observe → Repeat
        self.mcp_agent = create_react_agent(
            model=self.model,
            tools=self.mcp_tools,
            checkpointer=MemorySaver()  # Remember conversation history
        )
        
    async def mcp_agent_node(self, state: GraphState) -> dict:
        """
        The actual node function that processes the state.
        
        Args:
            state: Current GraphState
            
        Returns:
            Dict with updated state fields
        """
        print("🔍 --- MCP AGENT NODE ---")
        
        # Extract information from state
        question = state["question"]
        metrics_csv_path = state["metrics_csv_path"]
        documents = state["documents"]
        
        # Format the system message with dynamic context
        system_message_content = SYSTEM_MESSAGE_TEMPLATE.format(
            metrics_csv_path=metrics_csv_path
        )
        
        # Build context from retrieved log documents
        log_context = ""
        if documents:
            # Extract content from Document objects properly
            log_entries = []
            for doc in documents:
                if hasattr(doc, 'page_content'):
                    content = doc.page_content
                else:
                    content = str(doc)
                log_entries.append(content)
            
            log_context = "\n\nRelevant logs:\n" + "\n---\n".join(log_entries)
        
        # Construct the full prompt for the agent
        agent_prompt = f"""{system_message_content}
        User question: {question}{log_context}
        Analyze the metrics and provide a brief summary of key findings."""
        
        # Run the ReAct agent
        # The agent will autonomously decide which tools to call
        result = await self.mcp_agent.ainvoke(
            {"messages": [HumanMessage(content=agent_prompt)]},
            config={"configurable": {"thread_id": "mcp_agent"}}
        )
        
        # Log which tools the agent used
        print("\n🔧 Tools used by agent:")
        for msg in result["messages"]:
            if hasattr(msg, "tool_calls") and msg.tool_calls:
                for tool_call in msg.tool_calls:
                    print(f"   → {tool_call['name']}")
        
        # Extract the agent's analysis
        metrics_analysis = result["messages"][-1].content
        
        # Return updated state
        # This allows downstream nodes to use the metrics analysis insights
        return {
            "metrics_analysis": metrics_analysis
        }

print("✅ MCPNode class defined!")

✅ MCPNode class defined!


## 🏗️ Part 6: Build the Complete LangGraph

Now we combine everything into a graph with multiple nodes. In the NVIDIA Nemotron Log Analysis tutorial, we already built the LangGraph nodes and edges, so in this tutorial, we’ll simply import them without re-explaining the details.


In [5]:
from langchain_nvidia_ai_endpoints import ChatNVIDIA
from graphnodes import Nodes
from graphedges import Edge


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from binary_score_models import GradeAnswer,GradeDocuments,GradeHallucinations


In [6]:
async def build_graph(mcp_manager: MCPManager, mcp_config: dict):
    """
    Build the complete LangGraph with MCP integration.
    
    Args:
        mcp_manager: MCPManager instance
        mcp_config: Configuration for MCP server
        
    Returns:
        Compiled LangGraph application
    """
    print("🏗️ Building LangGraph...")
    
    # Step 1: Setup MCP and load tools
    mcp_tools = await mcp_manager.setup(mcp_config)
    
    # Step 2: Create the LLM
    model = ChatNVIDIA(
        model="nvidia/llama-3.3-nemotron-super-49b-v1.5",
        api_key=os.getenv("API_KEY"),  # Same key you're using
        temperature=0,
        max_tokens=20000
    )
    
    # Step 3: Create the MCP agent node
    metrics_node = MCPNode(mcp_tools=mcp_tools, model=model)
    
    # Step 4: Create the graph
    graph = StateGraph(GraphState)
    
    # Step 5: Add all nodes
    graph.add_node("retrieve", Nodes.retrieve)
    graph.add_node("rerank", Nodes.rerank)
    graph.add_node("grade_documents", Nodes.grade_documents)
    graph.add_node("generate", Nodes.generate)
    graph.add_node("transform_query", Nodes.transform_query)
    graph.add_node("mcp_agent", metrics_node.mcp_agent_node)  # MCP-powered node!
    
    # Step 6: Define the flow
    graph.add_edge(START, "retrieve")
    graph.add_edge("retrieve", "rerank")
    graph.add_edge("rerank", "grade_documents")
    
    # Conditional: Generate or transform query?
    graph.add_conditional_edges(
        "grade_documents",
        Edge.decide_to_generate,
        {
            "transform_query": "transform_query",
            "generate": "mcp_agent",  # ← Go to MCP agent before generation!
        },
    )
    
    graph.add_edge("transform_query", "retrieve")
    graph.add_edge("mcp_agent", "generate")  # ← MCP enriches question for generation
    
    # Final quality check
    graph.add_conditional_edges(
        "generate",
        Edge.grade_generation_vs_documents_and_question,
        {
            "not supported": "transform_query",
            "useful": END,
            "not useful": "transform_query",
        },
    )
    
    # Step 7: Compile the graph
    print("✅ Graph built successfully!")
    return graph.compile()

print("✅ Graph building function defined!")

✅ Graph building function defined!


## 🚀 Part 7: Run the Complete Pipeline

Now let's put everything together and run the pipeline!


In [7]:
async def main():
    """
    Main execution function that runs the complete pipeline.
    """
    print("=" * 60)
    print("🚀 LANGGRAPH WITH MCP TUTORIAL - EXECUTION")
    print("=" * 60)
    
    # Step 1: Configure MCP Server
    mcp_config = {
        "command": "./.venv/bin/python",  # Python command
        "args": ["mcp_server_metrics.py"],  # Your MCP server script
    }
    
    # Step 2: Create MCP Manager
    mcp_manager = MCPManager()
    
    try:
        # Step 3: Build the graph
        graph = await build_graph(
            mcp_manager=mcp_manager,
            mcp_config=mcp_config
        )
        
        # Step 4: Prepare initial state
        initial_state = {
            "path": "data/logs/update_password_404_user_not_found/light-oauth2-oauth2-client-1.log",
            "question": "What are the critical errors in the log file? Summarize the metrics analysis, and explain how the metrics correlate with potential issues",
            "documents": [],
            "generation": "",
            "metrics_analysis": "",
            "iterations": 0,
            "target_service": "oauth2-client",
            "metrics_csv_path": "data/metrics/light-oauth2-data-1719771248.csv"
        }
        
        print("\n📝 Initial Question:")
        print(f"   {initial_state['question']}")
        
        # Step 5: Run the graph
        print("\n🏃 Running graph pipeline...\n")
        result = await graph.ainvoke(initial_state)
        
        # Step 6: Display results
        print("\n" + "=" * 60)
        print("✅ FINAL RESULT")
        print("=" * 60)
        print(f"\n📄 Generation:\n{result['generation']}\n")
        
        print("=" * 60)
        print("🎉 Pipeline completed successfully!")
        print("=" * 60)
        
    finally:
        # Step 7: Always cleanup
        await mcp_manager.cleanup()

# Run the pipeline
await main()

🚀 LANGGRAPH WITH MCP TUTORIAL - EXECUTION
🏗️ Building LangGraph...
✅ Loaded 8 tools from MCP server:
   • get_go_memory_stats
   • get_go_gc_stats
   • get_goroutine_stats
   • get_system_memory_stats
   • get_cpu_stats
   • detect_anomalies
   • diagnose_error_correlation
   • get_metrics_summary
✅ Graph built successfully!

📝 Initial Question:
   What are the critical errors in the log file? Summarize the metrics analysis, and explain how the metrics correlate with potential issues

🏃 Running graph pipeline...





---RETRIEVE---
NVIDIA--RERANKER
CHECKING DOCUMENT RELEVANCE TO QUESTION
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
ASSESS GRADED DOCUMENTS
---DECISION: GENERATE---
🔍 --- MCP AGENT NODE ---

🔧 Tools used by agent:
   → diagnose_error_correlation
GENERATE USING LLM
GRADE GENERATED vs QUESTION
DECISION: GENERATION ADDRESSES QUESTION

✅ FINAL RESULT

📄 Generation:
Based on my analysis of the provided log files, here is a comprehensive breakdown of the critical errors and their correlation with system metrics:

### Summary
The log files reveal a critical pattern of **500 Internal Server Errors** occurring during `PUT /oauth2/client` requests. These errors are caused by `NullPointerException: Null key is not allowed!` originating from Hazelcast map operations. The errors consistently occur during client registration/update operations and result in runtime exceptions that halt request processing.

### Key Iss