Skip to content

Commit

Permalink
allow for correlation between header and saga data
Browse files Browse the repository at this point in the history
  • Loading branch information
mookid8000 committed Feb 3, 2017
1 parent b03cc6e commit 77ff0ee
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,7 @@
## 4.0.0

* Hide some internal types that were never meant to be part of the API
* Add correlation configuration API to make it easy to correlated head of incoming message with saga data

---

Expand Down
1 change: 1 addition & 0 deletions Rebus.Tests/Rebus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<Compile Include="Persistence\Filesystem\FilesystemBasicStoreAndRetrieveOperations.cs" />
<Compile Include="Persistence\InMem\TestSagaCorrelationInMem.cs" />
<Compile Include="Routing\TestTransportMessageForwarding.cs" />
<Compile Include="Sagas\TestHeaderCorrelation.cs" />
<Compile Include="Synchronous\TestAsyncHelpers.cs" />
<Compile Include="Synchronous\TestSyncBus.cs" />
<Compile Include="Synchronous\TestSyncBusApiParity.cs" />
Expand Down
87 changes: 87 additions & 0 deletions Rebus.Tests/Sagas/TestHeaderCorrelation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Messages;
using Rebus.Sagas;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Utilities;
using Rebus.Transport.InMem;

namespace Rebus.Tests.Sagas
{
[TestFixture]
public class TestHeaderCorrelation : FixtureBase
{
BuiltinHandlerActivator _activator;
IBus _bus;

protected override void SetUp()
{
_activator = new BuiltinHandlerActivator();

Using(_activator);

Configure.With(_activator)
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "header-correlation"))
.Start();

_bus = _activator.Bus;
}

[Test]
public async Task CanCorrelateWithHeadersOfIncomingMessages()
{
var sagaDataCounters = new ConcurrentDictionary<Guid, int>();
var sharedCounter = new SharedCounter(5);

_activator.Register(() => new MySaga(sharedCounter, sagaDataCounters));

var sameMessage = new MyMessage();
var headers1 = new Dictionary<string, string> { { "custom-correlation-id", "saga1" } };
var headers2 = new Dictionary<string, string> { { "custom-correlation-id", "saga2" } };

await _bus.SendLocal(sameMessage, headers1);
await _bus.SendLocal(sameMessage, headers2);
await _bus.SendLocal(sameMessage, headers2);
await _bus.SendLocal(sameMessage, headers1);
await _bus.SendLocal(sameMessage, headers1);

sharedCounter.WaitForResetEvent(timeoutSeconds:2);
}

class MyMessage { }

class MySaga : Saga<MySagaData>, IAmInitiatedBy<MyMessage>
{
readonly SharedCounter _sharedCounter;
readonly ConcurrentDictionary<Guid, int> _sagaDataCounters;

public MySaga(SharedCounter sharedCounter, ConcurrentDictionary<Guid, int> sagaDataCounters)
{
_sharedCounter = sharedCounter;
_sagaDataCounters = sagaDataCounters;
}

protected override void CorrelateMessages(ICorrelationConfig<MySagaData> config)
{
config.CorrelateHeader<MyMessage>("custom-correlation-id", d => d.CorrelationId);
}

public async Task Handle(MyMessage message)
{
_sagaDataCounters.AddOrUpdate(Data.Id, 1, (_, count) => count + 1);
_sharedCounter.Decrement();
}
}

