Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated initial State creation #16

Merged
merged 9 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## [2021-01-14](https://github.com/mizrael/OpenSleigh/pull/16)
### Added
- Saga States can now be reconstructed from typed messages

## [2021-01-14](https://github.com/mizrael/OpenSleigh/pull/15)
### Added
- possibility to configure exchange and queue names for each message
Expand Down
14 changes: 12 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ Dependency injection can be used to reference services from Sagas.
At this point all you have to do is register and configure the Saga:
```
services.AddOpenSleigh(cfg =>{
cfg.AddSaga<MyAwesomeSaga, MyAwesomeSagaState>()
.UseStateFactory(msg => new MyAwesomeSagaState(msg.CorrelationId))
cfg.AddSaga<MyAwesomeSaga, MyAwesomeSagaState>()
.UseRabbitMQTransport(rabbitConfig);
});
```
Expand Down Expand Up @@ -115,6 +114,17 @@ Each message has to expose an `Id` property and a `CorrelationId`. Those are use
If a Saga is sending a message to itself (loopback), or spawning child Sagas, the `CorrelationId` has to be kept unchanged on all the messages.
Also, make sure the `Id` and the `CorrelationId` don't match!

We also have to specify the starting message for a Saga when registering it on our DI container, by calling the `UseStateFactory()` method:

```
services.AddOpenSleigh(cfg =>{
cfg.AddSaga<MyAwesomeSaga, MyAwesomeSagaState>()
.UseStateFactory<StartMyAwesomeSaga>(msg => new MyAwesomeSagaState(msg.CorrelationId))
.UseRabbitMQTransport(rabbitConfig);
});
```
This call will tell OpenSleigh how it can build the initial State for the current Saga when loading it for the first time.

#### Handling messages

In order to handle more message types, it is necessary to add and implement the [`IHandleMessage<>`](https://github.com/mizrael/OpenSleigh/blob/develop/src/OpenSleigh.Core/IHandleMessage.cs) interface:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static async Task Main(string[] args)
.UseInMemoryPersistence();

cfg.AddSaga<MySaga, MySagaState>()
.UseStateFactory(msg => new MySagaState(msg.CorrelationId))
.UseStateFactory<StartSaga>(msg => new MySagaState(msg.CorrelationId))
.UseInMemoryTransport();
});
});
Expand Down
6 changes: 3 additions & 3 deletions samples/Sample2/OpenSleigh.Samples.Sample2.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ static async Task Main(string[] args)
.UseMongoPersistence(mongoCfg);

cfg.AddSaga<SimpleSaga, SimpleSagaState>()
.UseStateFactory(msg => new SimpleSagaState(msg.CorrelationId))
.UseStateFactory<StartSimpleSaga>(msg => new SimpleSagaState(msg.CorrelationId))
.UseRabbitMQTransport();

cfg.AddSaga<ParentSaga, ParentSagaState>()
.UseStateFactory(msg => new ParentSagaState(msg.CorrelationId))
.UseStateFactory<StartParentSaga>(msg => new ParentSagaState(msg.CorrelationId))
.UseRabbitMQTransport();

cfg.AddSaga<ChildSaga, ChildSagaState>()
.UseStateFactory(msg => new ChildSagaState(msg.CorrelationId))
.UseStateFactory<StartChildSaga>(msg => new ChildSagaState(msg.CorrelationId))
.UseRabbitMQTransport();
});
});
Expand Down
29 changes: 16 additions & 13 deletions src/OpenSleigh.Core/DependencyInjection/BusConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ public BusConfigurator(IServiceCollection services, ISagaTypeResolver typeResolv
_typeResolver = typeResolver ?? throw new ArgumentNullException(nameof(typeResolver));
}

