In [None]:
# Import required libraries
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt import create_react_agent
from typing import TypedDict, List, Dict
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Define a custom state to track execution logs
class GraphState(TypedDict):
    messages: List[Dict[str, str]]
    agent: str
    tool_used: str
    execution_log: List[str]

# Groq model initialization
GROQ_API_KEY = "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"
model = ChatGroq(
    api_key=GROQ_API_KEY,
    model="llama-3.2-3b-preview",
)

# Step 1: Define the Tools for Each Agent with Proper Docstrings
# Tools for AD Agent
def assign_user_to_sg(user: str, sg: str) -> str:
    """Assign a user to a security group in Active Directory.
    
    Args:
        user (str): The username to assign.
        sg (str): The security group name.
    
    Returns:
        str: Confirmation message of the assignment.
    """
    return f"Assigned user {user} to security group {sg}"

def validate_user(user: str) -> str:
    """Validate a user in Active Directory.
    
    Args:
        user (str): The username to validate.
    
    Returns:
        str: Confirmation message of the validation.
    """
    return f"Validated user {user}"

def construct_upn(user: str) -> str:
    """Construct a User Principal Name (UPN) for a user.
    
    Args:
        user (str): The username to construct the UPN for.
    
    Returns:
        str: The constructed UPN.
    """
    return f"Constructed UPN for {user}: {user}@domain.com"

# Tools for M365 Agent
def create_dl(dl_name: str) -> str:
    """Create a distribution list in Microsoft 365.
    
    Args:
        dl_name (str): The name of the distribution list to create.
    
    Returns:
        str: Confirmation message of the creation.
    """
    return f"Created distribution list {dl_name}"

def delete_dl(dl_name: str) -> str:
    """Delete a distribution list in Microsoft 365.
    
    Args:
        dl_name (str): The name of the distribution list to delete.
    
    Returns:
        str: Confirmation message of the deletion.
    """
    return f"Deleted distribution list {dl_name}"

def assign_license(user: str, license: str) -> str:
    """Assign a license to a user in Microsoft 365.
    
    Args:
        user (str): The username to assign the license to.
        license (str): The license type to assign.
    
    Returns:
        str: Confirmation message of the assignment.
    """
    return f"Assigned {license} license to {user}"

def create_shared_mailbox(mailbox_name: str) -> str:
    """Create a shared mailbox in Microsoft 365.
    
    Args:
        mailbox_name (str): The name of the shared mailbox to create.
    
    Returns:
        str: Confirmation message of the creation.
    """
    return f"Created shared mailbox {mailbox_name}"

def modify_shared_mailbox(mailbox_name: str, action: str) -> str:
    """Modify a shared mailbox in Microsoft 365.
    
    Args:
        mailbox_name (str): The name of the shared mailbox to modify.
        action (str): The action to perform on the mailbox.
    
    Returns:
        str: Confirmation message of the modification.
    """
    return f"Modified shared mailbox {mailbox_name} with action {action}"

# Tools for DB Restart Agent
def restart_db(db_name: str) -> str:
    """Restart a database.
    
    Args:
        db_name (str): The name of the database to restart.
    
    Returns:
        str: Confirmation message of the restart.
    """
    return f"Restarted database {db_name}"

# Step 2: Create Specialized Agents with Logging
def create_logging_agent(model, tools, name, prompt):
    agent = create_react_agent(model=model, tools=tools, name=name, prompt=prompt)
    def logging_wrapper(state: GraphState) -> GraphState:
        # Run the agent
        result = agent.invoke({"messages": state["messages"]})
        # Extract the tool used from the agent's response
        tool_used = "unknown"
        for message in result["messages"]:
            if "tool" in message.get("role", ""):
                tool_used = message.get("content", "unknown")
                break
        state["tool_used"] = tool_used
        log_message = f"Agent {name} executed with query '{state['messages'][-1]['content']}'. Tool used: {tool_used}"
        state["execution_log"].append(log_message)
        logger.info(log_message)
        state["messages"] = result["messages"]
        return state
    return logging_wrapper

# Create agents with logging
ad_agent = create_logging_agent(
    model=model,
    tools=[assign_user_to_sg, validate_user, construct_upn],
    name="ad_expert",
    prompt="You are an Active Directory expert. Use the provided tools to handle AD-related tasks such as assigning users to security groups, validating users, or constructing UPNs. Use one tool at a time."
)

