Skip to content

Commit ef912a9

Browse files
authored
.NET Agents - Switch all instance of "SendMessage" to "PublishMessage" (#12457)
### Motivation and Context <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> Eliminates usage of `SendMessageAsync` ### Description <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> While `SendMessageAsync` targets a specific agent, it will block until the target result is provided. `PublishMessageAsync` allows for execution to yield back to the caller as soon as the message is sent. ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone 😄
1 parent 4ab3d57 commit ef912a9

14 files changed

+61
-41
lines changed

dotnet/src/Agents/Magentic/MagenticManagerActor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
101101
if (status.IsTaskComplete)
102102
{
103103
ChatMessageContent finalAnswer = await this._manager.PrepareFinalAnswerAsync(context, cancellationToken).ConfigureAwait(false);
104-
await this.SendMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
104+
await this.PublishMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
105105
break;
106106
}
107107

@@ -142,14 +142,14 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
142142

143143
if (this._invocationCount >= this._manager.MaximumInvocationCount)
144144
{
145-
await this.SendMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
145+
await this.PublishMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
146146
break;
147147
}
148148

149149
ChatMessageContent instruction = new(AuthorRole.Assistant, agentInstruction);
150150
this._chat.Add(instruction);
151151
await this.PublishMessageAsync(instruction.AsGroupMessage(), this.Context.Topic, messageId: null, cancellationToken).ConfigureAwait(false);
152-
await this.SendMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false);
152+
await this.PublishMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false);
153153
break;
154154
}
155155

@@ -158,7 +158,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
158158
if (this._retryCount >= this._manager.MaximumResetCount)
159159
{
160160
this.Logger.LogMagenticManagerTaskFailed(this.Context.Topic);
161-
await this.SendMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
161+
await this.PublishMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
162162
break;
163163
}
164164

dotnet/src/Agents/Magentic/MagenticOrchestration.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
4040
{
4141
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
4242
}
43-
return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
43+
return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
4444
}
4545

4646
/// <inheritdoc />
@@ -65,7 +65,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
6565
}
6666

