Skip to content

Commit

Permalink
Allow to specify ASB specific consumer settings on handlers and reque…
Browse files Browse the repository at this point in the history
…st-response configuration.

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed May 26, 2023
1 parent 5bbf82c commit 606a4a7
Show file tree
Hide file tree
Showing 26 changed files with 381 additions and 445 deletions.
2 changes: 1 addition & 1 deletion src/Host.Transport.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.1.7</Version>
<Version>2.1.8</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public AsyncApiDocument GenerateDocument(TypeInfo[] asyncApiTypes, AsyncApiOptio
/// <returns></returns>
private static string TryGetSubscriptionName(ConsumerSettings consumer) =>
consumer.GetOrDefault<string>("Group")
?? consumer.GetOrDefault<string>("SubscriptionName")
?? consumer.GetOrDefault<string>("Asb_SubscriptionName")
?? consumer.GetOrDefault<string>("Eh_Group");

private static void GenerateChannelsFromConsumers(IDictionary<string, ChannelItem> channels, AsyncApiSchemaResolver schemaResolver, AsyncApiOptions options, JsonSchemaGenerator jsonSchemaGenerator, IServiceProvider serviceProvider, MessageBusSettings messageBusSettings, NamedServer namedServer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public static class AsbAbstractConsumerSettingsExtensions
{
internal static void SetSubscriptionName(this AbstractConsumerSettings consumerSettings, string subscriptionName)
{
if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName));

consumerSettings.Properties[AsbProperties.SubscriptionNameKey] = subscriptionName;
}

internal static string GetSubscriptionName(this AbstractConsumerSettings consumerSettings, bool required = true)
{
if (!consumerSettings.Properties.ContainsKey(AsbProperties.SubscriptionNameKey) && !required)
{
return null;
}
return consumerSettings.Properties[AsbProperties.SubscriptionNameKey] as string;
}

internal static void SetMaxAutoLockRenewalDuration(this AbstractConsumerSettings consumerSettings, TimeSpan duration)
=> consumerSettings.Properties[AsbProperties.MaxAutoLockRenewalDurationKey] = duration;

internal static TimeSpan? GetMaxAutoLockRenewalDuration(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault<TimeSpan?>(AsbProperties.MaxAutoLockRenewalDurationKey);

internal static void SetSubQueue(this AbstractConsumerSettings consumerSettings, SubQueue subQueue)
=> consumerSettings.Properties[AsbProperties.SubQueueKey] = subQueue;

internal static SubQueue? GetSubQueue(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault<SubQueue?>(AsbProperties.SubQueueKey);

internal static void SetPrefetchCount(this AbstractConsumerSettings consumerSettings, int prefetchCount)
=> consumerSettings.Properties[AsbProperties.PrefetchCountKey] = prefetchCount;

internal static int? GetPrefetchCount(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault<int?>(AsbProperties.PrefetchCountKey);

internal static void SetEnableSession(this AbstractConsumerSettings consumerSettings, bool enableSession)
=> consumerSettings.Properties[AsbProperties.EnableSessionKey] = enableSession;

internal static bool GetEnableSession(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault(AsbProperties.EnableSessionKey, false);

internal static void SetSessionIdleTimeout(this AbstractConsumerSettings consumerSettings, TimeSpan sessionIdleTimeout)
=> consumerSettings.Properties[AsbProperties.SessionIdleTimeoutKey] = sessionIdleTimeout;

internal static TimeSpan? GetSessionIdleTimeout(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault<TimeSpan?>(AsbProperties.SessionIdleTimeoutKey);

internal static void SetMaxConcurrentSessions(this AbstractConsumerSettings consumerSettings, int maxConcurrentSessions)
=> consumerSettings.Properties[AsbProperties.MaxConcurrentSessionsKey] = maxConcurrentSessions;

internal static int? GetMaxConcurrentSessions(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault<int?>(AsbProperties.MaxConcurrentSessionsKey);

internal static IDictionary<string, SubscriptionSqlRule> GetRules(this AbstractConsumerSettings consumerSettings, bool createIfNotExists = false)
{
var filterByName = consumerSettings.GetOrDefault<IDictionary<string, SubscriptionSqlRule>>(AsbProperties.RulesKey);
if (filterByName == null && createIfNotExists)
{
filterByName = new Dictionary<string, SubscriptionSqlRule>();
consumerSettings.Properties[AsbProperties.RulesKey] = filterByName;
}
return filterByName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public static class AsbConsumerBuilderExtensions
{
/// <summary>
/// Sets the queue name for this consumer to use.
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="queue"></param>
/// <param name="queueConfig"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder Queue<TConsumerBuilder>(this TConsumerBuilder builder, string queue, Action<TConsumerBuilder> queueConfig = null)
where TConsumerBuilder : AbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (queue is null) throw new ArgumentNullException(nameof(queue));

builder.ConsumerSettings.Path = queue;
builder.ConsumerSettings.PathKind = PathKind.Queue;

queueConfig?.Invoke(builder);

return builder;
}

private static void AssertIsTopicForSubscriptionName(AbstractConsumerSettings settings)
{
if (settings is null) throw new ArgumentNullException(nameof(settings));

if (settings.PathKind == PathKind.Queue)
{
var methodName = $".{nameof(SubscriptionName)}(...)";

var messageType = settings is ConsumerSettings consumerSettings
? consumerSettings.MessageType.FullName
: string.Empty;

throw new ConfigurationMessageBusException($"The subscription name configuration ({methodName}) does not apply to Azure ServiceBus queues (it only applies to topic consumers). Remove the {methodName} configuration for type {messageType} and queue {settings.Path} or change the consumer configuration to consume from topic {settings.Path} instead.");
}
}

/// <summary>
/// Configures the subscription name when consuming form Azure ServiceBus topic.
/// Not applicable when consuming from Azure ServiceBus queue.
/// </summary>
/// <param name="builder"></param>
/// <param name="subscriptionName"></param>
/// <returns></returns>
public static T SubscriptionName<T>(this T builder, string subscriptionName)
where T : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (subscriptionName is null) throw new ArgumentNullException(nameof(subscriptionName));

AssertIsTopicForSubscriptionName(builder.ConsumerSettings);

builder.ConsumerSettings.SetSubscriptionName(subscriptionName);
return builder;
}

/// <summary>
/// Azure Service Bus consumer setting. See underlying client for more details: https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.maxautolockrenewalduration
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="duration"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder MaxAutoLockRenewalDuration<TConsumerBuilder>(this TConsumerBuilder builder, TimeSpan duration)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));

builder.ConsumerSettings.SetMaxAutoLockRenewalDuration(duration);

return builder;
}

/// <summary>
/// Azure Service Bus consumer setting. See underlying client for more details: https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.subqueue
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="subQueue"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder SubQueue<TConsumerBuilder>(this TConsumerBuilder builder, SubQueue subQueue)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));

builder.ConsumerSettings.SetSubQueue(subQueue);

return builder;
}

/// <summary>
/// Azure Service Bus consumer setting. See underlying client for more details: https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.prefetchcount
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="prefetchCount "></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder PrefetchCount<TConsumerBuilder>(this TConsumerBuilder builder, int prefetchCount)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));

