In [2]:
from pprint import pprint
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor
from langgraph.checkpoint.memory import InMemorySaver
from dotenv import load_dotenv
import os
import asyncio
import nest_asyncio


load_dotenv("Learning-Research-Development/Agentic-AI/.env")
API=os.getenv('OPENAI_API_KEY')


model = ChatOllama(model="llama3.2")
# model = ChatOpenAI(api_key=API)
checkpointer = InMemorySaver()


async def db_main(db_app: str, db_name: str, db_host: str, db_password: str, db_user: str, user_query: str):
    """
    Main function to interact with a database server using a MultiServerMCPClient.

    Args:
        db_app (str): The database application or type (e.g., PostgreSQL, MySQL).
        db_name (str): The name of the database to connect to.
        db_host (str): The host address of the database server.
        db_password (str): The password for authenticating with the database.
        db_user (str): The username for authenticating with the database.
        user_query (str): The user query or input to process using the database server.

    Returns:
        str: The response from the database server after processing the query.
    """
    from langchain_mcp_adapters.client import MultiServerMCPClient
    client =  MultiServerMCPClient(
            {
                "postgresql": {
                    "command": "npx",
                    "args": [
                        "-y",
                        "@executeautomation/database-server",
                        "--postgresql", 
                        "--host",
                        db_host,
                        "--database",
                        db_name,
                        "--user",
                        db_user,
                        "--password",
                        db_password
                    ],
                    "transport":"stdio"
                }
            }
        ) 
    tools = await client.get_tools()
    agent = create_react_agent(
        model,
        tools,
        prompt="You are an AI assistant that strictly responds only based on the available tools. If the input requires information or actions beyond the capabilities of the tools, ask the user for the necessary details until you have complete input to proceed.",
        checkpointer=checkpointer
    )

    response = await agent.ainvoke(
        {"messages": [{"role": "user", "content": user_query}]},
        config={"configurable": {"thread_id": "1"}}
    )
    return response['messages'][-1].content


def check_db_connectivity(db_app: str, db_name: str, db_host: str, db_password: str, db_user: str) -> str:
    """
    Check the connectivity to a specified database.

    Args:
        db_app (str): The type of database application (e.g., PostgreSQL, MySQL).
        db_name (str): The name of the database to connect to.
        db_host (str): The host address of the database server.
        db_password (str): The password for authenticating with the database.
        db_user (str): The username for authenticating with the database.

    Returns:
        str: A message indicating whether the connection to the database was successful or failed.
    """
    try:
        user_query = "Check the database connectivity"
        nest_asyncio.apply()
        resp = asyncio.run(db_main(db_app, db_name, db_host, db_password, db_user, user_query))
        # print("Database connectivity response:", resp)
        if resp and "success" in resp.lower():  # Assuming the response contains "success" for a successful connection
            return f"Connection established successfully to the database: {db_name}."
        else:
            return "Unable to establish database connectivity. Please verify your credentials and try again."
    except Exception as e:
        return f"An error occurred while checking database connectivity: {str(e)}"


def perform_db_operation(db_app: str, db_name: str, db_host: str, db_password: str, db_user: str, user_query: str) -> str:
    """
    Perform a database operation based on the user's query.

    Args:
        db_app (str): The type of database application (e.g., PostgreSQL, MySQL).
        db_name (str): The name of the database to connect to.
        db_host (str): The host address of the database server.
        db_password (str): The password for authenticating with the database.
        db_user (str): The username for authenticating with the database.
        user_query (str): The query or operation to perform on the database.

    Returns:
        str: The result of the database operation or an error message if the operation fails.
    """
    try:
        nest_asyncio.apply()
        resp = asyncio.run(db_main(db_app, db_name, db_host, db_password, db_user, user_query))
        # print("Database operation response:", resp)
        return resp if resp else "The database operation did not return any result. Please check your query or database configuration."
    except Exception as e:
        return f"An error occurred while performing the database operation: {str(e)}"

def extraction(file_path:str):
    """
    Extract file data.

    Args:
        file_path (str): Local file path to perform extraction.

    Returns:
        str: Extracted information.
    """
    return f"extraction from location '{file_path}' successful!"