m365_agent = create_logging_agent(
    model=model,
    tools=[create_dl, delete_dl, assign_license, create_shared_mailbox, modify_shared_mailbox],
    name="m365_expert",
    prompt="You are a Microsoft 365 expert. Use the provided tools to handle M365-related tasks such as creating or deleting distribution lists, assigning licenses, or managing shared mailboxes. Use one tool at a time."
)

db_restart_agent = create_logging_agent(
    model=model,
    tools=[restart_db],
    name="db_restart_expert",
    prompt="You are a database management expert. Use the provided tools to handle database-related tasks such as restarting a database. Use one tool at a time."
)

# Step 3: Create a Custom Supervisor with Logging
def create_custom_supervisor(model):
    # Define the prompt for the supervisor
    supervisor_prompt = ChatPromptTemplate.from_messages([
        ("system", (
            "You are a team supervisor managing an Active Directory expert (ad_expert), a Microsoft 365 expert (m365_expert), and a database management expert (db_restart_expert). "
            "Your task is to route user queries to the appropriate agent based on the task: "
            "For Active Directory tasks (e.g., user assignment, validation, UPN construction), choose ad_expert. "
            "For Microsoft 365 tasks (e.g., distribution lists, licenses, shared mailboxes), choose m365_expert. "
            "For database tasks (e.g., restarting a database), choose db_restart_expert. "
            "Respond with only the name of the agent to route to (e.g., 'ad_expert', 'm365_expert', 'db_restart_expert')."
        )),
        ("human", "{query}")
    ])

    # Create a chain for the supervisor
    supervisor_chain = supervisor_prompt | model

    def supervisor_node(state: GraphState) -> GraphState:
        # Extract the query
        query = state["messages"][-1]["content"]
        # Run the supervisor chain to determine the agent
        agent_chosen = supervisor_chain.invoke({"query": query}).content.strip()
        # Log the supervisor's decision
        log_message = f"Supervisor routed query '{query}' to agent '{agent_chosen}'. Reason: Query matches {agent_chosen} tasks."
        state["execution_log"].append(log_message)
        logger.info(log_message)
        state["agent"] = agent_chosen
        return state

    return supervisor_node

# Create the supervisor
supervisor = create_custom_supervisor(model)

# Step 4: Build the Workflow with Logging
from langgraph.graph import StateGraph, END

workflow = StateGraph(GraphState)

# Add nodes
workflow.add_node("supervisor", supervisor)
workflow.add_node("ad_agent", ad_agent)
workflow.add_node("m365_agent", m365_agent)
workflow.add_node("db_restart_agent", db_restart_agent)

# Define routing logic
def route_to_agent(state: GraphState) -> str:
    agent = state["agent"]
    if agent == "ad_expert":
        return "ad_agent"
    elif agent == "m365_expert":
        return "m365_agent"
    elif agent == "db_restart_expert":
        return "db_restart_agent"
    return END

# Define edges
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "ad_agent": "ad_agent",
        "m365_agent": "m365_agent",
        "db_restart_agent": "db_restart_agent",
        END: END,
    },
)
workflow.add_edge("ad_agent", END)
workflow.add_edge("m365_agent", END)
workflow.add_edge("db_restart_agent", END)

# Step 5: Compile and Run the Workflow
app = workflow.compile()

# Invoke the workflow with a query
initial_state = {
    "messages": [{"role": "user", "content": "Assign user to SG"}],
    "agent": "",
    "tool_used": "",
    "execution_log": []
}
result = app.invoke(initial_state)

# Print the result and execution logs
print("Result:", result["messages"][-1]["content"])
print("\nExecution Logs:")
for log in result["execution_log"]:
    print(log)

# Test with additional queries
# Query for M365 Agent: "Create shared mailbox"
initial_state = {
    "messages": [{"role": "user", "content": "Create shared mailbox"}],
    "agent": "",
    "tool_used": "",
    "execution_log": []
}
result = app.invoke(initial_state)
print("\nResult:", result["messages"][-1]["content"])
print("\nExecution Logs:")
for log in result["execution_log"]:
    print(log)

# Query for DB Restart Agent: "Restart DB"
initial_state = {
    "messages": [{"role": "user", "content": "Restart DB"}],
    "agent": "",
    "tool_used": "",
    "execution_log": []
}
result = app.invoke(initial_state)
print("\nResult:", result["messages"][-1]["content"])
print("\nExecution Logs:")
for log in result["execution_log"]:
    print(log)

In [None]:
# Import required libraries
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt import create_react_agent
from typing import TypedDict, List, Dict, Callable
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Define a custom state to track execution logs and agent tools
class GraphState(TypedDict):
    messages: List[Dict[str, str]]
    agent: str
    tool_used: str
    execution_log: List[str]
    agent_tools: Dict[str, List[Callable]]  # Map of agent names to their tools

