Skip to content

Navigator adds assistant to current conversation #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 106 additions & 89 deletions assistants/navigator-assistant/assistant/chat.py
Original file line number Diff line number Diff line change
@@ -5,21 +5,18 @@
# This assistant helps you mine ideas from artifacts.
#

import datetime
import inspect
import io
import logging
import pathlib
from contextlib import asynccontextmanager
from time import perf_counter
from typing import Any, AsyncGenerator
from typing import Any

import deepmerge
from assistant_extensions import attachments, dashboard_card, mcp, navigator
from content_safety.evaluators import CombinedContentSafetyEvaluator
from semantic_workbench_api_model.workbench_model import (
ConversationEvent,
ConversationMessage,
ConversationParticipant,
MessageType,
NewConversationMessage,
ParticipantRole,
@@ -215,113 +212,133 @@ async def should_respond_to_message(context: ConversationContext, message: Conve
return True


@asynccontextmanager
async def timing(message: str) -> AsyncGenerator[None, None]:
async def handoff_to_assistant(context: ConversationContext, participant: ConversationParticipant) -> bool:
"""
Context manager to time the execution of a block of code.
Handoff the conversation to the assistant, if there is handoff metadata in the participant.
"""
caller_frame = inspect.stack()[2]
src = pathlib.Path(caller_frame.filename).name + ":" + str(caller_frame.lineno)
caller = caller_frame.function
start_time = perf_counter()
yield
end_time = perf_counter()
elapsed_time = datetime.timedelta(seconds=end_time - start_time)
logger.info(
"timing; message: %s, elapsed time: %s, caller: %s, src: %s",
message,
elapsed_time,
caller,
src,
)

navigator_handoff = participant.metadata.get("_navigator_handoff")

@assistant.events.conversation.on_created
async def on_conversation_created(context: ConversationContext) -> None:
"""
Handle the event triggered when the assistant is added to a conversation.
"""
if not navigator_handoff:
return False

# hand the conversation off to the assistant if this conversation was spawned from another conversation
async with timing("get-conversation"):
conversation = await context.get_conversation()
assistant_note_messages = await context.get_messages(
participant_ids=[context.assistant.id], message_types=[MessageType.note]
)

navigator_handoff = conversation.metadata.get("_navigator_handoff")
for note_message in assistant_note_messages.messages:
handoff_to_participant_id = note_message.metadata.get("_handoff")

async with timing("get-assistant-messages"):
assistant_sent_messages = await context.get_messages(
participant_ids=[context.assistant.id], limit=1, message_types=[MessageType.chat]
)
assistant_has_sent_a_message = len(assistant_sent_messages.messages) > 0
if not handoff_to_participant_id:
continue

if navigator_handoff:
if assistant_has_sent_a_message:
return
if handoff_to_participant_id == participant.id:
# we've already handed off to this participant
return False

async with timing("hand-off"), context.set_status("handing off..."):
spawned_from_conversation_id = navigator_handoff.get("spawned_from_conversation_id")
spawned_from_conversation_id = navigator_handoff.get("spawned_from_conversation_id")
files_to_copy = navigator_handoff.get("files_to_copy")
introduction_message = navigator_handoff.get("introduction_message")

async with context.set_status("handing off..."):
# copy files if the conversation was spawned from another conversation
is_different_conversation = spawned_from_conversation_id and spawned_from_conversation_id != context.id
if is_different_conversation and files_to_copy:
source_context = context.for_conversation(spawned_from_conversation_id)
files_to_copy = navigator_handoff.get("files_to_copy")
if files_to_copy:
for filename in files_to_copy:
buffer = io.BytesIO()
file = await source_context.get_file(filename)
if not file:
continue

async with source_context.read_file(filename) as reader:
async for chunk in reader:
buffer.write(chunk)

await context.write_file(filename, buffer, file.content_type)

introduction_message = navigator_handoff.get("introduction_message")
await context.send_messages([
NewConversationMessage(
content=introduction_message,
message_type=MessageType.chat,
),
NewConversationMessage(
content=f"{context.assistant.name} left the conversation.",
message_type=MessageType.note,
),
])
for filename in files_to_copy:
buffer = io.BytesIO()
file = await source_context.get_file(filename)
if not file:
continue

await context.update_participant_me(
UpdateParticipant(
active_participant=False,
)
)
async with source_context.read_file(filename) as reader:
async for chunk in reader:
buffer.write(chunk)

return
await context.write_file(filename, buffer, file.content_type)

async with timing("get-participants"):
participants_response = await context.get_participants()
other_assistants_in_conversation = any(
participant
for participant in participants_response.participants
if participant.role == ParticipantRole.assistant and participant.id != context.assistant.id
# send the introduction message to the conversation
await context.send_messages([
NewConversationMessage(
content=introduction_message,
message_type=MessageType.chat,
),
# the "leaving" message doubles as a note to the assistant that they have handed off to
# this participant and won't do it again, even if navigator is added to the conversation again
NewConversationMessage(
content=f"{context.assistant.name} left the conversation.",
message_type=MessageType.note,
metadata={"_handoff": {"participant_id": participant.id}},
),
])

# leave the conversation
await context.update_participant_me(
UpdateParticipant(
active_participant=False,
)
if other_assistants_in_conversation:
)

return True


@assistant.events.conversation.on_created
async def on_conversation_created(context: ConversationContext) -> None:
"""
Handle the event triggered when the assistant is added to a conversation.
"""

participants_response = await context.get_participants()
other_assistant_participants = [
participant
for participant in participants_response.participants
if participant.role == ParticipantRole.assistant and participant.id != context.assistant.id
]
for participant in other_assistant_participants:
# check if the participant has handoff metadata
if await handoff_to_assistant(context, participant):
# if we handed off to this participant, don't send the welcome message
return

if len(other_assistant_participants) > 0:
return

assistant_sent_messages = await context.get_messages(
participant_ids=[context.assistant.id], limit=1, message_types=[MessageType.chat]
)
assistant_has_sent_a_message = len(assistant_sent_messages.messages) > 0
if assistant_has_sent_a_message:
# don't send the welcome message if the assistant has already sent a message
return

# send a welcome message to the conversation
async with timing("get-assistant-config"):
config = await assistant_config.get(context.assistant)
config = await assistant_config.get(context.assistant)

async with timing("send-welcome-message"):
welcome_message = config.response_behavior.welcome_message
await context.send_messages(
NewConversationMessage(
content=welcome_message,
message_type=MessageType.chat,
metadata={"generated_content": False},
)
welcome_message = config.response_behavior.welcome_message
await context.send_messages(
NewConversationMessage(
content=welcome_message,
message_type=MessageType.chat,
metadata={"generated_content": False},
)
)


@assistant.events.conversation.participant.on_created
@assistant.events.conversation.participant.on_updated
async def on_participant_created(
context: ConversationContext, event: ConversationEvent, participant: ConversationParticipant
) -> None:
"""
Handle the event triggered when a participant is added to the conversation.
"""

# check if the participant is an assistant
if participant.role != ParticipantRole.assistant:
return

# check if the assistant should handoff to this participant
await handoff_to_assistant(context, participant)


# endregion
Original file line number Diff line number Diff line change
@@ -176,7 +176,7 @@ async def handle_completion(
local_tool = next((local_tool for local_tool in local_tools if tool_call.name == local_tool.name), None)
if local_tool:
# If the tool call is a local tool, handle it locally
logger.info(f"Handling local tool call: {tool_call.name}")
logger.info("executing local tool call; tool name: %s", tool_call.name)
typed_argument = local_tool.argument_model.model_validate(tool_call.arguments)
content = await local_tool.func(typed_argument, context)

Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
from textwrap import dedent
from typing import Annotated

from pydantic import BaseModel, Field
@@ -7,37 +9,53 @@
)
from semantic_workbench_assistant.assistant_app import ConversationContext

from .list_assistant_services import get_navigator_visible_assistant_service_templates
from .model import LocalTool

logger = logging.getLogger(__name__)


class ArgumentModel(BaseModel):
assistant_service_id: str
template_id: str

attachments_to_copy_to_new_conversation: Annotated[
list[str],
Field(
description="A list of attachment filenames to copy to the new conversation. If empty, no attachments will be copied.",
),
] = []

introduction_message: Annotated[
str,
Field(
description="The message to share with the assistant after the conversation is created. This message sets context around what the user is trying to achieve. Use your own voice, not the user's voice. Speak about the user in the third person.",
description=dedent("""
The message to share with the assistant after it is added to the conversation.
This message sets context around what the user is trying to achieve.
Use your own voice, as the navigator assistant. Speak about the user in the third person.
For example: "{{the user's name}} is trying to get help with their project. They are looking for a way to..."
""").strip(),
),
]


async def assistant_card(args: ArgumentModel, context: ConversationContext) -> str:
"""
Tool to render a control that allows the user to create a new conversation with an assistant.
Results in the app rendering an assistant card with a create buttton.
The button will create a new conversation with the assistant.
You can call this tool again for a difference assistant, or if the introduction message or
attachments to copy to the new conversation should be updated.
Tool to render a control that allows the user to add an assistant to this conversation.
Results in the app rendering an assistant card with a "+" buttton.
This tool does not add the assistant to the conversation. The assistant will be added to
the conversation if the user clicks the "+" button.
You can call this tool again for a different assistant, or if the introduction message
should be updated.
"""

# check if the assistant service id is valid
service_templates = await get_navigator_visible_assistant_service_templates(context)
if not any(
template
for (service_id, template, _) in service_templates
if service_id == args.assistant_service_id and template.id == args.template_id
):
logger.warning(
"assistant_card tool called with invalid assistant_service_id or template_id; assistant_service_id: %s, template_id: %s",
args.assistant_service_id,
args.template_id,
)
return "Error: The selected assistant_service_id and template_id are not available."

await context.send_messages(
NewConversationMessage(
message_type=MessageType.note,
@@ -48,12 +66,11 @@ async def assistant_card(args: ArgumentModel, context: ConversationContext) -> s
"props": {
"assistantServiceId": args.assistant_service_id,
"templateId": args.template_id,
"includeAssistantIds": [context.assistant.id],
"newConversationMetadata": {
"existingConversationId": context.id,
"participantMetadata": {
"_navigator_handoff": {
"introduction_message": args.introduction_message,
"spawned_from_conversation_id": context.id,
"files_to_copy": args.attachments_to_copy_to_new_conversation,
},
},
},
@@ -62,7 +79,7 @@ async def assistant_card(args: ArgumentModel, context: ConversationContext) -> s
)
)

return "Success: The user will be presented with an assistant card to create a new conversation with the assistant."
return "Success: The user will be presented with an assistant card to add the assistant to the conversation."


tool = LocalTool(name="assistant_card", argument_model=ArgumentModel, func=assistant_card)
Loading
Oops, something went wrong.