Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dotnet/agent-framework-dotnet.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
<File Path="samples/02-agents/A2A/README.md" />
<Project Path="samples/02-agents/A2A/A2AAgent_AsFunctionTools/A2AAgent_AsFunctionTools.csproj" />
<Project Path="samples/02-agents/A2A/A2AAgent_PollingForTaskCompletion/A2AAgent_PollingForTaskCompletion.csproj" />
<Project Path="samples/02-agents/A2A/A2AAgent_StreamReconnection/A2AAgent_StreamReconnection.csproj" />
</Folder>
<Folder Name="/Samples/05-end-to-end/">
<Project Path="samples/05-end-to-end/AgentWithPurview/AgentWithPurview.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

AgentSession session = await agent.CreateSessionAsync();

// AllowBackgroundResponses must be true so the server returns immediately with a continuation token
// instead of blocking until the task is complete.
AgentRunOptions options = new() { AllowBackgroundResponses = true };

// Start the initial run with a long-running task.
AgentResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session);
AgentResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session, options: options);

// Poll until the response is complete.
while (response.ContinuationToken is { } token)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFrameworks>net10.0</TargetFrameworks>

<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="A2A" />
<PackageReference Include="Azure.AI.OpenAI" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.A2A\Microsoft.Agents.AI.A2A.csproj" />
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.OpenAI\Microsoft.Agents.AI.OpenAI.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) Microsoft. All rights reserved.

// This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens,
// allowing recovery from stream interruptions without losing progress.

using A2A;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

var a2aAgentHost = Environment.GetEnvironmentVariable("A2A_AGENT_HOST") ?? throw new InvalidOperationException("A2A_AGENT_HOST is not set.");

// Initialize an A2ACardResolver to get an A2A agent card.
A2ACardResolver agentCardResolver = new(new Uri(a2aAgentHost));

// Get the agent card
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();

// Create an instance of the AIAgent for an existing A2A agent specified by the agent card.
AIAgent agent = agentCard.AsAIAgent();

AgentSession session = await agent.CreateSessionAsync();

ResponseContinuationToken? continuationToken = null;