class MySagaData : SagaData
{
public string CorrelationId { get; set; }
}
}
}
2 changes: 1 addition & 1 deletion Rebus/Pipeline/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Rebus.Pipeline
/// </summary>
public class MessageContext : IMessageContext
{
MessageContext(ITransactionContext transactionContext)
internal MessageContext(ITransactionContext transactionContext)
{
if (transactionContext == null) throw new ArgumentNullException(nameof(transactionContext));
TransactionContext = transactionContext;
Expand Down
5 changes: 3 additions & 2 deletions Rebus/Sagas/CorrelationProperty.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using Rebus.Pipeline;

namespace Rebus.Sagas
{
Expand Down Expand Up @@ -30,7 +31,7 @@ class CorrelationProperty : ISagaCorrelationProperty
/// <param name="sagaDataType">Specifies the type of saga data that this property can correlate to</param>
/// <param name="propertyName">Specifies that property name on the saga data that this correlation addresses</param>
/// <param name="sagaType">Specifies the saga type (i.e. the handler type) that contains the logic of the saga</param>
public CorrelationProperty(Type messageType, Func<object, object> valueFromMessage, Type sagaDataType, string propertyName, Type sagaType)
public CorrelationProperty(Type messageType, Func<IMessageContext, object, object> valueFromMessage, Type sagaDataType, string propertyName, Type sagaType)
{
if (messageType == null) throw new ArgumentNullException(nameof(messageType));
if (sagaDataType == null) throw new ArgumentNullException(nameof(sagaDataType));
Expand Down Expand Up @@ -75,7 +76,7 @@ void Validate()
/// <summary>
/// The function that will be called with the message instance in order to extract a value that should be used for correlation
/// </summary>
public Func<object, object> ValueFromMessage { get; private set; }
public Func<IMessageContext, object, object> ValueFromMessage { get; private set; }

/// <summary>
/// Gets the type of the saga's saga data
Expand Down
13 changes: 13 additions & 0 deletions Rebus/Sagas/ICorrelationConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,18 @@ public interface ICorrelationConfig<TSagaData>
/// <param name="messageValueExtractorFunction">Configures a function to extract a value from the message. Since this is just a function, it may contain logic that e.g. concatenates fields, calls other functions, etc.</param>
/// <param name="sagaDataValueExpression">Configures an expression, which will be used when querying the chosen <see cref="ISagaStorage"/> - since this is an expression, it must point to a simple property of the relevant <typeparamref name="TSagaData"/>.</param>
void Correlate<TMessage>(Func<TMessage, object> messageValueExtractorFunction, Expression<Func<TSagaData, object>> sagaDataValueExpression);

/// <summary>
/// Correlates an incoming message of type <typeparamref name="TMessage"/> using the header with the given <paramref name="headerKey"/>. The value will be used when looking up a saga data instance using the specified <paramref name="sagaDataValueExpression"/>.
/// You could for example do something like this:
/// <code>
/// config.CorrelateHeader&lt;TradeApproved&gt;("trade-corr-id", d => d.TradeId);
/// </code>
/// to look up a saga instance by the "TradeId" field, querying by the value of the "trade-corr-id" header of the incoming "TradeApproved" message.
/// </summary>
/// <typeparam name="TMessage">Specifies the message type to configure a correlation for</typeparam>
/// <param name="headerKey">Configures a header key which will be extracted from the incoming message</param>
/// <param name="sagaDataValueExpression">Configures an expression, which will be used when querying the chosen <see cref="ISagaStorage"/> - since this is an expression, it must point to a simple property of the relevant <typeparamref name="TSagaData"/>.</param>
void CorrelateHeader<TMessage>(string headerKey, Expression<Func<TSagaData, object>> sagaDataValueExpression);
}
}
14 changes: 8 additions & 6 deletions Rebus/Sagas/LoadSagaDataStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Rebus.Messages;
using Rebus.Pipeline;
using Rebus.Pipeline.Receive;
using Rebus.Transport;

namespace Rebus.Sagas
{
Expand Down Expand Up @@ -71,6 +72,7 @@ public async Task Process(IncomingStepContext context, Func<Task> next)

var message = context.Load<Message>();
var label = message.GetMessageLabel();
var transactionContext = context.Load<ITransactionContext>();

var body = message.Body;

Expand All @@ -81,7 +83,7 @@ public async Task Process(IncomingStepContext context, Func<Task> next)
// and then we process them
foreach (var sagaInvoker in handlerInvokersForSagas)
{
await TryMountSagaDataOnInvoker(sagaInvoker, body, label, loadedSagaData, newlyCreatedSagaData);
await TryMountSagaDataOnInvoker(sagaInvoker, body, label, loadedSagaData, newlyCreatedSagaData, transactionContext);
}

// invoke the rest of the pipeline (most likely also dispatching the incoming message to the now-ready saga handlers)
Expand All @@ -108,7 +110,7 @@ public async Task Process(IncomingStepContext context, Func<Task> next)
}
}

async Task TryMountSagaDataOnInvoker(HandlerInvoker sagaInvoker, object body, string label, List<RelevantSagaInfo> loadedSagaData, List<RelevantSagaInfo> newlyCreatedSagaData)
async Task TryMountSagaDataOnInvoker(HandlerInvoker sagaInvoker, object body, string label, List<RelevantSagaInfo> loadedSagaData, List<RelevantSagaInfo> newlyCreatedSagaData, ITransactionContext transactionContext)
{
var foundExistingSagaData = false;

Expand All @@ -117,7 +119,7 @@ async Task TryMountSagaDataOnInvoker(HandlerInvoker sagaInvoker, object body, st

foreach (var correlationProperty in correlationPropertiesRelevantForMessage)
{
var valueFromMessage = correlationProperty.ValueFromMessage(body);
var valueFromMessage = correlationProperty.ValueFromMessage(new MessageContext(transactionContext), body);
var sagaData = await _sagaStorage.Find(sagaInvoker.Saga.GetSagaDataType(), correlationProperty.PropertyName, valueFromMessage);

if (sagaData == null) continue;
Expand All @@ -142,7 +144,7 @@ async Task TryMountSagaDataOnInvoker(HandlerInvoker sagaInvoker, object body, st
// if there's exacly one correlation property that points to a property on the saga data, we can set it
if (correlationPropertiesRelevantForMessage.Length == 1)
{
TrySetCorrelationPropertyValue(newSagaData, correlationPropertiesRelevantForMessage[0], body);
TrySetCorrelationPropertyValue(newSagaData, correlationPropertiesRelevantForMessage[0], body, transactionContext);
}

sagaInvoker.SetSagaData(newSagaData);
Expand All @@ -159,7 +161,7 @@ async Task TryMountSagaDataOnInvoker(HandlerInvoker sagaInvoker, object body, st
}
}

static void TrySetCorrelationPropertyValue(ISagaData newSagaData, CorrelationProperty correlationProperty, object body)
static void TrySetCorrelationPropertyValue(ISagaData newSagaData, CorrelationProperty correlationProperty, object body, ITransactionContext transactionContext)
{
try
{
Expand All @@ -169,7 +171,7 @@ static void TrySetCorrelationPropertyValue(ISagaData newSagaData, CorrelationPro

if (correlationPropertyInfo == null) return;

var valueFromMessage = correlationProperty.ValueFromMessage(body);
var valueFromMessage = correlationProperty.ValueFromMessage(new MessageContext(transactionContext), body);

correlationPropertyInfo.SetValue(newSagaData, valueFromMessage);
}
Expand Down
46 changes: 35 additions & 11 deletions Rebus/Sagas/Saga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reflection;
using System.Threading.Tasks;
using Rebus.Exceptions;
using Rebus.Pipeline;
using Rebus.Reflection;
#pragma warning disable 1998

Expand Down Expand Up @@ -36,7 +37,7 @@ internal bool UserHasOverriddenConflictResolutionMethod()
}
return !(typeDeclaringTheConflictResolutionMethod.IsGenericType
&& typeDeclaringTheConflictResolutionMethod.GetGenericTypeDefinition() == typeof (Saga<>));
&& typeDeclaringTheConflictResolutionMethod.GetGenericTypeDefinition() == typeof(Saga<>));
});
}

Expand All @@ -48,7 +49,7 @@ internal IEnumerable<CorrelationProperty> GetCorrelationProperties()
internal abstract IEnumerable<CorrelationProperty> GenerateCorrelationProperties();

internal abstract Type GetSagaDataType();

internal abstract ISagaData CreateNewSagaData();

internal bool WasMarkedAsComplete { get; set; }
Expand Down Expand Up @@ -106,15 +107,15 @@ protected virtual void MarkAsUnchanged()
internal sealed override IEnumerable<CorrelationProperty> GenerateCorrelationProperties()
{
var configuration = new CorrelationConfiguration(GetType());

CorrelateMessages(configuration);

return configuration.GetCorrelationProperties();
}

internal sealed override async Task InvokeConflictResolution(ISagaData otherSagaData)
{
await ResolveConflict((TSagaData) otherSagaData);
await ResolveConflict((TSagaData)otherSagaData);
}

/// <summary>
Expand All @@ -135,20 +136,43 @@ public CorrelationConfiguration(Type sagaType)
}

