Skip to content

Commit

Permalink
Correlation id is now set on outgoing messages to make it possible to…
Browse files Browse the repository at this point in the history
… keep a

full audit trail
  • Loading branch information
andreasohlund committed Jan 3, 2012
1 parent bfd193a commit 5b1f3d3
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 48 deletions.
Expand Up @@ -54,6 +54,8 @@ public void SetUp()

subscriptionStorage = new FakeSubscriptionStorage();
FuncBuilder.Register<IMutateOutgoingTransportMessages>(()=>headerManager);



unicastBus = new UnicastBus
{
Expand All @@ -69,6 +71,10 @@ public void SetUp()
FailureManager = MockRepository.GenerateStub<IManageMessageFailures>()
};
bus = unicastBus;

FuncBuilder.Register<IMutateOutgoingTransportMessages>(() => new CorrelationIdMutator{Bus = bus});
FuncBuilder.Register<IBus>(() => bus);

ExtensionMethods.SetHeaderAction = headerManager.SetHeader;

}
Expand Down
14 changes: 13 additions & 1 deletion src/unicast/NServiceBus.Unicast.Tests/Helpers/FuncBuilder.cs
Expand Up @@ -25,7 +25,19 @@ public void Register(Type t,Func<object> func)

public object Build(Type typeToBuild)
{
return funcs.First(f => f.Item1 == typeToBuild).Item2();
var obj = funcs.First(f => f.Item1 == typeToBuild).Item2();

//enable property injection
obj.GetType().GetProperties()
.Select(p=>p.PropertyType)
.Intersect(funcs.Select(f=>f.Item1)).ToList()
.ForEach(propertyTypeToSet=>
{
obj.GetType().GetProperties().First(p=>p.PropertyType == propertyTypeToSet)
.SetValue(obj, Build(propertyTypeToSet), null);
});

return obj;
}

