-
Notifications
You must be signed in to change notification settings - Fork 275
Allow InProcessRuntime subscriptions to be processed concurrently #179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Allows in-process runtime subscriptions to run in parallel instead of sequentially by collecting processing tasks and awaiting them together.
- Switches from catching and aggregating exceptions to spawning tasks and using Task.WhenAll
- Introduces a static local function
ProcessSubscriptionAsync
for each subscription - Changes the topic-match check to add matching subscriptions to the task list
Comments suppressed due to low confidence (3)
dotnet/src/Microsoft.Extensions.AI.Agents.Runtime.Abstractions/InProcessRuntime.cs:265
- [nitpick] The variable name
tasks
is generic and may be ambiguous; consider renaming it tosubscriptionTasks
to clarify that it holds subscription processing tasks.
List<Task>? tasks = null;
dotnet/src/Microsoft.Extensions.AI.Agents.Runtime.Abstractions/InProcessRuntime.cs:297
- This change introduces parallel execution of subscription handlers. Please add or update tests to verify behavior when multiple subscriptions run concurrently, including cases with successful, faulted, and cancelled tasks.
await Task.WhenAll(tasks).ConfigureAwait(false);
dotnet/src/Microsoft.Extensions.AI.Agents.Runtime.Abstractions/InProcessRuntime.cs:271
- Launching all subscription handlers as concurrent tasks can introduce race conditions if the underlying
ISubscriptionDefinition
implementations or actor interactions are not thread-safe; ensure those components can handle parallel execution or serialize access where necessary.
(tasks ??= []).Add(ProcessSubscriptionAsync(message, subscription.Value, topic, cancellationToken));
dotnet/src/Microsoft.Extensions.AI.Agents.Runtime.Abstractions/InProcessRuntime.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Extensions.AI.Agents.Runtime.Abstractions/InProcessRuntime.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful
How important is it that we throw an AggregateException with all the exceptions? I neglected to do so, such that right now tests are failing because only the first exception is propagating. I can fix it to be an aggregate if that's important...? Otherwise I'll fix the tests. |
No description provided.