Skip to content

Commit

Permalink
Merge pull request #1164 from bjornsy-tl/receive-controller
Browse files Browse the repository at this point in the history
Adding a way to pause receiving messages instead of stopping the bus
  • Loading branch information
slang25 committed May 23, 2023
2 parents 1cbf75c + c82c795 commit 9a1d17b
Show file tree
Hide file tree
Showing 24 changed files with 315 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JustSaying.AwsTools;
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware.Logging;
Expand Down Expand Up @@ -116,7 +117,7 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
{
throw new ArgumentNullException(nameof(configure));
}

// Register as self so the same singleton instance implements two different interfaces
services.TryAddSingleton((p) => new ServiceProviderResolver(p));
services.TryAddSingleton<IHandlerResolver>((p) => p.GetRequiredService<ServiceProviderResolver>());
Expand Down Expand Up @@ -145,6 +146,8 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
return new MessageSerializationRegister(config.MessageSubjectProvider, serializerFactory);
});

services.TryAddSingleton<IMessageReceivePauseSignal, MessageReceivePauseSignal>();

services.TryAddSingleton(
(serviceProvider) =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using JustSaying.AwsTools;
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware.Logging;
Expand Down Expand Up @@ -52,6 +53,8 @@ public JustSayingRegistry()
})
.Singleton();

For<IMessageReceivePauseSignal>().Use<MessageReceivePauseSignal>().Singleton();

For<DefaultNamingConventions>().Singleton();
For<ITopicNamingConvention>().Use(context => context.GetInstance<DefaultNamingConventions>());
For<IQueueNamingConvention>().Use(context => context.GetInstance<DefaultNamingConventions>());
Expand Down
14 changes: 14 additions & 0 deletions src/JustSaying/JustSayingBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JustSaying.AwsTools.MessageHandling.Dispatch;
using JustSaying.Extensions;
using JustSaying.Messaging;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Channels.SubscriptionGroups;
using JustSaying.Messaging.Interrogation;
using JustSaying.Messaging.MessageSerialization;
Expand All @@ -28,6 +29,8 @@ public sealed class JustSayingBus : IMessagingBus, IMessagePublisher, IDisposabl

public IMessagingConfig Config { get; }

private readonly IMessageReceivePauseSignal _messageReceivePauseSignal;

private readonly IMessageMonitor _monitor;

private ISubscriptionGroup SubscriptionGroups { get; set; }
Expand Down Expand Up @@ -59,6 +62,16 @@ public sealed class JustSayingBus : IMessagingBus, IMessagePublisher, IDisposabl
_defaultSubscriptionGroupSettings = new SubscriptionGroupSettingsBuilder();
}

public JustSayingBus(
IMessagingConfig config,
IMessageSerializationRegister serializationRegister,
IMessageReceivePauseSignal messageReceivePauseSignal,
ILoggerFactory loggerFactory,
IMessageMonitor monitor) : this(config, serializationRegister, loggerFactory, monitor)
{
_messageReceivePauseSignal = messageReceivePauseSignal;
}