def classification(file_path:str):
    """
    classify file data.

    Args:
        file_path (str): Local file path to perform classification.

    Returns:
        str: Extracted information.
    """
    return f"classification from location '{file_path}' successful!"

async def MCP_servers(query:str):
    """
    This function interacts with multiple servers to perform tasks such as mathematical computations
    and weather information retrieval. It uses a MultiServerMCPClient to manage communication with
    the servers.

    Args:
        query (str): The user query or input to process using the MCP servers.

    Returns:
        str: The response from the MCP servers after processing the query.
    """
    from langchain_mcp_adapters.client import MultiServerMCPClient
    async with MultiServerMCPClient(
            {
                "math": {
                    "command": "python",
                    # Replace with absolute path to your math_server.py file
                    "args": ["/Users/ajeet/Data/Development/Learning-Research-Development/Agentic-AI/LangGraph/MCP_Integration/maths_server.py"],
                    "transport": "stdio",
                },
                "weather": {
                    # Ensure your start your weather server on port 8000
                    "url": "http://localhost:8000/sse",
                    "transport": "sse",
                }
            }
        ) as client:
            agent = create_react_agent(
                model,
                client.get_tools(),
                prompt="You are an AI assistant that strictly responds only based on the available tools. If the input requires information or actions beyond the capabilities of the tools, ask the user for the necessary details until you have complete input to proceed.",
                checkpointer=checkpointer
            )
            response = await agent.ainvoke(
                    {"messages": [{"role": "user", "content": query}]},
                    config={"configurable": {"thread_id": "1"}}

                )
            return response['messages'][-1].content


database_agent = create_react_agent(
    model=model,
    tools=[check_db_connectivity, perform_db_operation],
    prompt="""You are a database assistant. Your job is to assist users with database connectivity and operations. 
        - Always ask for required details (e.g., credentials, database type, etc.) before performing any operation.
        - If credentials are incorrect, inform the user and ask for updated credentials.
        - Once connected, ask for the operation details (e.g., data to store, table name, etc.).
        - Respond only based on tool outputs and do not make assumptions.
        - Ensure the user provides all necessary information before proceeding.""",
    name="database_agent",
)

service_agent = create_react_agent(
    model=model,
    tools=[extraction, classification],
    prompt="""You are a service assistant. Your job is to assist users with file-related operations such as extraction and classification. 
        - Always ask for required details (e.g., file path, type of operation, etc.) before performing any task.
        - Ensure the user provides all necessary information before proceeding with the operation.
        - Respond only based on tool outputs and do not make assumptions.
        - Provide clear and concise feedback about the success or failure of the operation.""",
    name="service_agent",
)

MCP_agent = create_react_agent(
    model=model,
    tools=[MCP_servers],
    prompt="""You are an MCP assistant. Your job is to assist users with multi-server operations such as mathematical computations and weather information retrieval. 
        - Always ask for required details (e.g., type of operation, specific inputs, etc.) before performing any task.
        - Ensure the user provides all necessary information before proceeding with the operation.
        - Respond only based on tool outputs and do not make assumptions.
        - Provide clear and concise feedback about the success or failure of the operation.
        - If the input requires information or actions beyond the capabilities of the tools, ask the user for the necessary details until you have complete input to proceed.""",
    name="MCP_agent",
)

supervisor_system_prompt = """
You are a Supervisor Agent responsible for orchestrating and coordinating tasks between the user and various specialized agents/tools. Follow these steps meticulously:

1. Understand and Clarify User Query:
Begin by carefully interpreting the user's instruction or query. If any critical information or context is missing, ask follow-up questions to ensure you have complete clarity before proceeding.

2. Check Tool Availability and Status:
Coordinate with the available agents to check the list of tools and their current statuses. Ensure that the required tools for the task are online, functional, and accessible.

3. Gather Required Parameters for Tools:
Once the target tool is identified, consult the respective agent to determine the required input parameters for the operation. Then, collect these parameters either from the user or from available data.

4. Trigger the Operation Using the Respective Agent:
After gathering all necessary inputs and confirming tool readiness, initiate the operation by directing the appropriate agent to execute the task. Monitor the result and confirm its success or handle errors if they occur.

5. Loop for Next Instruction:
Once the task is completed and results are delivered to the user, ask the user if they would like to perform another operation or have any further instructions.

Important Rules:
- Always ensure complete information is collected before initiating any task.
- Maintain a clear and logical flow of communication between the user, supervisor agent (yourself), and specialized agents.
- Handle missing tool statuses or unavailable tools gracefully by notifying the user and suggesting alternatives if applicable.
- Your goal is to provide a seamless, efficient, and accurate task orchestration experience.
"""

