-
Notifications
You must be signed in to change notification settings - Fork 4k
/
conversable_agent.py
2796 lines (2445 loc) · 130 KB
/
conversable_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import copy
import functools
import inspect
import json
import logging
import re
import warnings
from collections import defaultdict
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, TypeVar, Union
from openai import BadRequestError
from autogen.exception_utils import InvalidCarryOverType, SenderRequired
from .._pydantic import model_dump
from ..cache.cache import AbstractCache
from ..code_utils import (
PYTHON_VARIANTS,
UNKNOWN,
check_can_use_docker_or_throw,
content_str,
decide_use_docker,
execute_code,
extract_code,
infer_lang,
)
from ..coding.base import CodeExecutor
from ..coding.factory import CodeExecutorFactory
from ..formatting_utils import colored
from ..function_utils import get_function_schema, load_basemodels_if_needed, serialize_to_str
from ..io.base import IOStream
from ..oai.client import ModelClient, OpenAIWrapper
from ..runtime_logging import log_event, log_new_agent, logging_enabled
from .agent import Agent, LLMAgent
from .chat import ChatResult, a_initiate_chats, initiate_chats
from .utils import consolidate_chat_info, gather_usage_summary
__all__ = ("ConversableAgent",)
logger = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
class ConversableAgent(LLMAgent):
"""(In preview) A class for generic conversable agents which can be configured as assistant or user proxy.
After receiving each message, the agent will send a reply to the sender unless the msg is a termination msg.
For example, AssistantAgent and UserProxyAgent are subclasses of this class,
configured with different default settings.
To modify auto reply, override `generate_reply` method.
To disable/enable human response in every turn, set `human_input_mode` to "NEVER" or "ALWAYS".
To modify the way to get human input, override `get_human_input` method.
To modify the way to execute code blocks, single code block, or function call, override `execute_code_blocks`,
`run_code`, and `execute_function` methods respectively.
"""
DEFAULT_CONFIG = False # False or dict, the default config for llm inference
MAX_CONSECUTIVE_AUTO_REPLY = 100 # maximum number of consecutive auto replies (subject to future change)
DEFAULT_SUMMARY_PROMPT = "Summarize the takeaway from the conversation. Do not add any introductory phrases."
DEFAULT_SUMMARY_METHOD = "last_msg"
llm_config: Union[Dict, Literal[False]]
def __init__(
self,
name: str,
system_message: Optional[Union[str, List]] = "You are a helpful AI Assistant.",
is_termination_msg: Optional[Callable[[Dict], bool]] = None,
max_consecutive_auto_reply: Optional[int] = None,
human_input_mode: Literal["ALWAYS", "NEVER", "TERMINATE"] = "TERMINATE",
function_map: Optional[Dict[str, Callable]] = None,
code_execution_config: Union[Dict, Literal[False]] = False,
llm_config: Optional[Union[Dict, Literal[False]]] = None,
default_auto_reply: Union[str, Dict] = "",
description: Optional[str] = None,
chat_messages: Optional[Dict[Agent, List[Dict]]] = None,
):
"""
Args:
name (str): name of the agent.
system_message (str or list): system message for the ChatCompletion inference.
is_termination_msg (function): a function that takes a message in the form of a dictionary
and returns a boolean value indicating if this received message is a termination message.
The dict can contain the following keys: "content", "role", "name", "function_call".
max_consecutive_auto_reply (int): the maximum number of consecutive auto replies.
default to None (no limit provided, class attribute MAX_CONSECUTIVE_AUTO_REPLY will be used as the limit in this case).
When set to 0, no auto reply will be generated.
human_input_mode (str): whether to ask for human inputs every time a message is received.
Possible values are "ALWAYS", "TERMINATE", "NEVER".
(1) When "ALWAYS", the agent prompts for human input every time a message is received.
Under this mode, the conversation stops when the human input is "exit",
or when is_termination_msg is True and there is no human input.
(2) When "TERMINATE", the agent only prompts for human input only when a termination message is received or
the number of auto reply reaches the max_consecutive_auto_reply.
(3) When "NEVER", the agent will never prompt for human input. Under this mode, the conversation stops
when the number of auto reply reaches the max_consecutive_auto_reply or when is_termination_msg is True.
function_map (dict[str, callable]): Mapping function names (passed to openai) to callable functions, also used for tool calls.
code_execution_config (dict or False): config for the code execution.
To disable code execution, set to False. Otherwise, set to a dictionary with the following keys:
- work_dir (Optional, str): The working directory for the code execution.
If None, a default working directory will be used.
The default working directory is the "extensions" directory under
"path_to_autogen".
- use_docker (Optional, list, str or bool): The docker image to use for code execution.
Default is True, which means the code will be executed in a docker container. A default list of images will be used.
If a list or a str of image name(s) is provided, the code will be executed in a docker container
with the first image successfully pulled.
If False, the code will be executed in the current environment.
We strongly recommend using docker for code execution.
- timeout (Optional, int): The maximum execution time in seconds.
- last_n_messages (Experimental, int or str): The number of messages to look back for code execution.
If set to 'auto', it will scan backwards through all messages arriving since the agent last spoke, which is typically the last time execution was attempted. (Default: auto)
llm_config (dict or False or None): llm inference configuration.
Please refer to [OpenAIWrapper.create](/docs/reference/oai/client#create)
for available options.
When using OpenAI or Azure OpenAI endpoints, please specify a non-empty 'model' either in `llm_config` or in each config of 'config_list' in `llm_config`.
To disable llm-based auto reply, set to False.
When set to None, will use self.DEFAULT_CONFIG, which defaults to False.
default_auto_reply (str or dict): default auto reply when no code execution or llm-based reply is generated.
description (str): a short description of the agent. This description is used by other agents
(e.g. the GroupChatManager) to decide when to call upon this agent. (Default: system_message)
chat_messages (dict or None): the previous chat messages that this agent had in the past with other agents.
Can be used to give the agent a memory by providing the chat history. This will allow the agent to
resume previous had conversations. Defaults to an empty chat history.
"""
# we change code_execution_config below and we have to make sure we don't change the input
# in case of UserProxyAgent, without this we could even change the default value {}
code_execution_config = (
code_execution_config.copy() if hasattr(code_execution_config, "copy") else code_execution_config
)
self._name = name
# a dictionary of conversations, default value is list
if chat_messages is None:
self._oai_messages = defaultdict(list)
else:
self._oai_messages = chat_messages
self._oai_system_message = [{"content": system_message, "role": "system"}]
self._description = description if description is not None else system_message
self._is_termination_msg = (
is_termination_msg
if is_termination_msg is not None
else (lambda x: content_str(x.get("content")) == "TERMINATE")
)
# Take a copy to avoid modifying the given dict
if isinstance(llm_config, dict):
try:
llm_config = copy.deepcopy(llm_config)
except TypeError as e:
raise TypeError(
"Please implement __deepcopy__ method for each value class in llm_config to support deepcopy."
" Refer to the docs for more details: https://microsoft.github.io/autogen/docs/topics/llm_configuration#adding-http-client-in-llm_config-for-proxy"
) from e
self._validate_llm_config(llm_config)
if logging_enabled():
log_new_agent(self, locals())
# Initialize standalone client cache object.
self.client_cache = None
self.human_input_mode = human_input_mode
self._max_consecutive_auto_reply = (
max_consecutive_auto_reply if max_consecutive_auto_reply is not None else self.MAX_CONSECUTIVE_AUTO_REPLY
)
self._consecutive_auto_reply_counter = defaultdict(int)
self._max_consecutive_auto_reply_dict = defaultdict(self.max_consecutive_auto_reply)
self._function_map = (
{}
if function_map is None
else {name: callable for name, callable in function_map.items() if self._assert_valid_name(name)}
)
self._default_auto_reply = default_auto_reply
self._reply_func_list = []
self._human_input = []
self.reply_at_receive = defaultdict(bool)
self.register_reply([Agent, None], ConversableAgent.generate_oai_reply)
self.register_reply([Agent, None], ConversableAgent.a_generate_oai_reply, ignore_async_in_sync_chat=True)
# Setting up code execution.
# Do not register code execution reply if code execution is disabled.
if code_execution_config is not False:
# If code_execution_config is None, set it to an empty dict.
if code_execution_config is None:
warnings.warn(
"Using None to signal a default code_execution_config is deprecated. "
"Use {} to use default or False to disable code execution.",
stacklevel=2,
)
code_execution_config = {}
if not isinstance(code_execution_config, dict):
raise ValueError("code_execution_config must be a dict or False.")
# We have got a valid code_execution_config.
self._code_execution_config = code_execution_config
if self._code_execution_config.get("executor") is not None:
if "use_docker" in self._code_execution_config:
raise ValueError(
"'use_docker' in code_execution_config is not valid when 'executor' is set. Use the appropriate arg in the chosen executor instead."
)
if "work_dir" in self._code_execution_config:
raise ValueError(
"'work_dir' in code_execution_config is not valid when 'executor' is set. Use the appropriate arg in the chosen executor instead."
)
if "timeout" in self._code_execution_config:
raise ValueError(
"'timeout' in code_execution_config is not valid when 'executor' is set. Use the appropriate arg in the chosen executor instead."
)
# Use the new code executor.
self._code_executor = CodeExecutorFactory.create(self._code_execution_config)
self.register_reply([Agent, None], ConversableAgent._generate_code_execution_reply_using_executor)
else:
# Legacy code execution using code_utils.
use_docker = self._code_execution_config.get("use_docker", None)
use_docker = decide_use_docker(use_docker)
check_can_use_docker_or_throw(use_docker)
self._code_execution_config["use_docker"] = use_docker
self.register_reply([Agent, None], ConversableAgent.generate_code_execution_reply)
else:
# Code execution is disabled.
self._code_execution_config = False
self.register_reply([Agent, None], ConversableAgent.generate_tool_calls_reply)
self.register_reply([Agent, None], ConversableAgent.a_generate_tool_calls_reply, ignore_async_in_sync_chat=True)
self.register_reply([Agent, None], ConversableAgent.generate_function_call_reply)
self.register_reply(
[Agent, None], ConversableAgent.a_generate_function_call_reply, ignore_async_in_sync_chat=True
)
self.register_reply([Agent, None], ConversableAgent.check_termination_and_human_reply)
self.register_reply(
[Agent, None], ConversableAgent.a_check_termination_and_human_reply, ignore_async_in_sync_chat=True
)
# Registered hooks are kept in lists, indexed by hookable method, to be called in their order of registration.
# New hookable methods should be added to this list as required to support new agent capabilities.
self.hook_lists = {
"process_last_received_message": [],
"process_all_messages_before_reply": [],
"process_message_before_send": [],
}
def _validate_llm_config(self, llm_config):
assert llm_config in (None, False) or isinstance(
llm_config, dict
), "llm_config must be a dict or False or None."
if llm_config is None:
llm_config = self.DEFAULT_CONFIG
self.llm_config = self.DEFAULT_CONFIG if llm_config is None else llm_config
# TODO: more complete validity check
if self.llm_config in [{}, {"config_list": []}, {"config_list": [{"model": ""}]}]:
raise ValueError(
"When using OpenAI or Azure OpenAI endpoints, specify a non-empty 'model' either in 'llm_config' or in each config of 'config_list'."
)
self.client = None if self.llm_config is False else OpenAIWrapper(**self.llm_config)
@property
def name(self) -> str:
"""Get the name of the agent."""
return self._name
@property
def description(self) -> str:
"""Get the description of the agent."""
return self._description
@description.setter
def description(self, description: str):
"""Set the description of the agent."""
self._description = description
@property
def code_executor(self) -> Optional[CodeExecutor]:
"""The code executor used by this agent. Returns None if code execution is disabled."""
if not hasattr(self, "_code_executor"):
return None
return self._code_executor
def register_reply(
self,
trigger: Union[Type[Agent], str, Agent, Callable[[Agent], bool], List],
reply_func: Callable,
position: int = 0,
config: Optional[Any] = None,
reset_config: Optional[Callable] = None,
*,
ignore_async_in_sync_chat: bool = False,
remove_other_reply_funcs: bool = False,
):
"""Register a reply function.
The reply function will be called when the trigger matches the sender.
The function registered later will be checked earlier by default.
To change the order, set the position to a positive integer.
Both sync and async reply functions can be registered. The sync reply function will be triggered
from both sync and async chats. However, an async reply function will only be triggered from async
chats (initiated with `ConversableAgent.a_initiate_chat`). If an `async` reply function is registered
and a chat is initialized with a sync function, `ignore_async_in_sync_chat` determines the behaviour as follows:
if `ignore_async_in_sync_chat` is set to `False` (default value), an exception will be raised, and
if `ignore_async_in_sync_chat` is set to `True`, the reply function will be ignored.
Args:
trigger (Agent class, str, Agent instance, callable, or list): the trigger.
If a class is provided, the reply function will be called when the sender is an instance of the class.
If a string is provided, the reply function will be called when the sender's name matches the string.
If an agent instance is provided, the reply function will be called when the sender is the agent instance.
If a callable is provided, the reply function will be called when the callable returns True.
If a list is provided, the reply function will be called when any of the triggers in the list is activated.
If None is provided, the reply function will be called only when the sender is None.
Note: Be sure to register `None` as a trigger if you would like to trigger an auto-reply function with non-empty messages and `sender=None`.
reply_func (Callable): the reply function.
The function takes a recipient agent, a list of messages, a sender agent and a config as input and returns a reply message.
```python
def reply_func(
recipient: ConversableAgent,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
```
position (int): the position of the reply function in the reply function list.
The function registered later will be checked earlier by default.
To change the order, set the position to a positive integer.
config (Any): the config to be passed to the reply function.
When an agent is reset, the config will be reset to the original value.
reset_config (Callable): the function to reset the config.
The function returns None. Signature: ```def reset_config(config: Any)```
ignore_async_in_sync_chat (bool): whether to ignore the async reply function in sync chats. If `False`, an exception
will be raised if an async reply function is registered and a chat is initialized with a sync
function.
remove_other_reply_funcs (bool): whether to remove other reply functions when registering this reply function.
"""
if not isinstance(trigger, (type, str, Agent, Callable, list)):
raise ValueError("trigger must be a class, a string, an agent, a callable or a list.")
if remove_other_reply_funcs:
self._reply_func_list.clear()
self._reply_func_list.insert(
position,
{
"trigger": trigger,
"reply_func": reply_func,
"config": copy.copy(config),
"init_config": config,
"reset_config": reset_config,
"ignore_async_in_sync_chat": ignore_async_in_sync_chat and inspect.iscoroutinefunction(reply_func),
},
)
def replace_reply_func(self, old_reply_func: Callable, new_reply_func: Callable):
"""Replace a registered reply function with a new one.
Args:
old_reply_func (Callable): the old reply function to be replaced.
new_reply_func (Callable): the new reply function to replace the old one.
"""
for f in self._reply_func_list:
if f["reply_func"] == old_reply_func:
f["reply_func"] = new_reply_func
@staticmethod
def _summary_from_nested_chats(
chat_queue: List[Dict[str, Any]], recipient: Agent, messages: Union[str, Callable], sender: Agent, config: Any
) -> Tuple[bool, str]:
"""A simple chat reply function.
This function initiate one or a sequence of chats between the "recipient" and the agents in the
chat_queue.
It extracts and returns a summary from the nested chat based on the "summary_method" in each chat in chat_queue.
Returns:
Tuple[bool, str]: A tuple where the first element indicates the completion of the chat, and the second element contains the summary of the last chat if any chats were initiated.
"""
last_msg = messages[-1].get("content")
chat_to_run = []
for i, c in enumerate(chat_queue):
current_c = c.copy()
if current_c.get("sender") is None:
current_c["sender"] = recipient
message = current_c.get("message")
# If message is not provided in chat_queue, we by default use the last message from the original chat history as the first message in this nested chat (for the first chat in the chat queue).
# NOTE: This setting is prone to change.
if message is None and i == 0:
message = last_msg
if callable(message):
message = message(recipient, messages, sender, config)
# We only run chat that has a valid message. NOTE: This is prone to change dependin on applications.
if message:
current_c["message"] = message
chat_to_run.append(current_c)
if not chat_to_run:
return True, None
res = initiate_chats(chat_to_run)
return True, res[-1].summary
def register_nested_chats(
self,
chat_queue: List[Dict[str, Any]],
trigger: Union[Type[Agent], str, Agent, Callable[[Agent], bool], List],
reply_func_from_nested_chats: Union[str, Callable] = "summary_from_nested_chats",
position: int = 2,
**kwargs,
) -> None:
"""Register a nested chat reply function.
Args:
chat_queue (list): a list of chat objects to be initiated.
trigger (Agent class, str, Agent instance, callable, or list): refer to `register_reply` for details.
reply_func_from_nested_chats (Callable, str): the reply function for the nested chat.
The function takes a chat_queue for nested chat, recipient agent, a list of messages, a sender agent and a config as input and returns a reply message.
Default to "summary_from_nested_chats", which corresponds to a built-in reply function that get summary from the nested chat_queue.
```python
def reply_func_from_nested_chats(
chat_queue: List[Dict],
recipient: ConversableAgent,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
```
position (int): Ref to `register_reply` for details. Default to 2. It means we first check the termination and human reply, then check the registered nested chat reply.
kwargs: Ref to `register_reply` for details.
"""
if reply_func_from_nested_chats == "summary_from_nested_chats":
reply_func_from_nested_chats = self._summary_from_nested_chats
if not callable(reply_func_from_nested_chats):
raise ValueError("reply_func_from_nested_chats must be a callable")
def wrapped_reply_func(recipient, messages=None, sender=None, config=None):
return reply_func_from_nested_chats(chat_queue, recipient, messages, sender, config)
functools.update_wrapper(wrapped_reply_func, reply_func_from_nested_chats)
self.register_reply(
trigger,
wrapped_reply_func,
position,
kwargs.get("config"),
kwargs.get("reset_config"),
ignore_async_in_sync_chat=kwargs.get("ignore_async_in_sync_chat"),
)
@property
def system_message(self) -> str:
"""Return the system message."""
return self._oai_system_message[0]["content"]
def update_system_message(self, system_message: str) -> None:
"""Update the system message.
Args:
system_message (str): system message for the ChatCompletion inference.
"""
self._oai_system_message[0]["content"] = system_message
def update_max_consecutive_auto_reply(self, value: int, sender: Optional[Agent] = None):
"""Update the maximum number of consecutive auto replies.
Args:
value (int): the maximum number of consecutive auto replies.
sender (Agent): when the sender is provided, only update the max_consecutive_auto_reply for that sender.
"""
if sender is None:
self._max_consecutive_auto_reply = value
for k in self._max_consecutive_auto_reply_dict:
self._max_consecutive_auto_reply_dict[k] = value
else:
self._max_consecutive_auto_reply_dict[sender] = value
def max_consecutive_auto_reply(self, sender: Optional[Agent] = None) -> int:
"""The maximum number of consecutive auto replies."""
return self._max_consecutive_auto_reply if sender is None else self._max_consecutive_auto_reply_dict[sender]
@property
def chat_messages(self) -> Dict[Agent, List[Dict]]:
"""A dictionary of conversations from agent to list of messages."""
return self._oai_messages
def chat_messages_for_summary(self, agent: Agent) -> List[Dict]:
"""A list of messages as a conversation to summarize."""
return self._oai_messages[agent]
def last_message(self, agent: Optional[Agent] = None) -> Optional[Dict]:
"""The last message exchanged with the agent.
Args:
agent (Agent): The agent in the conversation.
If None and more than one agent's conversations are found, an error will be raised.
If None and only one conversation is found, the last message of the only conversation will be returned.
Returns:
The last message exchanged with the agent.
"""
if agent is None:
n_conversations = len(self._oai_messages)
if n_conversations == 0:
return None
if n_conversations == 1:
for conversation in self._oai_messages.values():
return conversation[-1]
raise ValueError("More than one conversation is found. Please specify the sender to get the last message.")
if agent not in self._oai_messages.keys():
raise KeyError(
f"The agent '{agent.name}' is not present in any conversation. No history available for this agent."
)
return self._oai_messages[agent][-1]
@property
def use_docker(self) -> Union[bool, str, None]:
"""Bool value of whether to use docker to execute the code,
or str value of the docker image name to use, or None when code execution is disabled.
"""
return None if self._code_execution_config is False else self._code_execution_config.get("use_docker")
@staticmethod
def _message_to_dict(message: Union[Dict, str]) -> Dict:
"""Convert a message to a dictionary.
The message can be a string or a dictionary. The string will be put in the "content" field of the new dictionary.
"""
if isinstance(message, str):
return {"content": message}
elif isinstance(message, dict):
return message
else:
return dict(message)
@staticmethod
def _normalize_name(name):
"""
LLMs sometimes ask functions while ignoring their own format requirements, this function should be used to replace invalid characters with "_".
Prefer _assert_valid_name for validating user configuration or input
"""
return re.sub(r"[^a-zA-Z0-9_-]", "_", name)[:64]
@staticmethod
def _assert_valid_name(name):
"""
Ensure that configured names are valid, raises ValueError if not.
For munging LLM responses use _normalize_name to ensure LLM specified names don't break the API.
"""
if not re.match(r"^[a-zA-Z0-9_-]+$", name):
raise ValueError(f"Invalid name: {name}. Only letters, numbers, '_' and '-' are allowed.")
if len(name) > 64:
raise ValueError(f"Invalid name: {name}. Name must be less than 64 characters.")
return name
def _append_oai_message(self, message: Union[Dict, str], role, conversation_id: Agent) -> bool:
"""Append a message to the ChatCompletion conversation.
If the message received is a string, it will be put in the "content" field of the new dictionary.
If the message received is a dictionary but does not have any of the three fields "content", "function_call", or "tool_calls",
this message is not a valid ChatCompletion message.
If only "function_call" or "tool_calls" is provided, "content" will be set to None if not provided, and the role of the message will be forced "assistant".
Args:
message (dict or str): message to be appended to the ChatCompletion conversation.
role (str): role of the message, can be "assistant" or "function".
conversation_id (Agent): id of the conversation, should be the recipient or sender.
Returns:
bool: whether the message is appended to the ChatCompletion conversation.
"""
message = self._message_to_dict(message)
# create oai message to be appended to the oai conversation that can be passed to oai directly.
oai_message = {
k: message[k]
for k in ("content", "function_call", "tool_calls", "tool_responses", "tool_call_id", "name", "context")
if k in message and message[k] is not None
}
if "content" not in oai_message:
if "function_call" in oai_message or "tool_calls" in oai_message:
oai_message["content"] = None # if only function_call is provided, content will be set to None.
else:
return False
if message.get("role") in ["function", "tool"]:
oai_message["role"] = message.get("role")
elif "override_role" in message:
# If we have a direction to override the role then set the
# role accordingly. Used to customise the role for the
# select speaker prompt.
oai_message["role"] = message.get("override_role")
else:
oai_message["role"] = role
if oai_message.get("function_call", False) or oai_message.get("tool_calls", False):
oai_message["role"] = "assistant" # only messages with role 'assistant' can have a function call.
self._oai_messages[conversation_id].append(oai_message)
return True
def _process_message_before_send(
self, message: Union[Dict, str], recipient: Agent, silent: bool
) -> Union[Dict, str]:
"""Process the message before sending it to the recipient."""
hook_list = self.hook_lists["process_message_before_send"]
for hook in hook_list:
message = hook(sender=self, message=message, recipient=recipient, silent=silent)
return message
def send(
self,
message: Union[Dict, str],
recipient: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
):
"""Send a message to another agent.
Args:
message (dict or str): message to be sent.
The message could contain the following fields:
- content (str or List): Required, the content of the message. (Can be None)
- function_call (str): the name of the function to be called.
- name (str): the name of the function to be called.
- role (str): the role of the message, any role that is not "function"
will be modified to "assistant".
- context (dict): the context of the message, which will be passed to
[OpenAIWrapper.create](../oai/client#create).
For example, one agent can send a message A as:
```python
{
"content": lambda context: context["use_tool_msg"],
"context": {
"use_tool_msg": "Use tool X if they are relevant."
}
}
```
Next time, one agent can send a message B with a different "use_tool_msg".
Then the content of message A will be refreshed to the new "use_tool_msg".
So effectively, this provides a way for an agent to send a "link" and modify
the content of the "link" later.
recipient (Agent): the recipient of the message.
request_reply (bool or None): whether to request a reply from the recipient.
silent (bool or None): (Experimental) whether to print the message sent.
Raises:
ValueError: if the message can't be converted into a valid ChatCompletion message.
"""
message = self._process_message_before_send(message, recipient, silent)
# When the agent composes and sends the message, the role of the message is "assistant"
# unless it's "function".
valid = self._append_oai_message(message, "assistant", recipient)
if valid:
recipient.receive(message, self, request_reply, silent)
else:
raise ValueError(
"Message can't be converted into a valid ChatCompletion message. Either content or function_call must be provided."
)
async def a_send(
self,
message: Union[Dict, str],
recipient: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
):
"""(async) Send a message to another agent.
Args:
message (dict or str): message to be sent.
The message could contain the following fields:
- content (str or List): Required, the content of the message. (Can be None)
- function_call (str): the name of the function to be called.
- name (str): the name of the function to be called.
- role (str): the role of the message, any role that is not "function"
will be modified to "assistant".
- context (dict): the context of the message, which will be passed to
[OpenAIWrapper.create](../oai/client#create).
For example, one agent can send a message A as:
```python
{
"content": lambda context: context["use_tool_msg"],
"context": {
"use_tool_msg": "Use tool X if they are relevant."
}
}
```
Next time, one agent can send a message B with a different "use_tool_msg".
Then the content of message A will be refreshed to the new "use_tool_msg".
So effectively, this provides a way for an agent to send a "link" and modify
the content of the "link" later.
recipient (Agent): the recipient of the message.
request_reply (bool or None): whether to request a reply from the recipient.
silent (bool or None): (Experimental) whether to print the message sent.
Raises:
ValueError: if the message can't be converted into a valid ChatCompletion message.
"""
message = self._process_message_before_send(message, recipient, silent)
# When the agent composes and sends the message, the role of the message is "assistant"
# unless it's "function".
valid = self._append_oai_message(message, "assistant", recipient)
if valid:
await recipient.a_receive(message, self, request_reply, silent)
else:
raise ValueError(
"Message can't be converted into a valid ChatCompletion message. Either content or function_call must be provided."
)
def _print_received_message(self, message: Union[Dict, str], sender: Agent):
iostream = IOStream.get_default()
# print the message received
iostream.print(colored(sender.name, "yellow"), "(to", f"{self.name}):\n", flush=True)
message = self._message_to_dict(message)
if message.get("tool_responses"): # Handle tool multi-call responses
for tool_response in message["tool_responses"]:
self._print_received_message(tool_response, sender)
if message.get("role") == "tool":
return # If role is tool, then content is just a concatenation of all tool_responses
if message.get("role") in ["function", "tool"]:
if message["role"] == "function":
id_key = "name"
else:
id_key = "tool_call_id"
id = message.get(id_key, "No id found")
func_print = f"***** Response from calling {message['role']} ({id}) *****"
iostream.print(colored(func_print, "green"), flush=True)
iostream.print(message["content"], flush=True)
iostream.print(colored("*" * len(func_print), "green"), flush=True)
else:
content = message.get("content")
if content is not None:
if "context" in message:
content = OpenAIWrapper.instantiate(
content,
message["context"],
self.llm_config and self.llm_config.get("allow_format_str_template", False),
)
iostream.print(content_str(content), flush=True)
if "function_call" in message and message["function_call"]:
function_call = dict(message["function_call"])
func_print = (
f"***** Suggested function call: {function_call.get('name', '(No function name found)')} *****"
)
iostream.print(colored(func_print, "green"), flush=True)
iostream.print(
"Arguments: \n",
function_call.get("arguments", "(No arguments found)"),
flush=True,
sep="",
)
iostream.print(colored("*" * len(func_print), "green"), flush=True)
if "tool_calls" in message and message["tool_calls"]:
for tool_call in message["tool_calls"]:
id = tool_call.get("id", "No tool call id found")
function_call = dict(tool_call.get("function", {}))
func_print = f"***** Suggested tool call ({id}): {function_call.get('name', '(No function name found)')} *****"
iostream.print(colored(func_print, "green"), flush=True)
iostream.print(
"Arguments: \n",
function_call.get("arguments", "(No arguments found)"),
flush=True,
sep="",
)
iostream.print(colored("*" * len(func_print), "green"), flush=True)
iostream.print("\n", "-" * 80, flush=True, sep="")
def _process_received_message(self, message: Union[Dict, str], sender: Agent, silent: bool):
# When the agent receives a message, the role of the message is "user". (If 'role' exists and is 'function', it will remain unchanged.)
valid = self._append_oai_message(message, "user", sender)
if logging_enabled():
log_event(self, "received_message", message=message, sender=sender.name, valid=valid)
if not valid:
raise ValueError(
"Received message can't be converted into a valid ChatCompletion message. Either content or function_call must be provided."
)
if not silent:
self._print_received_message(message, sender)
def receive(
self,
message: Union[Dict, str],
sender: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
):
"""Receive a message from another agent.
Once a message is received, this function sends a reply to the sender or stop.
The reply can be generated automatically or entered manually by a human.
Args:
message (dict or str): message from the sender. If the type is dict, it may contain the following reserved fields (either content or function_call need to be provided).
1. "content": content of the message, can be None.
2. "function_call": a dictionary containing the function name and arguments. (deprecated in favor of "tool_calls")
3. "tool_calls": a list of dictionaries containing the function name and arguments.
4. "role": role of the message, can be "assistant", "user", "function", "tool".
This field is only needed to distinguish between "function" or "assistant"/"user".
5. "name": In most cases, this field is not needed. When the role is "function", this field is needed to indicate the function name.
6. "context" (dict): the context of the message, which will be passed to
[OpenAIWrapper.create](../oai/client#create).
sender: sender of an Agent instance.
request_reply (bool or None): whether a reply is requested from the sender.
If None, the value is determined by `self.reply_at_receive[sender]`.
silent (bool or None): (Experimental) whether to print the message received.
Raises:
ValueError: if the message can't be converted into a valid ChatCompletion message.
"""
self._process_received_message(message, sender, silent)
if request_reply is False or request_reply is None and self.reply_at_receive[sender] is False:
return
reply = self.generate_reply(messages=self.chat_messages[sender], sender=sender)
if reply is not None:
self.send(reply, sender, silent=silent)
async def a_receive(
self,
message: Union[Dict, str],
sender: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
):
"""(async) Receive a message from another agent.
Once a message is received, this function sends a reply to the sender or stop.
The reply can be generated automatically or entered manually by a human.
Args:
message (dict or str): message from the sender. If the type is dict, it may contain the following reserved fields (either content or function_call need to be provided).
1. "content": content of the message, can be None.
2. "function_call": a dictionary containing the function name and arguments. (deprecated in favor of "tool_calls")
3. "tool_calls": a list of dictionaries containing the function name and arguments.
4. "role": role of the message, can be "assistant", "user", "function".
This field is only needed to distinguish between "function" or "assistant"/"user".
5. "name": In most cases, this field is not needed. When the role is "function", this field is needed to indicate the function name.
6. "context" (dict): the context of the message, which will be passed to
[OpenAIWrapper.create](../oai/client#create).
sender: sender of an Agent instance.
request_reply (bool or None): whether a reply is requested from the sender.
If None, the value is determined by `self.reply_at_receive[sender]`.
silent (bool or None): (Experimental) whether to print the message received.
Raises:
ValueError: if the message can't be converted into a valid ChatCompletion message.
"""
self._process_received_message(message, sender, silent)
if request_reply is False or request_reply is None and self.reply_at_receive[sender] is False:
return
reply = await self.a_generate_reply(sender=sender)
if reply is not None:
await self.a_send(reply, sender, silent=silent)
def _prepare_chat(
self,
recipient: "ConversableAgent",
clear_history: bool,
prepare_recipient: bool = True,
reply_at_receive: bool = True,
) -> None:
self.reset_consecutive_auto_reply_counter(recipient)
self.reply_at_receive[recipient] = reply_at_receive
if clear_history:
self.clear_history(recipient)
self._human_input = []
if prepare_recipient:
recipient._prepare_chat(self, clear_history, False, reply_at_receive)
def _raise_exception_on_async_reply_functions(self) -> None:
"""Raise an exception if any async reply functions are registered.
Raises:
RuntimeError: if any async reply functions are registered.
"""
reply_functions = {
f["reply_func"] for f in self._reply_func_list if not f.get("ignore_async_in_sync_chat", False)
}
async_reply_functions = [f for f in reply_functions if inspect.iscoroutinefunction(f)]
if async_reply_functions:
msg = (
"Async reply functions can only be used with ConversableAgent.a_initiate_chat(). The following async reply functions are found: "
+ ", ".join([f.__name__ for f in async_reply_functions])
)
raise RuntimeError(msg)
def initiate_chat(
self,
recipient: "ConversableAgent",
clear_history: bool = True,
silent: Optional[bool] = False,
cache: Optional[AbstractCache] = None,
max_turns: Optional[int] = None,
summary_method: Optional[Union[str, Callable]] = DEFAULT_SUMMARY_METHOD,
summary_args: Optional[dict] = {},
message: Optional[Union[Dict, str, Callable]] = None,
**kwargs,
) -> ChatResult:
"""Initiate a chat with the recipient agent.
Reset the consecutive auto reply counter.
If `clear_history` is True, the chat history with the recipient agent will be cleared.
Args:
recipient: the recipient agent.
clear_history (bool): whether to clear the chat history with the agent. Default is True.
silent (bool or None): (Experimental) whether to print the messages for this conversation. Default is False.
cache (AbstractCache or None): the cache client to be used for this conversation. Default is None.
max_turns (int or None): the maximum number of turns for the chat between the two agents. One turn means one conversation round trip. Note that this is different from
[max_consecutive_auto_reply](#max_consecutive_auto_reply) which is the maximum number of consecutive auto replies; and it is also different from [max_rounds in GroupChat](./groupchat#groupchat-objects) which is the maximum number of rounds in a group chat session.
If max_turns is set to None, the chat will continue until a termination condition is met. Default is None.
summary_method (str or callable): a method to get a summary from the chat. Default is DEFAULT_SUMMARY_METHOD, i.e., "last_msg".
Supported strings are "last_msg" and "reflection_with_llm":
- when set to "last_msg", it returns the last message of the dialog as the summary.
- when set to "reflection_with_llm", it returns a summary extracted using an llm client.
`llm_config` must be set in either the recipient or sender.
A callable summary_method should take the recipient and sender agent in a chat as input and return a string of summary. E.g.,
```python
def my_summary_method(
sender: ConversableAgent,
recipient: ConversableAgent,
summary_args: dict,
):
return recipient.last_message(sender)["content"]
```
summary_args (dict): a dictionary of arguments to be passed to the summary_method.
One example key is "summary_prompt", and value is a string of text used to prompt a LLM-based agent (the sender or receiver agent) to reflect
on the conversation and extract a summary when summary_method is "reflection_with_llm".
The default summary_prompt is DEFAULT_SUMMARY_PROMPT, i.e., "Summarize takeaway from the conversation. Do not add any introductory phrases. If the intended request is NOT properly addressed, please point it out."
message (str, dict or Callable): the initial message to be sent to the recipient. Needs to be provided. Otherwise, input() will be called to get the initial message.
- If a string or a dict is provided, it will be used as the initial message. `generate_init_message` is called to generate the initial message for the agent based on this string and the context.
If dict, it may contain the following reserved fields (either content or tool_calls need to be provided).
1. "content": content of the message, can be None.
2. "function_call": a dictionary containing the function name and arguments. (deprecated in favor of "tool_calls")
3. "tool_calls": a list of dictionaries containing the function name and arguments.
4. "role": role of the message, can be "assistant", "user", "function".
This field is only needed to distinguish between "function" or "assistant"/"user".
5. "name": In most cases, this field is not needed. When the role is "function", this field is needed to indicate the function name.
6. "context" (dict): the context of the message, which will be passed to
[OpenAIWrapper.create](../oai/client#create).
- If a callable is provided, it will be called to get the initial message in the form of a string or a dict.
If the returned type is dict, it may contain the reserved fields mentioned above.
Example of a callable message (returning a string):
```python
def my_message(sender: ConversableAgent, recipient: ConversableAgent, context: dict) -> Union[str, Dict]:
carryover = context.get("carryover", "")
if isinstance(message, list):
carryover = carryover[-1]
final_msg = "Write a blogpost." + "\\nContext: \\n" + carryover
return final_msg
```
Example of a callable message (returning a dict):
```python
def my_message(sender: ConversableAgent, recipient: ConversableAgent, context: dict) -> Union[str, Dict]:
final_msg = {}
carryover = context.get("carryover", "")
if isinstance(message, list):
carryover = carryover[-1]
final_msg["content"] = "Write a blogpost." + "\\nContext: \\n" + carryover
final_msg["context"] = {"prefix": "Today I feel"}
return final_msg
```
**kwargs: any additional information. It has the following reserved fields:
- "carryover": a string or a list of string to specify the carryover information to be passed to this chat.
If provided, we will combine this carryover (by attaching a "context: " string and the carryover content after the message content) with the "message" content when generating the initial chat
message in `generate_init_message`.
- "verbose": a boolean to specify whether to print the message and carryover in a chat. Default is False.
Raises:
RuntimeError: if any async reply functions are registered and not ignored in sync chat.
Returns:
ChatResult: an ChatResult object.
"""
_chat_info = locals().copy()
_chat_info["sender"] = self
consolidate_chat_info(_chat_info, uniform_sender=self)
for agent in [self, recipient]:
agent._raise_exception_on_async_reply_functions()
agent.previous_cache = agent.client_cache
agent.client_cache = cache
if isinstance(max_turns, int):
self._prepare_chat(recipient, clear_history, reply_at_receive=False)
for _ in range(max_turns):