In [571]:
import json
import uuid
from typing import List, Tuple

from autogen_core import (
    FunctionCall,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core.models import ( # type: ignore
    AssistantMessage,
    ChatCompletionClient,
    FunctionExecutionResult,
    FunctionExecutionResultMessage,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
from autogen_core.tools import FunctionTool, Tool
from autogen_ext.models.openai import OpenAIChatCompletionClient,AzureOpenAIChatCompletionClient
from pydantic import BaseModel
from autogen_core import AgentId

In [572]:
from autogen_core import AgentId 

In [573]:
from typing import Union, Tuple


class UserLogin(BaseModel):
    pass


class UserTask(BaseModel):
    context: List[LLMMessage]


class AgentResponse(BaseModel):
    reply_to_topic_type: str
    context: List[LLMMessage]
    
class SingleAgentResponse(BaseModel):
    context: List[LLMMessage]
    
class BetweenAgentsRequest(BaseModel):
    context : List[LLMMessage]
    
class BetweenAgentsResponse(BaseModel):
    context:List[LLMMessage]
    is_agent_done: bool = False

class OrchestorAgentRequest(BaseModel):
    model_config = {'arbitrary_types_allowed': True}
    context: List[LLMMessage]
    delegate_targets_agents: Union[List[Tuple[Tool, str, UserTask]], None]
    
    
class OrchestratorAgentHistory(BaseModel):
    context: List[LLMMessage]
    agent_responses: List[AgentResponse]
    
class OrchertratorLLMResponse(BaseModel):
    """List of ResponseMessages each containing a message and the agent it should be sent to and completed bool to check only when the other agents say they are done with what the user was asking for"""
    class ResponseMessage(BaseModel):
        message: str
        to:str
    responses: List[ResponseMessage]
    
class CombineMessageResponse(BaseModel):
    """"response message to be sent to the user and a bool to check if all agents are done with all user requests from the very beginning to the end of the conversation. Only set it as True when all agents are done with all user requests by reading the history of the conversation"""
    response_message: str
    all_agents_done: bool
    

In [None]:
from typing import Optional


class OrchestratorAgent(RoutedAgent):
    def __init__(
        self,
        description: str,
        system_message: SystemMessage,
        model_client: ChatCompletionClient,
        agent_topic_type: str,
        user_topic_type: str,
        master_agent: str,
    ) -> None:
        super().__init__(description)
        self._system_message = system_message
        self._model_client = model_client
        self._tools = None
        self._tool_schema = []
        self._delegate_tools = None
        self._delegate_tool_schema = None
        self._delegate_agents = None
        self._agent_topic_type = agent_topic_type
        self._master_agent = master_agent
        self._user_topic_type = user_topic_type
        self.all_agent_messages = None
        self.combine_message = SystemMessage(content="""You are an Intelligent Conversational Orchestrator responsible for managing communication between the user and multiple specialized agents. Your primary role is to ensure user queries are fully addressed, sub-agent requests are properly handled, and no action is taken prematurely without explicit user confirmation.

### Key Responsibilities:
1. **Understand User Intent**:
   - Accurately interpret the user's primary intent and ensure it is fully addressed before concluding any conversation.
   - If sub-agents require confirmation or further input from the user, prioritize obtaining this information before proceeding.

2. **Aggregate Sub-Agent Responses**:
   - Carefully review responses from all sub-agents.
   - If a sub-agent explicitly asks for user confirmation (e.g., "Please confirm if everything looks good or if you need adjustments"), relay this request to the user. Do not assume the order is finalized until the user provides explicit confirmation.

3. **Relay Confirmation Requests**:
   - When sub-agents request confirmation, your response to the user should reflect this need. For example:
     - "Your Cafe order includes: [list items]. Please confirm if this looks good or let us know if you need adjustments."
     - "Your Pizza order includes: [list items]. Please confirm if this looks good or let us know if you need adjustments."
   - Combine all sub-agent confirmation requests into a single cohesive message to avoid overwhelming the user with multiple prompts.

4. **Finalize Only After Confirmation**:
   - Do not mark `all_agents_done` as `True` or assume orders are placed until the user explicitly confirms that all items in their order are correct.
   - If the user provides modifications, relay these changes back to the relevant sub-agent(s) and ensure they update the order accordingly.

### Response Guidelines:
- **User-Centric Communication**:
  - Always prioritize clarity and accuracy in your responses.
  - Ensure your messages guide the user through any pending confirmations or clarifications needed by sub-agents.
- **Avoid Premature Finalization**:
  - Never state that orders have been placed unless every sub-agent has received explicit confirmation from the user.""")
    
    @message_handler
    async def handle_user_message(self, message: Union[OrchestorAgentRequest,UserTask], context: MessageContext) -> None:
        if isinstance(message, OrchestorAgentRequest):
            self._delegate_tools = dict([(single[0].name, single[0]) for single in message.delegate_targets_agents])
            self._delegate_tool_schema = [single[0].schema for single in message.delegate_targets_agents]
            self._delegate_agents = dict([(tool.name, AgentId(topic_type, self.id.key)) for tool, topic_type, task in message.delegate_targets_agents])
            self.all_agent_messages = dict([(single[0].name, single[2].context) for single in message.delegate_targets_agents])
        llm_result = await self._model_client.create(
            messages=[self._system_message] + message.context,
            cancellation_token=context.cancellation_token,
            extra_create_args={"response_format": OrchertratorLLMResponse},
        )
        response_content: Optional[str] = llm_result.content if isinstance(llm_result.content, str) else None
        if response_content is None:
            raise ValueError("Response content is not a valid JSON string")
        to_be_response = OrchertratorLLMResponse.model_validate(json.loads(response_content))
        second_llm_calls = []
        second_llm_calls.append(message.context[-1])
        #inner_agent_response : List[Tuple[str, BetweenAgentsResponse]] = []
        for single_response in to_be_response.responses:
            if single_response.to == "user":
                await self.send_message(self._user_topic_type, UserMessage(context=single_response.message))
            else:
                agent_existing_messages = self.all_agent_messages.get(single_response.to,[])
                agent_existing_messages.append(UserMessage(content=single_response.message,source=self.id.key))
                self.all_agent_messages[single_response.to] = agent_existing_messages
                inner_agent_response = await self.send_message(BetweenAgentsRequest(context=agent_existing_messages),self._delegate_agents[single_response.to])
                self.all_agent_messages[single_response.to] = inner_agent_response.context
                second_llm_calls.extend(inner_agent_response.context)
                
        
        llm_result_second = await self._model_client.create(
            messages=[self.combine_message] + second_llm_calls,
            cancellation_token=context.cancellation_token,
            extra_create_args={"response_format": CombineMessageResponse},
        )
        
        response_content_2: Optional[str] = llm_result_second.content if isinstance(llm_result_second.content, str) else None
        if response_content_2 is None:
            raise ValueError("Response content is not a valid JSON string")
        combined_message = CombineMessageResponse.model_validate(json.loads(response_content_2))
        message.context.append(AssistantMessage(content=combined_message.response_message, source=self.id.type))
        if combined_message.all_agents_done:
            await self.publish_message(
            AgentResponse(context=message.context, reply_to_topic_type=self._master_agent),
            topic_id=TopicId(self._user_topic_type, source=self.id.key),
        )
        else:
            await self.publish_message(
            AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
            topic_id=TopicId(self._user_topic_type, source=self.id.key),
        )
                
        
        #await self.send_message(single_delegate_agent, message.context, self._agent_topic_type)
   
    # @message_handler
    # async def handle_user_message(self,)

In [None]:
from dataclasses import dataclass


class AIAgent(RoutedAgent):
    def __init__(
        self,
        description: str,
        system_message: SystemMessage,
        model_client: ChatCompletionClient,
        tools: List[Tool],
        delegate_tools: List[Tool],
        agent_topic_type: str,
        user_topic_type: str,
        orchestrator_topic_type: str,
    ) -> None:
        super().__init__(description)
        self._system_message = system_message
        self._model_client = model_client
        self._tools = dict([(tool.name, tool) for tool in tools])
        self._tool_schema = [tool.schema for tool in tools]
        self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
        self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
        self._agent_topic_type = agent_topic_type
        self._user_topic_type = user_topic_type
        self._orchestrator_topic_type = orchestrator_topic_type

    @message_handler
    async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
        # Send the task to the LLM.
        llm_result = await self._model_client.create(
            messages=[self._system_message] + message.context,
            tools=self._tool_schema + self._delegate_tool_schema,
            cancellation_token=ctx.cancellation_token,
        )
        print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
        # Process the LLM result.
        while isinstance(llm_result.content, list) and all(isinstance(m, FunctionCall) for m in llm_result.content):
            tool_call_results: List[FunctionExecutionResult] = []
            delegate_targets: List[Tuple[str, UserTask]] = []
            delegate_targets_agents: List[Tuple[Tool, str, UserTask]] = []
            # Process each function call.
            for call in llm_result.content:
                arguments = json.loads(call.arguments)
                if call.name in self._tools:
                    # Execute the tool directly.
                    result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
                    result_as_str = self._tools[call.name].return_value_as_string(result)
                    tool_call_results.append(
                        FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
                    )
                elif call.name in self._delegate_tools:
                    # Execute the tool to get the delegate agent's topic type.
                    result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
                    topic_type = self._delegate_tools[call.name].return_value_as_string(result)
                    # Create the context for the delegate agent, including the function call and the result.
                    delegate_messages = list(message.context) + [
                        AssistantMessage(content=[call], source=self.id.type),
                        FunctionExecutionResultMessage(
                            content=[
                                FunctionExecutionResult(
                                    call_id=call.id,
                                    content=f"Transferred to {topic_type}. Adopt persona immediately.",
                                    is_error=False,
                                    name=call.name,
                                )
                            ]
                        ),
                    ]
                    delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
                    delegate_targets_agents.append([self._delegate_tools[call.name],topic_type, UserTask(context=delegate_messages)])
                else:
                    raise ValueError(f"Unknown tool: {call.name}")
            if len(delegate_targets) == 1:
                # Delegate the task to other agents by publishing messages to the corresponding topics.
                for topic_type, task in delegate_targets:
                    print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
                    await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
            if len(delegate_targets) > 1:
                print(f"{'-'*80}\n{self.id.type}:\n{delegate_targets}", flush=True)
                await self.publish_message(
                    OrchestorAgentRequest(context=message.context, delegate_targets_agents=delegate_targets_agents),
                    topic_id=TopicId(self._orchestrator_topic_type, source=self.id.key),
                )
            if len(tool_call_results) > 0:
                print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
                # Make another LLM call with the results.
                message.context.extend(
                    [
                        AssistantMessage(content=llm_result.content, source=self.id.type),
                        FunctionExecutionResultMessage(content=tool_call_results),
                    ]
                )
                llm_result = await self._model_client.create(
                    messages=[self._system_message] + message.context,
                    tools=self._tool_schema + self._delegate_tool_schema,
                    cancellation_token=ctx.cancellation_token,
                )
                print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
            else:
                # The task has been delegated, so we are done.
                return
        # The task has been completed, publish the final result.
        if not isinstance(llm_result.content, str):
            raise AssertionError("Return type not found: Expected str, got {}".format(type(llm_result.content)))
        message.context.append(AssistantMessage(content=llm_result.content, source=self.id.type))
        await self.publish_message(
            AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
            topic_id=TopicId(self._user_topic_type, source=self.id.key),
        )
    
    @message_handler
    async def handle_orchestrator_response(self, message: BetweenAgentsRequest, ctx: MessageContext) -> BetweenAgentsResponse:
        llm_result = await self._model_client.create(
            messages=[self._system_message] + message.context,
            tools=self._tool_schema,
            cancellation_token=ctx.cancellation_token,
        )
        print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
        # Process the LLM result.
        while isinstance(llm_result.content, list) and all(isinstance(m, FunctionCall) for m in llm_result.content):
            tool_call_results: List[FunctionExecutionResult] = []
            delegate_targets: List[Tuple[str, UserTask]] = []
            delegate_targets_agents: List[Tuple[Tool, str, UserTask]] = []
            # Process each function call.
            for call in llm_result.content:
                arguments = json.loads(call.arguments)
                if call.name in self._tools:
                    # Execute the tool directly.
                    result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
                    result_as_str = self._tools[call.name].return_value_as_string(result)
                    tool_call_results.append(
                        FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
                    )
                elif call.name in self._delegate_tools:
                    # Execute the tool to get the delegate agent's topic type.
                    result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
                    topic_type = self._delegate_tools[call.name].return_value_as_string(result)
                    # Create the context for the delegate agent, including the function call and the result.
                    delegate_messages = list(message.context) + [
                        AssistantMessage(content=[call], source=self.id.type),
                        FunctionExecutionResultMessage(
                            content=[
                                FunctionExecutionResult(
                                    call_id=call.id,
                                    content=f"Transferred to {topic_type}. Adopt persona immediately.",
                                    is_error=False,
                                    name=call.name,
                                )
                            ]
                        ),
                    ]
                    delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
                    delegate_targets_agents.append([self._delegate_tools[call.name],topic_type, UserTask(context=delegate_messages)])
                else:
                    raise ValueError(f"Unknown tool: {call.name}")
            if len(delegate_targets) == 1 and delegate_targets[0][0] != "MasterAgent":
                print(f"{'-'*80}\n{self.id.type}:\n{delegate_targets}", flush=True)
                inner_agent_response = await self.send_message(BetweenAgentsRequest(context=delegate_targets[0][1].context), AgentId(delegate_targets[0][0], self.id.key))
                return BetweenAgentsResponse(context=inner_agent_response.context)
            elif len(delegate_targets) == 1 and delegate_targets[0][0] == "MasterAgent":
                return BetweenAgentsResponse(context=message.context,is_agent_done=True)
                
            if len(tool_call_results) > 0:
                print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
                # Make another LLM call with the results.
                message.context.extend(
                    [
                        AssistantMessage(content=llm_result.content, source=self.id.type),
                        FunctionExecutionResultMessage(content=tool_call_results),
                    ]
                )
                llm_result = await self._model_client.create(
                    messages=[self._system_message] + message.context,
                    tools=self._tool_schema + self._delegate_tool_schema,
                    cancellation_token=ctx.cancellation_token,
                )
                print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
            else:
                # The task has been delegated, so we are done.
                return BetweenAgentsResponse(context=message.context)
        # The task has been completed, publish the final result.
        assert isinstance(llm_result.content, str)
        message.context.append(AssistantMessage(content=llm_result.content, source=self.id.type))
        return BetweenAgentsResponse(context=message.context)

In [576]:
class UserAgent(RoutedAgent):
    def __init__(self, description: str, user_topic_type: str, agent_topic_type: str) -> None:
        super().__init__(description)
        self._user_topic_type = user_topic_type
        self._agent_topic_type = agent_topic_type

    @message_handler
    async def handle_user_login(self, message: UserLogin, ctx: MessageContext) -> None:
        print(f"{'-'*80}\nUser login, session ID: {self.id.key}.", flush=True)
        # Get the user's initial input after login.
        user_input = input("User: ")
        print(f"{'-'*80}\n{self.id.type}:\n{user_input}")
        await self.publish_message(
            UserTask(context=[UserMessage(content=user_input, source="User")]),
            topic_id=TopicId(self._agent_topic_type, source=self.id.key),
        )

    @message_handler
    async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
        # Get the user's input after receiving a response from an agent.
        user_input = input("User (type 'exit' to close the session): ")
        print(f"{'-'*80}\n{self.id.type}:\n{user_input}", flush=True)
        if user_input.strip().lower() == "exit":
            print(f"{'-'*80}\nUser session ended, session ID: {self.id.key}.")
            return
        message.context.append(UserMessage(content=user_input, source="User"))
        await self.publish_message(
            UserTask(context=message.context), topic_id=TopicId(message.reply_to_topic_type, source=self.id.key)
        )

In [577]:
def get_menu() -> str:
    """Provide the latest up-to-date menu."""
    return "Use SQL Query Agent Tool to get all the items from the menu table."
  #   return """
  #   MENU:
  #   Coffee Drinks:
  #   Espresso
  #   Americano
  #   Cold Brew

  #   Coffee Drinks with Milk:
  #   Latte
  #   Cappuccino
  #   Cortado
  #   Macchiato
  #   Mocha
  #   Flat White

  #   Tea Drinks:
  #   English Breakfast Tea
  #   Green Tea
  #   Earl Grey

  #   Tea Drinks with Milk:
  #   Chai Latte
  #   Matcha Latte
  #   London Fog

  #   Other Drinks:
  #   Steamer
  #   Hot Chocolate

  #   Modifiers:
  #   Milk options: Whole, 2%, Oat, Almond, 2% Lactose Free; Default option: whole
  #   Espresso shots: Single, Double, Triple, Quadruple; default: Double
  #   Caffeine: Decaf, Regular; default: Regular
  #   Hot-Iced: Hot, Iced; Default: Hot
  #   Sweeteners (option to add one or more): vanilla sweetener, hazelnut sweetener, caramel sauce, chocolate sauce, sugar free vanilla sweetener
  #   Special requests: any reasonable modification that does not involve items not on the menu, for example: 'extra hot', 'one pump', 'half caff', 'extra foam', etc.

  #   "dirty" means add a shot of espresso to a drink that doesn't usually have it, like "Dirty Chai Latte".
  #   "Regular milk" is the same as 'whole milk'.
  #   "Sweetened" means add some regular sugar, not a sweetener.

  #   Soy milk has run out of stock today, so soy is not available.
  # """
  
  
  
get_menu_tool = FunctionTool(get_menu, description="Provide the latest up-to-date menu.For this you have to Query from the database.")

def get_menu_pizza() -> str:
  """Provide the latest up-to-date menu."""
  return """1. Margherita Pizza
Description: A classic pizza with a simple yet delicious combination of fresh mozzarella, tomatoes, and basil.
Base Ingredients: Tomato sauce, fresh mozzarella, fresh basil, olive oil.
2. Pepperoni Pizza
Description: A favorite among many, this pizza is topped with spicy pepperoni slices and mozzarella cheese.
Base Ingredients: Tomato sauce, mozzarella cheese, pepperoni slices.
3. BBQ Chicken Pizza
Description: A savory pizza featuring grilled chicken, BBQ sauce, and red onions.
Base Ingredients: BBQ sauce, mozzarella cheese, grilled chicken, red onions, cilantro.
4. Veggie Supreme Pizza
Description: A delightful mix of fresh vegetables for a healthy and tasty option.
Base Ingredients: Tomato sauce, mozzarella cheese, bell peppers, onions, mushrooms, black olives, tomatoes.
5. Hawaiian Pizza
Description: A sweet and savory combination of ham and pineapple.
Base Ingredients: Tomato sauce, mozzarella cheese, ham, pineapple chunks.

Topping Options
Enhance your pizza with any of the following toppings: (Optional)

Extra Cheese
Pepperoni
Sausage
Bacon
Ham
Grilled Chicken
Mushrooms
Onions
Bell Peppers
Black Olives
Jalapeños
Pineapple
Spinach
Fresh Basil
Tomatoes

Crust Size Options
Choose the perfect size for your appetite:

Small (10 inches) - (Default)
Medium (12 inches)
Large (14 inches)
Extra Large (16 inches)"""

get_menu_tool_pizza = FunctionTool(get_menu_pizza, description="Provide the latest up-to-date menu for pizza.")

In [578]:
from collections.abc import Iterable


def add_to_order(drink: str, modifiers: Iterable[str]) -> str:
    """Adds the specified drink to the customer's order, including any modifiers.

    Returns:
      The updated order in progress.
    """
    # Clean inputs
    drink = drink.strip()
    modifiers = [m.strip() for m in modifiers if isinstance(m, str) and m.strip()]
    order = ""
    # Format as "drink (modifier1, modifier2, ...)"
    if modifiers:
        order = f"{drink} ({', '.join(modifiers)})"
    else:
        order = drink
    return order

def get_all_orders(orders: List[str]) -> str:
    """Get all orders in the list."""
    return "\n".join(orders)

def clear_orders(orders: List[str]) -> List[str]:
    """Clear all orders."""
    return []

def place_order(orders: List[str]) -> str:
    """Place the order and return the order summary."""
    
    return f"Order placed:\n{get_all_orders(orders)} \n It will take 5-10 minutes to prepare. Thank you for your order!"

def confirm_order(orders: List[str]) -> str:
    """Asks the customer if the order is correct."""
    return f"Your order is: {get_all_orders(orders)}. Is this correct?"

add_to_order_tool = FunctionTool(add_to_order, description="Add or modify a drink to the order.")
get_all_orders_tool = FunctionTool(get_all_orders, description="Get all orders in the list.")
clear_orders_tool = FunctionTool(clear_orders, description="Clear all orders.")
place_order_tool = FunctionTool(place_order, description="Place the order and return the order summary.")
confirm_order_tool = FunctionTool(confirm_order, description="Asks the customer if the order is correct.")



In [579]:
def add_to_order_pizza(pizza_type: str, toppings: Iterable[str],crust_type:str) -> str:
    """Adds the specified pizza to the customer's order, including any toppings & crust_type.

    Returns:
      The updated order in progress.
    """
    # Clean inputs
    pizza = pizza_type.strip()
    toppings = [m.strip() for m in toppings if isinstance(m, str) and m.strip()]
    crust_type = crust_type.strip()
    order = ""
    
    if toppings:
        order = f"{pizza} ({', '.join(toppings)})"
    else:
        order = pizza
        
    order = order + " with " + crust_type
    return order

def get_all_orders_pizza(orders: List[str]) -> str:
    """Get all orders in the list."""
    return "\n".join(orders)

def clear_orders_pizza(orders: List[str]) -> List[str]:
    """Clear all orders."""
    return []

def place_order_pizza(orders: List[str]) -> str:
    """Place the order and return the order summary."""
    
    return f"Order placed:\n{get_all_orders(orders)} \n It will take 5-10 minutes to prepare. Thank you for your order!"

def confirm_order_pizza(orders: List[str]) -> str:
    """Asks the customer if the order is correct."""
    return f"Your order is: {get_all_orders(orders)}. Is this correct?"

add_to_order_pizza_tool = FunctionTool(add_to_order_pizza, description="Add or modify a drink to the order.")
get_all_orders_pizza_tool = FunctionTool(get_all_orders_pizza, description="Get all orders in the list.")
clear_orders_pizza_tool = FunctionTool(clear_orders_pizza, description="Clear all orders.")
place_order_pizza_tool = FunctionTool(place_order_pizza, description="Place the order and return the order summary.")
confirm_order_pizza_tool = FunctionTool(confirm_order_pizza, description="Asks the customer if the order is correct.")

In [580]:
cafe_agent_topic_type = "CafeAgent"
pizza_agent_topic_type = "PizzaAgent"
master_agent_topic_type = "MasterAgent"
sql_agent_topic_type = "SQLAgent"
user_topic_type = "User"
orchestrator_topic_type = "Orchestrator"

In [None]:
def transfer_to_pizza_agent() -> str:
    return pizza_agent_topic_type

def transfer_to_cafe_agent() -> str:
    return cafe_agent_topic_type

def transfer_back_to_master_agent() -> str:
    return master_agent_topic_type

def transfer_to_sql_agent() -> str:
    return sql_agent_topic_type

transfer_to_pizza_agent_tool = FunctionTool(transfer_to_pizza_agent, description="Use this for anything related to pizza and its orders.",strict=True)
transfer_to_cafe_agent_tool = FunctionTool(transfer_to_cafe_agent, description="Use this for anything related to coffee,drinks and its orders.",strict=True)
transfer_back_to_master_agent_tool = FunctionTool(transfer_back_to_master_agent, description="Call this if the user brings up a topic outside of your purview.",strict=True)
transfer_to_sql_tool = FunctionTool(transfer_to_sql_agent, description="Use this for anything related to SQL queries for Live Data needs.")

In [None]:
runtime = SingleThreadedAgentRuntime()

model_client = AzureOpenAIChatCompletionClient()
from langchain_openai import AzureChatOpenAI
llm = AzureChatOpenAI()



In [583]:
master_agent_type = await AIAgent.register(
    runtime,
    type=master_agent_topic_type,  # Using the topic type as the agent type.
    factory=lambda: AIAgent(
        description="A Master agent.",
        system_message=SystemMessage(
            content="""You are an advanced customer service AI agent for Fasto Food and Bev Inc., a company that operates two distinct entities: Fasto Coffee and Fasto Pizza. Your primary responsibility is to efficiently triage customer inquiries to the appropriate entity while maintaining a seamless and engaging interaction.

Introduction: Begin each interaction with a concise and friendly introduction. Clearly state your role and express your readiness to assist, ensuring the customer feels welcomed and valued.

Tone and Style: Maintain a professional yet approachable tone. Your language should be clear, concise, and free of jargon. Strive to make your questions and responses feel natural and conversational, avoiding overly formal or robotic phrasing.

Inquiry Handling:

Identify Needs: Subtly guide the conversation to identify whether the inquiry pertains to Fasto Coffee or Fasto Pizza. Use context clues and gentle probing questions to ascertain the customer's needs without making them feel interrogated.
Efficient Triage: Once the relevant entity is identified, seamlessly transition the conversation to focus on that entity."""
        ),
        model_client=model_client,
        tools=[],
        delegate_tools=[
            transfer_to_pizza_agent_tool,
            transfer_to_cafe_agent_tool,
        ],
        agent_topic_type=master_agent_topic_type,
        user_topic_type=user_topic_type,
        orchestrator_topic_type=orchestrator_topic_type,
    ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=master_agent_topic_type, agent_type=master_agent_type.type))




In [584]:
cafe_agent_type = await AIAgent.register(
    runtime,
    type=cafe_agent_topic_type,  # Using the topic type as the agent type.
    factory=lambda: AIAgent(
        description="An agent working for a Cafe as interactive ordering system.",
        system_message=SystemMessage(
            content="""You are BaristaBot, a specialized interactive ordering assistant for Coffee Buzz, the cafe division of Fasto Food and Bev Inc. Coffee Buzz offers a variety of coffee drinks and other beverages.
Your Role:
Menu Expert: Answer customer questions strictly related to menu items, their ingredients, modifiers, and history. Do not engage in off-topic discussions.

Order Management: Clearly structure customer orders, ensuring accuracy and completeness.

Strict Confirmation Protocol: Always explicitly confirm the customer's order before finalizing it. Never assume or place an order without clear confirmation from the customer.

Order Handling Instructions:
Adding Items: Use add_to_order only after explicitly verifying that the drink names and modifiers exactly match those listed on the MENU. If unsure, always clarify with the customer first.

Viewing Current Order: Use get_all_order to privately review the current order status (this information is for your reference only; do not show directly to the user).

Clearing Orders: Use clear_order if the customer explicitly requests to reset or cancel their current order.

Confirming Orders (Mandatory):

Always call confirm_order explicitly before placing any orders. This will display the complete order details to the customer for verification.

Carefully listen to the customer's response after confirm_order. If they request modifications or corrections, clearly acknowledge and implement these changes before proceeding.

Placing Orders:

Only call place_order after obtaining explicit confirmation from the customer that their order is correct and final.

Once the order has been successfully placed, politely thank the customer, say goodbye, and immediately use the delegate tool to transfer control back to your master agent.

Important Restrictions:
Never place an order without explicit user confirmation.

Never guess or assume drink names or modifiers that are not explicitly listed on your MENU.

If a customer's request is unclear or ambiguous, always ask clarifying questions before proceeding.

Your primary goal is to ensure accuracy, clarity, and customer satisfaction by strictly adhering to these guidelines."""
        ),
        model_client=model_client,
        tools=[get_menu_tool,add_to_order_tool,get_all_orders_tool,clear_orders_tool,place_order_tool,confirm_order_tool],
        delegate_tools=[
            transfer_back_to_master_agent_tool,
            transfer_to_sql_tool
        ],
        agent_topic_type=cafe_agent_topic_type,
        user_topic_type=user_topic_type,
        orchestrator_topic_type=orchestrator_topic_type
    ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=cafe_agent_topic_type, agent_type=cafe_agent_type.type))



In [585]:
pizza_agent_type = await AIAgent.register(
    runtime,
    type=pizza_agent_topic_type,  # Using the topic type as the agent type.
    factory=lambda: AIAgent(
        description="An agent working for a Pizza Serving Company.",
        system_message=SystemMessage(
            content="You are a customer service bot of Fasto Food and Bev Inc for their Cafe Division Named 'OverCrust'. "
            "OverCrust serves a variety of Pizza "
            "You are a PizzaBot, an interactive Pizza ordering system. A human will talk to you about the "
    "available products you have and you will answer any questions about menu items (and only about "
    "menu items - no off-topic discussion, but you can chat about the products and their history) - from get_menu_tool_pizza. "
    "The customer will place an order for 1 or more items from the menu, which you will structure "
    "and send to the ordering system after confirming the order with the human. "
    "\n\n"
    "Add items to the customer's order with add_to_order, and reset the order with clear_order. "
    "To see the contents of the order so far, call get_all_order_pizza (this is shown to you, not the user) "
    "Always confirm_order_pizza with the user (double-check) before calling place_order_pizza. Calling confirm_order will "
    "display the order items to the user and returns their response to seeing the list. Their response may contain modifications. "
    "Always verify and respond with drink and modifier names from the MENU before adding them to the order. "
    "If you are unsure a drink or modifier matches those on the MENU, ask a question to clarify or redirect. "
    "You only have the modifiers listed on the menu. "
    "Once the customer has finished ordering items, Call confirm_order to ensure it is correct then make "
    "any necessary updates and then call place_order. Once place_order_pizza has returned, thank the user and "
    "say goodbye! and use delegate tool to transfer back to master agent",
        ),
        model_client=model_client,
        tools=[get_menu_tool_pizza,add_to_order_pizza_tool,get_all_orders_pizza_tool,clear_orders_pizza_tool,place_order_pizza_tool,confirm_order_pizza_tool],
        delegate_tools=[
            transfer_back_to_master_agent_tool
        ],
        agent_topic_type=pizza_agent_topic_type,
        user_topic_type=user_topic_type,
        orchestrator_topic_type=orchestrator_topic_type
    ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=pizza_agent_topic_type, agent_type=pizza_agent_type.type))

In [None]:
orchestrator_agent_type = await OrchestratorAgent.register(
    runtime,
    type=orchestrator_topic_type,  # Using the topic type as the agent type.
    factory=lambda: OrchestratorAgent(
        description="An Orchestrator agent working for a Master Agent if Multiple Agents are to be managed and results have to be combined.",
        system_message=SystemMessage(
            content="""You are an advanced Orchestrator agent for Fasto Food and Bev Inc., a company that operates two distinct entities: Fasto Coffee and Fasto Pizza. Your primary responsibility is to orchestrate the conversation between the two entities and maintain a seamless and engaging interaction.
            You do have access to all the tools and can delegate the tasks to the respective agents tools by generating proper induvidual query as argument for the agent if the user asked combined query. Your Response will be used by the two agents not the end user so make sure you generate the proper query for the agents to handle the user query.
            and if the query is not needed to send to any agent but to the user then set the to as 'user' and give the response message
            Tools:
            Agent Tool Name - transfer_to_cafe_agent 
            description:Use this for anything related to coffee,drinks and its orders.
            Agent Tool Name - transfer_to_pizza_agent
            description:Use this for anything related to pizza and its orders.
            """,
        ),
        model_client=model_client,
        agent_topic_type=orchestrator_topic_type,
        user_topic_type=user_topic_type,
        master_agent=master_agent_topic_type
    ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=orchestrator_topic_type, agent_type=orchestrator_agent_type.type))

In [587]:
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from autogen_ext.tools.langchain import LangChainToolAdapter
from sqlalchemy import Engine, create_engine
from langchain_community.utilities.sql_database import SQLDatabase
from sqlalchemy.pool import StaticPool
import sqlite3
# Define the SQLite database file name
DATABASE_NAME = "cafe_menu.db"
DATABASE_URL = f"sqlite:///{DATABASE_NAME}"

# Create SQLAlchemy engine
engine = create_engine(DATABASE_URL, echo=True)
db = SQLDatabase(engine)
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
 
tools_sql = [LangChainToolAdapter(tool) for tool in toolkit.get_tools()]


2025-03-09 12:19:47,059 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 12:19:47,060 INFO sqlalchemy.engine.Engine SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite~_%' ESCAPE '~' ORDER BY name
2025-03-09 12:19:47,061 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 12:19:47,063 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 12:19:47,063 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 12:19:47,064 INFO sqlalchemy.engine.Engine SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite~_%' ESCAPE '~' ORDER BY name
2025-03-09 12:19:47,064 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 12:19:47,066 INFO sqlalchemy.engine.Engine SELECT name FROM sqlite_temp_master WHERE type='table' AND name NOT LIKE 'sqlite~_%' ESCAPE '~' ORDER BY name
2025-03-09 12:19:47,066 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 12:19:47,068 INFO sqlalchemy.engine.Engine PRAGMA main.table_xinfo("menu")
2025-03-09 12:19:47,068 IN

In [588]:
sql_db_agent = await AIAgent.register(
    runtime,
    type=sql_agent_topic_type,  # Using the topic type as the agent type.
    factory=lambda: AIAgent(
        description="An agent which when required can perform various database releated task like querying",
        system_message=SystemMessage(
            content="""You are an agent designed to interact with a SQL database.
Given an input question, create a syntactically correct SQLLite query to run, then look at the results of the query and return the answer.
You can order the results by a relevant column to return the most interesting examples in the database.
Query for all the columns in the table, not just the ones you need to answer the question.
You have access to tools for interacting with the database.
Only use the below tools. Only use the information returned by the below tools to construct your final answer.
You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database.

To start you should ALWAYS look at the tables in the database to see what you can query.
Do NOT skip this step.
Then you should query the schema of the most relevant tables.
Once you respond to the user, use the delegate tool to transfer back to the agent from where it came.""",
        ),
        model_client=model_client,
        tools=tools_sql,
        delegate_tools=[transfer_to_cafe_agent_tool],
        agent_topic_type=sql_agent_topic_type,
        user_topic_type=user_topic_type,
        orchestrator_topic_type=orchestrator_topic_type
    ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=sql_agent_topic_type, agent_type=sql_db_agent.type))

In [589]:
# Register the user agent.
user_agent_type = await UserAgent.register(
    runtime,
    type=user_topic_type,
    factory=lambda: UserAgent(
        description="A user agent.",
        user_topic_type=user_topic_type,
        agent_topic_type=master_agent_topic_type,  # Start with the triage agent.
    ),
)
# Add subscriptions for the user agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type))

In [590]:
# Start the runtime.
runtime.start()


# Create a new session for the user.
session_id = str(uuid.uuid4())
await runtime.publish_message(UserLogin(), topic_id=TopicId(user_topic_type, source=session_id))

# Run until completion.
await runtime.stop_when_idle()

--------------------------------------------------------------------------------
User login, session ID: 05c4721a-ce4c-4fa8-9708-78a8b4e87233.
--------------------------------------------------------------------------------
User:
hi
--------------------------------------------------------------------------------
MasterAgent:
Hello there! I'm here to assist you with any questions or concerns you might have related to Fasto Food and Bev Inc. Are you reaching out about Fasto Coffee or Fasto Pizza today?
--------------------------------------------------------------------------------
User:
i want to order drinks and pizza
--------------------------------------------------------------------------------
MasterAgent:
[FunctionCall(id='call_sN1J11WJVfqMg4lwEohkCfyH', arguments='{}', name='transfer_to_cafe_agent'), FunctionCall(id='call_rgqoY3qiksaKZRYPsp7ud6Ik', arguments='{}', name='transfer_to_pizza_agent')]
Delegates are:
2
-------------------------------------------------------------------