Skip to content

Commit

Permalink
Merge pull request #16 from mizrael/maintenance
Browse files Browse the repository at this point in the history
updated initial State creation
  • Loading branch information
mizrael committed Jan 18, 2021
2 parents c57f489 + 3856eec commit 0a86f66
Show file tree
Hide file tree
Showing 52 changed files with 562 additions and 798 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## [2021-01-14](https://github.com/mizrael/OpenSleigh/pull/16)
### Added
- Saga States can now be reconstructed from typed messages

## [2021-01-14](https://github.com/mizrael/OpenSleigh/pull/15)
### Added
- possibility to configure exchange and queue names for each message
Expand Down
14 changes: 12 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ Dependency injection can be used to reference services from Sagas.
At this point all you have to do is register and configure the Saga:
```
services.AddOpenSleigh(cfg =>{
cfg.AddSaga<MyAwesomeSaga, MyAwesomeSagaState>()
.UseStateFactory(msg => new MyAwesomeSagaState(msg.CorrelationId))
cfg.AddSaga<MyAwesomeSaga, MyAwesomeSagaState>()
.UseRabbitMQTransport(rabbitConfig);
});
```
Expand Down Expand Up @@ -115,6 +114,17 @@ Each message has to expose an `Id` property and a `CorrelationId`. Those are use
If a Saga is sending a message to itself (loopback), or spawning child Sagas, the `CorrelationId` has to be kept unchanged on all the messages.
Also, make sure the `Id` and the `CorrelationId` don't match!

We also have to specify the starting message for a Saga when registering it on our DI container, by calling the `UseStateFactory()` method:

```
services.AddOpenSleigh(cfg =>{
cfg.AddSaga<MyAwesomeSaga, MyAwesomeSagaState>()
.UseStateFactory<StartMyAwesomeSaga>(msg => new MyAwesomeSagaState(msg.CorrelationId))
.UseRabbitMQTransport(rabbitConfig);
});
```
This call will tell OpenSleigh how it can build the initial State for the current Saga when loading it for the first time.

#### Handling messages

In order to handle more message types, it is necessary to add and implement the [`IHandleMessage<>`](https://github.com/mizrael/OpenSleigh/blob/develop/src/OpenSleigh.Core/IHandleMessage.cs) interface:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static async Task Main(string[] args)
.UseInMemoryPersistence();
cfg.AddSaga<MySaga, MySagaState>()
.UseStateFactory(msg => new MySagaState(msg.CorrelationId))
.UseStateFactory<StartSaga>(msg => new MySagaState(msg.CorrelationId))
.UseInMemoryTransport();
});
});
Expand Down
6 changes: 3 additions & 3 deletions samples/Sample2/OpenSleigh.Samples.Sample2.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ static async Task Main(string[] args)
.UseMongoPersistence(mongoCfg);
cfg.AddSaga<SimpleSaga, SimpleSagaState>()
.UseStateFactory(msg => new SimpleSagaState(msg.CorrelationId))
.UseStateFactory<StartSimpleSaga>(msg => new SimpleSagaState(msg.CorrelationId))
.UseRabbitMQTransport();
cfg.AddSaga<ParentSaga, ParentSagaState>()
.UseStateFactory(msg => new ParentSagaState(msg.CorrelationId))
.UseStateFactory<StartParentSaga>(msg => new ParentSagaState(msg.CorrelationId))
.UseRabbitMQTransport();
cfg.AddSaga<ChildSaga, ChildSagaState>()
.UseStateFactory(msg => new ChildSagaState(msg.CorrelationId))
.UseStateFactory<StartChildSaga>(msg => new ChildSagaState(msg.CorrelationId))
.UseRabbitMQTransport();
});
});
Expand Down
29 changes: 16 additions & 13 deletions src/OpenSleigh.Core/DependencyInjection/BusConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ public BusConfigurator(IServiceCollection services, ISagaTypeResolver typeResolv
_typeResolver = typeResolver ?? throw new ArgumentNullException(nameof(typeResolver));
}

