-
Notifications
You must be signed in to change notification settings - Fork 57
Consolidating StreamingResponse definitions and slight fixes #327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d3652ff
e8aa9b2
a374ced
c730a89
ead6403
03ed8f7
634b608
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| import pytest | ||
| import asyncio | ||
|
|
||
| from microsoft_agents.activity import ( | ||
| Activity, | ||
| ActivityTypes, | ||
| Channels, | ||
| Entity | ||
| ) | ||
|
|
||
| from microsoft_agents.hosting.core import ( | ||
| TurnContext, | ||
| TurnState, | ||
| ) | ||
|
|
||
| from microsoft_agents.testing import ( | ||
| AgentClient, | ||
| AgentEnvironment, | ||
| AiohttpScenario, | ||
| ) | ||
|
|
||
| FULL_TEXT = "This is a streaming response." | ||
| CHUNKS = FULL_TEXT.split() | ||
|
|
||
| def get_streaminfo(activity: Activity) -> Entity: | ||
| for entity in activity.entities: | ||
| if isinstance(entity, dict) and entity.get("type") == "streaminfo": | ||
| return Entity.model_validate(entity) | ||
| elif isinstance(entity, Entity) and entity.type == "streaminfo": | ||
| return entity | ||
| raise ValueError("No streaminfo entity found") | ||
|
|
||
| async def init_agent(env: AgentEnvironment): | ||
|
|
||
| app = env.agent_application | ||
|
|
||
| @app.message("/stream") | ||
| async def stream_handler(context: TurnContext, state: TurnState): | ||
|
|
||
| assert context.streaming_response is not None | ||
|
|
||
| context.streaming_response.queue_informative_update("Starting stream...") | ||
|
|
||
| for chunk in CHUNKS: | ||
| await asyncio.sleep(1.0) # Simulate delay between chunks | ||
| context.streaming_response.queue_text_chunk(chunk) | ||
|
|
||
| await asyncio.sleep(1.0) | ||
|
|
||
| await context.streaming_response.end_stream() | ||
|
|
||
| _SCENARIO = AiohttpScenario(init_agent=init_agent, use_jwt_middleware=False) | ||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.agent_test(_SCENARIO) | ||
| async def test_basic_streaming_response_non_streaming_channel(agent_client: AgentClient): | ||
|
|
||
| expected_len = len(FULL_TEXT.split()) | ||
|
|
||
| agent_client.template = agent_client.template.with_updates(channel_id=Channels.emulator) | ||
|
|
||
| # give enough time for all the activities to send | ||
| await agent_client.send("/stream", wait=expected_len * 2.0) | ||
|
|
||
| stream_activities = agent_client.select().where( | ||
| entities=lambda x: any(e["type"] == "streaminfo" for e in x) | ||
| ).get() | ||
|
|
||
| assert len(stream_activities) == 1 | ||
|
|
||
| final_streaminfo = get_streaminfo(stream_activities[0]) | ||
|
|
||
| assert final_streaminfo.stream_sequence == 1 | ||
| assert final_streaminfo.stream_type == "final" | ||
| assert stream_activities[0].text == FULL_TEXT.replace(" ", "") | ||
|
|
||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.agent_test(_SCENARIO) | ||
| async def test_basic_streaming_response_streaming_channel(agent_client: AgentClient): | ||
|
|
||
| expected_len = len(FULL_TEXT.split()) | ||
|
|
||
| agent_client.template = agent_client.template.with_updates(channel_id=Channels.webchat) | ||
|
|
||
| # give enough time for all the activities to send | ||
| await agent_client.send("/stream", wait=expected_len * 2.0) | ||
|
|
||
| stream_activities = agent_client.select().where( | ||
| entities=lambda x: any(e["type"] == "streaminfo" for e in x) | ||
| ).get() | ||
|
Comment on lines
+90
to
+92
|
||
|
|
||
| assert len(stream_activities) == len(CHUNKS) + 2 | ||
|
|
||
| informative = stream_activities[0] | ||
| informative_streaminfo = get_streaminfo(informative) | ||
|
|
||
| assert informative_streaminfo.stream_type == "informative" | ||
| assert informative_streaminfo.stream_sequence == 1 | ||
| assert informative.text == "Starting stream..." | ||
| assert informative.type == ActivityTypes.typing | ||
|
|
||
| t = "" | ||
| for i, chunk in enumerate(CHUNKS): | ||
| t += chunk | ||
|
|
||
| j = i + 1 | ||
|
|
||
| streaminfo = get_streaminfo(stream_activities[j]) | ||
|
|
||
| assert stream_activities[j].text == t | ||
| assert stream_activities[j].type == ActivityTypes.typing | ||
| assert streaminfo.stream_type == "streaming" | ||
| assert streaminfo.stream_sequence == j + 1 | ||
|
|
||
| final_streaminfo = get_streaminfo(stream_activities[-1]) | ||
|
|
||
| assert final_streaminfo.stream_sequence == len(stream_activities) | ||
| assert final_streaminfo.stream_type == "final" | ||
| assert stream_activities[-1].text == FULL_TEXT.replace(" ", "") | ||
|
|
||
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Select.where(entities=...)predicate assumes each entity is a dict (e["type"]), but streamed activities are parsed viaActivity.model_validate_json(...), which producesEntitymodels. This will raiseTypeError: 'Entity' object is not subscriptable. Update the predicate to handle both dict andEntity(e.g., usegetattr(e, "type", None)/e.get("type")).