Skip to content

Commit

Permalink
added delivery timeout parameter to saga fixture deliver method
Browse files Browse the repository at this point in the history
  • Loading branch information
mookid8000 committed Sep 19, 2016
1 parent b330558 commit 95efa82
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 27 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -1080,6 +1080,11 @@

* Test release

## 2.0.0-b03

* Add separate wait methods to `ISyncBackoffStrategy` in order to differentiate between waiting because no more parallel operations are allowed, and waiting because no message was received
* Add `deliveryTimeoutSeconds` paramater to `Deliver` method of `SagaFixture` in order to allow for not timing out when e.g. step-debugging saga message processing

---

[AndreaCuneo]: https://github.com/AndreaCuneo
Expand Down
35 changes: 35 additions & 0 deletions Rebus.Tests.Contracts/Extensions/Range.cs
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;

namespace Rebus.Tests.Contracts.Extensions
{
public class Range<T>
{
public Range(T from, T to)
{
if (from == null) throw new ArgumentNullException(nameof(@from));
if (to == null) throw new ArgumentNullException(nameof(to));
From = from;
To = to;
}

public T From { get; }
public T To { get; }
}

public static class RangeExtensions
{
public static Range<T> To<T>(this T from, T to)
{
return new Range<T>(from, to);
}

public static IEnumerable<DateTime> GetIntervals(this Range<DateTime> timeRange, TimeSpan interval)
{
for (var time = timeRange.From; time < timeRange.To; time = time + interval)
{
yield return time;
}
}
}
}
8 changes: 8 additions & 0 deletions Rebus.Tests.Contracts/Extensions/TestEx.cs
Expand Up @@ -31,6 +31,14 @@ public static class TestEx
return list[medianIndex];
}

public static TValue GetValueOrDefault<TKey, TValue>(this IDictionary<TKey, TValue> dictionary, TKey key)
{
TValue value;
return dictionary.TryGetValue(key, out value)
? value
: default(TValue);
}

