# 想帶起 A2A agent with mcp client + LLM
env: M2504

In [None]:
# MCP client with LLM

# main_client.py
import asyncio
from fastmcp.client import Client # Removed ClientSession import
import google.generativeai as genai
import json
# from google.generativeai.types import Part

# --- 配置 Gemini ---
GEMINI_API_KEY = "AIzaSyDd5iMMMaFO76c17-XFIlaBIvD1lk2gjY8" # User's API Key
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-pro')

# --- MCP Client 實例 ---
class GeminiEnhancedMCPClient:
    def __init__(self, mcp_command, mcp_args, mcp_transport_type="stdio"):
        self.mcp_command = mcp_command
        self.mcp_args = mcp_args
        self.mcp_transport_type = mcp_transport_type
        
        # Configuration for a single server, matching StdioMCPServer/RemoteMCPServer fields
        single_server_details = {
            "command": self.mcp_command,
            "args": self.mcp_args,
            "transportType": self.mcp_transport_type
        }
        
        # mcpServers should be a dictionary mapping server names to their configs
        self.server_name = "postgres-mcp-server" # Store server name for easy access
        mcp_client_config = {
            "mcpServers": {
                self.server_name: single_server_details # Server name as key
            }
        }
        
        self.client_config = mcp_client_config # Store config for re-use if needed
        self.mcp_client_instance = Client(self.client_config) # Create client instance once
        try:
            self.mcp_client_instance.set_logging_level("DEBUG") # Attempt to set logging level
            print("fastmcp client logging level set to DEBUG.")
        except Exception as e:
            print(f"Could not set fastmcp client logging level: {e}")
            
        print(f"Initializing MCP client with transport config: {json.dumps(self.client_config)}")
        self.mcp_tools_cache = None
        # self.current_session will be managed by async with block in methods

    async def connect_and_fetch_tools(self):
        async with self.mcp_client_instance as session: # session is the active Client instance
            print(f"MCP client session active. Discovering tools for server '{self.server_name}' using 'tools/list'...")
            
            try:
                print("Wuulnog1")
                raw_tools_response = await session.call_tool("tools/list", {})
                print(f"Raw response from MCP server '{self.server_name}' for 'tools/list': {raw_tools_response}")

                actual_tools_list = None
                potential_result_part = None

                if isinstance(raw_tools_response, dict):
                    if 'result' in raw_tools_response: # Full JSON-RPC response
                        potential_result_part = raw_tools_response['result']
                    else: # Assuming raw_tools_response is already the result part itself
                        potential_result_part = raw_tools_response
                    
                    if isinstance(potential_result_part, dict): # e.g. {"tools": [...]}
                        actual_tools_list = potential_result_part.get("tools")
                    elif isinstance(potential_result_part, list): # e.g. [...] list of tools
                        actual_tools_list = potential_result_part
                        # Validate if items are indeed tool dictionaries
                        if not all(isinstance(item, dict) for item in actual_tools_list):
                            print(f"Warning: Result part is a list, but items are not all dictionaries (tools). Content: {actual_tools_list}")
                            actual_tools_list = None # Reset if format is unexpected
                
                elif isinstance(raw_tools_response, list): # If call_tool directly returns the list of tools
                    actual_tools_list = raw_tools_response
                    # Validate if items are indeed tool dictionaries
                    if not all(isinstance(item, dict) for item in actual_tools_list):
                        print(f"Warning: Raw response is a list, but items are not all dictionaries (tools). Content: {actual_tools_list}")
                        actual_tools_list = None # Reset if format is unexpected
                
                if actual_tools_list is None:
                    print(f"Warning: Could not extract tool list. Structure not as expected or 'tools' key missing. Response: {raw_tools_response}")
                    actual_tools_list = [] # Proceed with an empty list to prevent downstream errors

                self.mcp_tools_cache = self._format_tools_for_gemini(actual_tools_list)
                return self.mcp_tools_cache
            except Exception as e:
                print(f"Error during 'tools/list' call for server '{self.server_name}': {e}")
                # Re-raise the exception to be caught by the main try-except block
                raise


    def _format_tools_for_gemini(self, raw_tools_list): # Renamed raw_tools to raw_tools_list for clarity
        gemini_tools = []
        if isinstance(raw_tools_list, list):
            for tool_info in raw_tools_list:
                if isinstance(tool_info, dict):
                    name = tool_info.get("name")
                    description = tool_info.get("description")
                    # Use "inputSchema" from server response for Gemini's "parameters"
                    input_schema = tool_info.get("inputSchema") 
                    if name and description and input_schema is not None:
                        gemini_tools.append({
                            "name": name,
                            "description": description,
                            "parameters": input_schema # Assign input_schema here
                        })
                    else:
                        print(f"Warning: Skipping tool due to missing fields (name, description, or inputSchema): {tool_info}")
                else:
                    print(f"Warning: Expected a dictionary for tool_info, but got {type(tool_info)}. Skipping this tool.")
        else:
            print(f"Warning: Expected a list of tools from the server, but got {type(raw_tools_list)}. No tools will be formatted.")
        return [{"function_declarations": gemini_tools}]

    async def process_user_request(self, user_query): # Changed to async def
        if not self.mcp_tools_cache or not self.mcp_tools_cache[0].get("function_declarations"):
            print("Tools not fetched or no tools available. Attempting to fetch now...")
            try:
                await self.connect_and_fetch_tools() # Added await
                if not self.mcp_tools_cache or not self.mcp_tools_cache[0].get("function_declarations"):
                    return "Sorry, I couldn't retrieve any tools from the server after attempting to connect."
            except Exception as e:
                print(f"Error fetching tools during request processing: {e}")
                # Re-raise to allow main to catch it or return a user-friendly message
                # For now, returning a message. Consider re-raising if main should handle all.
                return f"Sorry, I encountered an error trying to fetch tools: {e}"
        
        print(f"\\nProcessing user query: '{user_query}' with Gemini and MCP tools...")
        
        try:
            # gemini_model.generate_content is synchronous.
            response = gemini_model.generate_content(
                f"User query: {user_query}",
                tools=self.mcp_tools_cache
            )

            if response.candidates and response.candidates[0].content.parts and response.candidates[0].content.parts[0].function_call:
                function_call = response.candidates[0].content.parts[0].function_call
                tool_name = function_call.name
                tool_args = {key: value for key, value in function_call.args.items()}
                
                print(f"Gemini suggests calling tool: {tool_name} with args: {tool_args}")
                print(f"MCP client session active. Invoking tool '{tool_name}' on server '{self.server_name}'...")
                
                async with self.mcp_client_instance as session_for_invoke: # session_for_invoke is the active Client instance
                    # Directly call the tool using session_for_invoke.call_tool
                    # Removed server_name argument
                    tool_result = await session_for_invoke.call_tool(tool_name, tool_args)
                
                print(f"Received result from MCP server for tool '{tool_name}': {tool_result}")
                return tool_result
            else:
                print("Gemini responded directly without tool usage.")
                # Ensure response.text is accessed safely
                try:
                    return response.text
                except ValueError: # Handle cases where response.text might not be available (e.g. blocked response)
                    print("Warning: Gemini response did not contain a direct text part (e.g. safety block).")
                    if response.prompt_feedback:
                        print(f"Prompt feedback: {response.prompt_feedback}")
                    return "Gemini did not provide a text response."


        except Exception as e:
            print(f"Error during Gemini processing or MCP interaction: {e}")
            # Re-raise the exception to be caught by the main try-except block
            raise

