# Allowing Human Feedback in Agents

Idea:
1. Human input will be requested but it is silent by default (no input)
2. Human may interject (raise a signal) to participate and insert comment in the up coming chat turn around

In [1]:
import os, dotenv, autogen
from autogen import ConversableAgent


import autogen
from autogen.io.websockets import IOWebsockets, IOStream
dotenv.load_dotenv()

config_list_gemini = autogen.config_list_from_json(
    "OAI_CONFIG_LIST",
    filter_dict={
        "model": ["gemini-pro"],
    },
)
llm_config = {
    "cache_seed": 42,  # change the cache_seed for different trials
    "temperature": 0,
    "config_list": config_list_gemini,
    "timeout": 120,
}
llm_config_stream = llm_config

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import json
from autogen import ConversableAgent
from autogen.agentchat.conversable_agent import colored, content_str, Dict, Union, Agent, OpenAIWrapper

In [3]:


class MyConversableAgent(ConversableAgent):   
    pass     
    # def _print_received_message(self, message: Union[Dict, str], sender: Agent):
    #     iostream = IOStream.get_default()
    #     # print the message received
    #     iostream.print(colored(sender.name, "yellow"), "(to", f"{self.name}):\n", flush=True)
    #     message = self._message_to_dict(message)
    #     print(message)
    #     if message.get("tool_responses"):  # Handle tool multi-call responses
    #         for tool_response in message["tool_responses"]:
    #             self._print_received_message(tool_response, sender)
    #         if message.get("role") == "tool":
    #             return  # If role is tool, then content is just a concatenation of all tool_responses

    #     if message.get("role") in ["function", "tool"]:
    #         if message["role"] == "function":
    #             id_key = "name"
    #         else:
    #             id_key = "tool_call_id"
    #         id = message.get(id_key, "No id found")
    #         func_print = f"***** Response from calling {message['role']} ({id}) *****"
    #         iostream.print(colored(func_print, "green"), flush=True)
    #         iostream.print(message["content"], flush=True)
    #         iostream.print(colored("*" * len(func_print), "green"), flush=True)
    #     else:
    #         content = message.get("content")
    #         if content is not None:
    #             if "context" in message:
    #                 content = OpenAIWrapper.instantiate(
    #                     content,
    #                     message["context"],
    #                     self.llm_config and self.llm_config.get("allow_format_str_template", False),
    #                 )
    #             iostream.print(content_str(content), flush=True)
    #         if "function_call" in message and message["function_call"]:
    #             function_call = dict(message["function_call"])
    #             func_print = (
    #                 f"***** Suggested function call: {function_call.get('name', '(No function name found)')} *****"
    #             )
    #             iostream.print(colored(func_print, "green"), flush=True)
    #             iostream.print(
    #                 "Arguments: \n",
    #                 function_call.get("arguments", "(No arguments found)"),
    #                 flush=True,
    #                 sep="",
    #             )
    #             iostream.print(colored("*" * len(func_print), "green"), flush=True)
    #         if "tool_calls" in message and message["tool_calls"]:
    #             for tool_call in message["tool_calls"]:
    #                 id = tool_call.get("id", "No tool call id found")
    #                 function_call = dict(tool_call.get("function", {}))
    #                 func_print = f"***** Suggested tool call ({id}): {function_call.get('name', '(No function name found)')} *****"
    #                 iostream.print(colored(func_print, "green"), flush=True)
    #                 iostream.print(
    #                     "Arguments: \n",
    #                     function_call.get("arguments", "(No arguments found)"),
    #                     flush=True,
    #                     sep="",
    #                 )
    #                 iostream.print(colored("*" * len(func_print), "green"), flush=True)

    #     iostream.print("\n", "-" * 80, flush=True, sep="")
    #     iostream.print(json.dumps({"message": message, "sender": sender.name}))

