Skip to content

Commit

Permalink
add ability to configure whether to use the native timeout manager
Browse files Browse the repository at this point in the history
  • Loading branch information
mookid8000 committed Jun 4, 2018
1 parent 6385d6a commit 1862883
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 88 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@

* Add .NET Standard 2.0 as a target

## 4.2.0

* Add ability to configure whether to use the native message deferral mechanism, making it possibe to register a custom timeout manager (e.g. in SQL Server)


---

[mattwhetton]: https://github.com/mattwhetton
107 changes: 107 additions & 0 deletions Rebus.AzureStorage.Tests/AlternativeTimeoutManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.AzureStorage.Transport;
using Rebus.Config;
using Rebus.Logging;
using Rebus.Persistence.InMem;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Extensions;
using Rebus.Timeouts;
// ReSharper disable ArgumentsStyleNamedExpression

namespace Rebus.AzureStorage.Tests
{
[TestFixture]
public class AlternativeTimeoutManager : FixtureBase
{
static readonly string QueueName = TestConfig.GetName("input");
static readonly string TimeoutManagerQueueName = TestConfig.GetName("timeouts");

BuiltinHandlerActivator _activator;
CloudStorageAccount _storageAccount;

protected override void SetUp()
{
_storageAccount = CloudStorageAccount.Parse(AzureStorageFactoryBase.ConnectionString);

new AzureStorageQueuesTransport(_storageAccount, QueueName, new NullLoggerFactory(), new AzureStorageQueuesTransportOptions()).PurgeInputQueue();
new AzureStorageQueuesTransport(_storageAccount, TimeoutManagerQueueName, new NullLoggerFactory(), new AzureStorageQueuesTransportOptions()).PurgeInputQueue();

_activator = new BuiltinHandlerActivator();

Using(_activator);
}

[Test]
public async Task CanUseAlternativeTimeoutManager()
{
var gotTheString = new ManualResetEvent(false);

_activator.Handle<string>(async str =>
{
Console.WriteLine($"Received string: '{str}'");
gotTheString.Set();
});

var bus = Configure.With(_activator)
.Transport(t =>
{
var options = new AzureStorageQueuesTransportOptions { UseNativeDeferredMessages = false };
t.UseAzureStorageQueues(_storageAccount, QueueName, options: options);
})
.Timeouts(t => t.Register(c => new InMemoryTimeoutManager()))
.Start();

await bus.DeferLocal(TimeSpan.FromSeconds(5), "hej med dig min ven!!!!!");

gotTheString.WaitOrDie(TimeSpan.FromSeconds(10), "Did not receive the string withing 10 s timeout");
}

[Test]
public async Task CanUseDedicatedAlternativeTimeoutManager()
{
// start the timeout manager
Configure.With(Using(new BuiltinHandlerActivator()))
.Transport(t =>
{
var options = new AzureStorageQueuesTransportOptions { UseNativeDeferredMessages = false };
t.UseAzureStorageQueues(_storageAccount, TimeoutManagerQueueName, options: options);
})
.Timeouts(t => t.Register(c => new InMemoryTimeoutManager()))
.Start();

var gotTheString = new ManualResetEvent(false);

_activator.Handle<string>(async str =>
{
Console.WriteLine($"Received string: '{str}'");
gotTheString.Set();
});

var bus = Configure.With(_activator)
.Transport(t =>
{
var options = new AzureStorageQueuesTransportOptions { UseNativeDeferredMessages = false };
t.UseAzureStorageQueues(_storageAccount, QueueName, options: options);
})
.Timeouts(t => t.UseExternalTimeoutManager(TimeoutManagerQueueName))
.Start();

await bus.DeferLocal(TimeSpan.FromSeconds(5), "hej med dig min ven!!!!!");

gotTheString.WaitOrDie(TimeSpan.FromSeconds(10), "Did not receive the string withing 10 s timeout");

//// don't dispose too quickly, or else we'll get annoying errors in the log
//await Task.Delay(TimeSpan.FromSeconds(0.5));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,11 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NUnit.Framework;
using Rebus.AzureStorage.Transport;
using Rebus.Messages;
using NUnit.Framework;
using Rebus.Tests.Contracts.Transports;
using Rebus.Time;
#pragma warning disable 1998

namespace Rebus.AzureStorage.Tests.Transport
{
[TestFixture]
public class AzureStorageQueuesTransportBasicSendReceive : BasicSendReceive<AzureStorageQueuesTransportFactory>
{
[Test]
public async Task GetQueueVisibilityDelayOrNull_NeverReturnsNegativeTimespans()
{
var sendInstant = DateTimeOffset.Now;
var deferDate = sendInstant.AddMilliseconds(-350);
RebusTimeMachine.FakeIt(sendInstant);
var result = AzureStorageQueuesTransport.GetQueueVisibilityDelayOrNull(new Dictionary<string, string>
{
{Headers.DeferredUntil, deferDate.ToString("O")}
});
RebusTimeMachine.Reset();
Assert.Null(result);

}
[Test]
public async Task GetQueueVisibilityDelayOrNull_StillReturnsPositiveTimespans()
{
var sendInstant = DateTimeOffset.Now;
var deferDate = sendInstant.AddMilliseconds(350);
RebusTimeMachine.FakeIt(sendInstant);
var result = AzureStorageQueuesTransport.GetQueueVisibilityDelayOrNull(new Dictionary<string, string>
{
{Headers.DeferredUntil, deferDate.ToString("O")}
});
RebusTimeMachine.Reset();
Assert.AreEqual(result, TimeSpan.FromMilliseconds(350));

}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using Rebus.AzureStorage.Transport;
using Rebus.Config;
using Rebus.Logging;
using Rebus.Tests.Contracts.Transports;
using Rebus.Transport;
Expand All @@ -20,7 +21,7 @@ public ITransport Create(string inputQueueAddress)
{
if (inputQueueAddress == null)
{
var transport = new AzureStorageQueuesTransport(AzureConfig.StorageAccount, null, new ConsoleLoggerFactory(false));
var transport = new AzureStorageQueuesTransport(AzureConfig.StorageAccount, null, new ConsoleLoggerFactory(false), new AzureStorageQueuesTransportOptions());

transport.Initialize();

Expand All @@ -29,7 +30,7 @@ public ITransport Create(string inputQueueAddress)

return _transports.GetOrAdd(inputQueueAddress, address =>
{
var transport = new AzureStorageQueuesTransport(AzureConfig.StorageAccount, inputQueueAddress, new ConsoleLoggerFactory(false));
var transport = new AzureStorageQueuesTransport(AzureConfig.StorageAccount, inputQueueAddress, new ConsoleLoggerFactory(false), new AzureStorageQueuesTransportOptions());
transport.PurgeInputQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Newtonsoft.Json;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Exceptions;
using Rebus.Extensions;
using Rebus.Logging;
using Rebus.Messages;
using Rebus.Time;
using Rebus.Transport;
// ReSharper disable MethodSupportsCancellation

#pragma warning disable 1998

Expand All @@ -26,29 +28,34 @@ namespace Rebus.AzureStorage.Transport
/// </summary>
public class AzureStorageQueuesTransport : ITransport, IInitializable
{
const string QueueNameValidationRegex = "^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$";
readonly AzureStorageQueuesTransportOptions _options;
readonly ConcurrentDictionary<string, CloudQueue> _queues = new ConcurrentDictionary<string, CloudQueue>();
readonly TimeSpan _initialVisibilityDelay = TimeSpan.FromMinutes(5);
readonly CloudQueueClient _queueClient;
readonly string _inputQueueName;
readonly ILog _log;
private readonly string _queueNameValidationRegex = "^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$";
static readonly QueueRequestOptions DefaultQueueRequestOptions = new QueueRequestOptions();
static readonly OperationContext DefaultOperationContext = new OperationContext();

/// <summary>
/// Constructs the transport
/// </summary>
public AzureStorageQueuesTransport(CloudStorageAccount storageAccount, string inputQueueName, IRebusLoggerFactory rebusLoggerFactory)
public AzureStorageQueuesTransport(CloudStorageAccount storageAccount, string inputQueueName, IRebusLoggerFactory rebusLoggerFactory, AzureStorageQueuesTransportOptions options)
{
if (storageAccount == null) throw new ArgumentNullException(nameof(storageAccount));
if (rebusLoggerFactory == null) throw new ArgumentNullException(nameof(rebusLoggerFactory));

_options = options;
_queueClient = storageAccount.CreateCloudQueueClient();
_log = rebusLoggerFactory.GetLogger<AzureStorageQueuesTransport>();

if (inputQueueName != null)
{
if (!Regex.IsMatch(inputQueueName, _queueNameValidationRegex))
throw new ArgumentException("The inputQueueName must comprise only alphanumeric characters and hyphens, and must not have 2 consecutive hyphens.", nameof(inputQueueName));
_inputQueueName = inputQueueName.ToLowerInvariant();
if (!Regex.IsMatch(inputQueueName, QueueNameValidationRegex))
{
throw new ArgumentException($"The inputQueueName {inputQueueName} is not valid - it can contain only alphanumeric characters and hyphens, and must not have 2 consecutive hyphens.", nameof(inputQueueName));
}
Address = inputQueueName.ToLowerInvariant();
}
}

Expand Down Expand Up @@ -80,7 +87,7 @@ public async Task Send(string destinationAddress, TransportMessage message, ITra
try
{
var options = new QueueRequestOptions { RetryPolicy = new ExponentialRetry() };
var operationContext = new OperationContext();
var operationContext = DefaultOperationContext;
await queue.AddMessageAsync(cloudQueueMessage, timeToBeReceivedOrNull, queueVisibilityDelayOrNull, options, operationContext);
}
Expand All @@ -96,21 +103,25 @@ public async Task Send(string destinationAddress, TransportMessage message, ITra
/// </summary>
public async Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
{
if (_inputQueueName == null)
if (Address == null)
{
throw new InvalidOperationException("This Azure Storage Queues transport does not have an input queue, hence it is not possible to receive anything");
}
var inputQueue = GetQueue(_inputQueueName);

var inputQueue = GetQueue(Address);

var cloudQueueMessage = await inputQueue.GetMessageAsync(_initialVisibilityDelay, new QueueRequestOptions(), new OperationContext(), cancellationToken);
var cloudQueueMessage = await inputQueue.GetMessageAsync(_initialVisibilityDelay, DefaultQueueRequestOptions, DefaultOperationContext, cancellationToken);

if (cloudQueueMessage == null) return null;

var messageId = cloudQueueMessage.Id;
var popReceipt = cloudQueueMessage.PopReceipt;

context.OnCompleted(async () =>
{
// if we get this far, don't pass on the cancellation token
// ReSharper disable once MethodSupportsCancellation
await inputQueue.DeleteMessageAsync(cloudQueueMessage);
await inputQueue.DeleteMessageAsync(messageId, popReceipt);
});

context.OnAborted(() =>
Expand All @@ -126,22 +137,24 @@ public async Task<TransportMessage> Receive(ITransactionContext context, Cancell

static TimeSpan? GetTimeToBeReceivedOrNull(Dictionary<string, string> headers)
{
string timeToBeReceivedStr;

if (!headers.TryGetValue(Headers.TimeToBeReceived, out timeToBeReceivedStr))
if (!headers.TryGetValue(Headers.TimeToBeReceived, out var timeToBeReceivedStr))
{
return null;
}

TimeSpan? timeToBeReceived = TimeSpan.Parse(timeToBeReceivedStr);

return timeToBeReceived;
}

internal static TimeSpan? GetQueueVisibilityDelayOrNull(Dictionary<string, string> headers)
TimeSpan? GetQueueVisibilityDelayOrNull(Dictionary<string, string> headers)
{
string deferredUntilDateTimeOffsetString;
if (!_options.UseNativeDeferredMessages)
{
return null;
}

if (!headers.TryGetValue(Headers.DeferredUntil, out deferredUntilDateTimeOffsetString))
if (!headers.TryGetValue(Headers.DeferredUntil, out var deferredUntilDateTimeOffsetString))
{
return null;
}
Expand Down Expand Up @@ -182,39 +195,36 @@ class CloudStorageQueueTransportMessage
}

/// <inheritdoc />
public string Address => _inputQueueName;
public string Address { get; }

/// <summary>
/// Initializes the transport by creating the input queue if necessary
/// </summary>
public void Initialize()
{
if (_inputQueueName != null)
if (Address != null)
{
_log.Info("Initializing Azure Storage Queues transport with queue '{0}'", _inputQueueName);
CreateQueue(_inputQueueName);
_log.Info("Initializing Azure Storage Queues transport with queue '{0}'", Address);
CreateQueue(Address);
return;
}

_log.Info("Initializing one-way Azure Storage Queues transport");
}

CloudQueue GetQueue(string address)
{
return _queues.GetOrAdd(address, _ => _queueClient.GetQueueReference(address));
}
CloudQueue GetQueue(string address) => _queues.GetOrAdd(address, _ => _queueClient.GetQueueReference(address));

/// <summary>
/// Purges the input queue (WARNING: potentially very slow operation, as it will continue to batch receive messages until the queue is empty
/// </summary>
/// <exception cref="RebusApplicationException"></exception>
public void PurgeInputQueue()
{
var queue = GetQueue(_inputQueueName);
var queue = GetQueue(Address);

if (!AsyncHelpers.GetResult(() => queue.ExistsAsync())) return;

_log.Info("Purging storage queue '{0}' (purging by deleting all messages)", _inputQueueName);
_log.Info("Purging storage queue '{0}' (purging by deleting all messages)", Address);

try
{
Expand All @@ -226,7 +236,7 @@ public void PurgeInputQueue()

Task.WaitAll(messages.Select(message => queue.DeleteMessageAsync(message)).ToArray());

_log.Debug("Deleted {0} messages from '{1}'", messages.Count, _inputQueueName);
_log.Debug("Deleted {0} messages from '{1}'", messages.Count, Address);
}
}
catch (Exception exception)
Expand Down

0 comments on commit 1862883

Please sign in to comment.