public void AddQueue(string subscriptionGroup, ISqsQueue queue)
{
if (string.IsNullOrWhiteSpace(subscriptionGroup))
Expand Down Expand Up @@ -155,6 +168,7 @@ private async Task RunImplAsync(CancellationToken stoppingToken)

var subscriptionGroupFactory = new SubscriptionGroupFactory(
dispatcher,
_messageReceivePauseSignal,
_monitor,
_loggerFactory);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace JustSaying.Messaging.Channels.Receive;

/// <summary>
/// Allows pausing and resuming the receipt of messages in all instances of the <see cref="MessageReceiveBuffer"/>
/// </summary>
public interface IMessageReceivePauseSignal
{
/// <summary>
/// Sets status to pause receiving
/// </summary>
void Pause();

/// <summary>
/// Sets status to resume receiving
/// </summary>
void Resume();

/// <summary>
/// Indicates receiving of messages is paused
/// </summary>
bool IsPaused { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ namespace JustSaying.Messaging.Channels.Receive;

internal class MessageReceiveBuffer : IMessageReceiveBuffer
{
private static readonly TimeSpan PauseReceivingBusyWaitDelay = TimeSpan.FromMilliseconds(100);

private readonly Channel<IQueueMessageContext> _channel;
private readonly int _prefetch;
private readonly int _bufferSize;
private readonly TimeSpan _readTimeout;
private readonly TimeSpan _sqsWaitTime;
private readonly SqsQueueReader _sqsQueueReader;
private readonly MiddlewareBase<ReceiveMessagesContext, IList<Message>> _sqsMiddleware;
private readonly IMessageReceivePauseSignal _messageReceivePauseSignal;
private readonly IMessageMonitor _monitor;
private readonly ILogger _logger;

Expand All @@ -36,6 +39,7 @@ internal class MessageReceiveBuffer : IMessageReceiveBuffer
TimeSpan sqsWaitTime,
ISqsQueue sqsQueue,
MiddlewareBase<ReceiveMessagesContext, IList<Message>> sqsMiddleware,
IMessageReceivePauseSignal messageReceivePauseSignal,
IMessageMonitor monitor,
ILogger<IMessageReceiveBuffer> logger)
{
Expand All @@ -46,6 +50,7 @@ internal class MessageReceiveBuffer : IMessageReceiveBuffer
if (sqsQueue == null) throw new ArgumentNullException(nameof(sqsQueue));
_sqsQueueReader = new SqsQueueReader(sqsQueue);
_sqsMiddleware = sqsMiddleware ?? throw new ArgumentNullException(nameof(sqsMiddleware));
_messageReceivePauseSignal = messageReceivePauseSignal;
_monitor = monitor ?? throw new ArgumentNullException(nameof(monitor));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

Expand All @@ -70,6 +75,13 @@ public async Task RunAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();

if (_messageReceivePauseSignal?.IsPaused == true)
{
await Task.Delay(PauseReceivingBusyWaitDelay, stoppingToken);

continue;
}

using (_monitor.MeasureThrottle())
{
bool canWrite = await writer.WaitToWriteAsync(stoppingToken).ConfigureAwait(false);
Expand Down Expand Up @@ -164,4 +176,4 @@ public InterrogationResult Interrogate()
Prefetch = _prefetch,
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace JustSaying.Messaging.Channels.Receive;

public sealed class MessageReceivePauseSignal : IMessageReceivePauseSignal
{
public void Pause()
{
IsPaused = true;
}

public void Resume()
{
IsPaused = false;
}

public bool IsPaused { get; private set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ public interface ISubscriptionGroupSettings
/// Gets the maxiumum number of messages to fetch from SQS in each request.
/// </summary>
public int Prefetch { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@ public SubscriptionGroupSettings Build(SubscriptionGroupSettingsBuilder defaults

return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace JustSaying.Messaging.Channels.SubscriptionGroups;
public class SubscriptionGroupFactory : ISubscriptionGroupFactory
{
private readonly IMessageDispatcher _messageDispatcher;
private readonly IMessageReceivePauseSignal _messageReceivePauseSignal;
private readonly IMessageMonitor _monitor;
private readonly ILoggerFactory _loggerFactory;
private readonly ReceiveMiddleware _defaultSqsMiddleware;
Expand All @@ -40,6 +41,22 @@ public class SubscriptionGroupFactory : ISubscriptionGroupFactory
new DefaultReceiveMessagesMiddleware(_loggerFactory.CreateLogger<DefaultReceiveMessagesMiddleware>());
}

/// <summary>
/// Creates an instance of <see cref="SubscriptionGroupFactory"/>.
/// </summary>
/// <param name="messageDispatcher">The <see cref="IMessageDispatcher"/> to use to dispatch messages.</param>
/// <param name="messageReceivePauseSignal">The <see cref="IMessageReceivePauseSignal"/> used by the <see cref="IMessageReceiveBuffer"/>.</param>
/// <param name="monitor">The <see cref="IMessageMonitor"/> used by the <see cref="IMessageReceiveBuffer"/>.</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> to use.</param>
public SubscriptionGroupFactory(
IMessageDispatcher messageDispatcher,
IMessageReceivePauseSignal messageReceivePauseSignal,
IMessageMonitor monitor,
ILoggerFactory loggerFactory) : this(messageDispatcher, monitor, loggerFactory)
{
_messageReceivePauseSignal = messageReceivePauseSignal;
}

/// <summary>
/// Creates a <see cref="ISubscriptionGroup"/> for the given configuration.
/// </summary>
Expand Down Expand Up @@ -103,6 +120,7 @@ private ISubscriptionGroup Create(ReceiveMiddleware receiveMiddleware, Subscript
subscriptionGroupSettings.ReceiveMessagesWaitTime,
queue,
receiveMiddleware,
_messageReceivePauseSignal,
_monitor,
logger);

Expand Down Expand Up @@ -130,4 +148,4 @@ private ICollection<IMultiplexerSubscriber> CreateSubscribers(SubscriptionGroupS
logger))
.ToList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ public sealed class SubscriptionGroupSettings : ISubscriptionGroupSettings
/// A collection of <see cref="ISqsQueue"/> to read messages from.
/// </summary>
public IReadOnlyCollection<ISqsQueue> Queues { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,4 @@ public SubscriptionGroupSettingsBuilder WithCustomMiddleware(ReceiveMiddleware m
SqsMiddleware = middleware;
return this;
}
}
}
6 changes: 4 additions & 2 deletions src/JustSaying/MessagingBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JustSaying.Fluent;
using JustSaying.Fluent.ServiceResolver;
using JustSaying.Messaging;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Monitoring;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -266,9 +267,10 @@ public IMessagingBus BuildSubscribers()
private JustSayingBus CreateBus(IMessagingConfig config, ILoggerFactory loggerFactory)
{
IMessageSerializationRegister register = ServiceResolver.ResolveService<IMessageSerializationRegister>();
IMessageReceivePauseSignal messageReceivePauseSignal = ServiceResolver.ResolveService<IMessageReceivePauseSignal>();
IMessageMonitor monitor = ServiceResolver.ResolveOptionalService<IMessageMonitor>() ?? new NullOpMessageMonitor();

var bus = new JustSayingBus(config, register, loggerFactory, monitor);
var bus = new JustSayingBus(config, register, messageReceivePauseSignal, loggerFactory, monitor);

return bus;
}
Expand All @@ -279,4 +281,4 @@ private IAwsClientFactoryProxy CreateFactoryProxy()
new AwsClientFactoryProxy(new Lazy<IAwsClientFactory>(ClientFactoryBuilder.Build)) :
ServiceResolver.ResolveService<IAwsClientFactoryProxy>();
}
}
}
5 changes: 4 additions & 1 deletion tests/JustSaying.UnitTests/JustSayingBus/GivenAServiceBus.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Channels.SubscriptionGroups;
using JustSaying.TestingFramework;
using JustSaying.UnitTests.Messaging.Channels.TestHelpers;
Expand Down Expand Up @@ -58,8 +59,10 @@ protected virtual void Given()
private JustSaying.JustSayingBus CreateSystemUnderTest()
{
var serializerRegister = new FakeSerializationRegister();
var messageReceivePauseSignal = new MessageReceivePauseSignal();
var bus = new JustSaying.JustSayingBus(Config,
serializerRegister,
messageReceivePauseSignal,
LoggerFactory,
Monitor);

Expand All @@ -74,4 +77,4 @@ public void RecordAnyExceptionsThrown()
{
_recordThrownExceptions = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JustSaying.Messaging.Channels.Receive;
using Microsoft.Extensions.Logging;
using NSubstitute;

Expand Down Expand Up @@ -34,6 +35,6 @@ protected virtual void Given()

private JustSaying.JustSayingBus CreateSystemUnderTest()
{
return new JustSaying.JustSayingBus(Config, null, LoggerFactory, null);
return new JustSaying.JustSayingBus(Config, null, new MessageReceivePauseSignal(), LoggerFactory, null);
}
}
}
Loading

0 comments on commit 9a1d17b

Please sign in to comment.