Skip to content

Commit

Permalink
Merge pull request #15 from lenndewolten/feature/refactor-DI-and-opti…
Browse files Browse the repository at this point in the history
…ons-setup

Feature/refactor di and options setup
  • Loading branch information
lenndewolten committed May 3, 2023
2 parents f40e730 + 8d908cf commit ccbe67d
Show file tree
Hide file tree
Showing 47 changed files with 1,284 additions and 1,206 deletions.
100 changes: 100 additions & 0 deletions lib/Dequeueable.AzureQueueStorage/Configurations/HostBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using Dequeueable.AzureQueueStorage.Factories;
using Dequeueable.AzureQueueStorage.Services.Hosts;
using Dequeueable.AzureQueueStorage.Services.Queues;
using Dequeueable.AzureQueueStorage.Services.Singleton;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;

namespace Dequeueable.AzureQueueStorage.Configurations
{
internal class HostBuilder : IDequeueableHostBuilder
{
private readonly IServiceCollection _services;

public HostBuilder(IServiceCollection services)
{
_services = services;
}

public IDequeueableHostBuilder RunAsJob(Action<HostOptions>? options = null)
{
_services.AddOptions<HostOptions>().BindConfiguration(HostOptions.Dequeueable)
.ValidateDataAnnotations()
.ValidateOnStart();

if (options is not null)
{
_services.Configure(options);
}

_services.AddHostedService<JobHost>();
_services.AddSingleton<IHostExecutor, JobExecutor>();

_services.TryAddSingleton<IHostOptions>(provider =>
{
var opt = provider.GetRequiredService<IOptions<HostOptions>>();
return opt.Value;
});

return this;
}

public IDequeueableHostBuilder RunAsListener(Action<ListenerHostOptions>? options = null)
{
_services.AddOptions<ListenerHostOptions>().BindConfiguration(HostOptions.Dequeueable)
.Validate(ListenerHostOptions.ValidatePollingInterval, $"The '{nameof(ListenerHostOptions.MinimumPollingIntervalInMilliseconds)}' must not be greater than the '{nameof(ListenerHostOptions.MaximumPollingIntervalInMilliseconds)}'.")
.Validate(ListenerHostOptions.ValidateNewBatchThreshold, $"The '{nameof(ListenerHostOptions.NewBatchThreshold)}' must not be greater than the '{nameof(ListenerHostOptions.BatchSize)}'.")
.ValidateDataAnnotations()
.ValidateOnStart();

if (options is not null)
{
_services.Configure(options);
}

_services.AddHostedService<QueueListenerHost>();
_services.AddSingleton<IHostExecutor, QueueListenerExecutor>();

_services.TryAddSingleton<IHostOptions>(provider =>
{
var opt = provider.GetRequiredService<IOptions<ListenerHostOptions>>();
return opt.Value;
});

return this;
}

public IDequeueableHostBuilder AsSingleton(Action<SingletonHostOptions>? options = null)
{
_services.AddOptions<SingletonHostOptions>().BindConfiguration(SingletonHostOptions.Name)
.Validate(SingletonHostOptions.ValidatePollingInterval, $"The '{nameof(SingletonHostOptions.MinimumPollingIntervalInSeconds)}' must not be greater than the '{nameof(SingletonHostOptions.MaximumPollingIntervalInSeconds)}'.")
.ValidateDataAnnotations()
.ValidateOnStart();

if (options is not null)
{
_services.Configure(options);
}

_services.AddTransient<IDistributedLockManager, DistributedLockManager>();
_services.AddTransient<IDistributedLockManagerFactory, DistributedLockManagerFactory>();
_services.AddTransient<IBlobClientProvider, BlobClientProvider>();
_services.AddTransient<ISingletonLockManager, SingletonLockManager>();
_services.AddTransient<IBlobClientFactory, BlobClientFactory>();
_services.AddTransient<QueueMessageExecutor>();
_services.AddTransient<IQueueMessageExecutor>(provider =>
{
var singletonManager = provider.GetRequiredService<ISingletonLockManager>();
var executor = provider.GetRequiredService<QueueMessageExecutor>();
var attribute = provider.GetRequiredService<IOptions<SingletonHostOptions>>();
return new SingletonQueueMessageExecutor(singletonManager, executor, attribute);
});

_services.PostConfigure<ListenerHostOptions>(storageAccountOptions => storageAccountOptions.NewBatchThreshold = 0);

return this;
}
}
}
76 changes: 14 additions & 62 deletions lib/Dequeueable.AzureQueueStorage/Configurations/HostOptions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Azure.Core;
using Azure.Storage.Queues;
using System.ComponentModel.DataAnnotations;

namespace Dequeueable.AzureQueueStorage.Configurations
{
Expand All @@ -8,49 +9,34 @@ namespace Dequeueable.AzureQueueStorage.Configurations
/// </summary>
public class HostOptions : IHostOptions
{
/// <summary>
/// Constant string used to bind the appsettings.*.json
/// </summary>
public static string Dequeueable => nameof(Dequeueable);
private long _visibilityTimeoutInSeconds = 300;
private long _maxDequeueCount = 5;
private int _batchSize = 16;
private string _poisonQueueSuffix = "poison";
internal static string Dequeueable => nameof(Dequeueable);

/// <summary>
/// The connection string used to authenticate to the queue.
/// </summary>
public string? ConnectionString { get; set; }

/// <summary>
/// The storage account name, used for identity flow.
/// </summary>
public string? AccountName { get; set; }

/// <summary>
/// The queue used to retrieve the messages.
/// </summary>
[Required(AllowEmptyStrings = true, ErrorMessage = "Value for {0} cannot be null.")]
public string QueueName { get; set; } = string.Empty;

/// <summary>
/// The poisen queue used to post queue message that reach the <see cref="MaxDequeueCount">MaxDequeueCount</see>.
/// </summary>
public string PoisonQueueName => $"{QueueName}-{_poisonQueueSuffix}";
public string PoisonQueueName => $"{QueueName}-{PoisonQueueSuffix}";

/// <summary>
/// Suffix that will be used after the <see cref="QueueName">QueueName</see>, eg queuename-suffix.
/// </summary>
public string PoisonQueueSuffix
{
get { return _poisonQueueSuffix; }
set
{
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"'{nameof(PoisonQueueSuffix)}' cannot be null or whitespace.", nameof(PoisonQueueSuffix));
}

_poisonQueueSuffix = value;
}
}
[Required(AllowEmptyStrings = false, ErrorMessage = "{0} cannot be null or whitespace.")]
public string PoisonQueueSuffix { get; set; } = "poison";