public ISagaConfigurator<TS, TD> AddSaga<TS, TD>()
where TS : Saga<TD> where TD : SagaState
public ISagaConfigurator<TS, TD> AddSaga<TS, TD>()
where TD : SagaState
where TS : Saga<TD>
{
bool hasMessages = false;

var sagaType = typeof(TS);
var sagaStateType = typeof(TD);

Services.AddScoped<TS>();


var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition();

var interfaces = sagaType.GetInterfaces();
Expand All @@ -54,16 +55,18 @@ public BusConfigurator(IServiceCollection services, ISagaTypeResolver typeResolv
_typeResolver.Register(messageType, (sagaType, sagaStateType));

Services.AddTransient(i, sagaType);
}

Services.AddSingleton(typeof(ISagaStateService<,>).MakeGenericType(sagaType, sagaStateType),
typeof(SagaStateService<,>).MakeGenericType(sagaType, sagaStateType));

Services.AddSingleton(typeof(ISagaRunner<,>).MakeGenericType(sagaType, sagaStateType),
typeof(SagaRunner<,>).MakeGenericType(sagaType, sagaStateType));
hasMessages = true;
}

Services.AddSingleton(typeof(ISagaFactory<,>).MakeGenericType(sagaType, sagaStateType),
typeof(DefaultSagaFactory<,>).MakeGenericType(sagaType, sagaStateType));
if (hasMessages)
{
Services.AddScoped<TS>();
Services.AddSingleton<ISagaStateService<TS, TD>, SagaStateService<TS, TD>>();
Services.AddSingleton<ISagaRunner<TS, TD>, SagaRunner<TS, TD>>();
Services.AddSingleton<ISagaFactory<TS, TD>, DefaultSagaFactory<TS, TD>>();
Services.AddSingleton<ISagaStateFactory<TD>, DefaultSagaStateFactory<TD>>();
}

return new SagaConfigurator<TS, TD>(Services);
}
Expand Down
13 changes: 9 additions & 4 deletions src/OpenSleigh.Core/DependencyInjection/ISagaConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public interface ISagaConfigurator<TS, in TD>
where TD : SagaState
{
IServiceCollection Services { get; }
ISagaConfigurator<TS, TD> UseStateFactory(Func<IMessage, TD> stateFactory);
ISagaConfigurator<TS, TD> UseStateFactory<TM>(Func<TM, TD> stateFactory)
where TM : IMessage;
}

[ExcludeFromCodeCoverage]
Expand All @@ -26,12 +27,16 @@ public SagaConfigurator(IServiceCollection services)

public IServiceCollection Services { get; }