public ISagaConfigurator<TS, TD> AddSaga<TS, TD>()
where TS : Saga<TD> where TD : SagaState
public ISagaConfigurator<TS, TD> AddSaga<TS, TD>()
where TD : SagaState
where TS : Saga<TD>
{
bool hasMessages = false;

var sagaType = typeof(TS);
var sagaStateType = typeof(TD);

Services.AddScoped<TS>();


var messageHandlerType = typeof(IHandleMessage<>).GetGenericTypeDefinition();

var interfaces = sagaType.GetInterfaces();
Expand All @@ -54,16 +55,18 @@ public BusConfigurator(IServiceCollection services, ISagaTypeResolver typeResolv
_typeResolver.Register(messageType, (sagaType, sagaStateType));

Services.AddTransient(i, sagaType);
}

Services.AddSingleton(typeof(ISagaStateService<,>).MakeGenericType(sagaType, sagaStateType),
typeof(SagaStateService<,>).MakeGenericType(sagaType, sagaStateType));

Services.AddSingleton(typeof(ISagaRunner<,>).MakeGenericType(sagaType, sagaStateType),
typeof(SagaRunner<,>).MakeGenericType(sagaType, sagaStateType));
hasMessages = true;
}

Services.AddSingleton(typeof(ISagaFactory<,>).MakeGenericType(sagaType, sagaStateType),
typeof(DefaultSagaFactory<,>).MakeGenericType(sagaType, sagaStateType));
if (hasMessages)
{
Services.AddScoped<TS>();
Services.AddSingleton<ISagaStateService<TS, TD>, SagaStateService<TS, TD>>();
Services.AddSingleton<ISagaRunner<TS, TD>, SagaRunner<TS, TD>>();
Services.AddSingleton<ISagaFactory<TS, TD>, DefaultSagaFactory<TS, TD>>();
Services.AddSingleton<ISagaStateFactory<TD>, DefaultSagaStateFactory<TD>>();
}

return new SagaConfigurator<TS, TD>(Services);
}
Expand Down
13 changes: 9 additions & 4 deletions src/OpenSleigh.Core/DependencyInjection/ISagaConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public interface ISagaConfigurator<TS, in TD>
where TD : SagaState
{
IServiceCollection Services { get; }
ISagaConfigurator<TS, TD> UseStateFactory(Func<IMessage, TD> stateFactory);
ISagaConfigurator<TS, TD> UseStateFactory<TM>(Func<TM, TD> stateFactory)
where TM : IMessage;
}

[ExcludeFromCodeCoverage]
Expand All @@ -26,12 +27,16 @@ public SagaConfigurator(IServiceCollection services)

public IServiceCollection Services { get; }

