In session 4, we demonstrated an example of developing a chatbot to serve customers of Contoso. In that example, we used one agent to manage the conversation. 

In this session, we will show an example to demonstrate how using multiple agents can help with more complex tasks.

Let's assume that we want to build a data management system for Contoso operational database that enables the following via natural language commands (NL2SQL):

- Answering questions about the data 
- Adding or removing records 
- Adding new stored procedures
    - Enforce using stored procedures for data modifications if available 


## Let's set up the base requirements

### Initiate a connection pool for better management of postgreSQL connections

In [1]:
from src.get_conn import get_connection_uri
from psycopg2 import pool
connection_pool = None
def init_pool():
    # Initialize connection pool
    global connection_pool
    if connection_pool is None:
        conn_string = get_connection_uri()
        connection_pool = pool.SimpleConnectionPool(
            minconn=1,
            maxconn=10,
            dsn=conn_string
        )


#### Initiating a global connection pool

In [2]:
init_pool()

Connection uri was rertieved successfully.


### Core plugins (getting Schema, Read and Write)

In [3]:
import psycopg2
import json
from semantic_kernel.functions import kernel_function

################## Schema Plugin ##################
# This plugin retrieves the database schema, including tables, columns, data types...
class Contoso_SchemaPlugin:
    @kernel_function
    async def get_db_schema(self) -> str:
        global connection_pool
        """Gets the database schema."""
        res = ""
        conn = connection_pool.getconn()
        try:
            
            query = """
            SELECT
                cols.table_schema,
                cols.table_name,
                cols.column_name,
                cols.data_type,
                cols.is_nullable,
                cons.constraint_type,
                cons.constraint_name,
                fk.references_table AS referenced_table,
                fk.references_column AS referenced_column
            FROM information_schema.columns cols
            LEFT JOIN information_schema.key_column_usage kcu
                ON cols.table_schema = kcu.table_schema
                AND cols.table_name = kcu.table_name
                AND cols.column_name = kcu.column_name
            LEFT JOIN information_schema.table_constraints cons
                ON kcu.table_schema = cons.table_schema
                AND kcu.table_name = cons.table_name
                AND kcu.constraint_name = cons.constraint_name
            LEFT JOIN (
                SELECT
                    rc.constraint_name,
                    kcu.table_name AS references_table,
                    kcu.column_name AS references_column
                FROM information_schema.referential_constraints rc
                JOIN information_schema.key_column_usage kcu
                    ON rc.unique_constraint_name = kcu.constraint_name
            ) fk
                ON cons.constraint_name = fk.constraint_name
            WHERE cols.table_schema = 'public'
            ORDER BY cols.table_schema, cols.table_name, cols.ordinal_position;
            """
            curs = conn.cursor()
            curs.execute(query)
            columns = [desc[0] for desc in curs.description]
            rows = curs.fetchall()
            curs.close()
            schema_info = [dict(zip(columns, row)) for row in rows]
            res = json.dumps(schema_info, indent=2)
        except Exception as e:
            print(f"Could not fetch database schema: {e}")
            res = ""
        finally:
            connection_pool.putconn(conn)
        return res
    
################### Read Plugin ##################    
# This plugin enbales executing SELECT queries ONLY and returns the results.
# It does not allow any write operations.
class Contoso_ReadPlugin:     
    @kernel_function
    async def execute_select_query(self, query: str) -> list:
        global connection_pool
        """Executes a SELECT query and returns the results."""
        res = []
        if query.startswith("SELECT"):
            try:
                conn = connection_pool.getconn()
                curs = conn.cursor()
                curs.execute(query)
                res = curs.fetchall()
                curs.close()
            except psycopg2.Error as e:
                conn.rollback()
                res = ["Could not perform the operation due to error: " + str(e)]
            finally:
                connection_pool.putconn(conn)
        return res
    
