In [1]:
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
import re
import asyncio
from typing import List, Tuple, Dict, Any
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_anthropic import ChatAnthropic
from langchain.schema import AIMessage


class MCPGraphEvaluator:
    def __init__(self, evaluation_prompt: str, neo4j_config: Dict[str, str], namespace: str = "graph"):
        """
        Initialize the MCP Graph Evaluator
        
        Args:
            evaluation_prompt: The prompt template for evaluating answers
            neo4j_config: Dictionary containing Neo4j connection parameters
                         Should include: NEO4J_URI, NEO4J_USERNAME, NEO4J_PASSWORD, NEO4J_DATABASE
            namespace: The namespace for the MCP server (default: "graph")
        """
        self.client = None
        self.agent = None
        self.llm = None
        self.evaluation_prompt = evaluation_prompt
        self.neo4j_config = neo4j_config
        self.namespace = namespace
    
    async def initialize(self):
        """Initialize the MCP client and agent"""
        self.client = MultiServerMCPClient({
            "neo4j-graph": {
                "command": "uvx",
                "args": ["mcp-neo4j-cypher@0.2.4", "--namespace", self.namespace],
                "transport": "stdio",
                "env": self.neo4j_config
            }
        })
        
        # Get tools from the client
        tools = await self.client.get_tools()
        
        # Create the agent
        self.agent = create_react_agent(
            "anthropic:claude-3-7-sonnet-latest",
            tools
        )
        
        # Initialize the evaluation LLM
        self.llm = ChatAnthropic(model='claude-3-5-haiku-latest')
    
    async def extract_tool_calls_and_final_answer(self, input_question: str) -> Tuple[List[Dict[str, Any]], str]:
        """Extract tool calls and final answer from agent response"""
        tool_calls = []
        final_answer = ""
        
        # Get response from agent
        data = await self.agent.ainvoke({
            "messages": [{"role": "user", "content": input_question}]
        })
        
        # Process messages to extract tool calls and final answer
        for message in data["messages"]:
            if isinstance(message, AIMessage):
                # Extract tool calls from the structured attribute
                if hasattr(message, "tool_calls") and message.tool_calls:
                    tool_calls.extend(message.tool_calls)
                # Capture final plain text response
                elif isinstance(message.content, str) and message.content.strip():
                    final_answer = message.content
        
        return tool_calls, final_answer
    
    def extract_score_and_reasoning(self, text: str) -> Tuple[float, str]:
        """Extract score and reasoning from XML tags in text"""
        score_pattern = r'<score>(.*?)</score>'
        reasoning_pattern = r'<reasoning>(.*?)</reasoning>'
        
        score_match = re.search(score_pattern, text, re.IGNORECASE | re.DOTALL)
        reasoning_match = re.search(reasoning_pattern, text, re.IGNORECASE | re.DOTALL)
        
        score = None
        if score_match:
            try:
                score = float(score_match.group(1).strip())
            except ValueError:
                pass
        
        reasoning = reasoning_match.group(1).strip() if reasoning_match else None
        
        return (score, reasoning)
    
    async def evaluate_answer(self, record: Dict[str, str]) -> str:
        """Evaluate a generated answer against the reference answer"""
        messages = [
            ("human", self.evaluation_prompt.format(
                question=record["question"], 
                reference=record["answer"], 
                generated_answer=record["generated_answer"]
            )),
        ]
        response = await self.llm.ainvoke(messages)
        return response.content
    
    async def evaluate_record(self, record: Dict[str, str]) -> Dict[str, Any]:
        """Process a single record: generate answer and evaluate it"""
        # Generate answer using the agent
        tools, generated_answer = await self.extract_tool_calls_and_final_answer(record["question"])
        
        # Update record with generated data
        record['tools'] = tools
        record['generated_answer'] = generated_answer
        
        # Evaluate the generated answer
        evaluation_result = await self.evaluate_answer(record)
        score, reasoning = self.extract_score_and_reasoning(evaluation_result)
        
        # Add evaluation results to record
        record['evaluation_score'] = score
        record['evaluation_reasoning'] = reasoning
        record['evaluation_raw'] = evaluation_result
        
        return record
    
    async def evaluate_dataset(self, dataset: List[Dict[str, str]]) -> List[Dict[str, Any]]:
        """Evaluate an entire dataset"""
        results = []
        for record in dataset:
            try:
                result = await self.evaluate_record(record.copy())
                results.append(result)
                print(f"Processed: {record['question'][:50]}...")
            except Exception as e:
                print(f"Error processing record: {e}")
                record['error'] = str(e)
                results.append(record)
        
        return results