public ISagaConfigurator<TS, TD> UseStateFactory(Func<IMessage, TD> stateFactory)
public ISagaConfigurator<TS, TD> UseStateFactory<TM>(Func<TM, TD> stateFactory)
where TM : IMessage
{
var messageType = typeof(TM);
var stateType = typeof(TD);
var factory = new LambdaSagaStateFactory<TD>(stateFactory);

var descriptor = ServiceDescriptor.Singleton(typeof(ISagaStateFactory<>).MakeGenericType(stateType), factory);
var factoryInterfaceType = typeof(ISagaStateFactory<,>).MakeGenericType(messageType, stateType);
var factory = new LambdaSagaStateFactory<TM, TD>(stateFactory);

var descriptor = ServiceDescriptor.Singleton(factoryInterfaceType, factory);
this.Services.Replace(descriptor);

return this;
Expand Down
10 changes: 0 additions & 10 deletions src/OpenSleigh.Core/DependencyInjection/ISagaStateFactory.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/OpenSleigh.Core/DependencyInjection/LambdaSagaStateFactory.cs

This file was deleted.

2 changes: 2 additions & 0 deletions src/OpenSleigh.Core/Exceptions/StateCreationException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace OpenSleigh.Core.Exceptions
[ExcludeFromCodeCoverage]
public class StateCreationException : Exception
{
public StateCreationException(string message) : base(message) { }

public StateCreationException(Type stateType, Guid correlationId)
: base($"unable to create State instance with type '{stateType.FullName}' for Saga '{correlationId}'")
{
Expand Down
47 changes: 47 additions & 0 deletions src/OpenSleigh.Core/ISagaStateFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using OpenSleigh.Core.Exceptions;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
public interface ISagaStateFactory<in TM, out TD> : ISagaStateFactory<TD>
where TM : IMessage
where TD : SagaState
{
TD Create(TM message);
}

public interface ISagaStateFactory<out TD> where TD : SagaState
{
TD Create(IMessage message);
}

internal class DefaultSagaStateFactory<TD> : ISagaStateFactory<TD>
where TD : SagaState
{
private readonly IServiceProvider _serviceProvider;

public DefaultSagaStateFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public TD Create(IMessage message)
{
if (message == null)
throw new ArgumentNullException(nameof(message));

var messageType = message.GetType();
var stateType = typeof(TD);

var factoryInterfaceType = typeof(ISagaStateFactory<,>).MakeGenericType(messageType, stateType);
var factory = _serviceProvider.GetService(factoryInterfaceType) as ISagaStateFactory<TD>;
if (null == factory)
throw new StateCreationException(
$"no state factory registered for message type '{message.GetType().FullName}'");

var state = factory.Create((dynamic) message);
return state;
}
}
}
26 changes: 26 additions & 0 deletions src/OpenSleigh.Core/LambdaSagaStateFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
internal class LambdaSagaStateFactory<TM, TD> : ISagaStateFactory<TM, TD>
where TM : IMessage
where TD : SagaState
{
private readonly Func<TM, TD> _factory;

public LambdaSagaStateFactory(Func<TM, TD> factory)
{
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
}

public TD Create(TM message) => _factory(message);

public TD Create(IMessage message)
{
if (message is TM m)
return _factory(m);
return null;
}
}
}
14 changes: 8 additions & 6 deletions src/OpenSleigh.Core/SagaStateService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Exceptions;
using OpenSleigh.Core.Persistence;
using System;
Expand All @@ -25,18 +24,21 @@ public SagaStateService(ISagaStateFactory<TD> sagaStateFactory, ISagaStateReposi
CancellationToken cancellationToken = default) where TM : IMessage
{
var correlationId = messageContext.Message.CorrelationId;

var defaultState = _sagaStateFactory.Create(messageContext.Message);

var result = await _sagaStateRepository.LockAsync(correlationId, defaultState, cancellationToken);

var isStartMessage = (typeof(IStartedBy<TM>).IsAssignableFrom(typeof(TS)));
TD initialState = null;
if (isStartMessage)
initialState = _sagaStateFactory.Create(messageContext.Message);

var result = await _sagaStateRepository.LockAsync(correlationId, initialState, cancellationToken);

if (null != result.state)
return result;

// if state is null, we're probably starting a new saga.
// We have to check if the current message can
// actually start the specified saga or not
if (!typeof(IStartedBy<TM>).IsAssignableFrom(typeof(TS)))
if (!isStartMessage)
throw new MessageException($"Saga '{correlationId}' cannot be started by message '{typeof(TM).FullName}'");

throw new StateCreationException(typeof(TD), correlationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using OpenSleigh.Core;
using OpenSleigh.Core.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using OpenSleigh.Core.Persistence;
using System;
using System.Collections.Concurrent;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,40 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Messaging
{
internal class InMemorySubscriber<TM> : ISubscriber
public class InMemorySubscriber<TM> : ISubscriber
where TM : IMessage
{
private readonly IMessageProcessor _messageProcessor;
private readonly ChannelReader<TM> _reader;

public InMemorySubscriber(IMessageProcessor messageProcessor, ChannelReader<TM> reader)
private readonly ILogger<InMemorySubscriber<TM>> _logger;

public InMemorySubscriber(IMessageProcessor messageProcessor,
ChannelReader<TM> reader,
ILogger<InMemorySubscriber<TM>> logger)
{
_messageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task StartAsync(CancellationToken cancellationToken = default)
{
await foreach (var message in _reader.ReadAllAsync(cancellationToken))
{
await _messageProcessor.ProcessAsync(message, cancellationToken);
try
{
await _messageProcessor.ProcessAsync(message, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, $"an exception has occurred while processing message '{message.Id}': {e.Message}");
}
}
}

Expand Down
1 change: 0 additions & 1 deletion src/OpenSleigh.Transport.RabbitMQ/QueueReferenceFactory.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.RabbitMQ
Expand Down
Loading