Skip to content

Commit

Permalink
Introducing IOStream protocol and adding support for websockets (#1551)
Browse files Browse the repository at this point in the history
* Introducing IOStream

* bug fixing

* polishing

* refactoring

* refactoring

* refactoring

* wip: async tests

* websockets added

* wip

* merge with main

* notebook added

* FastAPI example added

* wip

* merge

* getter/setter to iostream added

* website/blog/2024-03-03-AutoGen-Update/img/dalle_gpt4v.png: convert to Git LFS

* website/blog/2024-03-03-AutoGen-Update/img/gaia.png: convert to Git LFS

* website/blog/2024-03-03-AutoGen-Update/img/teach.png: convert to Git LFS

* add SSL support

* wip

* wip

* exception handling added to on_connect()

* refactoring: default iostream is being set in a context manager

* test fix

* polishing

* polishing

* polishing

* fixed bug with new thread

* polishing

* a bit of refactoring and docs added

* notebook added to docs

* type checking added to CI

* CI fix

* CI fix

* CI fix

* polishing

* obsolete todo comment removed

* fixed precommit error

---------

Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
  • Loading branch information
davorrunje and ekzhu committed Mar 26, 2024
1 parent 72994ea commit 78aa0eb
Show file tree
Hide file tree
Showing 17 changed files with 1,252 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
- name: Coverage
if: matrix.python-version == '3.10'
run: |
pip install -e .[test,redis]
pip install -e .[test,redis,websockets]
coverage run -a -m pytest test --ignore=test/agentchat/contrib --skip-openai --durations=10 --durations-min=1.0
coverage xml
- name: Upload coverage to Codecov
Expand Down
13 changes: 8 additions & 5 deletions autogen/agentchat/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .utils import consolidate_chat_info
import datetime
import warnings
from ..io.base import IOStream
from ..formatting_utils import colored


Expand Down Expand Up @@ -103,6 +104,8 @@ def __find_async_chat_order(chat_ids: Set[int], prerequisites: List[Prerequisite


def __post_carryover_processing(chat_info: Dict[str, Any]) -> None:
iostream = IOStream.get_default()

if "message" not in chat_info:
warnings.warn(
"message is not provided in a chat_queue entry. input() will be called to get the initial message.",
Expand All @@ -122,18 +125,18 @@ def __post_carryover_processing(chat_info: Dict[str, Any]) -> None:
print_message = "Dict: " + str(message)
elif message is None:
print_message = "None"
print(colored("\n" + "*" * 80, "blue"), flush=True, sep="")
print(
iostream.print(colored("\n" + "*" * 80, "blue"), flush=True, sep="")
iostream.print(
colored(
"Starting a new chat....",
"blue",
),
flush=True,
)
if chat_info.get("verbose", False):
print(colored("Message:\n" + print_message, "blue"), flush=True)
print(colored("Carryover:\n" + print_carryover, "blue"), flush=True)
print(colored("\n" + "*" * 80, "blue"), flush=True, sep="")
iostream.print(colored("Message:\n" + print_message, "blue"), flush=True)
iostream.print(colored("Carryover:\n" + print_carryover, "blue"), flush=True)
iostream.print(colored("\n" + "*" * 80, "blue"), flush=True, sep="")


def initiate_chats(chat_queue: List[Dict[str, Any]]) -> List[ChatResult]:
Expand Down
90 changes: 60 additions & 30 deletions autogen/agentchat/conversable_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ..oai.client import ModelClient, OpenAIWrapper
from ..runtime_logging import log_new_agent, logging_enabled
from .agent import Agent, LLMAgent
from ..io.base import IOStream
from .chat import ChatResult, a_initiate_chats, initiate_chats
from .utils import consolidate_chat_info, gather_usage_summary

Expand Down Expand Up @@ -681,8 +682,9 @@ async def a_send(
)

def _print_received_message(self, message: Union[Dict, str], sender: Agent):
iostream = IOStream.get_default()
# print the message received
print(colored(sender.name, "yellow"), "(to", f"{self.name}):\n", flush=True)
iostream.print(colored(sender.name, "yellow"), "(to", f"{self.name}):\n", flush=True)
message = self._message_to_dict(message)

if message.get("tool_responses"): # Handle tool multi-call responses
Expand All @@ -698,9 +700,9 @@ def _print_received_message(self, message: Union[Dict, str], sender: Agent):
id_key = "tool_call_id"
id = message.get(id_key, "No id found")
func_print = f"***** Response from calling {message['role']} ({id}) *****"
print(colored(func_print, "green"), flush=True)
print(message["content"], flush=True)
print(colored("*" * len(func_print), "green"), flush=True)
iostream.print(colored(func_print, "green"), flush=True)
iostream.print(message["content"], flush=True)
iostream.print(colored("*" * len(func_print), "green"), flush=True)
else:
content = message.get("content")
if content is not None:
Expand All @@ -710,35 +712,35 @@ def _print_received_message(self, message: Union[Dict, str], sender: Agent):
message["context"],
self.llm_config and self.llm_config.get("allow_format_str_template", False),
)
print(content_str(content), flush=True)
iostream.print(content_str(content), flush=True)
if "function_call" in message and message["function_call"]:
function_call = dict(message["function_call"])
func_print = (
f"***** Suggested function call: {function_call.get('name', '(No function name found)')} *****"
)
print(colored(func_print, "green"), flush=True)
print(
iostream.print(colored(func_print, "green"), flush=True)
iostream.print(
"Arguments: \n",
function_call.get("arguments", "(No arguments found)"),
flush=True,
sep="",
)
print(colored("*" * len(func_print), "green"), flush=True)
iostream.print(colored("*" * len(func_print), "green"), flush=True)
if "tool_calls" in message and message["tool_calls"]:
for tool_call in message["tool_calls"]:
id = tool_call.get("id", "No tool call id found")
function_call = dict(tool_call.get("function", {}))
func_print = f"***** Suggested tool call ({id}): {function_call.get('name', '(No function name found)')} *****"
print(colored(func_print, "green"), flush=True)
print(
iostream.print(colored(func_print, "green"), flush=True)
iostream.print(
"Arguments: \n",
function_call.get("arguments", "(No arguments found)"),
flush=True,
sep="",
)
print(colored("*" * len(func_print), "green"), flush=True)
iostream.print(colored("*" * len(func_print), "green"), flush=True)

print("\n", "-" * 80, flush=True, sep="")
iostream.print("\n", "-" * 80, flush=True, sep="")

def _process_received_message(self, message: Union[Dict, str], sender: Agent, silent: bool):
# When the agent receives a message, the role of the message is "user". (If 'role' exists and is 'function', it will remain unchanged.)
Expand Down Expand Up @@ -1229,6 +1231,7 @@ def clear_history(self, recipient: Optional[Agent] = None, nr_messages_to_preser
recipient: the agent with whom the chat history to clear. If None, clear the chat history with all agents.
nr_messages_to_preserve: the number of newest messages to preserve in the chat history.
"""
iostream = IOStream.get_default()
if recipient is None:
if nr_messages_to_preserve:
for key in self._oai_messages:
Expand All @@ -1238,7 +1241,7 @@ def clear_history(self, recipient: Optional[Agent] = None, nr_messages_to_preser
first_msg_to_save = self._oai_messages[key][-nr_messages_to_preserve_internal]
if "tool_responses" in first_msg_to_save:
nr_messages_to_preserve_internal += 1
print(
iostream.print(
f"Preserving one more message for {self.name} to not divide history between tool call and "
f"tool response."
)
Expand All @@ -1249,7 +1252,7 @@ def clear_history(self, recipient: Optional[Agent] = None, nr_messages_to_preser
else:
self._oai_messages[recipient].clear()
if nr_messages_to_preserve:
print(
iostream.print(
colored(
"WARNING: `nr_preserved_messages` is ignored when clearing chat history with a specific agent.",
"yellow",
Expand Down Expand Up @@ -1323,8 +1326,19 @@ async def a_generate_oai_reply(
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
"""Generate a reply using autogen.oai asynchronously."""
iostream = IOStream.get_default()

def _generate_oai_reply(
self, iostream: IOStream, *args: Any, **kwargs: Any
) -> Tuple[bool, Union[str, Dict, None]]:
with IOStream.set_default(iostream):
return self.generate_oai_reply(*args, **kwargs)

return await asyncio.get_event_loop().run_in_executor(
None, functools.partial(self.generate_oai_reply, messages=messages, sender=sender, config=config)
None,
functools.partial(
_generate_oai_reply, self=self, iostream=iostream, messages=messages, sender=sender, config=config
),
)

def _generate_code_execution_reply_using_executor(
Expand All @@ -1334,6 +1348,8 @@ def _generate_code_execution_reply_using_executor(
config: Optional[Union[Dict, Literal[False]]] = None,
):
"""Generate a reply using code executor."""
iostream = IOStream.get_default()

if config is not None:
raise ValueError("config is not supported for _generate_code_execution_reply_using_executor.")
if self._code_execution_config is False:
Expand Down Expand Up @@ -1371,15 +1387,15 @@ def _generate_code_execution_reply_using_executor(

num_code_blocks = len(code_blocks)
if num_code_blocks == 1:
print(
iostream.print(
colored(
f"\n>>>>>>>> EXECUTING CODE BLOCK (inferred language is {code_blocks[0].language})...",
"red",
),
flush=True,
)
else:
print(
iostream.print(
colored(
f"\n>>>>>>>> EXECUTING {num_code_blocks} CODE BLOCKS (inferred languages are [{', '.join([x.language for x in code_blocks])}])...",
"red",
Expand Down Expand Up @@ -1631,6 +1647,8 @@ def check_termination_and_human_reply(
- Tuple[bool, Union[str, Dict, None]]: A tuple containing a boolean indicating if the conversation
should be terminated, and a human reply which can be a string, a dictionary, or None.
"""
iostream = IOStream.get_default()

if config is None:
config = self
if messages is None:
Expand Down Expand Up @@ -1675,7 +1693,7 @@ def check_termination_and_human_reply(

# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
iostream.print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)

# stop the conversation
if reply == "exit":
Expand Down Expand Up @@ -1715,7 +1733,7 @@ def check_termination_and_human_reply(
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
iostream.print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)

return False, None

Expand All @@ -1742,6 +1760,8 @@ async def a_check_termination_and_human_reply(
- Tuple[bool, Union[str, Dict, None]]: A tuple containing a boolean indicating if the conversation
should be terminated, and a human reply which can be a string, a dictionary, or None.
"""
iostream = IOStream.get_default()

if config is None:
config = self
if messages is None:
Expand Down Expand Up @@ -1786,7 +1806,7 @@ async def a_check_termination_and_human_reply(

# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
iostream.print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)

# stop the conversation
if reply == "exit":
Expand Down Expand Up @@ -1826,7 +1846,7 @@ async def a_check_termination_and_human_reply(
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
iostream.print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)

return False, None

Expand Down Expand Up @@ -2001,7 +2021,9 @@ def get_human_input(self, prompt: str) -> str:
Returns:
str: human input.
"""
reply = input(prompt)
iostream = IOStream.get_default()

reply = iostream.input(prompt)
self._human_input.append(reply)
return reply

Expand All @@ -2016,8 +2038,8 @@ async def a_get_human_input(self, prompt: str) -> str:
Returns:
str: human input.
"""
reply = input(prompt)
self._human_input.append(reply)
loop = asyncio.get_running_loop()
reply = await loop.run_in_executor(None, functools.partial(self.get_human_input, prompt))
return reply

def run_code(self, code, **kwargs):
Expand All @@ -2038,12 +2060,14 @@ def run_code(self, code, **kwargs):

def execute_code_blocks(self, code_blocks):
"""Execute the code blocks and return the result."""
iostream = IOStream.get_default()

logs_all = ""
for i, code_block in enumerate(code_blocks):
lang, code = code_block
if not lang:
lang = infer_lang(code)
print(
iostream.print(
colored(
f"\n>>>>>>>> EXECUTING CODE BLOCK {i} (inferred language is {lang})...",
"red",
Expand Down Expand Up @@ -2124,6 +2148,8 @@ def execute_function(self, func_call, verbose: bool = False) -> Tuple[bool, Dict
"function_call" deprecated as of [OpenAI API v1.1.0](https://github.com/openai/openai-python/releases/tag/v1.1.0)
See https://platform.openai.com/docs/api-reference/chat/create#chat-create-function_call
"""
iostream = IOStream.get_default()

func_name = func_call.get("name", "")
func = self._function_map.get(func_name, None)

Expand All @@ -2139,7 +2165,7 @@ def execute_function(self, func_call, verbose: bool = False) -> Tuple[bool, Dict

# Try to execute the function
if arguments is not None:
print(
iostream.print(
colored(f"\n>>>>>>>> EXECUTING FUNCTION {func_name}...", "magenta"),
flush=True,
)
Expand All @@ -2152,7 +2178,7 @@ def execute_function(self, func_call, verbose: bool = False) -> Tuple[bool, Dict
content = f"Error: Function {func_name} not found."

if verbose:
print(
iostream.print(
colored(f"\nInput arguments: {arguments}\nOutput:\n{content}", "magenta"),
flush=True,
)
Expand All @@ -2179,6 +2205,8 @@ async def a_execute_function(self, func_call):
"function_call" deprecated as of [OpenAI API v1.1.0](https://github.com/openai/openai-python/releases/tag/v1.1.0)
See https://platform.openai.com/docs/api-reference/chat/create#chat-create-function_call
"""
iostream = IOStream.get_default()

func_name = func_call.get("name", "")
func = self._function_map.get(func_name, None)

Expand All @@ -2194,7 +2222,7 @@ async def a_execute_function(self, func_call):

# Try to execute the function
if arguments is not None:
print(
iostream.print(
colored(f"\n>>>>>>>> EXECUTING ASYNC FUNCTION {func_name}...", "magenta"),
flush=True,
)
Expand Down Expand Up @@ -2639,10 +2667,12 @@ def process_last_received_message(self, messages):

def print_usage_summary(self, mode: Union[str, List[str]] = ["actual", "total"]) -> None:
"""Print the usage summary."""
iostream = IOStream.get_default()

if self.client is None:
print(f"No cost incurred from agent '{self.name}'.")
iostream.print(f"No cost incurred from agent '{self.name}'.")
else:
print(f"Agent '{self.name}':")
iostream.print(f"Agent '{self.name}':")
self.client.print_usage_summary(mode)

def get_actual_usage(self) -> Union[None, Dict[str, int]]:
Expand Down
Loading

0 comments on commit 78aa0eb

Please sign in to comment.