public static DateTime RoundTo(this DateTime dateTime, TimeSpan resolution)
{
var resolutionTicks = resolution.Ticks;
Expand Down
2 changes: 2 additions & 0 deletions Rebus.Tests.Contracts/Rebus.Tests.Contracts.csproj
Expand Up @@ -58,6 +58,7 @@
<Compile Include="Activation\ContainerTests.cs" />
<Compile Include="Activation\IContainerAdapterFactory.cs" />
<Compile Include="Activation\RealContainerTests.cs" />
<Compile Include="Extensions\Range.cs" />
<Compile Include="Properties\AssemblyInfo_Patch.cs" />
<Compile Include="DataBus\GeneralDataBusStorageTests.cs" />
<Compile Include="DataBus\IDataBusStorageFactory.cs" />
Expand All @@ -75,6 +76,7 @@
<Compile Include="Transports\IBusFactory.cs" />
<Compile Include="Transports\TestManyMessages.cs" />
<Compile Include="Utilities\DeleteHelper.cs" />
<Compile Include="Utilities\In.cs" />
<Compile Include="Utilities\ListLoggerFactory.cs" />
<Compile Include="Utilities\LogLine.cs" />
<None Include="Properties\AssemblyInfo.cs" />
Expand Down
48 changes: 48 additions & 0 deletions Rebus.Tests.Contracts/Utilities/In.cs
@@ -0,0 +1,48 @@
using System.Threading;
using System.Threading.Tasks;
using Rebus.Messages;
using Rebus.Transport;

namespace Rebus.Tests.Contracts.Utilities
{
public class IntroducerOfLatency : ITransport
{
readonly ITransport _innerTransport;
readonly int? _sendLatencyMs;
readonly int? _receiveLatencyMs;

public IntroducerOfLatency(ITransport innerTransport, int? sendLatencyMs = null, int? receiveLatencyMs = null)
{
_innerTransport = innerTransport;
_sendLatencyMs = sendLatencyMs;
_receiveLatencyMs = receiveLatencyMs;
}

public void CreateQueue(string address)
{
_innerTransport.CreateQueue(address);
}

public async Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
{
if (_sendLatencyMs.HasValue)
{
await Task.Delay(_sendLatencyMs.Value);
}

await _innerTransport.Send(destinationAddress, message, context);
}

public async Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
{
if (_receiveLatencyMs.HasValue)
{
await Task.Delay(_receiveLatencyMs.Value, cancellationToken);
}

return await _innerTransport.Receive(context, cancellationToken);
}

public string Address => _innerTransport.Address;
}
}
140 changes: 140 additions & 0 deletions Rebus.Tests/Backoff/TestBackoffBehaviorWhenBusy.cs
@@ -0,0 +1,140 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Backoff;
using Rebus.Config;
using Rebus.Logging;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Extensions;
using Rebus.Tests.Contracts.Utilities;
using Rebus.Transport;
using Rebus.Transport.InMem;
using Rebus.Workers;
#pragma warning disable 1998

namespace Rebus.Tests.Backoff
{
[TestFixture]
public class TestBackoffBehaviorWhenBusy : FixtureBase
{
BuiltinHandlerActivator _activator;
BackoffSnitch _snitch;

protected override void SetUp()
{
_activator = Using(new BuiltinHandlerActivator());

_snitch = new BackoffSnitch();

Configure.With(_activator)
.Logging(l => l.Console(LogLevel.Info))
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "busy-test"))
.Options(o =>
{
o.SetNumberOfWorkers(1);
o.SetMaxParallelism(500);
o.SetBackoffTimes(TimeSpan.FromSeconds(0.2));
// install the snitch
o.Decorate<ISyncBackoffStrategy>(c =>
{
var syncBackoffStrategy = c.Get<ISyncBackoffStrategy>();
_snitch.SyncBackoffStrategy = syncBackoffStrategy;
return _snitch;
});
o.Decorate<ITransport>(c =>
{
var transport = c.Get<ITransport>();
return new IntroducerOfLatency(transport, receiveLatencyMs: 10);
});
})
.Start();
}

[TestCase(100000)]
public async Task DoesNotBackOffAtAllWhenBusy(int messageCount)
{
var counter = new SharedCounter(messageCount);

_activator.Handle<string>(async str =>
{
await Task.Delay(1);
counter.Decrement();
});

var startTime = DateTime.UtcNow;

await Task.Delay(TimeSpan.FromSeconds(5));

Printt("Sending 100k msgs");

await Task.WhenAll(Enumerable.Range(0, messageCount)
.Select(i => _activator.Bus.SendLocal($"THIS IS MESSAGE {i}")));

Printt("Receiving them...");

_activator.Bus.Advanced.Workers.SetNumberOfWorkers(1);

counter.WaitForResetEvent(60);

Printt("Done... waiting a little extra");
await Task.Delay(TimeSpan.FromSeconds(5));

var stopTime = DateTime.UtcNow;

var waitsPerSecond = _snitch.WaitTimes
.GroupBy(t => t.RoundTo(TimeSpan.FromSeconds(1)))
.ToDictionary(g => g.Key, g => g.Count());

var waitNoMessagesPerSecond = _snitch.WaitNoMessageTimes
.GroupBy(t => t.RoundTo(TimeSpan.FromSeconds(1)))
.ToDictionary(g => g.Key, g => g.Count());

var seconds = startTime.RoundTo(TimeSpan.FromSeconds(1)).To(stopTime.RoundTo(TimeSpan.FromSeconds(1)))
.GetIntervals(TimeSpan.FromSeconds(1));

Console.WriteLine(string.Join(Environment.NewLine,
seconds.Select(time => $"{time}: {new string('.', waitsPerSecond.GetValueOrDefault(time))}{new string('*', waitNoMessagesPerSecond.GetValueOrDefault(time))}")));
}

class BackoffSnitch : ISyncBackoffStrategy
{
readonly ConcurrentQueue<DateTime> _waitTimes = new ConcurrentQueue<DateTime>();
readonly ConcurrentQueue<DateTime> _waitNoMessageTimes = new ConcurrentQueue<DateTime>();

public ISyncBackoffStrategy SyncBackoffStrategy { get; set; }

public IEnumerable<DateTime> WaitTimes => _waitTimes;
public IEnumerable<DateTime> WaitNoMessageTimes => _waitNoMessageTimes;

public void Reset()
{
SyncBackoffStrategy.Reset();
}

public void WaitNoMessage()
{
_waitNoMessageTimes.Enqueue(DateTime.UtcNow);
SyncBackoffStrategy.WaitNoMessage();
}

public void Wait()
{
_waitTimes.Enqueue(DateTime.UtcNow);
SyncBackoffStrategy.Wait();
}

public void WaitError()
{
SyncBackoffStrategy.WaitError();
}
}
}
}
1 change: 1 addition & 0 deletions Rebus.Tests/Rebus.Tests.csproj
Expand Up @@ -72,6 +72,7 @@
<Compile Include="Assumptions\TestString.cs" />
<Compile Include="Assumptions\TestTask.cs" />
<Compile Include="Auditing\TestMessageAuditing.cs" />
<Compile Include="Backoff\TestBackoffBehaviorWhenBusy.cs" />
<Compile Include="Backoff\TestCustomizedBackoff.cs" />
<Compile Include="Bugs\CustomHeadersAreCloned.cs" />
<Compile Include="Bugs\DoesNotDispatchWrongSagaDataType.cs" />
Expand Down
6 changes: 6 additions & 0 deletions Rebus.Tests/Testing/TestSagaFixture.cs
Expand Up @@ -100,6 +100,12 @@ public void EmitsDeletedEvent()
}
}