#################### Write Plugin ##################
# This plugin allows executing write operations (INSERT, UPDATE, DELETE) on the database.    
# It also provides a method to retrieve information about available stored procedures.
class Contoso_WritePlugin:
    @kernel_function
    async def get_procedure_info(self) -> str:
        global connection_pool
        """Gets information about available stored procedures in the database."""
        conn = connection_pool.getconn()
        res = ""
        try:
            procedure_query = """
            SELECT
                routine_schema,
                routine_name,
                routine_type,
                data_type AS return_type,
                specific_name
            FROM information_schema.routines
            WHERE routine_schema = 'public'
            ORDER BY routine_schema, routine_name;
            """
            curs = conn.cursor()
            curs.execute(procedure_query)
            columns = [desc[0] for desc in curs.description]
            rows = curs.fetchall()
            curs.close()
            proc_info = [dict(zip(columns, row)) for row in rows]
            res =  json.dumps(proc_info, indent=2)
        except Exception as e:
            print(f"Could not execute query: {e}")
            res =  ""
        finally:
            connection_pool.putconn(conn)
        return res

    @kernel_function
    async def execute_write_query(self, query: str) -> list:
        global connection_pool
        res = []
        if not query.startswith("SELECT"):
            conn = connection_pool.getconn()
            try:
                query_cursor = conn.cursor()
                query_cursor.execute(query)
                res = ["Operation successful"]
                conn.commit()     
            except psycopg2.Error as e:
                conn.rollback()
                res = ["Could not perform the operation due to error: " + str(e)]   
            finally:
                query_cursor.close()
                connection_pool.putconn(conn)
            return res

        
    

### Function for defining agents

In [None]:
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion

def get_db_agents():

    # Initialize the plugins
    read_plugin = Contoso_ReadPlugin()
    write_plugin = Contoso_WritePlugin()
    schema_plugin = Contoso_SchemaPlugin()

    # Read Agent
    read_agent = ChatCompletionAgent(
        service=AzureChatCompletion(),
        name="ReadAgent",
        description="Responsible for reading data from the database.",
        plugins=[read_plugin, schema_plugin],
        instructions="""You are responsible for answering questions about the data via running SELECT queries on the database.

                        Below are the instructions you must follow:
                        - You must always first retrieve and share the full database schema.
                        - You can only run SELECT queries to read data from the database. You cannot modify the data.
                        - You must always lowercase the condition values in the WHERE clause.

                        If task is to modify the data, say 'Passing to the WriteAgent'.
                        """
    )
    
    # Write Agent
    write_agent = ChatCompletionAgent(
        service=AzureChatCompletion(),
        name="WriteAgent",
        description="Responsible for modifying data in the database.",
        plugins=[write_plugin, schema_plugin],
        instructions="""You are responsible for modifying data in the database while maintaining referential integrity.

                        Below are the instructions you must follow:
                        - You must always first esnure you have the database schema.
                        - For adding a new record, ensure that there is value provided for all required NOT NULL columns. Ask the user for any missing values.
                        - Always prioritize using a stored procedure if available for that operation. 
                        - You must always ensure that the operation does not violate referential integrity.
                        - Before executing the query, you must always first ask user for confirmation.
                        """
    )




    return [read_agent, write_agent]


## Group Chat Orchestration with RoundRobin Manager

In this form of managed chat, agents take turn in a roundrobin fashion and take a stab at the task. The resulting message of each agent is then broadcasted to all agents in the group.

### Asking questions about data

Let's start simple and only see an example of a group chat answering simple questions.

##### Initializing agents

In [5]:
from semantic_kernel.agents import GroupChatOrchestration, RoundRobinGroupChatManager
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.contents import ChatMessageContent, AuthorRole, ChatHistory

# to print the messages from the agents
def agent_response_callback(message: ChatMessageContent) -> None:
    """Observer function to print the messages from the agents."""
    print("Role:", message.name, "Response:", message.content)

# Initialize the agents
db_agents = get_db_agents()




##### Initializing group chat and running an example

In [6]:
# 1. Initiate a GroupChatOrchestration with the agents and a RoundRobinGroupChatManager
group_chat_orchestration = GroupChatOrchestration(
    members=db_agents,
    manager=RoundRobinGroupChatManager(
    max_rounds=(4)),   # limiting to 4 rounds
    agent_response_callback=agent_response_callback
    )

# 2. Create a runtime and start it
runtime = InProcessRuntime()
runtime.start()

user_input = "How many customers have sentiment score less than 3? show me the reviews they wrote."

# 3. Invoke the orchestration with a task and the runtime
orchestration_result = await group_chat_orchestration.invoke(
    task=user_input,
    runtime=runtime
)

# 4. Wait for the chat to complete and get the result
value = await orchestration_result.get()