# Groq model initialization
GROQ_API_KEY = "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"
model = ChatGroq(
    api_key=GROQ_API_KEY,
    model="mixtral-8x7b-32768",
)

# Step 1: Define the Tools for Each Agent with Proper Docstrings
# Tools for AD Agent
def assign_user_to_sg(user: str, sg: str) -> str:
    """Assign a user to a security group in Active Directory.
    
    Args:
        user (str): The username to assign.
        sg (str): The security group name.
    
    Returns:
        str: Confirmation message of the assignment.
    """
    return f"Assigned user {user} to security group {sg}"

def validate_user(user: str) -> str:
    """Validate a user in Active Directory.
    
    Args:
        user (str): The username to validate.
    
    Returns:
        str: Confirmation message of the validation.
    """
    return f"Validated user {user}"

# Tools for M365 Agent
def create_dl(dl_name: str) -> str:
    """Create a distribution list in Microsoft 365.
    
    Args:
        dl_name (str): The name of the distribution list to create.
    
    Returns:
        str: Confirmation message of the creation.
    """
    return f"Created distribution list {dl_name}"

def assign_license(user: str, license: str) -> str:
    """Assign a license to a user in Microsoft 365.
    
    Args:
        user (str): The username to assign the license to.
        license (str): The license type to assign.
    
    Returns:
        str: Confirmation message of the assignment.
    """
    return f"Assigned {license} license to {user}"

# Tools for DB Restart Agent
def restart_db(db_name: str) -> str:
    """Restart a database.
    
    Args:
        db_name (str): The name of the database to restart.
    
    Returns:
        str: Confirmation message of the restart.
    """
    return f"Restarted database {db_name}"

# Step 2: Create Specialized Agents with Logging
def create_logging_agent(model, tools, name, prompt):
    enhanced_prompt = (
        f"{prompt} When calling a tool, format the call as a JSON object with the tool name and arguments, "
        "e.g., {{'tool': 'tool_name', 'arguments': {{'arg1': 'value1', 'arg2': 'value2'}}}}. "
        "Do not use any other format for tool calls."
    )
    agent = create_react_agent(model=model, tools=tools, name=name, prompt=enhanced_prompt)
    def logging_wrapper(state: GraphState) -> GraphState:
        try:
            result = agent.invoke({"messages": state["messages"]})
            tool_used = "unknown"
            for message in result["messages"]:
                if message.get("role") == "tool":
                    tool_used = message.get("name", "unknown")
                    break
            state["tool_used"] = tool_used
            log_message = f"Agent {name} executed with query '{state['messages'][-1]['content']}'. Tool used: {tool_used}"
            state["execution_log"].append(log_message)
            logger.info(log_message)
            state["messages"] = result["messages"]
        except Exception as e:
            error_message = f"Agent {name} failed to execute query '{state['messages'][-1]['content']}'. Error: {str(e)}"
            state["execution_log"].append(error_message)
            logger.error(error_message)
            state["messages"].append({"role": "assistant", "content": f"Error: {str(e)}"})
        return state
    return logging_wrapper

# Define agent tools
ad_tools = [assign_user_to_sg, validate_user]
m365_tools = [create_dl, assign_license]
db_tools = [restart_db]

# Create agents
ad_agent = create_logging_agent(model, ad_tools, "ad_expert", "You are an Active Directory expert.")
m365_agent = create_logging_agent(model, m365_tools, "m365_expert", "You are a Microsoft 365 expert.")
db_restart_agent = create_logging_agent(model, db_tools, "db_restart_expert", "You are a database management expert.")