supervisor = create_supervisor(
    agents=[database_agent, service_agent],
    model=model,
    prompt=(supervisor_system_prompt),
).compile(checkpointer=checkpointer)

In [6]:
import nest_asyncio
import asyncio

nest_asyncio.apply()

def call_db():
    result = asyncio.run(db_main(
        db_app="postgresql",
        db_name="postgres",
        db_host="localhost",
        db_password="root",
        db_user="postgres",
        user_query="Give me list of tables"
    ))
    return result
call_db()

'Here is the list of tables:\n\n1. employees'

# database agent

In [7]:
from pprint import pprint
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor
from langgraph.checkpoint.memory import InMemorySaver
from dotenv import load_dotenv
import os
import asyncio
import nest_asyncio


load_dotenv("Learning-Research-Development/Agentic-AI/.env")
API=os.getenv('OPENAI_API_KEY')


model = ChatOllama(model="llama3.2")
# model = ChatOpenAI(api_key=API)
checkpointer = InMemorySaver()


async def postgres_db(db_name: str, db_host: str, db_password: str, db_user: str, user_query: str):
    """
    postgresql database function to interact with a postgresql database server.

    Args:
        db_name (str): The name of the database to connect to.
        db_host (str): The host address of the database server.
        db_password (str): The password for authenticating with the database.
        db_user (str): The username for authenticating with the database.
        user_query (str): The user query or input to process using the database server.

    Returns:
        str: The response from the database server after processing the query.
    """
    from langchain_mcp_adapters.client import MultiServerMCPClient
    client =  MultiServerMCPClient(
            {
                "postgresql": {
                    "command": "npx",
                    "args": [
                        "-y",
                        "@executeautomation/database-server",
                        "--postgresql", 
                        "--host",
                        db_host,
                        "--database",
                        db_name,
                        "--user",
                        db_user,
                        "--password",
                        db_password
                    ],
                    "transport":"stdio"
                }
            }
        ) 
    tools = await client.get_tools()
    agent = create_react_agent(
        model,
        tools,
        prompt="You are an AI assistant that strictly responds only based on the available tools. If the input requires information or actions beyond the capabilities of the tools, ask the user for the necessary details until you have complete input to proceed.",
        checkpointer=checkpointer
    )

    response = await agent.ainvoke(
        {"messages": [{"role": "user", "content": user_query}]},
        config={"configurable": {"thread_id": "1"}}
    )
    return response['messages'][-1].content

async def mysql_db(db_name: str, db_host: str, db_password: str, db_user: str, user_query: str):
    """
    mysql database function to interact with a mysql database server.

    Args:
        db_name (str): The name of the database to connect to.
        db_host (str): The host address of the database server.
        db_password (str): The password for authenticating with the database.
        db_user (str): The username for authenticating with the database.
        user_query (str): The user query or input to process using the database server.

    Returns:
        str: The response from the database server after processing the query.
    """
    from langchain_mcp_adapters.client import MultiServerMCPClient
    client =  MultiServerMCPClient(
            {
                "mysql": {
                    "command": "npx",
                    "args": [
                        "-y",
                        "@executeautomation/database-server",
                        "--mysql", 
                        "--host",
                        db_host,
                        "--database",
                        "--port", 
                        "3306",
                        db_name,
                        "--user",
                        db_user,
                        "--password",
                        db_password
                    ],
                    "transport":"stdio"
                }
            }
        ) 
    tools = await client.get_tools()
    agent = create_react_agent(
        model,
        tools,
        prompt="You are an AI assistant that strictly responds only based on the available tools. If the input requires information or actions beyond the capabilities of the tools, ask the user for the necessary details until you have complete input to proceed.",
        checkpointer=checkpointer
    )

    response = await agent.ainvoke(
        {"messages": [{"role": "user", "content": user_query}]},
        config={"configurable": {"thread_id": "1"}}
    )
    return response['messages'][-1].content



