Skip to content

Commit

Permalink
.Net - Agents Fix: Chat Concurrency (Step #ANY) (#5831)
Browse files Browse the repository at this point in the history
### Motivation and Context

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

Minimize locking / state management for `AgentChat` and
`BroadcastQueue`.

### Description

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

On re-evaluation w/ Sergey, we wanted to consolidate `BroadcastQueue`
state and determine if this had an impact on locking semantics. While
this did show some improvement, I traced the concurrency contract back
into `AgentChat` and was able to establish a flow that eliminated the
need for `BroadcastQueue._stateLock`. There exists zero contention for
`BroadcastQueue._queues.`

I've ran ran all tests, including my local concurrency performance
harness that creates 3 agents with 3 separate channels and proceeds with
over 100 turns while validating turn order. For this test, I ran the
harness using Open AI Assistant Agents for maximum latency / queueing.

Test execution quantitatively demonstrates lighter locking patterns.

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
crickman committed Apr 22, 2024
1 parent ac9ed03 commit e637568
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 112 deletions.
195 changes: 144 additions & 51 deletions dotnet/src/Agents/Abstractions/AgentChat.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
Expand All @@ -14,14 +13,24 @@ namespace Microsoft.SemanticKernel.Agents;
/// <summary>
/// Point of interaction for one or more agents.
/// </summary>
/// <remarks>
/// Any <see cref="AgentChat" /> instance does not support concurrent invocation and
/// will throw exception if concurrent activity is attempted for any public method.
/// </remarks>
public abstract class AgentChat
{
private readonly BroadcastQueue _broadcastQueue;
private readonly Dictionary<string, AgentChannel> _agentChannels;
private readonly Dictionary<Agent, string> _channelMap;
private readonly Dictionary<string, AgentChannel> _agentChannels; // Map channel hash to channel: one entry per channel.
private readonly Dictionary<Agent, string> _channelMap; // Map agent to its channel-hash: one entry per agent.

private int _isActive;

/// <summary>
/// Indicates if a chat operation is active. Activity is defined as
/// any the execution of any public method.
/// </summary>
public bool IsActive => Interlocked.CompareExchange(ref this._isActive, 1, 1) > 0;

/// <summary>
/// Exposes the internal history to subclasses.
/// </summary>
Expand All @@ -34,47 +43,88 @@ public abstract class AgentChat
/// <param name="agent">An optional agent, if requesting an agent history.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>The message history</returns>
public IAsyncEnumerable<ChatMessageContent> GetChatMessagesAsync(Agent? agent = null, CancellationToken cancellationToken = default)
/// <remarks>
/// Any <see cref="AgentChat" /> instance does not support concurrent invocation and
/// will throw exception if concurrent activity is attempted.
/// </remarks>
public async IAsyncEnumerable<ChatMessageContent> GetChatMessagesAsync(
Agent? agent = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (agent == null)
this.SetActivityOrThrow(); // Disallow concurrent access to chat history

try
{
return this.History.ToDescendingAsync();
}
IAsyncEnumerable<ChatMessageContent>? messages = null;

var channelKey = this.GetAgentHash(agent);
if (!this._agentChannels.TryGetValue(channelKey, out var channel))
if (agent == null)
{
// Provide primary history
messages = this.History.ToDescendingAsync();
}
else // else provide channel specific history
{
// Retrieve the requested channel, if exists, and block until channel is synchronized.
string channelKey = this.GetAgentHash(agent);
AgentChannel? channel = await this.SynchronizeChannelAsync(channelKey, cancellationToken).ConfigureAwait(false);
if (channel != null)
{
messages = channel.GetHistoryAsync(cancellationToken);
}
}

if (messages != null)
{
await foreach (ChatMessageContent message in messages.ConfigureAwait(false))
{
yield return message;
}
}
}
finally
{
return Array.Empty<ChatMessageContent>().ToAsyncEnumerable();
this.ClearActivitySignal(); // Signal activity hash completed
}

return channel.GetHistoryAsync(cancellationToken);
}

/// <summary>
/// Append messages to the conversation.
/// Append a message to the conversation. Adding a message while an agent
/// is active is not allowed.
/// </summary>
/// <param name="message">Set of non-system messages with which to seed the conversation.</param>
/// <param name="message">A non-system message with which to append to the conversation.</param>
/// <remarks>
/// Adding a message to the conversation requires any active <see cref="AgentChannel"/> remains
/// synchronized, so the message is broadcast to all channels.
/// </remarks>
/// <throws>KernelException if a system message is present, without taking any other action</throws>
/// <remarks>
/// Any <see cref="AgentChat" /> instance does not support concurrent invocation and
/// will throw exception if concurrent activity is attempted.
/// </remarks>
public void AddChatMessage(ChatMessageContent message)
{
this.AddChatMessages([message]);
}

/// <summary>
/// Append messages to the conversation.
/// Append messages to the conversation. Adding messages while an agent
/// is active is not allowed.
/// </summary>
/// <param name="messages">Set of non-system messages with which to seed the conversation.</param>
/// <param name="messages">Set of non-system messages with which to append to the conversation.</param>
/// <remarks>
/// Adding messages to the conversation requires any active <see cref="AgentChannel"/> remains
/// synchronized, so the messages are broadcast to all channels.
/// </remarks>
/// <throws>KernelException if a system message is present, without taking any other action</throws>
/// <throws>KernelException chat has current activity.</throws>
/// <remarks>
/// Any <see cref="AgentChat" /> instance does not support concurrent invocation and
/// will throw exception if concurrent activity is attempted.
/// </remarks>
public void AddChatMessages(IReadOnlyList<ChatMessageContent> messages)
{
this.SetActivityOrThrow(); // Disallow concurrent access to chat history

for (int index = 0; index < messages.Count; ++index)
{
if (messages[index].Role == AuthorRole.System)
Expand All @@ -83,12 +133,20 @@ public void AddChatMessages(IReadOnlyList<ChatMessageContent> messages)
}
}

// Append to chat history
this.History.AddRange(messages);
try
{
// Append to chat history
this.History.AddRange(messages);

// Broadcast message to other channels (in parallel)
var channelRefs = this._agentChannels.Select(kvp => new ChannelReference(kvp.Value, kvp.Key));
this._broadcastQueue.Enqueue(channelRefs, messages);
// Broadcast message to other channels (in parallel)
// Note: Able to queue messages without synchronizing channels.
var channelRefs = this._agentChannels.Select(kvp => new ChannelReference(kvp.Value, kvp.Key));
this._broadcastQueue.Enqueue(channelRefs, messages);
}
finally
{
this.ClearActivitySignal(); // Signal activity hash completed
}
}