await foreach (var update in agent.RunStreamingAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", session))
{
// Saving the continuation token to be able to reconnect to the same response stream later.
// Note: Continuation tokens are only returned for long-running tasks. If the underlying A2A agent
// returns a message instead of a task, the continuation token will not be initialized.
// A2A agents do not support stream resumption from a specific point in the stream,
// but only reconnection to obtain the same response stream from the beginning.
// So, A2A agents will return an initialized continuation token in the first update
// representing the beginning of the stream, and it will be null in all subsequent updates.
Comment thread
SergeyMenshykh marked this conversation as resolved.
if (update.ContinuationToken is { } token)
{
continuationToken = token;
}

// Imitating stream interruption
break;
}

// Reconnect to the same response stream using the continuation token obtained from the previous run.
// As a first update, the agent will return an update representing the current state of the response at the moment of calling
// RunStreamingAsync with the same continuation token, followed by other updates until the end of the stream is reached.
if (continuationToken is not null)
{
await foreach (var update in agent.RunStreamingAsync(session, options: new() { ContinuationToken = continuationToken }))
{
if (!string.IsNullOrEmpty(update.Text))
{
Console.WriteLine(update.Text);
}
}
}
29 changes: 29 additions & 0 deletions dotnet/samples/02-agents/A2A/A2AAgent_StreamReconnection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# A2A Agent Stream Reconnection

This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, allowing recovery from stream interruptions without losing progress.

The sample:

- Connects to an A2A agent server specified in the `A2A_AGENT_HOST` environment variable
- Sends a request to the agent and begins streaming the response
- Captures a continuation token from the stream for later reconnection
- Simulates a stream interruption by breaking out of the streaming loop
- Reconnects to the same response stream using the captured continuation token
- Displays the response received after reconnection

This pattern is useful when network interruptions or other failures may disrupt an ongoing streaming response, and you need to recover and continue processing.

> **Note:** Continuation tokens are only available when the underlying A2A agent returns a task. If the agent returns a message instead, the continuation token will not be initialized and stream reconnection is not applicable.

# Prerequisites

Before you begin, ensure you have the following prerequisites:

- .NET 10.0 SDK or later
- An A2A agent server running and accessible via HTTP

Set the following environment variable:

```powershell
$env:A2A_AGENT_HOST="http://localhost:5000" # Replace with your A2A agent server host
```
1 change: 1 addition & 0 deletions dotnet/samples/02-agents/A2A/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ See the README.md for each sample for the prerequisites for that sample.
|---|---|
|[A2A Agent As Function Tools](./A2AAgent_AsFunctionTools/)|This sample demonstrates how to represent an A2A agent as a set of function tools, where each function tool corresponds to a skill of the A2A agent, and register these function tools with another AI agent so it can leverage the A2A agent's skills.|
|[A2A Agent Polling For Task Completion](./A2AAgent_PollingForTaskCompletion/)|This sample demonstrates how to poll for long-running task completion using continuation tokens with an A2A agent.|
|[A2A Agent Stream Reconnection](./A2AAgent_StreamReconnection/)|This sample demonstrates how to reconnect to an A2A agent's streaming response using continuation tokens, allowing recovery from stream interruptions.|

## Running the samples from the console

Expand Down
86 changes: 78 additions & 8 deletions dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingA

if (GetContinuationToken(messages, options) is { } token)
{
streamEvents = this._a2aClient.SubscribeToTaskAsync(new SubscribeToTaskRequest { Id = token.TaskId }, cancellationToken).ConfigureAwait(false);
streamEvents = this.SubscribeToTaskWithFallbackAsync(token.TaskId, cancellationToken).ConfigureAwait(false);
}
else
{
Expand Down Expand Up @@ -248,6 +248,75 @@ private async ValueTask<A2AAgentSession> GetA2ASessionAsync(AgentSession? sessio
return typedSession;
}

/// <summary>
/// Subscribes to task updates, falling back to <see cref="A2AClient.GetTaskAsync"/>
/// when the task has already reached a terminal state and the server responds with
/// <see cref="A2AErrorCode.UnsupportedOperation"/>.
/// </summary>
/// <remarks>
/// Per A2A spec §3.1.6, subscribing to a task in a terminal state (completed, failed,
/// canceled, or rejected) results in an <c>UnsupportedOperationError</c>.
/// See: <see href="https://a2a-protocol.org/latest/specification/#332-error-handling"/>.
/// </remarks>
private async IAsyncEnumerable<StreamResponse> SubscribeToTaskWithFallbackAsync(
string taskId,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var subscribeStream = this._a2aClient.SubscribeToTaskAsync(new SubscribeToTaskRequest { Id = taskId }, cancellationToken);

var enumerator = subscribeStream.GetAsyncEnumerator(cancellationToken);

// yield return cannot appear inside a try block that has catch clauses,
// so we manually advance the enumerator within try/catch and yield outside it.
// The outer try/finally (no catch) is allowed to contain yield return in C#.
StreamResponse? fallbackResponse = null;
bool disposed = false;

try
{
while (true)
{
bool hasNext;
try
{
hasNext = await enumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (A2AException ex) when (ex.ErrorCode == A2AErrorCode.UnsupportedOperation)
{
this._logger.LogA2ASubscribeToTaskFallback(this.Id, this.Name, taskId, ex.Message);

// Dispose the enumerator before the fallback call to release the HTTP/SSE connection.
await enumerator.DisposeAsync().ConfigureAwait(false);
disposed = true;

AgentTask agentTask = await this._a2aClient.GetTaskAsync(new GetTaskRequest { Id = taskId }, cancellationToken).ConfigureAwait(false);
Comment thread
SergeyMenshykh marked this conversation as resolved.

fallbackResponse = new StreamResponse { Task = agentTask };
break;
}

if (!hasNext)
{
break;
}

yield return enumerator.Current;
}

if (fallbackResponse is not null)
{
yield return fallbackResponse;
}
}
finally
{
if (!disposed)
{
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}

private static void UpdateSession(A2AAgentSession? session, string? contextId, string? taskId = null)
{
if (session is null)
Expand Down Expand Up @@ -321,17 +390,17 @@ private AgentResponse ConvertToAgentResponse(Message message)
};
}

private AgentResponse ConvertToAgentResponse(AgentTask agentTask)
private AgentResponse ConvertToAgentResponse(AgentTask task)
{
return new AgentResponse
{
AgentId = this.Id,
ResponseId = agentTask.Id,
FinishReason = MapTaskStateToFinishReason(agentTask.Status.State),
RawRepresentation = agentTask,
Messages = agentTask.ToChatMessages() ?? [],
ContinuationToken = CreateContinuationToken(agentTask.Id, agentTask.Status.State),
AdditionalProperties = agentTask.Metadata?.ToAdditionalProperties(),
ResponseId = task.Id,
FinishReason = MapTaskStateToFinishReason(task.Status.State),
RawRepresentation = task,
Messages = task.ToChatMessages() ?? [],
ContinuationToken = CreateContinuationToken(task.Id, task.Status.State),
AdditionalProperties = task.Metadata?.ToAdditionalProperties(),
};
}

Expand Down Expand Up @@ -360,6 +429,7 @@ private AgentResponseUpdate ConvertToAgentResponseUpdate(AgentTask task)
RawRepresentation = task,
Role = ChatRole.Assistant,
Contents = task.ToAIContents(),
ContinuationToken = CreateContinuationToken(task.Id, task.Status.State),
AdditionalProperties = task.Metadata?.ToAdditionalProperties(),
};
}
Expand Down
13 changes: 13 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.A2A/A2AAgentLogMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,17 @@ public static partial void LogAgentChatClientInvokedAgent(
string methodName,
string agentId,
string? agentName);

/// <summary>
/// Logs <see cref="A2AAgent"/> falling back to GetTaskAsync after SubscribeToTaskAsync failed with UnsupportedOperation.
/// </summary>
[LoggerMessage(
Level = LogLevel.Warning,
Message = "A2AAgent {AgentId}/{AgentName} SubscribeToTask for task '{TaskId}' failed with UnsupportedOperation: {ErrorMessage}. Falling back to GetTaskAsync.")]
public static partial void LogA2ASubscribeToTaskFallback(
this ILogger logger,
string agentId,
string? agentName,
string taskId,
string errorMessage);
}
Loading