builder.ConsumerSettings.SetPrefetchCount(prefetchCount);

return builder;
}

/// <summary>
/// Enables Azue Service Bus session support for this consumer
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder EnableSession<TConsumerBuilder>(this TConsumerBuilder builder, Action<AsbConsumerSessionBuilder> sessionConfiguration = null)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));

builder.ConsumerSettings.SetEnableSession(true);

if (sessionConfiguration != null)
{
sessionConfiguration(new AsbConsumerSessionBuilder(builder.ConsumerSettings));
}

return builder;
}

/// <summary>
/// Adds a named SQL filter to the subscription (Azure Service Bus). Setting relevant only if topology provisioning enabled.
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="ruleName">The name of the filter</param>
/// <param name="filterSql">The SQL expression of the filter</param>
/// <param name="actionSql">The action to be performed on the filter</param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder SubscriptionSqlFilter<TConsumerBuilder>(this TConsumerBuilder builder, string filterSql, string ruleName = "default", string actionSql = null)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));

var filterByName = builder.ConsumerSettings.GetRules(createIfNotExists: true);
filterByName[ruleName] = new SubscriptionSqlRule { Name = ruleName, SqlFilter = filterSql, SqlAction = actionSql };

return builder;
}

/// <summary>
/// <see cref="CreateQueueOptions"/> when the ASB queue does not exist and needs to be created
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <returns></returns>
public static TConsumerBuilder CreateQueueOptions<TConsumerBuilder>(this TConsumerBuilder builder, Action<CreateQueueOptions> action)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (action is null) throw new ArgumentNullException(nameof(action));