# Step 3: Create a Custom Supervisor with Dynamic Tool Calling
def create_custom_supervisor(model, agent_tools_map):
    supervisor_prompt = ChatPromptTemplate.from_messages([
        ("system", (
            "You are a team supervisor managing an Active Directory expert (ad_expert), a Microsoft 365 expert (m365_expert), and a database management expert (db_restart_expert). "
            "Available tools by agent: "
            "ad_expert: assign_user_to_sg, validate_user; "
            "m365_expert: create_dl, assign_license; "
            "db_restart_expert: restart_db. "
            "For each query, decide if you should: "
            "1. Route to an agent (respond with the agent name, e.g., 'ad_expert'). "
            "2. Directly call a tool (respond with a JSON object, e.g., {'tool': 'tool_name', 'agent': 'agent_name', 'arguments': {'arg1': 'value1'}}). "
            "Choose the option that best fits the query."
        )),
        ("human", "{query}")
    ])

    supervisor_chain = supervisor_prompt | model

    def supervisor_node(state: GraphState) -> GraphState:
        try:
            query = state["messages"][-1]["content"]
            response = supervisor_chain.invoke({"query": query}).content.strip()

            # Check if the response is a JSON tool call
            if response.startswith("{") and response.endswith("}"):
                import json
                tool_call = json.loads(response)
                if "tool" in tool_call and "agent" in tool_call and "arguments" in tool_call:
                    agent_name = tool_call["agent"]
                    tool_name = tool_call["tool"]
                    arguments = tool_call["arguments"]
                    # Find the tool in the agent's toolkit
                    tool = next((t for t in state["agent_tools"][agent_name] if t.__name__ == tool_name), None)
                    if tool:
                        result = tool(**arguments)
                        state["tool_used"] = tool_name
                        log_message = f"Supervisor directly called tool '{tool_name}' from agent '{agent_name}' with arguments {arguments}. Result: {result}"
                        state["execution_log"].append(log_message)
                        logger.info(log_message)
                        state["messages"].append({"role": "assistant", "content": result})
                        state["agent"] = "none"  # No agent routing needed
                    else:
                        error_message = f"Tool '{tool_name}' not found for agent '{agent_name}'."
                        state["execution_log"].append(error_message)
                        logger.error(error_message)
                        state["messages"].append({"role": "assistant", "content": error_message})
                        state["agent"] = "none"
                else:
                    error_message = f"Invalid tool call format: {response}"
                    state["execution_log"].append(error_message)
                    logger.error(error_message)
                    state["messages"].append({"role": "assistant", "content": error_message})
                    state["agent"] = "none"
            else:
                # Route to an agent
                agent_chosen = response
                log_message = f"Supervisor routed query '{query}' to agent '{agent_chosen}'. Reason: Query matches {agent_chosen} tasks."
                state["execution_log"].append(log_message)
                logger.info(log_message)
                state["agent"] = agent_chosen
        except Exception as e:
            error_message = f"Supervisor failed to process query '{query}'. Error: {str(e)}"
            state["execution_log"].append(error_message)
            logger.error(error_message)
            state["agent"] = "none"
            state["messages"].append({"role": "assistant", "content": f"Error: {str(e)}"})
        return state

    return supervisor_node

# Step 4: Build the Workflow
from langgraph.graph import StateGraph, END

workflow = StateGraph(GraphState)

# Add nodes
agent_tools_map = {
    "ad_expert": ad_tools,
    "m365_expert": m365_tools,
    "db_restart_expert": db_tools
}
supervisor = create_custom_supervisor(model, agent_tools_map)
workflow.add_node("supervisor", supervisor)
workflow.add_node("ad_agent", ad_agent)
workflow.add_node("m365_agent", m365_agent)
workflow.add_node("db_restart_agent", db_restart_agent)

# Define routing logic
def route_to_agent(state: GraphState) -> str:
    agent = state["agent"]
    if agent == "ad_expert":
        return "ad_agent"
    elif agent == "m365_expert":
        return "m365_agent"
    elif agent == "db_restart_expert":
        return "db_restart_agent"
    return END

# Define edges
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "ad_agent": "ad_agent",
        "m365_agent": "m365_agent",
        "db_restart_agent": "db_restart_agent",
        END: END,
    },
)
workflow.add_edge("ad_agent", END)
workflow.add_edge("m365_agent", END)
workflow.add_edge("db_restart_agent", END)

# Step 5: Compile and Run the Workflow
app = workflow.compile()

# Test with a query that requires a specific tool
initial_state = {
    "messages": [{"role": "user", "content": "Assign user John to SG Admins"}],
    "agent": "",
    "tool_used": "",
    "execution_log": [],
    "agent_tools": agent_tools_map
}
result = app.invoke(initial_state)

# Print the result and execution logs
print("Result:", result["messages"][-1]["content"])
print("\nExecution Logs:")
for log in result["execution_log"]:
    print(log)

# Test with another query
initial_state = {
    "messages": [{"role": "user", "content": "Create a distribution list named TeamDL"}],
    "agent": "",
    "tool_used": "",
    "execution_log": [],
    "agent_tools": agent_tools_map
}
result = app.invoke(initial_state)
print("\nResult:", result["messages"][-1]["content"])
print("\nExecution Logs:")
for log in result["execution_log"]:
    print(log)