public IBuilder CreateChildBuilder()
Expand Down
@@ -1,46 +1,48 @@
namespace NServiceBus.Unicast.Tests.Helpers
namespace NServiceBus.Unicast.Tests.Helpers
{
using System;
using System.Collections.Generic;
using NServiceBus.MessageMutator;
using NServiceBus.Unicast.Transport;

public class MessageHeaderManager : IMutateOutgoingTransportMessages
{
void IMutateOutgoingTransportMessages.MutateOutgoing(object[] messages, TransportMessage transportMessage)
{
foreach (var staticHeader in staticHeaders.Keys)
{
transportMessage.Headers.Add(staticHeader,staticHeaders[staticHeader]);
}

if (messageHeaders != null)
if (messageHeaders.ContainsKey(messages[0]))
foreach (var key in messageHeaders[messages[0]].Keys)
transportMessage.Headers.Add(key, messageHeaders[messages[0]][key]);

messageHeaders.Clear();
}


public void SetHeader(object message, string key, string value)
{
if (message == ExtensionMethods.CurrentMessageBeingHandled)
throw new InvalidOperationException("Cannot change headers on the message being processed.");

if (messageHeaders == null)
messageHeaders = new Dictionary<object, IDictionary<string, string>>();

if (!messageHeaders.ContainsKey(message))
messageHeaders.Add(message, new Dictionary<string, string>());

if (!messageHeaders[message].ContainsKey(key))
messageHeaders[message].Add(key, value);
else
messageHeaders[message][key] = value;
}

public static IDictionary<string, string> staticHeaders = new Dictionary<string, string>();
static IDictionary<object, IDictionary<string, string>> messageHeaders;
}
using MessageMutator;
using Transport;

public class MessageHeaderManager : IMutateOutgoingTransportMessages
{
void IMutateOutgoingTransportMessages.MutateOutgoing(object[] messages, TransportMessage transportMessage)
{
foreach (var staticHeader in staticHeaders.Keys)
{
transportMessage.Headers.Add(staticHeader, staticHeaders[staticHeader]);
}

if (messageHeaders == null)
return;

if (messageHeaders.ContainsKey(messages[0]))
foreach (var key in messageHeaders[messages[0]].Keys)
transportMessage.Headers.Add(key, messageHeaders[messages[0]][key]);

messageHeaders.Clear();
}


public void SetHeader(object message, string key, string value)
{
if (message == ExtensionMethods.CurrentMessageBeingHandled)
throw new InvalidOperationException("Cannot change headers on the message being processed.");

if (messageHeaders == null)
messageHeaders = new Dictionary<object, IDictionary<string, string>>();

if (!messageHeaders.ContainsKey(message))
messageHeaders.Add(message, new Dictionary<string, string>());

if (!messageHeaders[message].ContainsKey(key))
messageHeaders[message].Add(key, value);
else
messageHeaders[message][key] = value;
}

public static IDictionary<string, string> staticHeaders = new Dictionary<string, string>();
static IDictionary<object, IDictionary<string, string>> messageHeaders;
}
}
33 changes: 32 additions & 1 deletion src/unicast/NServiceBus.Unicast.Tests/Receiving.cs
Expand Up @@ -2,7 +2,9 @@
{
using Contexts;
using NUnit.Framework;

using Rhino.Mocks;
using Transport;

[TestFixture]
public class When_receiving_a_regular_message : using_the_unicastbus
{
Expand All @@ -25,6 +27,35 @@ public void Should_invoke_the_registered_message_handlers()

}


[TestFixture]
public class When_sending_messages_from_a_messagehandler : using_the_unicastbus
{
[Test]
public void Should_set_the_correlation_id_to_the_current_message()
{
var receivedMessage = Helpers.Helpers.Serialize(new EventMessage());

RegisterMessageType<EventMessage>();
RegisterMessageType<CommandMessage>();
RegisterMessageHandlerType<HandlerThatSendsAMessage>();

ReceiveMessage(receivedMessage);


messageSender.AssertWasCalled(x => x.Send(Arg<TransportMessage>.Matches(m => m.CorrelationId == receivedMessage.IdForCorrelation), Arg<Address>.Is.Anything));
}
}

class HandlerThatSendsAMessage : IHandleMessages<EventMessage>
{
public IBus Bus { get; set; }

public void Handle(EventMessage message)
{
Bus.Send(new CommandMessage());
}
}
class Handler1:IHandleMessages<EventMessage>
{
public static bool Called;
Expand Down
34 changes: 34 additions & 0 deletions src/unicast/NServiceBus.Unicast/CorrelationIdMutator.cs
@@ -0,0 +1,34 @@
namespace NServiceBus.Unicast
{
using Config;
using MessageMutator;
using Transport;

/// <summary>
/// Mutator to set the correlation id
/// </summary>
public class CorrelationIdMutator : IMutateOutgoingTransportMessages, INeedInitialization
{
/// <summary>
/// The bus is needed to get access to the current message id
/// </summary>
public IBus Bus { get; set; }

/// <summary>
/// If no correlation id is set and the message is sent from a messagehandler the current message id
/// will be used as correlation id to make auditing possible
/// </summary>
/// <param name="messages"></param>
/// <param name="transportMessage"></param>
public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
{
if (transportMessage.CorrelationId == null && Bus.CurrentMessageContext != null)
transportMessage.CorrelationId = Bus.CurrentMessageContext.Id;
}

public void Init()
{
Configure.Instance.Configurer.ConfigureComponent<CorrelationIdMutator>(DependencyLifecycle.InstancePerCall);
}
}
}
@@ -1,9 +1,7 @@
namespace NServiceBus.Unicast
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Config;
using MessageMutator;
using Transport;
Expand Down Expand Up @@ -43,8 +41,7 @@ static string SerializeEnclosedMessageTypes(IEnumerable<object> messages)

public void Init()
{
Configure.Instance.Configurer.ConfigureComponent<EnclosedMessageTypesMutator>(
DependencyLifecycle.InstancePerCall);
Configure.Instance.Configurer.ConfigureComponent<EnclosedMessageTypesMutator>(DependencyLifecycle.InstancePerCall);
}
}
}
1 change: 1 addition & 0 deletions src/unicast/NServiceBus.Unicast/NServiceBus.Unicast.csproj
Expand Up @@ -111,6 +111,7 @@
<ItemGroup>
<Compile Include="BusAsyncResult.cs" />
<Compile Include="Callback.cs" />
<Compile Include="CorrelationIdMutator.cs" />
<Compile Include="EnclosedMessageTypesMutator.cs" />
<Compile Include="IUnicastBus.cs" />
<Compile Include="IWantToRunWhenTheBusStarts.cs" />
Expand Down

0 comments on commit 5b1f3d3

Please sign in to comment.