builder.ConsumerSettings.SetQueueOptions(action);
return builder;
}

/// <summary>
/// <see cref="CreateTopicOptions"/> when the ASB topic does not exist and needs to be created
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="action"></param>
/// <returns></returns>
public static TConsumerBuilder CreateTopicOptions<TConsumerBuilder>(this TConsumerBuilder builder, Action<CreateTopicOptions> action)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (action is null) throw new ArgumentNullException(nameof(action));

builder.ConsumerSettings.SetTopicOptions(action);
return builder;
}

/// <summary>
/// <see cref="CreateSubscriptionOptions"/> when the ASB subscription does not exist and needs to be created
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="action"></param>
/// <returns></returns>
public static TConsumerBuilder CreateSubscriptionOptions<TConsumerBuilder>(this TConsumerBuilder builder, Action<CreateSubscriptionOptions> action)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (action is null) throw new ArgumentNullException(nameof(action));

builder.ConsumerSettings.SetSubscriptionOptions(action);
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public class AsbConsumerSessionBuilder
{
internal AbstractConsumerSettings ConsumerSettings { get; }

public AsbConsumerSessionBuilder(AbstractConsumerSettings consumerSettings) => ConsumerSettings = consumerSettings;

/// <summary>
/// Sets the Azue Service Bus session idle timeout.
/// </summary>
/// <param name="sessionIdleTimeout"></param>
/// <returns></returns>
public AsbConsumerSessionBuilder SessionIdleTimeout(TimeSpan sessionIdleTimeout)
{
ConsumerSettings.SetSessionIdleTimeout(sessionIdleTimeout);

return this;
}

/// <summary>
/// Sets the Azue Service Bus maximmum concurrent sessions that can be handled by this consumer.
/// </summary>
/// <param name="maxConcurrentSessions"></param>
/// <returns></returns>
public AsbConsumerSessionBuilder MaxConcurrentSessions(int maxConcurrentSessions)
{
ConsumerSettings.SetMaxConcurrentSessions(maxConcurrentSessions);

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

internal static class HasProviderExtensionsExtensions
internal static class AsbHasProviderExtensionsExtensions
{
internal static HasProviderExtensions SetMessageModifier(this HasProviderExtensions producerSettings, Action<object, ServiceBusMessage> messageModifierAction)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public static class ProducerBuilderExtensions
public static class AsbProducerBuilderExtensions
{
public static ProducerBuilder<T> DefaultQueue<T>(this ProducerBuilder<T> producerBuilder, string queue)
{
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (queue is null) throw new ArgumentNullException(nameof(queue));

producerBuilder.DefaultTopic(queue);
producerBuilder.DefaultPath(queue);
producerBuilder.ToQueue();
return producerBuilder;
}
Expand Down
13 changes: 13 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/Config/AsbProperties.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public static class AsbProperties
{
public static readonly string SubscriptionNameKey = "Asb_SubscriptionName";
public static readonly string MaxAutoLockRenewalDurationKey = "Asb_MaxAutoLockRenewalDuration";
public static readonly string SubQueueKey = "Asb_SubQueue";
public static readonly string PrefetchCountKey = "Asb_PrefetchCount";
public static readonly string EnableSessionKey = "Asb_SessionEnabled";
public static readonly string SessionIdleTimeoutKey = "Asb_SessionIdleTimeout";
public static readonly string MaxConcurrentSessionsKey = "Asb_MaxConcurrentSessions";
public static readonly string RulesKey = "Asb_Rules";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public static class AsbRequestResponseBuilderExtensions
{
public static RequestResponseBuilder ReplyToQueue(this RequestResponseBuilder builder, string queue, Action<RequestResponseBuilder> builderConfig = null)
{
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (queue is null) throw new ArgumentNullException(nameof(queue));

builder.Settings.Path = queue;
builder.Settings.PathKind = PathKind.Queue;

builderConfig?.Invoke(builder);

return builder;
}
}
Loading

0 comments on commit 606a4a7

Please sign in to comment.