Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions extensions/AzureQueues/AzureQueuesConfig.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;
using Azure;
using Azure.Core;
using Azure.Storage;
Expand All @@ -17,6 +19,8 @@ public class AzureQueuesConfig
private AzureSasCredential? _azureSasCredential;
private TokenCredential? _tokenCredential;

private static readonly Regex s_validPoisonQueueSuffixRegex = new(@"^[a-z0-9-]{1}(?!.*--)[a-z0-9-]{0,28}[a-z0-9]$");

[JsonConverter(typeof(JsonStringEnumConverter))]
public enum AuthTypes
{
Expand All @@ -35,6 +39,38 @@ public enum AuthTypes
public string AccountKey { get; set; } = "";
public string EndpointSuffix { get; set; } = "core.windows.net";

/// <summary>
/// How often to check if there are new messages.
/// </summary>
public int PollDelayMsecs { get; set; } = 100;

/// <summary>
/// How many messages to fetch at a time.
/// </summary>
public int FetchBatchSize { get; set; } = 3;

/// <summary>
/// How long to lock messages once fetched. Azure Queue default is 30 secs.
/// </summary>
public int FetchLockSeconds { get; set; } = 300;

/// <summary>
/// How many times to dequeue a messages and process before moving it to a poison queue.
/// </summary>
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;

/// <summary>
/// Suffix used for the poison queues.
/// </summary>
private string? _poisonQueueSuffix = "-poison";

public string PoisonQueueSuffix
{
get => this._poisonQueueSuffix!;
// Queue names must be lowercase.
set => this._poisonQueueSuffix = value?.ToLowerInvariant() ?? string.Empty;
}

Comment thread
marcominerva marked this conversation as resolved.
public void SetCredential(StorageSharedKeyCredential credential)
{
this.Auth = AuthTypes.ManualStorageSharedKeyCredential;
Expand Down Expand Up @@ -70,4 +106,27 @@ public TokenCredential GetTokenCredential()
return this._tokenCredential
?? throw new ConfigurationException("TokenCredential not defined");
}

/// <summary>
/// Verify that the current state is valid.
/// </summary>
public void Validate()
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.PollDelayMsecs);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.FetchBatchSize);
ArgumentOutOfRangeException.ThrowIfLessThan(this.FetchLockSeconds, 30);
ArgumentOutOfRangeException.ThrowIfNegative(this.MaxRetriesBeforePoisonQueue);
ArgumentException.ThrowIfNullOrWhiteSpace(this.PoisonQueueSuffix);

// Queue names must follow the rules described at
// https://learn.microsoft.com/rest/api/storageservices/naming-queues-and-metadata#queue-names.
// In this case, we need to validate only the suffix part, so rules are slightly different
// (for example, as it is a suffix, it can safely start with a dash (-) character).
// Queue names can be up to 63 characters long, so for the suffix we define a maximum length
// of 30, so there is room for the other name part.
if (!s_validPoisonQueueSuffixRegex.IsMatch(this.PoisonQueueSuffix))
{
throw new ArgumentException($"Invalid {nameof(this.PoisonQueueSuffix)} format.", nameof(this.PoisonQueueSuffix));
}
}
}
37 changes: 14 additions & 23 deletions extensions/AzureQueues/AzureQueuesPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,12 @@ private sealed class MessageEventArgs : EventArgs
/// </summary>
private event AsyncMessageHandler<MessageEventArgs>? Received;

// How often to check if there are new messages
private const int PollDelayMsecs = 100;

// How many messages to fetch at a time
private const int FetchBatchSize = 3;

// How long to lock messages once fetched. Azure Queue default is 30 secs.
private const int FetchLockSeconds = 300;

// How many times to dequeue a messages and process before moving it to a poison queue
private const int MaxRetryBeforePoisonQueue = 20;

// Suffix used for the poison queues
private const string PoisonQueueSuffix = "-poison";

// Queue client builder, requiring the queue name in input
private readonly Func<string, QueueClient> _clientBuilder;

