-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
assistant.py
779 lines (684 loc) · 33.3 KB
/
assistant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
import json
from uuid import uuid4
from typing import List, Any, Optional, Dict, Iterator, Callable, Union, Type, Tuple
from pydantic import BaseModel, ConfigDict, field_validator, Field, ValidationError
from phi.assistant.run import AssistantRun
from phi.knowledge.base import AssistantKnowledge
from phi.llm.base import LLM
from phi.llm.openai import OpenAIChat
from phi.llm.message import Message
from phi.llm.references import References # noqa: F401
from phi.memory.assistant import AssistantMemory
from phi.storage.assistant import AssistantStorage
from phi.task.task import Task
from phi.task.llm import LLMTask
from phi.tools import Tool, ToolRegistry, Function
from phi.utils.log import logger, set_log_level_to_debug
from phi.utils.message import get_text_from_message
from phi.utils.merge_dict import merge_dictionaries
from phi.utils.timer import Timer
class Assistant(BaseModel):
# -*- Assistant settings
# LLM to use for this Assistant
llm: LLM = OpenAIChat()
# Assistant introduction. This is added to the chat history when a run is started.
introduction: Optional[str] = None
# Assistant name
name: Optional[str] = None
# Metadata associated with this assistant
assistant_data: Optional[Dict[str, Any]] = None
# -*- Run settings
# Run UUID (autogenerated if not set)
run_id: Optional[str] = Field(None, validate_default=True)
# Run name
run_name: Optional[str] = None
# Metadata associated with this run
run_data: Optional[Dict[str, Any]] = None
# -*- User settings
# ID of the user participating in this run
user_id: Optional[str] = None
# Metadata associated the user participating in this run
user_data: Optional[Dict[str, Any]] = None
# -*- Assistant Memory
memory: AssistantMemory = AssistantMemory()
# add_chat_history_to_messages=true_adds_the_chat_history_to_the_messages_sent_to_the_llm.
add_chat_history_to_messages: bool = False
# add_chat_history_to_prompt=True adds the formatted chat history to the user prompt.
add_chat_history_to_prompt: bool = False
# Number of previous messages to add to the prompt or messages.
num_history_messages: int = 6
# -*- Assistant Knowledge Base
knowledge_base: Optional[AssistantKnowledge] = None
# Enable RAG by adding references from the knowledge base to the prompt.
add_references_to_prompt: bool = False
# -*- Assistant Storage
storage: Optional[AssistantStorage] = None
# AssistantRun from the database: DO NOT SET MANUALLY
db_row: Optional[AssistantRun] = None
# -*- Assistant Tools
# A list of tools provided to the LLM.
# Tools are functions the model may generate JSON inputs for.
# If you provide a dict, it is not called by the model.
tools: Optional[List[Union[Tool, ToolRegistry, Callable, Dict, Function]]] = None
# Allow the assistant to use tools
use_tools: bool = False
# Show tool calls in LLM messages.
show_tool_calls: bool = False
# Maximum number of tool calls allowed.
tool_call_limit: Optional[int] = None
# Controls which (if any) tool is called by the model.
# "none" means the model will not call a tool and instead generates a message.
# "auto" means the model can pick between generating a message or calling a tool.
# Specifying a particular function via {"type: "function", "function": {"name": "my_function"}}
# forces the model to call that tool.
# "none" is the default when no tools are present. "auto" is the default if tools are present.
tool_choice: Optional[Union[str, Dict[str, Any]]] = None
# -*- Available tools
# If use_tools is True and update_knowledge_base is True,
# then a tool is added that allows the LLM to update the knowledge base.
update_knowledge_base: bool = False
# If use_tools is True and read_tool_call_history is True,
# then a tool is added that allows the LLM to get the tool call history.
read_tool_call_history: bool = False
#
# -*- Prompt Settings
#
# -*- System prompt: provide the system prompt as a string
system_prompt: Optional[str] = None
# -*- System prompt function: provide the system prompt as a function
# This function is provided the "Assistant object" as an argument
# and should return the system_prompt as a string.
# Signature:
# def system_prompt_function(assistant: Assistant) -> str:
# ...
system_prompt_function: Optional[Callable[..., Optional[str]]] = None
# If True, build a default system prompt using instructions and extra_instructions
build_default_system_prompt: bool = True
# Assistant description for the default system prompt
description: Optional[str] = None
# List of instructions for the default system prompt
instructions: Optional[List[str]] = None
# List of extra_instructions for the default system prompt
# Use these when you want to use the default prompt but also add some extra instructions
extra_instructions: Optional[List[str]] = None
# If True, add the current datetime to the prompt to give the assistant a sense of time
# This allows for relative times like "tomorrow" to be used in the prompt
add_datetime_to_instructions: bool = False
# If markdown=true, formats the output using markdown
markdown: bool = True
# -*- User prompt: provide the user prompt as a string
# Note: this will ignore the input message provided to the run function
user_prompt: Optional[Union[List[Dict], str]] = None
# -*- User prompt function: provide the user prompt as a function.
# This function is provided the "Assistant object" and the "input message" as arguments
# and should return the user_prompt as a Union[List[Dict], str].
# If add_references_to_prompt is True, then references are also provided as an argument.
# If add_chat_history_to_prompt is True, then chat_history is also provided as an argument.
# Signature:
# def custom_user_prompt_function(
# assistant: Assistant,
# message: Union[List[Dict], str],
# references: Optional[str] = None,
# chat_history: Optional[str] = None,
# ) -> Union[List[Dict], str]:
# ...
user_prompt_function: Optional[Callable[..., str]] = None
# If True, build a default user prompt using references and chat history
build_default_user_prompt: bool = True
# Function to get references for the user_prompt
# This function, if provided, is called when add_references_to_prompt is True
# Signature:
# def references(assistant: Assistant, query: str) -> Optional[str]:
# ...
references_function: Optional[Callable[..., Optional[str]]] = None
# Function to get the chat_history for the user prompt
# This function, if provided, is called when add_chat_history_to_prompt is True
# Signature:
# def chat_history(assistant: Assistant) -> str:
# ...
chat_history_function: Optional[Callable[..., Optional[str]]] = None
# -*- Assistant Output Settings
# Provide an output model for the responses
output_model: Optional[Union[str, List, Type[BaseModel]]] = None
# If True, the output is converted into the output_model (pydantic model or json dict)
parse_output: bool = True
# -*- Final LLM response i.e. the final output of this assistant
output: Optional[Any] = None
# -*- Assistant Tasks
# Tasks allow the Assistant to generate a response using a list of tasks
# If tasks is None or empty, a single default LLM task is created for this assistant
tasks: Optional[List[Task]] = None
# Metadata associated with the assistant tasks
task_data: Optional[Dict[str, Any]] = None
# debug_mode=True enables debug logs
debug_mode: bool = False
# monitoring=True logs Assistant runs on phidata.com
monitoring: bool = False
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_validator("debug_mode", mode="before")
def set_log_level(cls, v: bool) -> bool:
if v:
set_log_level_to_debug()
logger.debug("Debug logs enabled")
return v
@field_validator("run_id", mode="before")
def set_run_id(cls, v: Optional[str]) -> str:
return v if v is not None else str(uuid4())
@property
def streamable(self) -> bool:
return self.output_model is None
@property
def llm_task(self) -> LLMTask:
"""Returns an LLMTask for this assistant"""
_llm_task = LLMTask(
llm=self.llm.model_copy(),
assistant_name=self.name,
assistant_memory=self.memory,
add_references_to_prompt=self.add_references_to_prompt,
add_chat_history_to_messages=self.add_chat_history_to_messages,
num_history_messages=self.num_history_messages,
knowledge_base=self.knowledge_base,
use_tools=self.use_tools,
show_tool_calls=self.show_tool_calls,
tool_call_limit=self.tool_call_limit,
tools=self.tools,
tool_choice=self.tool_choice,
update_knowledge_base=self.update_knowledge_base,
read_tool_call_history=self.read_tool_call_history,
system_prompt=self.system_prompt,
system_prompt_function=self.system_prompt_function,
build_default_system_prompt=self.build_default_system_prompt,
description=self.description,
instructions=self.instructions,
extra_instructions=self.extra_instructions,
markdown=self.markdown,
user_prompt=self.user_prompt,
user_prompt_function=self.user_prompt_function,
build_default_user_prompt=self.build_default_user_prompt,
references_function=self.references_function,
chat_history_function=self.chat_history_function,
output_model=self.output_model,
add_datetime_to_instructions=self.add_datetime_to_instructions,
)
return _llm_task
def to_database_row(self) -> AssistantRun:
"""Create a AssistantRun for the current Assistant (to save to the database)"""
return AssistantRun(
name=self.name,
run_id=self.run_id,
run_name=self.run_name,
user_id=self.user_id,
llm=self.llm.to_dict(),
memory=self.memory.to_dict(),
assistant_data=self.assistant_data,
run_data=self.run_data,
user_data=self.user_data,
task_data=self.task_data,
)
def from_database_row(self, row: AssistantRun):
"""Load the existing Assistant from an AssistantRun (from the database)"""
# Values that are overwritten from the database if they are not set in the assistant
if self.name is None and row.name is not None:
self.name = row.name
if self.run_id is None and row.run_id is not None:
self.run_id = row.run_id
if self.run_name is None and row.run_name is not None:
self.run_name = row.run_name
if self.user_id is None and row.user_id is not None:
self.user_id = row.user_id
# Update llm data from the AssistantRun
if row.llm is not None:
# Update llm metrics from the database
llm_metrics_from_db = row.llm.get("metrics")
if llm_metrics_from_db is not None and isinstance(llm_metrics_from_db, dict):
try:
self.llm.metrics = llm_metrics_from_db
except Exception as e:
logger.warning(f"Failed to load llm metrics: {e}")
# Update assistant memory from the AssistantRun
if row.memory is not None:
try:
self.memory = self.memory.__class__.model_validate(row.memory)
except Exception as e:
logger.warning(f"Failed to load assistant memory: {e}")
# Update assistant_data from the database
if row.assistant_data is not None:
# If assistant_data is set in the assistant, merge it with the database assistant_data.
# The assistant assistant_data takes precedence
if self.assistant_data is not None and row.assistant_data is not None:
# Updates db_row.assistant_data with self.assistant_data
merge_dictionaries(row.assistant_data, self.assistant_data)
self.assistant_data = row.assistant_data
# If assistant_data is not set in the assistant, use the database assistant_data
if self.assistant_data is None and row.assistant_data is not None:
self.assistant_data = row.assistant_data
# Update run_data from the database
if row.run_data is not None:
# If run_data is set in the assistant, merge it with the database run_data.
# The assistant run_data takes precedence
if self.run_data is not None and row.run_data is not None:
# Updates db_row.run_data with self.run_data
merge_dictionaries(row.run_data, self.run_data)
self.run_data = row.run_data
# If run_data is not set in the assistant, use the database run_data
if self.run_data is None and row.run_data is not None:
self.run_data = row.run_data
# Update user_data from the database
if row.user_data is not None:
# If user_data is set in the assistant, merge it with the database user_data.
# The assistant user_data takes precedence
if self.user_data is not None and row.user_data is not None:
# Updates db_row.user_data with self.user_data
merge_dictionaries(row.user_data, self.user_data)
self.user_data = row.user_data
# If user_data is not set in the assistant, use the database user_data
if self.user_data is None and row.user_data is not None:
self.user_data = row.user_data
# Update task_data from the database
if row.task_data is not None:
# If task_data is set in the assistant, merge it with the database task_data.
# The assistant task_data takes precedence
if self.task_data is not None and row.task_data is not None:
# Updates db_row.task_data with self.task_data
merge_dictionaries(row.task_data, self.task_data)
self.task_data = row.task_data
# If task_data is not set in the assistant, use the database task_data
if self.task_data is None and row.task_data is not None:
self.task_data = row.task_data
def read_from_storage(self) -> Optional[AssistantRun]:
"""Load the AssistantRun from storage"""
if self.storage is not None and self.run_id is not None:
self.db_row = self.storage.read(run_id=self.run_id)
if self.user_id is not None and self.db_row is not None and self.db_row.user_id != self.user_id:
logger.error(f"SECURITY ERROR: User id mismatch: {self.user_id} != {self.db_row.user_id}")
return None
if self.db_row is not None:
logger.debug(f"-*- Loading run: {self.db_row.run_id}")
self.from_database_row(row=self.db_row)
logger.debug(f"-*- Loaded run: {self.run_id}")
return self.db_row
def write_to_storage(self) -> Optional[AssistantRun]:
"""Save the AssistantRun to the storage"""
if self.storage is not None:
self.db_row = self.storage.upsert(row=self.to_database_row())
return self.db_row
def add_introduction(self, introduction: str) -> None:
"""Add assistant introduction to the chat history"""
if introduction is not None:
if len(self.memory.chat_history) == 0:
self.memory.add_chat_message(Message(role="assistant", content=introduction))
def create_run(self) -> Optional[str]:
"""Create a run in the database and return the run_id.
This function:
- Creates a new run in the storage if it does not exist
- Load the assistant from the storage if it exists
"""
# If a database_row exists, return the id from the database_row
if self.db_row is not None:
return self.db_row.run_id
# Create a new run or load an existing run
if self.storage is not None:
# Load existing run if it exists
logger.debug(f"Reading run: {self.run_id}")
self.read_from_storage()
# Create a new run
if self.db_row is None:
logger.debug("-*- Creating new assistant run")
if self.introduction:
self.add_introduction(self.introduction)
self.db_row = self.write_to_storage()
if self.db_row is None:
raise Exception("Failed to create new assistant run in storage")
logger.debug(f"-*- Created assistant run: {self.db_row.run_id}")
self.from_database_row(row=self.db_row)
self._api_log_assistant_run()
return self.run_id
def _run(self, message: Optional[Union[List[Dict], str]] = None, stream: bool = True) -> Iterator[str]:
logger.debug(f"*********** Run Start: {self.run_id} ***********")
# Load run from storage
self.read_from_storage()
# Add a default LLMTask if tasks are empty
_tasks = self.tasks
if _tasks is None or len(_tasks) == 0:
_tasks = [self.llm_task]
# Metadata for all tasks in this run
task_data: List[Dict[str, Any]] = []
# Final LLM response after running all tasks
run_output = ""
# -*- Generate response by running tasks
current_task: Optional[Task] = None
for idx, task in enumerate(_tasks, start=1):
logger.debug(f"*********** Task {idx} Start ***********")
# Set previous_task and current_task
previous_task = current_task
current_task = task
# -*- Prepare input message for the current_task
current_task_message: Optional[Union[List[Dict], str]] = None
if previous_task and previous_task.output is not None:
# Convert current_task_message to json if it is a BaseModel
if issubclass(previous_task.output.__class__, BaseModel):
current_task_message = previous_task.output.model_dump_json(exclude_none=True, indent=2)
else:
current_task_message = previous_task.output
else:
current_task_message = message
# -*- Update Task
# Add run state to the task
current_task.run_id = self.run_id
current_task.assistant_name = self.name
current_task.assistant_memory = self.memory
current_task.run_message = message
current_task.run_task_data = task_data
# Set output parsing off. This is handled by the run() function
current_task.parse_output = False
# -*- Update LLMTask
if isinstance(current_task, LLMTask):
# Update LLM
if current_task.llm is None:
current_task.llm = self.llm.model_copy()
# -*- Run Task
if stream and current_task.streamable:
for chunk in current_task.run(message=current_task_message, stream=True):
if current_task.show_output:
run_output += chunk if isinstance(chunk, str) else ""
yield chunk if isinstance(chunk, str) else ""
if current_task.show_output:
yield "\n\n"
run_output += "\n\n"
else:
current_task_response = current_task.run(message=current_task_message, stream=False) # type: ignore
current_task_response_str = ""
try:
if current_task_response:
if isinstance(current_task_response, str):
current_task_response_str = current_task_response
elif issubclass(current_task_response.__class__, BaseModel):
current_task_response_str = current_task_response.model_dump_json(
exclude_none=True, indent=2
)
else:
current_task_response_str = json.dumps(current_task_response)
if current_task.show_output:
if stream:
yield current_task_response_str
yield "\n\n"
else:
run_output += current_task_response_str
run_output += "\n\n"
except Exception as e:
logger.debug(f"Failed to convert task response to json: {e}")
logger.debug(f"*********** Task {idx} End ***********")
# -*- Save run to storage
self.write_to_storage()
# -*- Send run event for monitoring
llm_response_type = "text"
if self.output_model is not None:
llm_response_type = "json"
elif self.markdown:
llm_response_type = "markdown"
event_info = {
"tasks": task_data,
}
event_data = {
"user_message": message,
"llm_response": run_output,
"llm_response_type": llm_response_type,
"info": event_info,
"metrics": self.llm.metrics,
}
self._api_log_assistant_event(event_type="run", event_data=event_data)
# -*- Update run output
self.output = run_output
# -*- Yield final response if not streaming
if not stream:
yield run_output
logger.debug(f"*********** Run End: {self.run_id} ***********")
def run(
self, message: Optional[Union[List[Dict], str]] = None, stream: bool = True
) -> Union[Iterator[str], str, BaseModel]:
# Convert response to structured output if output_model is set
if self.output_model is not None and self.parse_output:
logger.debug("Setting stream=False as output_model is set")
json_resp = next(self._run(message=message, stream=False))
try:
structured_output = None
if (
isinstance(self.output_model, str)
or isinstance(self.output_model, dict)
or isinstance(self.output_model, list)
):
structured_output = json.loads(json_resp)
elif issubclass(self.output_model, BaseModel):
try:
structured_output = self.output_model.model_validate_json(json_resp)
except ValidationError:
# Check if response starts with ```json
if json_resp.startswith("```json"):
json_resp = json_resp.replace("```json\n", "").replace("\n```", "")
try:
structured_output = self.output_model.model_validate_json(json_resp)
except ValidationError as exc:
logger.warning(f"Failed to validate response: {exc}")
# -*- Update assistant output to the structured output
if structured_output is not None:
self.output = structured_output
except Exception as e:
logger.warning(f"Failed to convert response to output model: {e}")
return self.output or json_resp
else:
if stream and self.streamable:
resp = self._run(message=message, stream=True)
return resp
else:
resp = self._run(message=message, stream=False)
return next(resp)
def chat(self, message: Union[List[Dict], str], stream: bool = True) -> Union[Iterator[str], str, BaseModel]:
return self.run(message=message, stream=stream)
def _chat_raw(
self, messages: List[Message], user_message: Optional[str] = None, stream: bool = True
) -> Iterator[Dict]:
logger.debug("*********** Assistant Chat Raw Start ***********")
# Load run from storage
self.read_from_storage()
# -*- Add user message to the memory - this is added to the chat_history
if user_message:
self.memory.add_chat_message(Message(role="user", content=user_message))
# -*- Generate response
batch_llm_response_message = {}
if stream:
for response_delta in self.llm.response_delta(messages=messages):
yield response_delta
else:
batch_llm_response_message = self.llm.response_message(messages=messages)
# -*- Add prompts and response to the memory - these are added to the llm_messages
self.memory.add_llm_messages(messages=messages)
# Add llm response to the chat history
# LLM Response is the last message in the messages list
llm_response_message = messages[-1]
try:
self.memory.add_chat_message(llm_response_message)
except Exception as e:
logger.warning(f"Failed to add llm response to memory: {e}")
# -*- Save run to storage
self.write_to_storage()
# -*- Send assistant event for monitoring
event_data = {
"user_message": user_message,
"llm_response": llm_response_message,
"messages": [m.model_dump(exclude_none=True) for m in messages],
"metrics": self.llm.metrics,
}
self._api_log_assistant_event(event_type="chat_raw", event_data=event_data)
# -*- Yield final response if not streaming
if not stream:
yield batch_llm_response_message
logger.debug("*********** Assistant Chat Raw End ***********")
def chat_raw(
self, messages: List[Message], user_message: Optional[str] = None, stream: bool = True
) -> Union[Iterator[Dict], Dict]:
if self.tasks and len(self.tasks) > 0:
raise Exception("chat_raw does not support tasks")
resp = self._chat_raw(messages=messages, user_message=user_message, stream=stream)
if stream:
return resp
else:
return next(resp)
def rename(self, name: str) -> None:
"""Rename the assistant for the current run"""
# -*- Read run to storage
self.read_from_storage()
# -*- Rename assistant
self.name = name
# -*- Save run to storage
self.write_to_storage()
# -*- Log assistant run
self._api_log_assistant_run()
def rename_run(self, name: str) -> None:
"""Rename the current run"""
# -*- Read run to storage
self.read_from_storage()
# -*- Rename run
self.run_name = name
# -*- Save run to storage
self.write_to_storage()
# -*- Log assistant run
self._api_log_assistant_run()
def generate_name(self) -> str:
"""Generate a name for the run using the first 6 messages of the chat history"""
_conv = "Conversation\n"
_messages_for_generating_name = []
try:
if self.memory.chat_history[0].role == "assistant":
_messages_for_generating_name = self.memory.chat_history[1:6]
else:
_messages_for_generating_name = self.memory.chat_history[:6]
except Exception as e:
logger.warning(f"Failed to generate name: {e}")
finally:
if len(_messages_for_generating_name) == 0:
_messages_for_generating_name = self.memory.llm_messages[-4:]
for message in _messages_for_generating_name:
_conv += f"{message.role.upper()}: {message.content}\n"
_conv += "\n\nConversation Name:"
system_message = Message(
role="system",
content="Please provide a suitable name for the conversation assistant in maximum 5 words. "
"Remember, do not exceed 5 words.",
)
user_message = Message(role="user", content=_conv)
generate_name_messages = [system_message, user_message]
generated_name = self.llm.parsed_response(messages=generate_name_messages)
if len(generated_name.split()) > 15:
logger.error("Generated name is too long. Trying again.")
return self.generate_name()
return generated_name.replace('"', "").strip()
def auto_rename_run(self) -> None:
"""Automatically rename the run"""
# -*- Read run to storage
self.read_from_storage()
# -*- Generate name for run
generated_name = self.generate_name()
logger.debug(f"Generated name: {generated_name}")
self.run_name = generated_name
# -*- Save run to storage
self.write_to_storage()
# -*- Log assistant run
self._api_log_assistant_run()
###########################################################################
# Api functions
###########################################################################
def _api_log_assistant_run(self):
if not self.monitoring:
return
from phi.api.assistant import create_assistant_run, AssistantRunCreate
try:
database_row: AssistantRun = self.db_row or self.to_database_row()
create_assistant_run(
run=AssistantRunCreate(
run_id=database_row.run_id,
assistant_data=database_row.assistant_dict(),
),
)
except Exception as e:
logger.debug(f"Could not create assistant monitor: {e}")
def _api_log_assistant_event(self, event_type: str = "run", event_data: Optional[Dict[str, Any]] = None) -> None:
if not self.monitoring:
return
from phi.api.assistant import create_assistant_event, AssistantEventCreate
try:
database_row: AssistantRun = self.db_row or self.to_database_row()
create_assistant_event(
event=AssistantEventCreate(
run_id=database_row.run_id,
assistant_data=database_row.assistant_dict(),
event_type=event_type,
event_data=event_data,
),
)
except Exception as e:
logger.debug(f"Could not create assistant event: {e}")
###########################################################################
# Print Response
###########################################################################
def print_response(
self, message: Optional[Union[List[Dict], str]] = None, stream: bool = True, markdown: bool = True
) -> None:
from phi.cli.console import console
from rich.live import Live
from rich.table import Table
from rich.status import Status
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.box import ROUNDED
from rich.markdown import Markdown
if self.output_model is not None:
markdown = False
stream = False
if stream:
response = ""
with Live() as live_log:
status = Status("Working...", spinner="dots")
live_log.update(status)
response_timer = Timer()
response_timer.start()
for resp in self.run(message, stream=True):
response += resp if isinstance(resp, str) else ""
_response = response if not markdown else Markdown(response)
table = Table(box=ROUNDED, border_style="blue", show_header=False)
if message:
table.show_header = True
table.add_column("Message")
table.add_column(get_text_from_message(message))
table.add_row(f"Response\n({response_timer.elapsed:.1f}s)", _response) # type: ignore
live_log.update(table)
response_timer.stop()
else:
response_timer = Timer()
response_timer.start()
with Progress(
SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
) as progress:
progress.add_task("Working...")
response = self.run(message, stream=False) # type: ignore
response_timer.stop()
_response = response if not markdown else Markdown(response)
table = Table(box=ROUNDED, border_style="blue", show_header=False)
if message:
table.show_header = True
table.add_column("Message")
table.add_column(get_text_from_message(message))
table.add_row(f"Response\n({response_timer.elapsed:.1f}s)", _response) # type: ignore
console.print(table)
def cli_app(
self,
user: str = "User",
emoji: str = ":sunglasses:",
stream: bool = True,
markdown: bool = True,
exit_on: Tuple[str, ...] = ("exit", "bye"),
) -> None:
from rich.prompt import Prompt
while True:
message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
if message in exit_on:
break
self.print_response(message=message, stream=stream, markdown=markdown)