# 5. Stop the runtime after the invocation is complete

await runtime.stop_when_idle()


Role: ReadAgent Response: Here is the full database schema:

Tables and Columns:

1. customers
   - customer_id (PK, integer)
   - city (character varying)
   - state (character varying)
   - country (character varying)
   - sentiment_score (numeric)
   - name (character varying)
   - email (character varying, UNIQUE)

2. product_desc
   - vector_id (PK, integer)
   - embedding (USER-DEFINED)
   - product_id (integer, FK -> products.product_id)

3. product_desc_sk
   - key (PK, text)
   - embedding (USER-DEFINED)
   - metadata (jsonb)
   - timestamp (timestamp without time zone)

4. products
   - product_id (PK, integer)
   - name (character varying)
   - description (text)
   - price (numeric)
   - inventory (numeric)
   - refurbished (boolean)
   - category (character varying)

5. return_items
   - return_id (PK, integer)
   - sales_id (integer, FK -> sales.sales_id)
   - return_status (character varying)
   - reason (text)
   - status_date (date)

6. reviews
   - review_id (PK, inte

### Adding human in the loop 

Basic chat managers do not ask for human input when solving a task. So we would need to add this capability to the manager. For this usecase, we want to ensure the WriteAgent asks for user confirmation before modifying the database.

In [7]:
from semantic_kernel.agents import GroupChatOrchestration, RoundRobinGroupChatManager, BooleanResult
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.contents import ChatMessageContent, ChatHistory, AuthorRole
import sys
from typing_extensions import override  

### Customizing Round Robin Group Chat Manager ###

class CustomRoundRobinGroupChatManager(RoundRobinGroupChatManager):
    """Custom round robin group chat manager to enable user input."""

    @override
    async def should_request_user_input(self, chat_history: ChatHistory) -> BooleanResult:
        """Override the default behavior to request user input after the WriteAgent's message.

        The manager will check if input from human is needed after each agent message.
        """
        if len(chat_history.messages) == 0:
            return BooleanResult(
                result=False,
                reason="No agents have spoken yet.",
            )
        last_message = chat_history.messages[-1]
        if last_message.name == "WriteAgent":
            return BooleanResult(
                result=True,
                reason="User input is needed.",
            )

        return BooleanResult(
            result=False,
            reason="User input is not needed if the last message is not from the WriteAgent.",
        )
    # Override the should_terminate method to check for exit words in user messages.
    @override
    async def should_terminate(self, chat_history: ChatHistory) -> BooleanResult:
        # Check if the last user message is an exit word
        if len(chat_history.messages) == 0:
            return BooleanResult(
                result=False,
                reason="No agents have spoken yet.",
            )
        last_message = chat_history.messages[-1]
        if last_message.role == AuthorRole.USER:
            content = last_message.content.strip().lower()
            if("exit" in content):
                print("User requested to exit the chat.")
                return BooleanResult(
                result=True,
                reason="User Ended the Chat.",
            )
                
        return BooleanResult(
                result=False,
                reason="Chat continues.",
               )
    

async def human_response_function(chat_histoy: ChatHistory) -> ChatMessageContent:
    user_input = input("User (type exit to end the chat): ")
    return ChatMessageContent(role=AuthorRole.USER, content=user_input)

def agent_response_callback(message: ChatMessageContent) -> None:
    """Observer function to print the messages from the agents."""
    print(f"**{message.name}**\n{message.content}")

In [8]:
# Initialize the agents
db_agents = get_db_agents()

#### Example 1: modify data 

In [None]:
# Create a GroupChatOrchestration with the custom RoundRobinGroupChatManager and add the human response function.
group_chat_orchestration = GroupChatOrchestration(
    members=db_agents,
    manager= CustomRoundRobinGroupChatManager(
        max_rounds=20,
        human_response_function=human_response_function,
    ),
        agent_response_callback=agent_response_callback,
)
user_input = "remove a product with id 25 and any associated data"
runtime = InProcessRuntime()
runtime.start()
# Invoke the orchestration 
orchestration_result = await group_chat_orchestration.invoke(
    task=user_input,
    runtime=runtime,
)

# Wait for the results
value = await orchestration_result.get()
print(f"***** Result *****\n{value}")
await runtime.stop_when_idle()

**ReadAgent**
Here is the full database schema:

Tables and columns:

1. customers
   - customer_id (PK)
   - city
   - state
   - country
   - sentiment_score
   - name
   - email (UNIQUE)

2. product_desc
   - vector_id (PK)
   - embedding
   - product_id (FK -> products.product_id)

3. product_desc_sk
   - key (PK)
   - embedding
   - metadata
   - timestamp

4. products
   - product_id (PK)
   - name
   - description
   - price
   - inventory
   - refurbished
   - category

5. return_items
   - return_id (PK)
   - sales_id (FK -> sales.sales_id)
   - return_status
   - reason
   - status_date

6. reviews
   - review_id (PK)
   - customer_id (FK -> customers.customer_id)
   - product_id
   - sales_id (FK -> sales.sales_id)
   - rating
   - review_text
   - review_date

7. sales
   - sales_id (PK)
   - customer_id (FK -> customers.customer_id)
   - quantity
   - product_id (FK -> products.product_id)
   - sale_date

8. shipments
   - shipment_id (PK)
   - sales_id (FK -> sales.sales_

#### Example 2: Create a new stored procedure


In [12]:
# Create a GroupChatOrchestration with the custom RoundRobinGroupChatManager and add the human response function.
group_chat_orchestration = GroupChatOrchestration(
    members=db_agents,
    manager= CustomRoundRobinGroupChatManager(
        max_rounds=20,
        human_response_function=human_response_function,
    ),
        agent_response_callback=agent_response_callback,
)
# user_input = "create a stored procedure to add a new product to products table"
user_input = "add a new product with name 'Test Product', description 'new test product', price 100, category 'Electronics', refurbished false, and inventory 5"
runtime = InProcessRuntime()
runtime.start()
# Invoke the orchestration 
orchestration_result = await group_chat_orchestration.invoke(
    task=user_input,
    runtime=runtime,
)

# Wait for the results
value = await orchestration_result.get()
print(f"***** Result *****\n{value}")
await runtime.stop_when_idle()

**ReadAgent**
Here is the full database schema:

Tables and Columns:
- customers
  - customer_id (integer, PRIMARY KEY)
  - name (character varying)
  - email (character varying, UNIQUE)
  - city (character varying)
  - state (character varying)
  - country (character varying)
  - sentiment_score (numeric)
- product_desc
  - vector_id (integer, PRIMARY KEY)
  - embedding (USER-DEFINED)
  - product_id (integer, FOREIGN KEY → products.product_id)
- product_desc_sk
  - key (text, PRIMARY KEY)
  - embedding (USER-DEFINED)
  - metadata (jsonb)
  - timestamp (timestamp without time zone)
- products
  - product_id (integer, PRIMARY KEY)
  - name (character varying)
  - description (text)
  - price (numeric)
  - inventory (numeric)
  - refurbished (boolean)
  - category (character varying)
- return_items
  - return_id (integer, PRIMARY KEY)
  - sales_id (integer, FOREIGN KEY → sales.sales_id)
  - return_status (character varying)
  - reason (text)
  - status_date (date)
- reviews
  - review_id

## Handoff orchestration

In previous section, we implemented a simple roundrobin group chat in which agents took turn in order to address the task. If the task is complex it can take more rounds to solve with a lot of potentially unnecessary agent participations. Another method is to define handoff logics so the appropriate agent addresses the task and result is achieved more efficiently. 

### Initializing agents alongside their hand-off relationships



In [None]:
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs 


def get_agents_with_handoffs() -> tuple[list[Agent], OrchestrationHandoffs]:

    read_plugin = Contoso_ReadPlugin()
    write_plugin = Contoso_WritePlugin()
    schema_plugin = Contoso_SchemaPlugin()

    planner_agent = ChatCompletionAgent(
    name="PlannerAgent",
    description="Agent to provide schema and start the conversation.",
    instructions="""You are responsible to provide the database schema and start the conversation.
                Below are the instructions you must follow:
                - First, retrieve and share the full database schema.
                - After sharing the schema, ask the user what they would like to do.
                """,
    plugins=[schema_plugin],
    service=AzureChatCompletion(),
    )

    read_agent = ChatCompletionAgent(
        service=AzureChatCompletion(),
        name="ReadAgent",
        description="Responsible for reading data from the database.",
        plugins=[read_plugin],
        instructions="""You are responsible for answering questions about the data via running SELECT queries on the database.

                        Below are the instructions you must follow:
                        - You can only run SELECT queries to read data from the database. You cannot modify the data.
                        - You must ensure the query is valid based on the schema.
                        - You must always lowercase the condition values in the WHERE clause.
                        """
    )

    write_agent = ChatCompletionAgent(
        service=AzureChatCompletion(),
        name="WriteAgent",
        description="Responsible for modifying data in the database.",
        plugins=[write_plugin],
        instructions="""You are responsible for modifying data in the database while maintaining referential integrity.

                        Below are the instructions you must follow:
                        - For adding a new record, ensure that there is value provided for all required NOT NULL columns. Ask the user for any missing values.
                        - Always prioritize using a stored procedure if available for that operation. 
                        - You must always ensure that the operation does not violate referential integrity based on the schema.
                        - You must always ask user for confirmation before executing the query.
                        """
    )


    ################ handoff relationships ####################

    handoffs = (
        OrchestrationHandoffs()
        .add_many(
            source_agent=planner_agent.name,
            target_agents={
                read_agent.name: "Transfer to this agent if the ask is to answer a question about the data via SELECT queries.",
                write_agent.name: "Transfer to this agent if the ask is to add, remove, or modify data in the database.",
            },
        )
        .add_many(
            source_agent=write_agent.name,
            target_agents={
                planner_agent.name: "Transfer to this agent to get database schema.",
                read_agent.name: "Transfer to this agent if the ask is to answer a question about the data via SELECT queries.",
            }
        )
        .add_many(
            source_agent=read_agent.name,
            target_agents={
                planner_agent.name: "Transfer to this agent to get database schema.",
                write_agent.name: "Transfer to this agent if the ask is to add, remove, or modify data in the database.",
            }
        )
        
    )

    return [planner_agent, read_agent, write_agent], handoffs




In [14]:
from semantic_kernel.contents import AuthorRole, ChatMessageContent, FunctionCallContent, FunctionResultContent, ChatHistory
from semantic_kernel.agents.runtime import InProcessRuntime

async def human_response_function() -> ChatMessageContent:
    user_input = input("User (type exit to end the chat): ")
    return ChatMessageContent(role=AuthorRole.USER, content=user_input)

def agent_response_callback(message: ChatMessageContent) -> None:
    print(f"{message.name}: {message.content}")

In [15]:
agents, handoffs = get_agents_with_handoffs()

### Starting the group chat:

In [18]:
handoff_orchestration = HandoffOrchestration(
    members=agents,
    handoffs=handoffs,
    agent_response_callback=agent_response_callback,
    human_response_function=human_response_function,
)

user_input = "Hi"
runtime = InProcessRuntime()
runtime.start()
# 3. Invoke the orchestration with a task and the runtime
orchestration_result = await handoff_orchestration.invoke(
    task=user_input,
    runtime=runtime,
)

# 4. Wait for the results
value = await orchestration_result.get()
print(f"***** Result *****\n{value}")

await runtime.stop_when_idle()


PlannerAgent: Here is the database schema:

### Table: customers
- customer_id (integer, PRIMARY KEY, NOT NULL)
- name (character varying, NOT NULL)
- email (character varying, UNIQUE, NOT NULL)
- city (character varying, nullable)
- state (character varying, nullable)
- country (character varying, nullable)
- sentiment_score (numeric, nullable)

### Table: products
- product_id (integer, PRIMARY KEY, NOT NULL)
- name (character varying, NOT NULL)
- description (text, nullable)
- price (numeric, nullable)
- inventory (numeric, nullable)
- refurbished (boolean, nullable)
- category (character varying, nullable)

### Table: product_desc
- vector_id (integer, PRIMARY KEY, NOT NULL)
- embedding (USER-DEFINED, NOT NULL)
- product_id (integer, FOREIGN KEY to products.product_id, nullable)

### Table: product_desc_sk
- key (text, PRIMARY KEY, NOT NULL)
- embedding (USER-DEFINED, nullable)
- metadata (jsonb, nullable)
- timestamp (timestamp, nullable)

### Table: return_items
- return_id (inte

## Release connection pool after you are done.

In [19]:
##### release connnection pool #####
connection_pool.closeall()