// Queue confirguration
private readonly AzureQueuesConfig _config;

// Queue client, once connected
private QueueClient? _queue;

Expand All @@ -77,6 +65,9 @@ public AzureQueuesPipeline(
AzureQueuesConfig config,
ILogger<AzureQueuesPipeline>? log = null)
{
this._config = config;
this._config.Validate();

this._log = log ?? DefaultLogger<AzureQueuesPipeline>.Instance;

switch (config.Auth)
Expand Down Expand Up @@ -161,14 +152,14 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt
Response? result = await this._queue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
this._log.LogTrace("Queue ready: status code {0}", result?.Status);

this._poisonQueue = this._clientBuilder(this._queueName + PoisonQueueSuffix);
this._poisonQueue = this._clientBuilder(this._queueName + this._config.PoisonQueueSuffix);
result = await this._poisonQueue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
this._log.LogTrace("Poison queue ready: status code {0}", result?.Status);

if (options.DequeueEnabled)
{
this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, PollDelayMsecs);
this._dispatchTimer = new Timer(PollDelayMsecs); // milliseconds
this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, this._config.PollDelayMsecs);
this._dispatchTimer = new Timer(this._config.PollDelayMsecs); // milliseconds
this._dispatchTimer.Elapsed += this.DispatchMessages;
this._dispatchTimer.Start();
}
Expand Down Expand Up @@ -201,7 +192,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)

try
{
if (message.DequeueCount <= MaxRetryBeforePoisonQueue)
if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue)
{
bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
if (success)
Expand Down Expand Up @@ -271,7 +262,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev)
try
{
// Fetch and Hide N messages
Response<QueueMessage[]> receiveMessages = this._queue.ReceiveMessages(FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(FetchLockSeconds));
Response<QueueMessage[]> receiveMessages = this._queue.ReceiveMessages(this._config.FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(this._config.FetchLockSeconds));
if (receiveMessages.HasValue && receiveMessages.Value.Length > 0)
{
messages = receiveMessages.Value;
Expand Down Expand Up @@ -336,10 +327,10 @@ private async Task MoveMessageToPoisonQueueAsync(QueueMessage message, Cancellat

var poisonMsg = new
{
MessageText = message.MessageText,
message.MessageText,
Id = message.MessageId,
InsertedOn = message.InsertedOn,
DequeueCount = message.DequeueCount,
message.InsertedOn,
message.DequeueCount,
};

var neverExpire = TimeSpan.FromSeconds(-1);
Expand Down
4 changes: 4 additions & 0 deletions extensions/AzureQueues/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public static partial class KernelMemoryBuilderExtensions
{
public static IKernelMemoryBuilder WithAzureQueuesOrchestration(this IKernelMemoryBuilder builder, AzureQueuesConfig config)
{
config.Validate();

builder.Services.AddAzureQueuesOrchestration(config);
return builder;
}
Expand All @@ -22,6 +24,8 @@ public static partial class DependencyInjection
{
public static IServiceCollection AddAzureQueuesOrchestration(this IServiceCollection services, AzureQueuesConfig config)
{
config.Validate();

IQueue QueueFactory(IServiceProvider serviceProvider)
{
return serviceProvider.GetService<AzureQueuesPipeline>()
Expand Down
12 changes: 11 additions & 1 deletion service/Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,17 @@
// Note: you can use an env var 'KernelMemory__Services__AzureQueue__ConnectionString' to set this
"ConnectionString": "",
// Setting used only for country clouds
"EndpointSuffix": "core.windows.net"
"EndpointSuffix": "core.windows.net",
// How often to check if there are new messages
"PollDelayMsecs": 100,
// How many messages to fetch at a time
"FetchBatchSize": 3,
// How long to lock messages once fetched. Azure Queue default is 30 secs
"FetchLockSeconds": 300,
// How many times to dequeue a messages and process before moving it to a poison queue
"MaxRetriesBeforePoisonQueue": 20,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
},
"Elasticsearch": {
// SHA-256 fingerprint. When running the docker image this is printed after starting the server
Expand Down