/// <summary>
Expand All @@ -97,21 +155,21 @@ public void AddChatMessages(IReadOnlyList<ChatMessageContent> messages)
/// <param name="agent">The agent actively interacting with the chat.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>Asynchronous enumeration of messages.</returns>
/// <remarks>
/// Any <see cref="AgentChat" /> instance does not support concurrent invocation and
/// will throw exception if concurrent activity is attempted.
/// </remarks>
protected async IAsyncEnumerable<ChatMessageContent> InvokeAgentAsync(
Agent agent,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Verify only a single operation is active
int wasActive = Interlocked.CompareExchange(ref this._isActive, 1, 0);
if (wasActive > 0)
{
throw new KernelException("Unable to proceed while another agent is active.");
}
this.SetActivityOrThrow(); // Disallow concurrent access to chat history

try
{
// Manifest the required channel. Will throw if channel not in sync.
var channel = await this.GetChannelAsync(agent, cancellationToken).ConfigureAwait(false);
// Get or create the required channel and block until channel is synchronized.
// Will throw exception when propagating a processing failure.
AgentChannel channel = await GetOrCreateChannelAsync().ConfigureAwait(false);

// Invoke agent & process response
List<ChatMessageContent> messages = [];
Expand All @@ -126,6 +184,7 @@ protected async IAsyncEnumerable<ChatMessageContent> InvokeAgentAsync(
}

// Broadcast message to other channels (in parallel)
// Note: Able to queue messages without synchronizing channels.
var channelRefs =
this._agentChannels
.Where(kvp => kvp.Value != channel)
Expand All @@ -134,47 +193,81 @@ protected async IAsyncEnumerable<ChatMessageContent> InvokeAgentAsync(
}
finally
{
Interlocked.Exchange(ref this._isActive, 0);
this.ClearActivitySignal(); // Signal activity hash completed
}
}

private async Task<AgentChannel> GetChannelAsync(Agent agent, CancellationToken cancellationToken)
{
var channelKey = this.GetAgentHash(agent);

if (this._agentChannels.TryGetValue(channelKey, out var channel))
{
await this._broadcastQueue.EnsureSynchronizedAsync(new ChannelReference(channel, channelKey)).ConfigureAwait(false);
}
else
async Task<AgentChannel> GetOrCreateChannelAsync()
{
channel = await agent.CreateChannelAsync(cancellationToken).ConfigureAwait(false);

if (this.History.Count > 0)
string channelKey = this.GetAgentHash(agent);
AgentChannel channel = await this.SynchronizeChannelAsync(channelKey, cancellationToken).ConfigureAwait(false);
if (channel == null)
{
await channel.ReceiveAsync(this.History, cancellationToken).ConfigureAwait(false);
channel = await agent.CreateChannelAsync(cancellationToken).ConfigureAwait(false);
this._agentChannels.Add(channelKey, channel);

if (this.History.Count > 0)
{
await channel.ReceiveAsync(this.History, cancellationToken).ConfigureAwait(false);
}
}

this._agentChannels.Add(channelKey, channel);
return channel;
}
}

return channel;
/// <summary>
/// Clear activity signal to indicate that activity has ceased.
/// </summary>
private void ClearActivitySignal()
{
// Note: Interlocked is the absolute lightest synchronization mechanism available in dotnet.
Interlocked.Exchange(ref this._isActive, 0);
}

private string GetAgentHash(Agent agent)
/// <summary>
/// Test to ensure chat is not concurrently active and throw exception if it is.
/// If not, activity is signaled.
/// </summary>
/// <remarks>
/// Rather than allowing concurrent invocation to result in undefined behavior / failure,
/// it is preferred to fail-fast in order to avoid side-effects / state mutation.
/// The activity signal is used to manage ability and visibility for taking actions based
/// on conversation history.
/// </remarks>
private void SetActivityOrThrow()
{
if (this._channelMap.TryGetValue(agent, out var hash))
// Note: Interlocked is the absolute lightest synchronization mechanism available in dotnet.
int wasActive = Interlocked.CompareExchange(ref this._isActive, 1, 0);
if (wasActive > 0)
{
return hash;
throw new KernelException("Unable to proceed while another agent is active.");
}
}

hash = KeyEncoder.GenerateHash(agent.GetChannelKeys());
private string GetAgentHash(Agent agent)
{
if (!this._channelMap.TryGetValue(agent, out var hash))
{
hash = KeyEncoder.GenerateHash(agent.GetChannelKeys());

this._channelMap.Add(agent, hash);
// Ok if already present: same agent always produces the same hash
this._channelMap.Add(agent, hash);
}

return hash;
}

private async Task<AgentChannel> SynchronizeChannelAsync(string channelKey, CancellationToken cancellationToken)
{
if (this._agentChannels.TryGetValue(channelKey, out AgentChannel channel))
{
await this._broadcastQueue.EnsureSynchronizedAsync(
new ChannelReference(channel, channelKey), cancellationToken).ConfigureAwait(false);
}

return channel;
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentChat"/> class.
/// </summary>
Expand Down
Loading

0 comments on commit e637568

Please sign in to comment.