# --- 使用範例 ---
async def main(): # Wrapped in async main
    mcp_server_config = {
        "command": "npx",
        "args": [
            "-y",
            "@modelcontextprotocol/server-postgres",
            "postgresql://root:LN3F5E8iGs67HDRlZWOehT0yJ2a4m19k@211.73.81.235:30198/zeabur"
        ],
        "transportType": "stdio"
    }

    client = GeminiEnhancedMCPClient(
        mcp_command=mcp_server_config["command"],
        mcp_args=mcp_server_config["args"],
        mcp_transport_type=mcp_server_config["transportType"]
    )
    
    available_tools_formatted = []
    actual_tools_list = []
    try:
        
        available_tools_formatted = await client.connect_and_fetch_tools()
        print(f"\\nAvailable tools (formatted for Gemini by the client): {json.dumps(available_tools_formatted, indent=2, ensure_ascii=False)}")
        
        if available_tools_formatted and available_tools_formatted[0].get("function_declarations"):
            actual_tools_list = available_tools_formatted[0]["function_declarations"]
            if actual_tools_list:
                print("\\n--- List of Discovered Tools ---")
                for i, tool in enumerate(actual_tools_list):
                    print(f"{i+1}. Name: {tool.get('name')}")
                    print(f"   Description: {tool.get('description')}")
                    print(f"   Parameters: {json.dumps(tool.get('parameters'), indent=2, ensure_ascii=False)}")
                print("-------------------------------")
            else:
                print("\\nNo tools found in function_declarations after formatting.")
        else:
            print("\\nNo tools were discovered or formatted correctly (function_declarations missing or empty).")

    except (ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError, ConnectionError) as conn_err:
        print(f"A connection error occurred during initial tool discovery: {conn_err}")
        print("This often indicates the MCP server process could not be reached or terminated unexpectedly.")
        import traceback
        traceback.print_exc()
    except RuntimeError as rt_err: # Catch specific RuntimeErrors if needed, or broaden
        print(f"A runtime error occurred during initial tool discovery: {rt_err}")
        if "Failed to initialize server session" in str(rt_err) or \
           "Server process exited" in str(rt_err) or \
           "Server process failed to start" in str(rt_err) or \
           "Connection to server process lost" in str(rt_err): # Added more specific checks
            print("This error often indicates a problem with the MCP server process itself or its ability to start/maintain connection.")
            print(f"The client attempted to start/connect to the following command:")
            print(f"  Command: {client.mcp_command}")
            print(f"  Arguments: {' '.join(client.mcp_args)}")
            print("\\nPossible causes include:")
            print("1. Invalid PostgreSQL Connection String: The string used to connect to your PostgreSQL server might be incorrect (e.g., wrong credentials, host/port, database name), or the PostgreSQL server might be down or inaccessible.")
            print("2. `npx` or MCP Server Package Issues: Problems with `npx` or the `@modelcontextprotocol/server-postgres` package (e.g., not installed correctly, internal script errors, missing dependencies for the script).")
            print("3. Network Configuration: Firewalls or other network issues preventing the MCP server script from reaching the PostgreSQL database or the client from reaching the server.")
            print("4. Environment Issues: The environment where the notebook kernel is running might be missing necessary PATH configurations for `npx` or Node.js, or there might be permission issues.")
            print("\\nDebugging Steps:")
            print("A. Verify the PostgreSQL connection string and ensure the database is accessible with the provided credentials (e.g., using a separate SQL client).")
            print("B. Try running the MCP server command directly in your system's terminal (outside of the notebook environment) to see if it outputs any specific errors. This helps isolate if the issue is with the command itself or the notebook's execution environment:")
            print(f"   {client.mcp_command} {' '.join(client.mcp_args)}")
            print("C. Check if `npx` and `node` are correctly installed and accessible from the environment where your Jupyter notebook kernel is running. You can try running `!npx --version` or `!node --version` in a new notebook cell.")
            print("D. Examine the full traceback printed below (if any) for more detailed clues from the `fastmcp` library or the subprocess management.")
        import traceback
        traceback.print_exc()
    except Exception as e:
        print(f"An unexpected error occurred during initial tool discovery: {e}")
        import traceback
        traceback.print_exc()


    if actual_tools_list:
        print("\\nNote: Please inspect the 'List of Discovered Tools' above and craft queries that match available tools and their parameters.")
        # Example: If a tool 'execute_sql' is discovered, you might try:
        user_query_sql = "Execute SQL: SELECT version()" # Example query for the 'query' tool
        print(f"\\nAttempting to process example query: '{user_query_sql}'")
        try:
            response_sql = await client.process_user_request(user_query_sql)
            print(f"\\nFinal Response for '{user_query_sql}': {response_sql}")
        except Exception as e_query:
            print(f"\\nError processing query '{user_query_sql}': {e_query}")
            import traceback
            traceback.print_exc()
    else:
        print("\\nSkipping example tool-based queries as no tools were available or an error occurred during discovery.")

    user_query_direct = "Tell me a joke."
    try:
        response_direct = await client.process_user_request(user_query_direct)
        print(f"\\nFinal Response for '{user_query_direct}': {response_direct}")
    except Exception as e_direct_query:
        print(f"\\nError processing direct query '{user_query_direct}': {e_direct_query}")
        import traceback
        traceback.print_exc()