[Test]
public void DoesNotTimeOutWhenDebuggerIsAttached()
{

}

class MySaga : Saga<MySagaState>, IAmInitiatedBy<TestMessage>
{
protected override void CorrelateMessages(ICorrelationConfig<MySagaState> config)
Expand Down
7 changes: 4 additions & 3 deletions Rebus/Testing/SagaFixture.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Rebus.Activation;
using Rebus.Config;
Expand Down Expand Up @@ -155,16 +156,16 @@ internal SagaFixture(BuiltinHandlerActivator activator)
/// <summary>
/// Delivers the given message to the saga handler
/// </summary>
public void Deliver(object message, Dictionary<string, string> optionalHeaders = null)
public void Deliver(object message, Dictionary<string, string> optionalHeaders = null, int deliveryTimeoutSeconds = 5)
{
var resetEvent = new ManualResetEvent(false);
_lockStepper.AddResetEvent(resetEvent);

_activator.Bus.SendLocal(message, optionalHeaders).Wait();

if (!resetEvent.WaitOne(TimeSpan.FromSeconds(5)))
if (!resetEvent.WaitOne(TimeSpan.FromSeconds(deliveryTimeoutSeconds)))
{
throw new TimeoutException($"Message {message} did not seem to have been processed withing 5 s timeout");
throw new TimeoutException($"Message {message} did not seem to have been processed withing {deliveryTimeoutSeconds} s timeout");
}
}

Expand Down
8 changes: 7 additions & 1 deletion Rebus/Workers/ISyncBackoffStrategy.cs
Expand Up @@ -8,10 +8,16 @@ public interface ISyncBackoffStrategy
{
/// <summary>
/// Executes the next wait operation by blocking the thread, possibly advancing the wait cursor to a different wait time for the next time.
/// This function is called each time no message was received.
/// This function is called each time a worker thread cannot continue because no more parallel operations are allowed to happen.
/// </summary>
void Wait();

/// <summary>
/// Executes the next wait operation by blocking the thread, possibly advancing the wait cursor to a different wait time for the next time.
/// This function is called each time no message was received.
/// </summary>
void WaitNoMessage();

/// <summary>
/// Blocks the thread for a (most likely longer) while, when an error has occurred
/// </summary>
Expand Down

0 comments on commit 95efa82

Please sign in to comment.