readonly List<CorrelationProperty> _correlationProperties = new List<CorrelationProperty>();

public void Correlate<TMessage>(Func<TMessage, object> messageValueExtractorFunction, Expression<Func<TSagaData, object>> sagaDataValueExpression)
{
var propertyName = Reflect.Path(sagaDataValueExpression);
Func<object, object> neutralMessageValueExtractor = message =>

Func<IMessageContext, object, object> neutralMessageValueExtractor = (context, message) =>
{
try
{
return messageValueExtractorFunction((TMessage) message);
return messageValueExtractorFunction((TMessage)message);
}
catch (Exception exception)
{
throw new RebusApplicationException(exception, $"Could not extract correlation value from message {typeof(TMessage)}");
}
};

_correlationProperties.Add(new CorrelationProperty(typeof(TMessage), neutralMessageValueExtractor, typeof(TSagaData), propertyName, _sagaType));
}

public void CorrelateHeader<TMessage>(string headerKey, Expression<Func<TSagaData, object>> sagaDataValueExpression)
{
var propertyName = Reflect.Path(sagaDataValueExpression);

Func<IMessageContext, object, object> neutralMessageValueExtractor = (context, message) =>
{
try
{
string headerValue;
return context.Headers.TryGetValue(headerKey, out headerValue)
? headerValue
: null;
}
catch (Exception exception)
{
throw new RebusApplicationException(exception, $"Could not extract correlation value from message {typeof (TMessage)}");
throw new RebusApplicationException(exception, $"Could not extract correlation value from message {typeof(TMessage)}");
}
};

Expand All @@ -163,7 +187,7 @@ public IEnumerable<CorrelationProperty> GetCorrelationProperties()

internal override Type GetSagaDataType()
{
return typeof (TSagaData);
return typeof(TSagaData);
}

internal override ISagaData CreateNewSagaData()
Expand Down

0 comments on commit 77ff0ee

Please sign in to comment.