if 1: #__name__ == "__main__":
    try:
        print("Attempting to run main() using asyncio.run()...")
        asyncio.run(main())
        print("main() completed via asyncio.run().")
    except RuntimeError as e:
        if "cannot be called from a running event loop" in str(e).lower():
            print("asyncio.run() failed: An event loop is already running.")
            print("Attempting to run main() on the existing event loop using loop.run_until_complete()...")
            loop = asyncio.get_event_loop()
            try:
                loop.run_until_complete(main())
                print("main() completed via loop.run_until_complete() on existing loop.")
            except RuntimeError as e2:
                print(f"loop.run_until_complete() also failed: {e2}")
                print("As a final fallback, creating a task for main() on the running loop.")
                print("Note: Output from main() might appear asynchronously or be incomplete in this mode.")
                # Ensure the task is actually awaited or managed if it's critical to see its completion/errors
                # For a script-like execution in a notebook, this might be tricky.
                # If the loop is already running (e.g. Jupyter), creating a task is often the way.
                if loop.is_running():
                    print("Creating task for main() as loop is running.")
                    task = loop.create_task(main())
                    # To actually wait for it in a notebook cell if it's the last operation,
                    # you might need a way to keep the cell running or await the task.
                    # However, for now, just creating it as per the original logic.
                else:
                    print("Loop is not running, cannot create task for main(). This state is unexpected here.")

        elif "no current event loop" in str(e).lower():
            print("asyncio.run() failed: No current event loop found. This is unexpected.")
            print("Manually creating a new event loop and running main()...")
            new_loop = asyncio.new_event_loop()
            asyncio.set_event_loop(new_loop)
            try:
                new_loop.run_until_complete(main())
                print("main() completed on a manually created new event loop.")
            finally:
                new_loop.close()
                asyncio.set_event_loop(None) 
        else:
            print(f"An unexpected RuntimeError occurred in asyncio.run()/loop management: {e}")
            import traceback
            traceback.print_exc()
            # raise # Optionally re-raise
    except Exception as ex:
        print(f"An error occurred at the outermost execution level of main(): {ex}")
        import traceback
        traceback.print_exc()

