diff --git a/extensions/AzureQueues/AzureQueuesConfig.cs b/extensions/AzureQueues/AzureQueuesConfig.cs index d1f90a097..fe4e43cd0 100644 --- a/extensions/AzureQueues/AzureQueuesConfig.cs +++ b/extensions/AzureQueues/AzureQueuesConfig.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Text.Json.Serialization; +using System.Text.RegularExpressions; using Azure; using Azure.Core; using Azure.Storage; @@ -17,6 +19,8 @@ public class AzureQueuesConfig private AzureSasCredential? _azureSasCredential; private TokenCredential? _tokenCredential; + private static readonly Regex s_validPoisonQueueSuffixRegex = new(@"^[a-z0-9-]{1}(?!.*--)[a-z0-9-]{0,28}[a-z0-9]$"); + [JsonConverter(typeof(JsonStringEnumConverter))] public enum AuthTypes { @@ -35,6 +39,38 @@ public enum AuthTypes public string AccountKey { get; set; } = ""; public string EndpointSuffix { get; set; } = "core.windows.net"; + /// + /// How often to check if there are new messages. + /// + public int PollDelayMsecs { get; set; } = 100; + + /// + /// How many messages to fetch at a time. + /// + public int FetchBatchSize { get; set; } = 3; + + /// + /// How long to lock messages once fetched. Azure Queue default is 30 secs. + /// + public int FetchLockSeconds { get; set; } = 300; + + /// + /// How many times to dequeue a messages and process before moving it to a poison queue. + /// + public int MaxRetriesBeforePoisonQueue { get; set; } = 20; + + /// + /// Suffix used for the poison queues. + /// + private string? _poisonQueueSuffix = "-poison"; + + public string PoisonQueueSuffix + { + get => this._poisonQueueSuffix!; + // Queue names must be lowercase. + set => this._poisonQueueSuffix = value?.ToLowerInvariant() ?? string.Empty; + } + public void SetCredential(StorageSharedKeyCredential credential) { this.Auth = AuthTypes.ManualStorageSharedKeyCredential; @@ -70,4 +106,27 @@ public TokenCredential GetTokenCredential() return this._tokenCredential ?? throw new ConfigurationException("TokenCredential not defined"); } + + /// + /// Verify that the current state is valid. + /// + public void Validate() + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.PollDelayMsecs); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.FetchBatchSize); + ArgumentOutOfRangeException.ThrowIfLessThan(this.FetchLockSeconds, 30); + ArgumentOutOfRangeException.ThrowIfNegative(this.MaxRetriesBeforePoisonQueue); + ArgumentException.ThrowIfNullOrWhiteSpace(this.PoisonQueueSuffix); + + // Queue names must follow the rules described at + // https://learn.microsoft.com/rest/api/storageservices/naming-queues-and-metadata#queue-names. + // In this case, we need to validate only the suffix part, so rules are slightly different + // (for example, as it is a suffix, it can safely start with a dash (-) character). + // Queue names can be up to 63 characters long, so for the suffix we define a maximum length + // of 30, so there is room for the other name part. + if (!s_validPoisonQueueSuffixRegex.IsMatch(this.PoisonQueueSuffix)) + { + throw new ArgumentException($"Invalid {nameof(this.PoisonQueueSuffix)} format.", nameof(this.PoisonQueueSuffix)); + } + } } diff --git a/extensions/AzureQueues/AzureQueuesPipeline.cs b/extensions/AzureQueues/AzureQueuesPipeline.cs index 743779ea5..ef4f80680 100644 --- a/extensions/AzureQueues/AzureQueuesPipeline.cs +++ b/extensions/AzureQueues/AzureQueuesPipeline.cs @@ -35,24 +35,12 @@ private sealed class MessageEventArgs : EventArgs /// private event AsyncMessageHandler? Received; - // How often to check if there are new messages - private const int PollDelayMsecs = 100; - - // How many messages to fetch at a time - private const int FetchBatchSize = 3; - - // How long to lock messages once fetched. Azure Queue default is 30 secs. - private const int FetchLockSeconds = 300; - - // How many times to dequeue a messages and process before moving it to a poison queue - private const int MaxRetryBeforePoisonQueue = 20; - - // Suffix used for the poison queues - private const string PoisonQueueSuffix = "-poison"; - // Queue client builder, requiring the queue name in input private readonly Func _clientBuilder; + // Queue confirguration + private readonly AzureQueuesConfig _config; + // Queue client, once connected private QueueClient? _queue; @@ -77,6 +65,9 @@ public AzureQueuesPipeline( AzureQueuesConfig config, ILogger? log = null) { + this._config = config; + this._config.Validate(); + this._log = log ?? DefaultLogger.Instance; switch (config.Auth) @@ -161,14 +152,14 @@ public async Task ConnectToQueueAsync(string queueName, QueueOptions opt Response? result = await this._queue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); this._log.LogTrace("Queue ready: status code {0}", result?.Status); - this._poisonQueue = this._clientBuilder(this._queueName + PoisonQueueSuffix); + this._poisonQueue = this._clientBuilder(this._queueName + this._config.PoisonQueueSuffix); result = await this._poisonQueue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); this._log.LogTrace("Poison queue ready: status code {0}", result?.Status); if (options.DequeueEnabled) { - this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, PollDelayMsecs); - this._dispatchTimer = new Timer(PollDelayMsecs); // milliseconds + this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, this._config.PollDelayMsecs); + this._dispatchTimer = new Timer(this._config.PollDelayMsecs); // milliseconds this._dispatchTimer.Elapsed += this.DispatchMessages; this._dispatchTimer.Start(); } @@ -201,7 +192,7 @@ public void OnDequeue(Func> processMessageAction) try { - if (message.DequeueCount <= MaxRetryBeforePoisonQueue) + if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue) { bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false); if (success) @@ -271,7 +262,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev) try { // Fetch and Hide N messages - Response receiveMessages = this._queue.ReceiveMessages(FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(FetchLockSeconds)); + Response receiveMessages = this._queue.ReceiveMessages(this._config.FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(this._config.FetchLockSeconds)); if (receiveMessages.HasValue && receiveMessages.Value.Length > 0) { messages = receiveMessages.Value; @@ -336,10 +327,10 @@ private async Task MoveMessageToPoisonQueueAsync(QueueMessage message, Cancellat var poisonMsg = new { - MessageText = message.MessageText, + message.MessageText, Id = message.MessageId, - InsertedOn = message.InsertedOn, - DequeueCount = message.DequeueCount, + message.InsertedOn, + message.DequeueCount, }; var neverExpire = TimeSpan.FromSeconds(-1); diff --git a/extensions/AzureQueues/DependencyInjection.cs b/extensions/AzureQueues/DependencyInjection.cs index 4c9085a74..9dc70ec96 100644 --- a/extensions/AzureQueues/DependencyInjection.cs +++ b/extensions/AzureQueues/DependencyInjection.cs @@ -13,6 +13,8 @@ public static partial class KernelMemoryBuilderExtensions { public static IKernelMemoryBuilder WithAzureQueuesOrchestration(this IKernelMemoryBuilder builder, AzureQueuesConfig config) { + config.Validate(); + builder.Services.AddAzureQueuesOrchestration(config); return builder; } @@ -22,6 +24,8 @@ public static partial class DependencyInjection { public static IServiceCollection AddAzureQueuesOrchestration(this IServiceCollection services, AzureQueuesConfig config) { + config.Validate(); + IQueue QueueFactory(IServiceProvider serviceProvider) { return serviceProvider.GetService() diff --git a/service/Service/appsettings.json b/service/Service/appsettings.json index 151cabd5b..f812f3f21 100644 --- a/service/Service/appsettings.json +++ b/service/Service/appsettings.json @@ -276,7 +276,17 @@ // Note: you can use an env var 'KernelMemory__Services__AzureQueue__ConnectionString' to set this "ConnectionString": "", // Setting used only for country clouds - "EndpointSuffix": "core.windows.net" + "EndpointSuffix": "core.windows.net", + // How often to check if there are new messages + "PollDelayMsecs": 100, + // How many messages to fetch at a time + "FetchBatchSize": 3, + // How long to lock messages once fetched. Azure Queue default is 30 secs + "FetchLockSeconds": 300, + // How many times to dequeue a messages and process before moving it to a poison queue + "MaxRetriesBeforePoisonQueue": 20, + // Suffix used for the poison queues. + "PoisonQueueSuffix": "-poison" }, "Elasticsearch": { // SHA-256 fingerprint. When running the docker image this is printed after starting the server