Skip to content

Commit

Permalink
Merge pull request #73 from ssboisen/issue_72-errortracker_timeout
Browse files Browse the repository at this point in the history
Added message timeout tracking to the ErrorTracker
  • Loading branch information
mookid8000 committed Mar 29, 2012
2 parents 833f927 + bc4114a commit a9f3d48
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Rebus.Tests/Rebus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
<Compile Include="TestStuff.cs" />
<Compile Include="Transports\Rabbit\TestRabbitMqMessageQueue.cs" />
<Compile Include="Unit\TestDispatcher.cs" />
<Compile Include="Unit\TestErrorTracker.cs" />
<Compile Include="Unit\TestRebusBus.cs" />
<Compile Include="Unit\TestWorker_ErrorHandling.cs" />
<Compile Include="Unit\TestWorker_PolymorphicDispatch.cs" />
Expand Down
97 changes: 97 additions & 0 deletions src/Rebus.Tests/Unit/TestErrorTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using NUnit.Framework;
using Rebus.Bus;

namespace Rebus.Tests.Unit
{
[TestFixture]
public class TestErrorTracker : FixtureBase
{
ErrorTracker errorTracker;
private TimeSpan _timeoutSpan;

protected override void DoSetUp()
{
TimeMachine.Reset();
_timeoutSpan = TimeSpan.FromDays(1);
errorTracker = new ErrorTracker(_timeoutSpan, TimeSpan.FromHours(10));
}

[Test]
public void ErrorTrackerRemovesAMessageWhichTimedOut()
{
//Arrange
const string messageId = "testId";
var fakeTime = Time.Now();
TimeMachine.FixTo(fakeTime);

//Act
errorTracker.TrackDeliveryFail(messageId, new Exception());
errorTracker.TrackDeliveryFail(messageId, new Exception());

TimeMachine.FixTo(fakeTime.Add(_timeoutSpan));

errorTracker.CheckForMessageTimeout();

var errorText = errorTracker.GetErrorText(messageId);

//Assert
Assert.That(errorText, Is.Empty);
}


[Test]
public void ErrorTrackerRemovesMultipleMessagesWhichTimedOut()
{
//Arrange
const string messageId = "testId";
const string messageId2 = "testId2";
var fakeTime = Time.Now();
TimeMachine.FixTo(fakeTime);

//Act
errorTracker.TrackDeliveryFail(messageId, new Exception());
TimeMachine.FixTo(fakeTime.Add(TimeSpan.FromMinutes(10)));
errorTracker.TrackDeliveryFail(messageId2, new Exception());
TimeMachine.FixTo(fakeTime.AddDays(1).AddMinutes(10));

errorTracker.CheckForMessageTimeout();

var errorText1 = errorTracker.GetErrorText(messageId);
var errorText2 = errorTracker.GetErrorText(messageId2);

//Assert
Assert.That(errorText1, Is.Empty);
Assert.That(errorText2, Is.Empty);
}

[Test]
public void ErrorTrackerDoesntRemoveMessageWhichHasntTimedOut()
{
//Arrange
const string messageId = "testId";
const string messageId2 = "testId2";
var fakeTime = Time.Now();
TimeMachine.FixTo(fakeTime);

//Act
errorTracker.TrackDeliveryFail(messageId, new Exception());
TimeMachine.FixTo(fakeTime.Add(TimeSpan.FromMinutes(10)));
errorTracker.TrackDeliveryFail(messageId2, new Exception());
TimeMachine.FixTo(fakeTime.AddDays(1));

errorTracker.CheckForMessageTimeout();

var errorText1 = errorTracker.GetErrorText(messageId);
var errorText2 = errorTracker.GetErrorText(messageId2);

//Assert
Assert.That(errorText1, Is.Empty);
Assert.That(errorText2, Is.Not.Empty);
}
}
}
56 changes: 56 additions & 0 deletions src/Rebus/Bus/ErrorTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Linq;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Rebus.Logging;

namespace Rebus.Bus
Expand All @@ -13,12 +14,63 @@ public class ErrorTracker
{
static ILog log;


static ErrorTracker()
{
RebusLoggerFactory.Changed += f => log = f.GetCurrentClassLogger();

}

/// <summary>
/// Default constructor which sets the timeoutSpan to 1 day
/// </summary>
public ErrorTracker()
{
StartTimeoutTracker(TimeSpan.FromDays(1), TimeSpan.FromMinutes(5));
}

private void StartTimeoutTracker(TimeSpan timeoutSpan, TimeSpan timeoutCheckInterval)
{
this.timeoutSpan = timeoutSpan;
new Timer(TimeoutTracker, null, TimeSpan.Zero, timeoutCheckInterval);
}

/// <summary>
/// Constructor
/// </summary>
/// <param name="timeoutSpan">How long messages will be supervised by the ErrorTracker</param>
public ErrorTracker(TimeSpan timeoutSpan, TimeSpan timeoutCheckInterval)
{
StartTimeoutTracker(timeoutSpan, timeoutCheckInterval);
}

readonly ConcurrentDictionary<string, TrackedMessage> trackedMessages = new ConcurrentDictionary<string, TrackedMessage>();
readonly ConcurrentQueue<Timed<string>> timedoutMessages = new ConcurrentQueue<Timed<string>>();
TimeSpan timeoutSpan;


private void TimeoutTracker(object state)
{
CheckForMessageTimeout();
}

internal void CheckForMessageTimeout()
{
Timed<string> id;
bool couldRetrieve = timedoutMessages.TryPeek(out id);

while (couldRetrieve && id.Time <= Time.Now())
{
if (timedoutMessages.TryDequeue(out id))
{
TrackedMessage trackedMessage;
if (trackedMessages.TryRemove(id.Value, out trackedMessage))
log.Error("Handling message {0} has failed due to timeout at {1}", id.Value, Time.Now());
}

couldRetrieve = timedoutMessages.TryPeek(out id);
}
}

/// <summary>
/// Increments the fail count for this particular message, and starts tracking
Expand Down Expand Up @@ -72,6 +124,10 @@ TrackedMessage GetOrAdd(string id)
{
throw new ArgumentException(string.Format("Id of message to track is null! Cannot track message errors with a null id"));
}

if (!trackedMessages.ContainsKey(id))
timedoutMessages.Enqueue(id.At(Time.Now().Add(timeoutSpan)));

return trackedMessages.GetOrAdd(id, i => new TrackedMessage(id));
}

Expand Down

0 comments on commit a9f3d48

Please sign in to comment.