# Example usage
async def evaluate_dataset(prompt: str, dataset: List[Dict[str, str]], neo4j_config: Dict[str, str], namespace: str = "graph"):
    # Initialize the evaluator with the provided prompt and Neo4j config
    evaluator = MCPGraphEvaluator(prompt, neo4j_config, namespace)
    await evaluator.initialize()
    
    # Evaluate the dataset
    results = await evaluator.evaluate_dataset(dataset)
    
    # Print results
    for result in results:
        print(f"\nQuestion: {result['question']}")
        print(f"Reference Answer: {result['answer']}")
        print(f"Generated Answer: {result.get('generated_answer', 'N/A')}")
        print(f"Evaluation Score: {result.get('evaluation_score', 'N/A')}")
        print(f"Evaluation Reasoning: {result.get('evaluation_reasoning', 'N/A')}")
        print("-" * 80)
    
    return results

In [3]:
neo4j_config = {
    "NEO4J_URI": "neo4j+s://demo.neo4jlabs.com",
    "NEO4J_USERNAME": "recommendations", 
    "NEO4J_PASSWORD": "recommendations",
    "NEO4J_DATABASE": "recommendations"
}

# Example prompt
prompt = """You are an answer evaluation system. Compare the generated answer against the real answer and output only a single decimal score between 0 and 1.

Scoring criteria:
- 1.0: Generated answer is completely accurate and comprehensive
- 0.8-0.9: Mostly accurate with minor omissions or slight inaccuracies
- 0.6-0.7: Generally accurate but missing important details or contains some errors
- 0.4-0.5: Partially accurate with significant gaps or notable errors
- 0.2-0.3: Largely inaccurate with only some correct elements
- 0.0-0.1: Completely inaccurate or irrelevant

Consider both factual accuracy and completeness. Penalize hallucinations, contradictions, and missing key information.

Input format:
Question: {question}
Real answer: {reference}
Generated answer: {generated_answer}

Output format:
<reasoning>...</reasoning>
<score>0.4</score>
"""

# Example dataset
dataset = [
    {
        "question": "How many movies in the graph?", 
        "answer": "There are 9,125 movies in the graph database."
    }
]

In [4]:
results = await evaluate_dataset(prompt, dataset, neo4j_config)

Processed: How many movies in the graph?...

Question: How many movies in the graph?
Reference Answer: There are 9,125 movies in the graph database.
Generated Answer: There are 9,125 movies in the graph database.
Evaluation Score: 1.0
Evaluation Reasoning: In this case, the generated answer is exactly the same as the real answer, matching perfectly in both content and phrasing. The factual information is 100% accurate and complete.
--------------------------------------------------------------------------------


In [5]:
results

[{'question': 'How many movies in the graph?',
  'answer': 'There are 9,125 movies in the graph database.',
  'tools': [{'name': 'graph-read_neo4j_cypher',
    'args': {'query': 'MATCH (m:Movie) RETURN COUNT(m) AS movieCount'},
    'id': 'toolu_01LKXrKk9Ti6MsNJrHZf5vVY',
    'type': 'tool_call'}],
  'generated_answer': 'There are 9,125 movies in the graph database.',
  'evaluation_score': 1.0,
  'evaluation_reasoning': 'In this case, the generated answer is exactly the same as the real answer, matching perfectly in both content and phrasing. The factual information is 100% accurate and complete.',
  'evaluation_raw': '<reasoning>In this case, the generated answer is exactly the same as the real answer, matching perfectly in both content and phrasing. The factual information is 100% accurate and complete.</reasoning>\n<score>1.0</score>'}]