6767
AgentType managerType =
68-
await runtime.RegisterAgentFactoryAsync(
68+
await runtime.RegisterOrchestrationAgentAsync(
6969
this.FormatAgentType(context.Topic, "Manager"),
7070
(agentId, runtime) =>
7171
{
@@ -83,7 +83,7 @@ await runtime.RegisterAgentFactoryAsync(
8383
return managerType;
8484

8585
ValueTask<AgentType> RegisterAgentAsync(Agent agent, int agentCount) =>
86-
runtime.RegisterAgentFactoryAsync(
86+
runtime.RegisterOrchestrationAgentAsync(
8787
this.FormatAgentType(context.Topic, $"Agent_{agentCount}"),
8888
(agentId, runtime) =>
8989
{

dotnet/src/Agents/Orchestration/AgentOrchestration.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public async ValueTask<OrchestrationResult<TOutput>> InvokeAsync(
114114

115115
logger.LogOrchestrationInvoke(this.OrchestrationLabel, topic);
116116

117-
Task task = runtime.SendMessageAsync(input, orchestrationType, cancellationToken).AsTask();
117+
Task task = runtime.PublishMessageAsync(input, orchestrationType, cancellationToken).AsTask();
118118

119119
logger.LogOrchestrationYield(this.OrchestrationLabel, topic);
120120

@@ -168,7 +168,7 @@ private async ValueTask<AgentType> RegisterAsync(IAgentRuntime runtime, Orchestr
168168

169169
// Register actor for orchestration entry-point
170170
AgentType orchestrationEntry =
171-
await runtime.RegisterAgentFactoryAsync(
171+
await runtime.RegisterOrchestrationAgentAsync(
172172
this.FormatAgentType(context.Topic, "Boot"),
173173
(agentId, runtime) =>
174174
{
@@ -210,8 +210,8 @@ public sealed class RegistrationContext(
210210
public async ValueTask<AgentType> RegisterResultTypeAsync<TResult>(OrchestrationResultTransform<TResult> resultTransform)
211211
{
212212
// Register actor for final result
213-
return
214-
await runtime.RegisterAgentFactoryAsync(
213+
AgentType registeredType =
214+
await runtime.RegisterOrchestrationAgentAsync(
215215
agentType,
216216
(agentId, runtime) =>
217217
{
@@ -229,6 +229,8 @@ await runtime.RegisterAgentFactoryAsync(
229229
return ValueTask.FromResult<IHostableAgent>(actor);
230230
#endif
231231
}).ConfigureAwait(false);
232+
233+
return registeredType;
232234
}
233235
}
234236
}

dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ public async ValueTask HandleAsync(ConcurrentMessages.Request item, MessageConte
3838

3939
this.Logger.LogConcurrentAgentResult(this.Id, response.Content);
4040

41-
await this.SendMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false);
41+
await this.PublishMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false);
4242
}
4343
}

dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
4040

4141
// Register result actor
4242
AgentType resultType = this.FormatAgentType(context.Topic, "Results");
43-
await runtime.RegisterAgentFactoryAsync(
43+
await runtime.RegisterOrchestrationAgentAsync(
4444
resultType,
4545
(agentId, runtime) =>
4646
{

dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public async ValueTask HandleAsync(ConcurrentMessages.Result item, MessageContex
5353

5454
if (Interlocked.Increment(ref this._resultCount) == this._expectedCount)
5555
{
56-
await this.SendMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
56+
await this.PublishMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
5757
}
5858
}
5959
}

dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

3+
using System;
34
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.SemanticKernel.Agents.Runtime;
@@ -15,13 +16,36 @@ public static class RuntimeExtensions
1516
/// <summary>
1617
/// Sends a message to the specified agent.
1718
/// </summary>
18-
public static async ValueTask SendMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default)
19+
public static async ValueTask PublishMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default)
1920
{
20-
AgentId? agentId = await runtime.GetAgentAsync(agentType, lazy: false).ConfigureAwait(false);
21-
if (agentId.HasValue)
22-
{
23-
await runtime.SendMessageAsync(message, agentId.Value, sender: null, messageId: null, cancellationToken).ConfigureAwait(false);
24-
}
21+
await runtime.PublishMessageAsync(message, new TopicId(agentType), sender: null, messageId: null, cancellationToken).ConfigureAwait(false);
22+
}
23+
24+
/// <summary>
25+
/// Registers an agent factory for the specified agent type and associates it with the runtime.
26+
/// </summary>
27+
/// <param name="runtime">The runtime targeted for registration.</param>
28+
/// <param name="agentType">The type of agent to register.</param>
29+
/// <param name="factoryFunc">The factory function for creating the agent.</param>
30+
/// <returns>The registered agent type.</returns>
31+
public static async ValueTask<AgentType> RegisterOrchestrationAgentAsync(this IAgentRuntime runtime, AgentType agentType, Func<AgentId, IAgentRuntime, ValueTask<IHostableAgent>> factoryFunc)
32+
{
33+
AgentType registeredType = await runtime.RegisterAgentFactoryAsync(agentType, factoryFunc).ConfigureAwait(false);
34+
35+
// Subscribe agent to its own unique topic
36+
await runtime.SubscribeAsync(registeredType).ConfigureAwait(false);
37+
38+
return registeredType;
39+
}
40+
41+
/// <summary>
42+
/// Subscribes the specified agent type to its own dedicated topic.
43+
/// </summary>
44+
/// <param name="runtime">The runtime for managing the subscription.</param>
45+
/// <param name="agentType">The agent type to subscribe.</param>
46+
public static async Task SubscribeAsync(this IAgentRuntime runtime, string agentType)
47+
{
48+
await runtime.AddSubscriptionAsync(new TypeSubscription(agentType, agentType)).ConfigureAwait(false);
2549
}
2650

2751
/// <summary>

dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ private async ValueTask ManageAsync(MessageContext messageContext)
8888
{
8989
GroupChatManagerResult<string> filterResult = await this._manager.FilterResults(this._chat, messageContext.CancellationToken).ConfigureAwait(false);
9090
this.Logger.LogChatManagerResult(this.Id, filterResult.Value, filterResult.Reason);
91-
await this.SendMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
91+
await this.PublishMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
9292
return;
9393
}
9494

9595
GroupChatManagerResult<string> selectionResult = await this._manager.SelectNextAgent(this._chat, this._team, messageContext.CancellationToken).ConfigureAwait(false);
9696
AgentType selectionType = this._team[selectionResult.Value].Type;
9797
this.Logger.LogChatManagerSelect(this.Id, selectionType);
98-
await this.SendMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false);
98+
await this.PublishMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false);
9999
}
100100
}

dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
3939
{
4040
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
4141
}
42-
return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
42+
return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
4343
}
4444

4545
/// <inheritdoc />
@@ -64,7 +64,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
6464
}
6565

6666
AgentType managerType =
67-
await runtime.RegisterAgentFactoryAsync(
67+
await runtime.RegisterOrchestrationAgentAsync(
6868
this.FormatAgentType(context.Topic, "Manager"),
6969
(agentId, runtime) =>
7070
{
@@ -82,7 +82,7 @@ await runtime.RegisterAgentFactoryAsync(
8282
return managerType;
8383

8484
ValueTask<AgentType> RegisterAgentAsync(Agent agent, int agentCount) =>
85-
runtime.RegisterAgentFactoryAsync(
85+
runtime.RegisterOrchestrationAgentAsync(
8686
this.FormatAgentType(context.Topic, $"Agent_{agentCount}"),
8787
(agentId, runtime) =>
8888
{

dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public async ValueTask HandleAsync(HandoffMessages.Request item, MessageContext
125125
if (this._handoffAgent != null)
126126
{
127127
AgentType handoffType = this._handoffs[this._handoffAgent].AgentType;
128-
await this.SendMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false);
128+
await this.PublishMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false);
129129

130130
this._handoffAgent = null;
131131
break;
@@ -183,6 +183,6 @@ private async ValueTask EndAsync(string summary, CancellationToken cancellationT
183183
{
184184
this.Logger.LogHandoffSummary(this.Id, summary);
185185
this._taskSummary = summary;
186-
await this.SendMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false);
186+
await this.PublishMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false);
187187
}
188188
}

dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
5353
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
5454
}
5555
await runtime.PublishMessageAsync(input.AsInputTaskMessage(), topic).ConfigureAwait(false);
56-
await runtime.SendMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false);
56+
await runtime.PublishMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false);
5757
}
5858

5959
/// <inheritdoc />
@@ -71,7 +71,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
7171
HandoffLookup map = [];
7272
handoffMap[agent.Name ?? agent.Id] = map;
7373
agentType =
74-
await runtime.RegisterAgentFactoryAsync(
74+
await runtime.RegisterOrchestrationAgentAsync(
7575
this.GetAgentType(context.Topic, index),
7676
(agentId, runtime) =>
7777
{

dotnet/src/Agents/Orchestration/OrchestrationActor.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,11 @@ protected OrchestrationActor(AgentId id, IAgentRuntime runtime, OrchestrationCon
3434
/// <param name="agentType">The recipient agent's type.</param>
3535
/// <param name="cancellationToken">A token used to cancel the operation if needed.</param>
3636
/// <returns>The agent identifier, if it exists.</returns>
37-
protected async ValueTask<AgentId?> SendMessageAsync(
37+
protected async ValueTask PublishMessageAsync(
3838
object message,
3939
AgentType agentType,
4040
CancellationToken cancellationToken = default)
4141
{
42-
AgentId? agentId = await this.GetAgentAsync(agentType, cancellationToken).ConfigureAwait(false);
43-
44-
if (agentId.HasValue)
45-
{
46-
await this.SendMessageAsync(message, agentId.Value, messageId: null, cancellationToken).ConfigureAwait(false);
47-
}
48-
49-
return agentId;
42+
await base.PublishMessageAsync(message, new TopicId(agentType), messageId: null, cancellationToken).ConfigureAwait(false);
5043
}
5144
}

dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,6 @@ private async ValueTask InvokeAgentAsync(IList<ChatMessageContent> input, Messag
5656

5757
this.Logger.LogSequentialAgentResult(this.Id, response.Content);
5858

59-
await this.SendMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false);
59+
await this.PublishMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false);
6060
}
6161
}

dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
3131
{
3232
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
3333
}
34-
await runtime.SendMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false);
34+
await runtime.PublishMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false);
3535
}
3636

3737
/// <inheritdoc />
@@ -52,11 +52,12 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
5252
return nextAgent;
5353

5454
ValueTask<AgentType> RegisterAgentAsync(Agent agent, int index, AgentType nextAgent) =>
55-
runtime.RegisterAgentFactoryAsync(
55+
runtime.RegisterOrchestrationAgentAsync(
5656
this.GetAgentType(context.Topic, index),
5757
(agentId, runtime) =>
5858
{
5959
SequentialActor actor = new(agentId, runtime, context, agent, nextAgent, context.LoggerFactory.CreateLogger<SequentialActor>());
60+
6061
#if !NETCOREAPP
6162
return actor.AsValueTask<IHostableAgent>();
6263
#else

0 commit comments

Comments
 (0)