database_agent = create_react_agent(
    model=model,
    tools=[postgres_db, mysql_db],
    prompt="""You are a database assistant. Your job is to assist users with database connectivity and operations. 
        - Always ask for required details (e.g., credentials, database type, etc.) before performing any operation.
        - If credentials are incorrect, inform the user and ask for updated credentials.
        - Once connected, ask for the operation details (e.g., data to store, table name, etc.).
        - Respond only based on tool outputs and do not make assumptions.
        - Ensure the user provides all necessary information before proceeding.""",
    name="database_agent",
)

service_agent = create_react_agent(
    model=model,
    tools=[extraction, classification],
    prompt="""You are a service assistant. Your job is to assist users with file-related operations such as extraction and classification. 
        - Always ask for required details (e.g., file path, type of operation, etc.) before performing any task.
        - Ensure the user provides all necessary information before proceeding with the operation.
        - Respond only based on tool outputs and do not make assumptions.
        - Provide clear and concise feedback about the success or failure of the operation.""",
    name="service_agent",
)

MCP_agent = create_react_agent(
    model=model,
    tools=[MCP_servers],
    prompt="""You are an MCP assistant. Your job is to assist users with multi-server operations such as mathematical computations and weather information retrieval. 
        - Always ask for required details (e.g., type of operation, specific inputs, etc.) before performing any task.
        - Ensure the user provides all necessary information before proceeding with the operation.
        - Respond only based on tool outputs and do not make assumptions.
        - Provide clear and concise feedback about the success or failure of the operation.
        - If the input requires information or actions beyond the capabilities of the tools, ask the user for the necessary details until you have complete input to proceed.""",
    name="MCP_agent",
)

supervisor_system_prompt = """
You are a Supervisor Agent responsible for orchestrating and coordinating tasks between the user and various specialized agents/tools. Follow these steps meticulously:

1. Understand and Clarify User Query:
Begin by carefully interpreting the user's instruction or query. If any critical information or context is missing, ask follow-up questions to ensure you have complete clarity before proceeding.

2. Check Tool Availability and Status:
Coordinate with the available agents to check the list of tools and their current statuses. Ensure that the required tools for the task are online, functional, and accessible.

3. Gather Required Parameters for Tools:
Once the target tool is identified, consult the respective agent to determine the required input parameters for the operation. Then, collect these parameters either from the user or from available data.

4. Trigger the Operation Using the Respective Agent:
After gathering all necessary inputs and confirming tool readiness, initiate the operation by directing the appropriate agent to execute the task. Monitor the result and confirm its success or handle errors if they occur.

5. Loop for Next Instruction:
Once the task is completed and results are delivered to the user, ask the user if they would like to perform another operation or have any further instructions.

Important Rules:
- Always ensure complete information is collected before initiating any task.
- Maintain a clear and logical flow of communication between the user, supervisor agent (yourself), and specialized agents.
- Handle missing tool statuses or unavailable tools gracefully by notifying the user and suggesting alternatives if applicable.
- Your goal is to provide a seamless, efficient, and accurate task orchestration experience.
"""

supervisor = create_supervisor(
    agents=[database_agent],
    model=model,
    prompt=(supervisor_system_prompt),
).compile(checkpointer=checkpointer)

In [8]:
import nest_asyncio
import asyncio

nest_asyncio.apply()

def call_db():
    result = asyncio.run(postgres_db(
        db_name="postgres",
        db_host="localhost",
        db_password="root",
        db_user="postgres",
        user_query="Give me list of tables"
    ))
    return result
call_db()

'The list of available tables is:\n\n* employees'