Attempting to run main() using asyncio.run()...
asyncio.run() failed: An event loop is already running.
Attempting to run main() on the existing event loop using loop.run_until_complete()...
loop.run_until_complete() also failed: This event loop is already running
As a final fallback, creating a task for main() on the running loop.
Note: Output from main() might appear asynchronously or be incomplete in this mode.
Creating task for main() as loop is running.




fastmcp client logging level set to DEBUG.
Initializing MCP client with transport config: {"mcpServers": {"postgres-mcp-server": {"command": "npx", "args": ["-y", "@modelcontextprotocol/server-postgres", "postgresql://root:LN3F5E8iGs67HDRlZWOehT0yJ2a4m19k@211.73.81.235:30198/zeabur"], "transportType": "stdio"}}}


  self.mcp_client_instance.set_logging_level("DEBUG") # Attempt to set logging level


A runtime error occurred during initial tool discovery: Failed to initialize server session
This error often indicates a problem with the MCP server process itself or its ability to start/maintain connection.
The client attempted to start/connect to the following command:
  Command: npx
  Arguments: -y @modelcontextprotocol/server-postgres postgresql://root:LN3F5E8iGs67HDRlZWOehT0yJ2a4m19k@211.73.81.235:30198/zeabur
\nPossible causes include:
1. Invalid PostgreSQL Connection String: The string used to connect to your PostgreSQL server might be incorrect (e.g., wrong credentials, host/port, database name), or the PostgreSQL server might be down or inaccessible.
2. `npx` or MCP Server Package Issues: Problems with `npx` or the `@modelcontextprotocol/server-postgres` package (e.g., not installed correctly, internal script errors, missing dependencies for the script).
3. Network Configuration: Firewalls or other network issues preventing the MCP server script from reaching the PostgreSQL d

Traceback (most recent call last):
  File "/var/folders/4y/6z_8fjy91fxbyxf66g1mnlxr0000gn/T/ipykernel_32717/3162466684.py", line 193, in main
    available_tools_formatted = await client.connect_and_fetch_tools()
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/4y/6z_8fjy91fxbyxf66g1mnlxr0000gn/T/ipykernel_32717/3162466684.py", line 50, in connect_and_fetch_tools
    async with self.mcp_client_instance as session: # session is the active Client instance
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wuulong/opt/anaconda3/envs/m2504/lib/python3.12/site-packages/fastmcp/client/client.py", line 187, in __aenter__
    await stack.enter_async_context(self._context_manager())
  File "/Users/wuulong/opt/anaconda3/envs/m2504/lib/python3.12/contextlib.py", line 659, in enter_async_context
    result = await _enter(cm)
             ^^^^^^^^^^^^^^^^
  File "/Users/wuulong/opt/anaconda3/envs/m2504/lib/python3.12/contextlib.py", line 210, in __aent

MCP client session active. Discovering tools for server 'postgres-mcp-server' using 'tools/list'...
Error during 'tools/list' call for server 'postgres-mcp-server': Unknown tool: tools/list
Error fetching tools during request processing: Unknown tool: tools/list
\nFinal Response for 'Tell me a joke.': Sorry, I encountered an error trying to fetch tools: Unknown tool: tools/list
