.NET: Add streaming support to A2A agent handler#5427
.NET: Add streaming support to A2A agent handler#5427SergeyMenshykh merged 16 commits intomicrosoft:mainfrom
Conversation
Move the A2A sample projects (A2AAgent_AsFunctionTools and A2AAgent_PollingForTaskCompletion) from samples/04-hosting/A2A/ to samples/02-agents/A2A/ to better align with the sample directory structure. Update solution file and samples README accordingly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Add SSE stream reconnection support to A2AAgent Implement automatic reconnection for SSE streams that disconnect mid-task, using the Last-Event-ID header to resume from where the stream left off. Changes: - Add InvokeStreamingWithReconnectAsync method to A2AAgent with configurable max retries and delay between attempts - Add new log messages for reconnection events - Add A2AAgent_StreamReconnection sample demonstrating the feature - Update existing polling sample to use simplified SendMessageAsync API - Add unit tests for stream reconnection logic Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * address comments * Address PR review feedback - Dispose SSE enumerator before GetTaskAsync fallback to release HTTP connection - Wrap StreamWriter in using blocks with leaveOpen:true and explicit UTF-8 encoding - Print update.Text instead of update object in stream reconnection sample Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Refactor A2A extensions to use IA2AClientFactory and add ProtocolSelection sample - Update A2AAgentCardExtensions to accept IA2AClientFactory instead of A2AClientOptions - Update A2ACardResolverExtensions to accept IA2AClientFactory - Update A2AClientExtensions to accept IA2AClientFactory - Update A2AAgent to use IA2AClientFactory for client creation - Add A2AAgent_ProtocolSelection sample demonstrating protocol selection - Add comprehensive unit tests for all changes - Update README files with new sample reference Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Reorder params: options before loggerFactory in A2A extensions Move A2AClientOptions parameter before ILoggerFactory in AsAIAgent and GetAIAgentAsync extension methods to follow the repo convention of keeping LoggerFactory and CancellationToken as the last parameters. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* .NET: Migrate A2A hosting to A2A SDK v1 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * remove unused agent card --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…osoft#5413) * .NET: Refactor A2A hosting registration into A2AServerServiceCollectionExtensions - Rename A2AHostingOptions to A2AServerRegistrationOptions - Move server registration logic from A2AEndpointRouteBuilderExtensions and AIAgentExtensions into new A2AServerServiceCollectionExtensions - Remove A2AProtocolBinding and AIAgentExtensions (consolidated) - Update samples and tests to use the new registration API Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * address copilot comments --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add HandleNewMessageStreamingAsync to A2AAgentHandler that routes StreamingResponse requests through RunStreamingAsync, enqueuing an A2A Message for each AgentResponseUpdate. Add MessageConverter.ToParts(AgentResponseUpdate) extension to convert streaming update contents to A2A Parts with unsupported-content filtering. Add CreateMessageFromUpdate to map AgentResponseUpdate to A2A Message. Add 16 new tests covering the streaming path and converter. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 90%
✓ Correctness
This PR adds streaming support to the A2A agent handler by introducing HandleNewMessageStreamingAsync alongside the existing HandleNewMessageAsync. The implementation correctly follows established patterns: session lifecycle management, ReferenceTaskIds rejection, metadata-to-options conversion, and response translation. The new ToParts extension on AgentResponseUpdate mirrors the existing ToParts on IList. The streaming path intentionally omits AllowBackgroundResponses (not applicable when responses are already streamed). Tests are comprehensive, covering message production, options forwarding, context ID generation, error cases, and session persistence. No bugs, race conditions, or incorrect API usage found.
✓ Security Reliability
This PR adds streaming support to the A2A agent handler, following existing patterns closely. The new HandleNewMessageStreamingAsync method mirrors HandleNewMessageAsync in structure, including session management, ReferenceTaskIds validation, and metadata-to-options conversion. The ToParts extension for AgentResponseUpdate follows the same null-filtering pattern as the existing ToParts for IList. The streaming path intentionally omits AllowBackgroundResponses and task lifecycle management, which is consistent with streaming semantics. No security or reliability issues were identified — no new trust boundaries are crossed, resource management via await-foreach is correct, and error handling is consistent with the non-streaming path.
✓ Test Coverage
The PR adds streaming support to A2AAgentHandler with a new HandleNewMessageStreamingAsync method, a CreateMessageFromUpdate static helper, and a ToParts extension method for AgentResponseUpdate. The test coverage is solid overall: 12 new handler tests cover the core streaming scenarios (multiple updates, metadata propagation, null handling, ReferenceTaskIds guard, context ID generation, session saving) and 4 new MessageConverter tests cover the ToParts extension. However, there are a few test coverage gaps: no test for an empty stream (zero updates from RunStreamingAsync), no cancellation token propagation test mirroring the non-streaming equivalent, and the streaming path's intentional omission of ContinuationToken handling is not verified by any test.
✗ Design Approach
The streaming support is headed in the right direction, but the current design forks away from the existing A2A run/task pipeline in two important ways: it no longer honors
AgentRunModewhen invoking the agent, and it flattens everyAgentResponseUpdateinto a plain A2A message event. Together, those choices disable task-backed streaming and continuation-token-based reconection for hosted A2A streams, so the change solves “emit chunks” but drops the broader background/resumption contract the rest of this layer already implements.
Suggestions
- Add a test for zero updates from
RunStreamingAsync(empty async enumerable). Currently all streaming tests yield at least one update; when RunStreamingAsync yields nothing, no messages are enqueued and only SaveSessionAsync runs—this edge case should be covered. - Add a cancellation token propagation test for the streaming path, mirroring
ExecuteAsync_DynamicMode_CancellationTokenIsPropagatedToCallbackAsync(line 767) from the non-streaming path. The streaming path passesCancellationTokento multiple calls but has no test verifying this. - Factor the run-decision/options construction into a shared helper used by all three execution paths so streaming preserves the same background/task policy as non-streaming execution.
- Model streaming background responses as A2A task/status/artifact events (or otherwise preserve
ContinuationToken) instead of flattening every update into an independent message chunk.
Automated review by SergeyMenshykh's agents
Add two tests covering gaps in the streaming path: - ExecuteAsync_Streaming_WhenNoUpdates_EnqueuesNoMessagesAndSavesSessionAsync: Verifies that when RunStreamingAsync yields an empty async enumerable, no messages are enqueued and only SaveSessionAsync runs. - ExecuteAsync_Streaming_CancellationTokenIsPropagatedToRunStreamingAsyncAsync: Verifies that the CancellationToken from ExecuteAsync is propagated through to the inner agent's RunCoreStreamingAsync call. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds A2A streaming support in the .NET host by routing streaming-endpoint requests through the agent’s streaming execution path and converting streaming updates into A2A message events.
Changes:
- Route
A2AAgentHandler.ExecuteAsyncto a new streaming handler whenRequestContext.StreamingResponseistrue. - Convert
AgentResponseUpdatestreaming chunks into A2AMessage/Partobjects and enqueue them as they arrive. - Add unit tests covering the streaming handler behavior and
MessageConverter.ToParts(AgentResponseUpdate).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| dotnet/src/Microsoft.Agents.AI.Hosting.A2A/A2AAgentHandler.cs | Adds streaming execution path using RunStreamingAsync and maps updates to A2A message events. |
| dotnet/src/Microsoft.Agents.AI.Hosting.A2A/Converters/MessageConverter.cs | Adds ToParts(AgentResponseUpdate) conversion for streaming update contents. |
| dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/A2AAgentHandlerTests.cs | Adds streaming-path tests for enqueuing, metadata/options propagation, IDs, and session persistence. |
| dotnet/tests/Microsoft.Agents.AI.Hosting.A2A.UnitTests/Converters/MessageConverterTests.cs | Adds tests for ToParts(AgentResponseUpdate) including unsupported-content filtering. |
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 92%
✓ Correctness
This PR adds a streaming path to the A2A agent handler. The new
HandleNewMessageStreamingAsyncmethod correctly mirrors the non-streamingHandleNewMessageAsyncpattern—usingRunStreamingAsyncinstead ofRunAsync, enqueuing individual message events per update, and saving the session afterward. The newToParts(AgentResponseUpdate)extension method follows the identical iteration-and-filter-nulls pattern as the existingToParts(IList<ChatMessage>). TheExecuteAsyncdispatch order (continuation → streaming → non-streaming) is logically sound. Tests are comprehensive, covering the happy path, metadata propagation, null handling, session persistence, cancellation token forwarding, and edge cases. No bugs, race conditions, or incorrect API usage found.
✓ Security Reliability
This PR adds a streaming path to the A2A agent handler, introducing HandleNewMessageStreamingAsync and a ToParts extension for AgentResponseUpdate. The implementation is consistent with the existing non-streaming HandleNewMessageAsync: same session lifecycle (get-or-create → run → save), same ReferenceTaskIds guard, same contextId fallback pattern, and same error-propagation behavior (no try/catch, matching HandleNewMessageAsync). The new ToParts extension correctly handles null/empty contents and filters unsupported content types. The routing logic in ExecuteAsync correctly prioritizes IsContinuation over StreamingResponse. Tests are thorough. No security or reliability issues found.
✓ Test Coverage
Test coverage for the new streaming path is comprehensive. The PR adds 14 tests for
HandleNewMessageStreamingAsynccovering the key scenarios: multiple updates, metadata/options propagation, null metadata, ReferenceTaskIds rejection, context ID generation, null message, ResponseId-to-MessageId mapping, AditionalProperties, session persistence, empty updates, and cancellation token propagation. TheMessageConverter.ToParts(AgentResponseUpdate)extension gets 4 tests covering empty contents, text content, multiple contents, and unsupported content filtering. One minor gap: there is no test verifying the routing precedence when bothIsContinuationandStreamingResponseare true on theRequestContext.
✗ Design Approach
The streaming split adds useful test coverage, but the design currently makes streaming requests follow a different execution policy from the rest of the A2A host. In particular, the new path bypasses
AgentRunMode/background-response handling entirely and always emits plain message events, which drops the continuation-based reconection model the abstractions describe for streamed long-running work. There is also a narrower identity-mapping bug where streamed A2A messages are keyed offResponseIdinstead of the provider’s per-messageMessageId.
Suggestions
- Keep streaming as a transport concern rather than a separate execution mode: share the same background/continuation decision logic between streaming and non-streaming requests, then serialize either
AgentResponseorAgentResponseUpdateevents on top of that common pipeline.
Automated review by SergeyMenshykh's agents
Summary
Adds streaming message support to
A2AAgentHandlerso that requests received via the A2A streaming endpoint (StreamingResponse = true) are handled usingRunStreamingAsyncinstead ofRunAsync.Changes
Production code
A2AAgentHandler.ExecuteAsync- routes toHandleNewMessageStreamingAsyncwhencontext.StreamingResponseistrue.HandleNewMessageStreamingAsync- iteratesRunStreamingAsync, enqueues an A2AMessagefor eachAgentResponseUpdate, and saves the session after the stream completes. Unlike the non-streaming path, this does not useAgentRunModeor task lifecycle events.CreateMessageFromUpdate- mapsAgentResponseUpdateto A2AMessage(parallel to the existingCreateMessageFromResponse).MessageConverter.ToParts(AgentResponseUpdate)- converts streaming update contents to A2APartobjects, filtering out unsupported content types.Tests (16 new tests)
MessageConverter.ToParts(AgentResponseUpdate)- empty contents, text content, multiple contents, unsupported content filtering.ReferenceTaskIdsrejection, context ID generation, session persistence,ResponseIdmapping, andnulledge cases.Out Of Scope
Supporting long-running operations/tasks with streaming is out of the scope of this PR and will be added later.
Closes:
MapA2A#3310