Skip to content

Commit 2bfa31b

Browse files
authored
Python: pseudo-stream copilot invoke_stream (#12548)
### Motivation and Context The current copilot studio client doesn't support streaming invocation. The underlying orchestration patterns rely on agents to have an invoke_stream. Since it doesn't exist in the copilot agent, that agent type cannot be used. This PR implements a pseudo-invoke_stream so that the agent can be used with patterns - although it doesn't stream the response back in chunks, it unblocks the use if one desired to use our new orchestration patterns. <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> ### Description - Closes #12449 <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone 😄
1 parent 9e3cdff commit 2bfa31b

File tree

1 file changed

+64
-4
lines changed

1 file changed

+64
-4
lines changed

python/semantic_kernel/agents/copilot_studio/copilot_studio_agent.py

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
CopilotStudioAgentSettings,
3333
)
3434
from semantic_kernel.contents.chat_message_content import ChatMessageContent
35+
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
3536
from semantic_kernel.contents.utils.author_role import AuthorRole
3637
from semantic_kernel.exceptions.agent_exceptions import (
3738
AgentInitializationException,
@@ -54,7 +55,6 @@
5455
from typing_extensions import override
5556

5657
if TYPE_CHECKING: # pragma: no cover
57-
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
5858
from semantic_kernel.kernel import Kernel
5959

6060
logger: logging.Logger = logging.getLogger(__name__)
@@ -510,7 +510,7 @@ async def invoke(
510510
yield AgentResponseItem(message=response, thread=thread)
511511

512512
@override
513-
def invoke_stream(
513+
async def invoke_stream(
514514
self,
515515
messages: str | ChatMessageContent | list[str | ChatMessageContent] | None = None,
516516
*,
@@ -519,8 +519,53 @@ def invoke_stream(
519519
arguments: KernelArguments | None = None,
520520
kernel: "Kernel | None" = None,
521521
**kwargs: Any,
522-
) -> AsyncIterable[AgentResponseItem["StreamingChatMessageContent"]]:
523-
raise NotImplementedError("Streaming is not supported for Copilot Studio agents.")
522+
) -> AsyncIterable[AgentResponseItem[StreamingChatMessageContent]]:
523+
"""Invoke the agent and stream the response.
524+
525+
Note: this is a “pseudo-streaming” implementation.
526+
527+
We're internally delegating to the real async generator `_inner_invoke`.
528+
Each complete ChatMessageContent is wrapped in exactly one
529+
StreamingChatMessageContent chunk, so downstream consumers can iterate
530+
without change. The stream yields at least once; callers still receive
531+
on_intermediate callbacks in real time.
532+
533+
Args:
534+
messages: The messages to send to the agent.
535+
thread: The thread to use for the agent.
536+
on_intermediate_message: A callback function to call with each intermediate message.
537+
arguments: The arguments to pass to the agent.
538+
kernel: The kernel to use for the agent.
539+
**kwargs: Additional keyword arguments.
540+
541+
Yields:
542+
A chat message content and thread with the response.
543+
"""
544+
thread = await self._ensure_thread_exists_with_messages(
545+
messages=messages,
546+
thread=thread,
547+
construct_thread=lambda: CopilotStudioAgentThread(self.client),
548+
expected_type=CopilotStudioAgentThread,
549+
)
550+
if not isinstance(thread, CopilotStudioAgentThread):
551+
raise AgentThreadOperationException("The thread is not a Copilot Studio Agent thread.")
552+
553+
normalized_messages = self._normalize_messages(messages)
554+
555+
responses: list[ChatMessageContent] = []
556+
async for resp in self._inner_invoke(
557+
thread=thread,
558+
messages=normalized_messages,
559+
on_intermediate_message=on_intermediate_message,
560+
arguments=arguments,
561+
kernel=kernel,
562+
**kwargs,
563+
):
564+
responses.append(resp)
565+
566+
for i, resp in enumerate(responses):
567+
stream_msg = self._to_streaming(resp, index=i)
568+
yield AgentResponseItem(message=stream_msg, thread=thread)
524569

525570
# endregion
526571

@@ -598,6 +643,21 @@ def _normalize_messages(messages: str | ChatMessageContent | list[str | ChatMess
598643
normalized.append(m.content if isinstance(m, ChatMessageContent) else str(m))
599644
return normalized
600645

646+
@staticmethod
647+
def _to_streaming(
648+
msg: ChatMessageContent,
649+
*,
650+
index: int,
651+
) -> StreamingChatMessageContent:
652+
"""Wrap a complete ChatMessageContent in a StreamingChatMessageContent."""
653+
return StreamingChatMessageContent(
654+
role=msg.role,
655+
name=msg.name,
656+
content=msg.content,
657+
choice_index=index,
658+
metadata=msg.metadata,
659+
)
660+
601661
@override
602662
async def _notify_thread_of_new_message(self, thread, new_message):
603663
"""Copilot Studio Agent doesn't need to notify the thread of new messages.

0 commit comments

Comments
 (0)