Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,21 +262,22 @@ async Task WaitAndRemoveAsync(long taskId, ValueTask processTask)
{
Debug.Assert(message.Topic.HasValue);

List<Exception>? exceptions = null;
List<Task>? tasks = null;
TopicId topic = message.Topic!.Value;
foreach (KeyValuePair<string, ISubscriptionDefinition> subscription in message.Runtime._subscriptions)
{
if (!subscription.Value.Matches(topic))
if (subscription.Value.Matches(topic))
{
continue;
(tasks ??= []).Add(ProcessSubscriptionAsync(message, subscription.Value, topic, cancellationToken));
}

try
static async Task ProcessSubscriptionAsync(
MessageToProcess message, ISubscriptionDefinition subscription, TopicId topic, CancellationToken cancellationToken)
{
using CancellationTokenSource combinedSource = CancellationTokenSource.CreateLinkedTokenSource(message.Cancellation, cancellationToken);
combinedSource.Token.ThrowIfCancellationRequested();

ActorId actorId = subscription.Value.MapToActor(topic);
ActorId actorId = subscription.MapToActor(topic);
ActorId? sender = message.Sender;
if (sender is null || sender != actorId)
{
Expand All @@ -289,15 +290,11 @@ async Task WaitAndRemoveAsync(long taskId, ValueTask processTask)
}, combinedSource.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
(exceptions ??= []).Add(ex);
}
}

if (exceptions is not null)
if (tasks is not null)
{
throw new AggregateException("One or more exceptions occurred while processing the message.", exceptions);
await Task.WhenAll(tasks).ConfigureAwait(false);
}

// This method is effectively void, with the result never being used. But it's typed the same as SendMessageServicerAsync
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Threading.Tasks;

namespace Microsoft.Extensions.AI.Agents.Runtime.InProcess.Tests;
Expand Down Expand Up @@ -35,8 +34,7 @@ public async Task Test_PublishMessage_SingleFailureAsync()
await fixture.RegisterErrorAgentAsync(topicTypes: "TestTopic");

// Test that we wrap single errors appropriately
var e = await Assert.ThrowsAsync<AggregateException>(async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }));
Assert.IsType<TestException>(Assert.Single(e.InnerExceptions));
await Assert.ThrowsAsync<TestException>(async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }));

var values = fixture.GetAgentInstances<ReceiverAgent>().Values;
}
Expand All @@ -50,9 +48,7 @@ public async Task Test_PublishMessage_MultipleFailuresAsync()
await fixture.RegisterErrorAgentAsync("2", topicTypes: "TestTopic");

// What we are really testing here is that a single exception does not prevent sending to the remaining agents
var e = await Assert.ThrowsAsync<AggregateException>(async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }));
Assert.Equal(2, e.InnerExceptions.Count);
Assert.All(e.InnerExceptions, innerException => Assert.IsType<TestException>(innerException));
await Assert.ThrowsAsync<TestException>(async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }));

var values = fixture.GetAgentInstances<ErrorAgent>().Values;
Assert.Equal(2, values.Count);
Expand All @@ -70,9 +66,7 @@ public async Task Test_PublishMessage_MixedSuccessFailureAsync()
await fixture.RegisterErrorAgentAsync("2", topicTypes: "TestTopic");

// What we are really testing here is that raising exceptions does not prevent sending to the remaining agents
var e = await Assert.ThrowsAsync<AggregateException>(async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }));
Assert.Equal(2, e.InnerExceptions.Count);
Assert.All(e.InnerExceptions, innerException => Assert.IsType<TestException>(innerException));
await Assert.ThrowsAsync<TestException>(async () => await fixture.RunPublishTestAsync(new TopicId("TestTopic"), new BasicMessage { Content = "1" }));

var agents = fixture.GetAgentInstances<ReceiverAgent>().Values;
Assert.Equal(2, agents.Count);
Expand Down
Loading