public ISagaConfigurator<TS, TD> UseStateFactory(Func<IMessage, TD> stateFactory)
public ISagaConfigurator<TS, TD> UseStateFactory<TM>(Func<TM, TD> stateFactory)
where TM : IMessage
{
var messageType = typeof(TM);
var stateType = typeof(TD);
var factory = new LambdaSagaStateFactory<TD>(stateFactory);

var descriptor = ServiceDescriptor.Singleton(typeof(ISagaStateFactory<>).MakeGenericType(stateType), factory);
var factoryInterfaceType = typeof(ISagaStateFactory<,>).MakeGenericType(messageType, stateType);
var factory = new LambdaSagaStateFactory<TM, TD>(stateFactory);

var descriptor = ServiceDescriptor.Singleton(factoryInterfaceType, factory);
this.Services.Replace(descriptor);

return this;
Expand Down
10 changes: 0 additions & 10 deletions src/OpenSleigh.Core/DependencyInjection/ISagaStateFactory.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/OpenSleigh.Core/DependencyInjection/LambdaSagaStateFactory.cs

This file was deleted.

2 changes: 2 additions & 0 deletions src/OpenSleigh.Core/Exceptions/StateCreationException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace OpenSleigh.Core.Exceptions
[ExcludeFromCodeCoverage]
public class StateCreationException : Exception
{
public StateCreationException(string message) : base(message) { }

public StateCreationException(Type stateType, Guid correlationId)
: base($"unable to create State instance with type '{stateType.FullName}' for Saga '{correlationId}'")
{
Expand Down
47 changes: 47 additions & 0 deletions src/OpenSleigh.Core/ISagaStateFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using OpenSleigh.Core.Exceptions;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
public interface ISagaStateFactory<in TM, out TD> : ISagaStateFactory<TD>
where TM : IMessage
where TD : SagaState
{
TD Create(TM message);
}

public interface ISagaStateFactory<out TD> where TD : SagaState
{
TD Create(IMessage message);
}

internal class DefaultSagaStateFactory<TD> : ISagaStateFactory<TD>
where TD : SagaState
{
private readonly IServiceProvider _serviceProvider;

public DefaultSagaStateFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public TD Create(IMessage message)
{
if (message == null)
throw new ArgumentNullException(nameof(message));

var messageType = message.GetType();
var stateType = typeof(TD);

var factoryInterfaceType = typeof(ISagaStateFactory<,>).MakeGenericType(messageType, stateType);
var factory = _serviceProvider.GetService(factoryInterfaceType) as ISagaStateFactory<TD>;
if (null == factory)
throw new StateCreationException(
$"no state factory registered for message type '{message.GetType().FullName}'");

var state = factory.Create((dynamic) message);
return state;
}
}
}
26 changes: 26 additions & 0 deletions src/OpenSleigh.Core/LambdaSagaStateFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
internal class LambdaSagaStateFactory<TM, TD> : ISagaStateFactory<TM, TD>
where TM : IMessage
where TD : SagaState
{
private readonly Func<TM, TD> _factory;

public LambdaSagaStateFactory(Func<TM, TD> factory)
{
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
}

public TD Create(TM message) => _factory(message);

public TD Create(IMessage message)
{
if (message is TM m)
return _factory(m);
return null;
}
}
}
14 changes: 8 additions & 6 deletions src/OpenSleigh.Core/SagaStateService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Exceptions;
using OpenSleigh.Core.Persistence;
using System;
Expand All @@ -25,18 +24,21 @@ public SagaStateService(ISagaStateFactory<TD> sagaStateFactory, ISagaStateReposi
CancellationToken cancellationToken = default) where TM : IMessage
{
var correlationId = messageContext.Message.CorrelationId;

var defaultState = _sagaStateFactory.Create(messageContext.Message);

var result = await _sagaStateRepository.LockAsync(correlationId, defaultState, cancellationToken);

var isStartMessage = (typeof(IStartedBy<TM>).IsAssignableFrom(typeof(TS)));
TD initialState = null;
if (isStartMessage)
initialState = _sagaStateFactory.Create(messageContext.Message);

var result = await _sagaStateRepository.LockAsync(correlationId, initialState, cancellationToken);

if (null != result.state)
return result;

// if state is null, we're probably starting a new saga.
// We have to check if the current message can
// actually start the specified saga or not
if (!typeof(IStartedBy<TM>).IsAssignableFrom(typeof(TS)))
if (!isStartMessage)
throw new MessageException($"Saga '{correlationId}' cannot be started by message '{typeof(TM).FullName}'");

throw new StateCreationException(typeof(TD), correlationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using OpenSleigh.Core;
using OpenSleigh.Core.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using OpenSleigh.Core.Persistence;
using System;
using System.Collections.Concurrent;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,40 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Messaging
{
internal class InMemorySubscriber<TM> : ISubscriber
public class InMemorySubscriber<TM> : ISubscriber
where TM : IMessage
{
private readonly IMessageProcessor _messageProcessor;
private readonly ChannelReader<TM> _reader;

public InMemorySubscriber(IMessageProcessor messageProcessor, ChannelReader<TM> reader)
private readonly ILogger<InMemorySubscriber<TM>> _logger;

public InMemorySubscriber(IMessageProcessor messageProcessor,
ChannelReader<TM> reader,
ILogger<InMemorySubscriber<TM>> logger)
{
_messageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task StartAsync(CancellationToken cancellationToken = default)
{
await foreach (var message in _reader.ReadAllAsync(cancellationToken))
{
await _messageProcessor.ProcessAsync(message, cancellationToken);
try
{
await _messageProcessor.ProcessAsync(message, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, $"an exception has occurred while processing message '{message.Id}': {e.Message}");
}
}
}

Expand Down
1 change: 0 additions & 1 deletion src/OpenSleigh.Transport.RabbitMQ/QueueReferenceFactory.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.RabbitMQ
Expand Down
Loading

0 comments on commit 0a86f66

Please sign in to comment.