/// <summary>
/// The uri format to the queue storage. Used for identity flow. Use ` {accountName}` and `{queueName}` for variable substitution.
Expand All @@ -70,53 +56,19 @@ public string PoisonQueueSuffix
/// <summary>
/// The maximum number of messages processed in parallel.
/// </summary>
public int BatchSize
{
get => _batchSize;
set
{
if (value < 1)
{
throw new ArgumentOutOfRangeException(nameof(BatchSize), $"'{nameof(BatchSize)}' must not be negative or zero.");
}

_batchSize = value;
}
}
[Range(1, 100, ErrorMessage = "Value for {0} must be between {1} and {2}.")]
public int BatchSize { get; set; } = 16;

/// <summary>
/// Max dequeue count before moving to the poison queue.
/// </summary>
public long MaxDequeueCount
{
get => _maxDequeueCount;
set
{
if (value < 0 || value > 20)
{
throw new ArgumentOutOfRangeException(nameof(MaxDequeueCount),
$"{nameof(MaxDequeueCount)} must be between 0 and 20.");
}

_maxDequeueCount = value;
}
}
[Range(0, 20, ErrorMessage = "Value for {0} must be between {1} and {2}.")]
public long MaxDequeueCount { get; set; } = 5;

/// <summary>
/// The timeout after the queue message is visible again for other services.
/// </summary>
public long VisibilityTimeoutInSeconds
{
get => _visibilityTimeoutInSeconds;
set
{
if (value < 1)
{
throw new ArgumentOutOfRangeException(nameof(VisibilityTimeoutInSeconds), $"'{nameof(VisibilityTimeoutInSeconds)}' must not be negative or zero.");
}

_visibilityTimeoutInSeconds = value;
}
}
[Range(1, long.MaxValue, ErrorMessage = "Value for {0} must not be negative or zero.")]
public long VisibilityTimeoutInSeconds { get; set; } = 300;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace Dequeueable.AzureQueueStorage.Configurations
{
/// <summary>
/// Interface to builds and setup the dequeueable host
/// </summary>
public interface IDequeueableHostBuilder
{
/// <summary>
/// This makes sure only a single instance of the function is executed at any given time (even across host instances).
/// A blob lease is used behind the scenes to implement the lock./>
/// </summary>
/// <param name="options">Action to configure the <see cref="SingletonHostOptions"/></param>
/// <returns><see cref="IDequeueableHostBuilder"/></returns>
IDequeueableHostBuilder AsSingleton(Action<SingletonHostOptions>? options = null);
/// <summary>
/// The application will run as a job, from start to finish, and will automatically shutdown when the messages are executed.
/// </summary>
/// <param name="options">Action to configure the <see cref="HostOptions"/></param>
/// <returns><see cref="IDequeueableHostBuilder"/></returns>
IDequeueableHostBuilder RunAsJob(Action<HostOptions>? options = null);
/// <summary>
/// The application will run as a listener, the queue will periodically be polled for new message.
/// </summary>
/// <param name="options">Action to configure the <see cref="ListenerHostOptions"/></param>
/// <returns><see cref="IDequeueableHostBuilder"/></returns>
IDequeueableHostBuilder RunAsListener(Action<ListenerHostOptions>? options = null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System.ComponentModel.DataAnnotations;

namespace Dequeueable.AzureQueueStorage.Configurations
{
/// <summary>
/// Host options to configure the settings of the host and it's queue listeners
/// </summary>
public class ListenerHostOptions : HostOptions
{
private int? _newBatchThreshold;

/// <summary>
/// The threshold at which a new batch of messages will be fetched.
/// </summary>
public int NewBatchThreshold
{
get => _newBatchThreshold ?? Convert.ToInt32(Math.Ceiling(BatchSize / (double)2));
set
{
_newBatchThreshold = value;
}
}

/// <summary>
/// The minimum polling interval to check the queue for new messages.
/// </summary>
[Range(1, long.MaxValue, ErrorMessage = "Value for {0} must not be negative.")]
public long MinimumPollingIntervalInMilliseconds { get; set; } = 5;

/// <summary>
/// The maximum polling interval to check the queue for new messages.
/// </summary>
[Range(1, long.MaxValue, ErrorMessage = "Value for {0} must not be negative or zero.")]
public long MaximumPollingIntervalInMilliseconds { get; set; } = 10000;

/// <summary>
/// The delta used to randomize the polling interval.
/// </summary>
public TimeSpan? DeltaBackOff { get; set; }

internal static bool ValidatePollingInterval(ListenerHostOptions options)
{
return options.MinimumPollingIntervalInMilliseconds < options.MaximumPollingIntervalInMilliseconds;
}

internal static bool ValidateNewBatchThreshold(ListenerHostOptions options)
{
return options._newBatchThreshold is null || options.NewBatchThreshold <= options.BatchSize;
}
}
}

This file was deleted.

Loading

0 comments on commit ccbe67d

Please sign in to comment.