class HandRaiseConversableAgent(ConversableAgent):
    def __init__(self, *args, websocket_uri=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.websocket_uri = websocket_uri  # WebSocket for monitoring
        self.hand_raise = False  # Initial state
        self.default_message = "skip"  # Default message

    async def monitor_hand_raise(self):
        """Asynchronously monitor the WebSocket for hand-raise signals."""
        if not self.websocket_uri:
            return

        async with IOWebsockets.connect(self.websocket_uri) as ws:
            async for message in ws:
                if message == "raise-hand":
                    print("Hand-raise detected!")
                    self.hand_raise = True
                elif message == "lower-hand":
                    print("Hand-raise cleared.")
                    self.hand_raise = False

    def get_human_input(self, prompt: str) -> str:
        """Override to handle human input based on hand-raise signal."""
        if self.hand_raise:
            print("Hand-raise detected. Awaiting human input...")
            iostream = IOStream.get_default()
            reply = iostream.input(prompt)  # Non-blocking input from IOStream
            self._human_input.append(reply)
            return reply
        else:
            print(f"No hand-raise detected. Returning default message: '{self.default_message}'")
            return self.default_message
            

In [4]:
agent1 = ConversableAgent(
    "agent1",
    system_message="You are have an indefinite conversation for fun, you can talk about anything but you must always reply and add a question for following up.",
    llm_config=llm_config_stream,
    # is_termination_msg=lambda msg: "53" in msg["content"],  # terminate if the number is guessed by the other agent
    human_input_mode="NEVER",  # never ask for human input
)
agent2 = ConversableAgent(
    "agent2",
    system_message="You are have an indefinite conversation for fun, you can talk about anything but you must always reply and add a question for following up.",
    llm_config=llm_config_stream,
    # is_termination_msg=lambda msg: "53" in msg["content"],  # terminate if the number is guessed by the other agent
    human_input_mode="NEVER",  # never ask for human input
)

human_proxy = HandRaiseConversableAgent(
    "human_proxy",
    llm_config=False,  # no LLM used for human proxy
    is_termination_msg=lambda msg: "CORRECT!" in msg["content"],
    # terminate if the number is guessed by the other agent
    human_input_mode="ALWAYS",  # always ask for human input
)
def custom_speaker_selection_func(last_speaker: Agent, groupchat: autogen.GroupChat):
    """Define a customized speaker selection function.
    A recommended way is to define a transition for each speaker in the groupchat.

    Returns:
        Return an `Agent` class or a string from ['auto', 'manual', 'random', 'round_robin'] to select a default method to use.
    """
    messages = groupchat.messages
    if len(messages) <= 1:
        return agent1
    if len(messages) <= 2:
        return agent2
    
    if last_speaker is human_proxy:
        if messages[-2]["name"] == "agent1":
            # If it is the planning stage, let the planner to continue
            return agent2
        elif messages[-2]["name"] == "agent2":
            # If the last message is from the scientist, let the scientist to continue
            return agent1

    elif last_speaker in [agent1, agent2]:
        # Always let the user to speak after the agent
        return human_proxy
    
groupchat = autogen.GroupChat(
    agents=[human_proxy, agent1, agent2],
    messages=[],
    max_round=20,
    speaker_selection_method=custom_speaker_selection_func,
)

manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

In [5]:
def on_connect(iostream: IOWebsockets) -> None:
    print(f" - on_connect(): Connected to client using IOWebsockets {iostream}", flush=True)

    try:
        # 1. Receive Initial Message
        initial_msg = "talk about anything you want"
        # initial_msg = iostream.input()  # Blocking until a message is received
        # if initial_msg:
        #     print(f"Received message from client: {initial_msg}", flush=True)

        with IOStream.set_default(iostream):
            # 2. Initiate the chat with the agent
            print(f"Initiating chat with agent using message '{initial_msg}'", flush=True)
            # This is where your chat initiation logic happens
            agent1.initiate_chat(
                manager, 
                message=initial_msg,
                clear_history=False  # Set clear_history based on your business logic
            )
        
        # 3. After the chat initiation, close the connection
        # print("Closing WebSocket connection after chat initiation.", flush=True)
        # IOStream.get_default().close()  # Close the IOStream connection

    except Exception as e:
        # Handle any exceptions and ensure the connection is closed in case of failure
        print(f"Error during WebSocket communication: {str(e)}", flush=True)
        IOStream.get_default().close()  # Close the IOStream connection


In [8]:
from contextlib import asynccontextmanager  # noqa: E402
from pathlib import Path  # noqa: E402

from fastapi import FastAPI  # noqa: E402
from fastapi.responses import HTMLResponse  # noqa: E402

html_path = "agentchat_websocket_server/chat.html"

@asynccontextmanager
async def run_websocket_server(app):
    try:
        with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
            print(f"WebSocket server started at {uri}.", flush=True)
            yield
    except Exception as e:
        print(f"WebSocket server failed: {str(e)}", flush=True)


app = FastAPI(lifespan=run_websocket_server)


@app.get("/")
async def get():
    html_file = Path(html_path)
    html_content = html_file.read_text()
    return HTMLResponse(content=html_content, media_type="text/html")

Error during WebSocket communication: received 1005 (no status received [internal]); then sent 1005 (no status received [internal])


 - IOWebsockets._handler(): Error in on_connect: 'IOWebsockets' object has no attribute 'close'


In [9]:
import uvicorn  # noqa: E402

config = uvicorn.Config(app)
server = uvicorn.Server(config)
await server.serve()  # noqa: F704

INFO:     Started server process [328227]
INFO:     Waiting for application startup.


WebSocket server started at ws://127.0.0.1:8080.


INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:35922 - "GET / HTTP/1.1" 500 Internal Server Error


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/mymm_psu_gmail_com/miniconda3/envs/dev/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 411, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mymm_psu_gmail_com/miniconda3/envs/dev/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 69, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mymm_psu_gmail_com/miniconda3/envs/dev/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/home/mymm_psu_gmail_com/miniconda3/envs/dev/lib/python3.12/site-packages/starlette/applications.py", line 113, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/mymm_psu_gmail_com/miniconda3/envs/dev/li

If you run the code above, you will be prompt to enter a response
each time it is your turn to speak. You can see the human in the conversation
was not very good at guessing the number... but hey the agent was nice enough
to give out the number in the end.

## Human Input Mode = `TERMINATE`

In this mode, human input is only requested when a termination condition is
met. **If the human chooses to intercept and reply, the counter will be reset**; if 
the human chooses to skip, the automatic reply mechanism will be used; if the human
chooses to terminate, the conversation will be terminated.

Let us see this mode in action by playing the same game again, but this time
the guessing agent will only have two chances to guess the number, and if it 
fails, the human will be asked to provide feedback,
and the guessing agent gets two more chances.
If the correct number is guessed eventually, the conversation will be terminated.