diff --git a/CHANGELOG.md b/CHANGELOG.md
index 738d6646..5b5315d9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.
+## [2021-02-24](https://github.com/mizrael/OpenSleigh/pull/27)
+### Added
+- added support for compensating transactions
+- added new sample
+
## [2021-02-11](https://github.com/mizrael/OpenSleigh/pull/26)
### Added
- added automatic infrastructure provisioning
diff --git a/samples/Sample7/.dockerignore b/samples/Sample7/.dockerignore
new file mode 100644
index 00000000..3729ff0c
--- /dev/null
+++ b/samples/Sample7/.dockerignore
@@ -0,0 +1,25 @@
+**/.classpath
+**/.dockerignore
+**/.env
+**/.git
+**/.gitignore
+**/.project
+**/.settings
+**/.toolstarget
+**/.vs
+**/.vscode
+**/*.*proj.user
+**/*.dbmdl
+**/*.jfm
+**/azds.yaml
+**/bin
+**/charts
+**/docker-compose*
+**/Dockerfile*
+**/node_modules
+**/npm-debug.log
+**/obj
+**/secrets.dev.yaml
+**/values.dev.yaml
+LICENSE
+README.md
\ No newline at end of file
diff --git a/samples/Sample7/OpenSleigh.Samples.Sample7.Console/OpenSleigh.Samples.Sample7.Console.csproj b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/OpenSleigh.Samples.Sample7.Console.csproj
new file mode 100644
index 00000000..ce1e1268
--- /dev/null
+++ b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/OpenSleigh.Samples.Sample7.Console.csproj
@@ -0,0 +1,27 @@
+
+
+
+ Exe
+ net5.0
+ Linux
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
+
diff --git a/samples/Sample7/OpenSleigh.Samples.Sample7.Console/Program.cs b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/Program.cs
new file mode 100644
index 00000000..4292bd14
--- /dev/null
+++ b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/Program.cs
@@ -0,0 +1,56 @@
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using OpenSleigh.Core.DependencyInjection;
+using OpenSleigh.Core.Messaging;
+using OpenSleigh.Persistence.InMemory;
+using OpenSleigh.Samples.Sample7.Console.Sagas;
+
+namespace OpenSleigh.Samples.Sample7.Console
+{
+ class Program
+ {
+ static async Task Main(string[] args)
+ {
+ var hostBuilder = CreateHostBuilder(args);
+ var host = hostBuilder.Build();
+
+ using var scope = host.Services.CreateScope();
+ var bus = scope.ServiceProvider.GetRequiredService();
+ var messages = new[]
+ {
+ new StartSaga(Guid.NewGuid(), Guid.NewGuid()),
+ new StartSaga(Guid.NewGuid(), Guid.NewGuid(), true)
+ };
+
+ var tasks = messages
+ .Select(m => bus.PublishAsync(m))
+ .Union(new[] {host.RunAsync()})
+ .ToArray();
+
+ await Task.WhenAll(tasks);
+ }
+
+ static IHostBuilder CreateHostBuilder(string[] args) =>
+ Host.CreateDefaultBuilder(args)
+ .ConfigureServices((hostContext, services) =>
+ {
+ services.AddLogging(cfg =>
+ {
+ cfg.AddConsole();
+ })
+ .AddOpenSleigh(cfg =>
+ {
+ cfg.UseInMemoryTransport()
+ .UseInMemoryPersistence();
+
+ cfg.AddSaga()
+ .UseStateFactory(msg => new MySagaState(msg.CorrelationId))
+ .UseInMemoryTransport();
+ });
+ });
+ }
+}
diff --git a/samples/Sample7/OpenSleigh.Samples.Sample7.Console/Sagas/MySaga.cs b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/Sagas/MySaga.cs
new file mode 100644
index 00000000..5d0bebca
--- /dev/null
+++ b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/Sagas/MySaga.cs
@@ -0,0 +1,75 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using OpenSleigh.Core;
+using OpenSleigh.Core.Messaging;
+
+namespace OpenSleigh.Samples.Sample7.Console.Sagas
+{
+ public class MySagaState : SagaState{
+ public MySagaState(Guid id) : base(id){}
+
+ public enum Steps
+ {
+ Processing,
+ Successful,
+ Failed
+ };
+ public Steps CurrentStep { get; set; } = Steps.Processing;
+ }
+
+ public record StartSaga(Guid Id, Guid CorrelationId, bool WillFail = false) : ICommand { }
+
+ public record MySagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }
+
+ public class MySaga :
+ Saga,
+ IStartedBy,
+ ICompensateMessage,
+ IHandleMessage
+ {
+ private readonly ILogger _logger;
+
+ public MySaga(ILogger logger)
+ {
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public async Task HandleAsync(IMessageContext context, CancellationToken cancellationToken = default)
+ {
+ _logger.LogInformation($"starting saga '{context.Message.CorrelationId}'...");
+
+ if (context.Message.WillFail)
+ throw new ApplicationException("something, somewhere, went terribly, terribly wrong.");
+
+ this.State.CurrentStep = MySagaState.Steps.Successful;
+
+ var message = new MySagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
+ await this.Bus.PublishAsync(message, cancellationToken);
+ }
+
+ public async Task CompensateAsync(ICompensationContext context, CancellationToken cancellationToken = default)
+ {
+ _logger.LogWarning($"saga '{context.MessageContext.Message.CorrelationId}' failed! Reason: {context.Exception.Message}");
+
+ this.State.CurrentStep = MySagaState.Steps.Failed;
+
+ var message = new MySagaCompleted(Guid.NewGuid(), context.MessageContext.Message.CorrelationId);
+ await this.Bus.PublishAsync(message, cancellationToken);
+ }
+
+ public Task HandleAsync(IMessageContext context, CancellationToken cancellationToken = default)
+ {
+ this.State.MarkAsCompleted();
+
+ var isFailed = this.State.CurrentStep == MySagaState.Steps.Failed;
+ if(isFailed)
+ _logger.LogWarning($"saga '{context.Message.CorrelationId}' failed!");
+ else
+ _logger.LogInformation($"saga '{context.Message.CorrelationId}' completed!");
+
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/samples/Sample7/OpenSleigh.Samples.Sample7.Console/appSettings.json b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/appSettings.json
new file mode 100644
index 00000000..4f30a00f
--- /dev/null
+++ b/samples/Sample7/OpenSleigh.Samples.Sample7.Console/appSettings.json
@@ -0,0 +1,9 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Debug",
+ "Microsoft": "Warning",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/samples/Sample7/Sample7.sln b/samples/Sample7/Sample7.sln
new file mode 100644
index 00000000..58c35ddf
--- /dev/null
+++ b/samples/Sample7/Sample7.sln
@@ -0,0 +1,43 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.30804.86
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core", "..\..\src\OpenSleigh.Core\OpenSleigh.Core.csproj", "{82774D46-07E9-4A39-A987-D49CD1412DCE}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory", "..\..\src\OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj", "{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Samples.Sample1.Console", "OpenSleigh.Samples.Sample7.Console\OpenSleigh.Samples.Sample7.Console.csproj", "{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "library", "library", "{775A8A17-E723-42D9-BEA8-4A36A8FEEC84}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {82774D46-07E9-4A39-A987-D49CD1412DCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {82774D46-07E9-4A39-A987-D49CD1412DCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {82774D46-07E9-4A39-A987-D49CD1412DCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {82774D46-07E9-4A39-A987-D49CD1412DCE}.Release|Any CPU.Build.0 = Release|Any CPU
+ {1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {82774D46-07E9-4A39-A987-D49CD1412DCE} = {775A8A17-E723-42D9-BEA8-4A36A8FEEC84}
+ {1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9} = {775A8A17-E723-42D9-BEA8-4A36A8FEEC84}
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5}
+ EndGlobalSection
+EndGlobal
diff --git a/src/OpenSleigh.Core/ICompensateMessage.cs b/src/OpenSleigh.Core/ICompensateMessage.cs
new file mode 100644
index 00000000..2bbebb5d
--- /dev/null
+++ b/src/OpenSleigh.Core/ICompensateMessage.cs
@@ -0,0 +1,11 @@
+using System.Threading;
+using System.Threading.Tasks;
+using OpenSleigh.Core.Messaging;
+
+namespace OpenSleigh.Core
+{
+ public interface ICompensateMessage where TM : IMessage
+ {
+ Task CompensateAsync(ICompensationContext context, CancellationToken cancellationToken = default);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenSleigh.Core/ICompensationContext.cs b/src/OpenSleigh.Core/ICompensationContext.cs
new file mode 100644
index 00000000..b6b5dfd9
--- /dev/null
+++ b/src/OpenSleigh.Core/ICompensationContext.cs
@@ -0,0 +1,21 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+using OpenSleigh.Core.Messaging;
+
+namespace OpenSleigh.Core
+{
+ public interface ICompensationContext
+ where TM : IMessage
+ {
+ IMessageContext MessageContext { get; }
+ Exception Exception { get; }
+ }
+
+ [ExcludeFromCodeCoverage] // only if doesn't get more complex than this
+ internal record DefaultCompensationContext(IMessageContext MessageContext, Exception Exception) : ICompensationContext
+ where TM : IMessage
+ {
+ public static ICompensationContext Build(IMessageContext messageContext, Exception exception)
+ => new DefaultCompensationContext(messageContext, exception);
+ }
+}
\ No newline at end of file
diff --git a/src/OpenSleigh.Core/SagaRunner.cs b/src/OpenSleigh.Core/SagaRunner.cs
index d2eb815f..70ca574f 100644
--- a/src/OpenSleigh.Core/SagaRunner.cs
+++ b/src/OpenSleigh.Core/SagaRunner.cs
@@ -71,9 +71,42 @@ public async Task RunAsync(IMessageContext messageContext, CancellationT
await transaction.CommitAsync(cancellationToken);
}
- catch
+ catch(Exception ex)
{
await transaction.RollbackAsync(cancellationToken);
+
+ if (saga is not ICompensateMessage compensatingHandler)
+ throw;
+
+ await ExecuteCompensationAsync(compensatingHandler, messageContext, ex, state, lockId, cancellationToken);
+ }
+ }
+
+ private async Task ExecuteCompensationAsync(ICompensateMessage compensatingHandler,
+ IMessageContext messageContext,
+ Exception exception,
+ TD state,
+ Guid lockId,
+ CancellationToken cancellationToken) where TM : IMessage
+ {
+ _logger.LogWarning(exception, $"something went wrong when processing saga '{messageContext.Message.CorrelationId}' : {exception.Message}. executing compensation ...");
+
+ var compensatingTransaction = await _transactionManager.StartTransactionAsync(cancellationToken);
+
+ var compensationContext = DefaultCompensationContext.Build(messageContext, exception);
+
+ try
+ {
+ await compensatingHandler.CompensateAsync(compensationContext, cancellationToken);
+
+ state.SetAsProcessed(messageContext.Message);
+ await _sagaStateService.SaveAsync(state, lockId, cancellationToken);
+
+ await compensatingTransaction.CommitAsync(cancellationToken);
+ }
+ catch
+ {
+ await compensatingTransaction.RollbackAsync(cancellationToken);
throw;
}
}
diff --git a/src/OpenSleigh.Core/SagaState.cs b/src/OpenSleigh.Core/SagaState.cs
index 15b278c3..d7889f21 100644
--- a/src/OpenSleigh.Core/SagaState.cs
+++ b/src/OpenSleigh.Core/SagaState.cs
@@ -8,7 +8,7 @@ namespace OpenSleigh.Core
//TODO: get rid of Newtonsoft.JSON dependency
public abstract class SagaState
{
- [JsonProperty]
+ [JsonProperty] //TODO: can we use an HashSet here ?
private readonly Dictionary _processedMessages = new();
[JsonProperty] private bool _isComplete;
@@ -28,7 +28,7 @@ public void SetAsProcessed(TM message) where TM : IMessage
if (this.Id != message.CorrelationId)
throw new ArgumentException($"invalid message correlation id", nameof(message));
- _processedMessages.Add(message.Id, message);
+ _processedMessages[message.Id] = message;
}
public bool CheckWasProcessed(TM message) where TM : IMessage
diff --git a/tests/OpenSleigh.Core.Tests/Sagas/CompensatingSaga.cs b/tests/OpenSleigh.Core.Tests/Sagas/CompensatingSaga.cs
new file mode 100644
index 00000000..3b1db1c8
--- /dev/null
+++ b/tests/OpenSleigh.Core.Tests/Sagas/CompensatingSaga.cs
@@ -0,0 +1,49 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using OpenSleigh.Core.Messaging;
+
+namespace OpenSleigh.Core.Tests.Sagas
+{
+ public class CompensatingSagaState : SagaState
+ {
+ public CompensatingSagaState(Guid id) : base(id)
+ {
+ }
+ }
+
+ public record StartCompensatingSaga(Guid Id, Guid CorrelationId) : ICommand
+ {
+ public int Foo { get; init; }
+ public string Bar { get; init; }
+ public Guid Baz => this.CorrelationId;
+ }
+
+ public class CompensatingSaga :
+ Saga
+ , IStartedBy
+ , ICompensateMessage
+ {
+ private readonly Action> _onStart;
+ private readonly Action> _onCompensate;
+
+ public CompensatingSaga(Action> onStart,
+ Action> onCompensate)
+ {
+ _onStart = onStart;
+ _onCompensate = onCompensate;
+ }
+
+ public Task HandleAsync(IMessageContext context, CancellationToken cancellationToken = default)
+ {
+ _onStart?.Invoke(context);
+ return Task.CompletedTask;
+ }
+
+ public Task CompensateAsync(ICompensationContext context, CancellationToken cancellationToken = default)
+ {
+ _onCompensate?.Invoke(context);
+ return Task.CompletedTask;
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/OpenSleigh.Core.Tests/Unit/SagaRunnerTests.cs b/tests/OpenSleigh.Core.Tests/Unit/SagaRunnerTests.cs
index a34448b1..87e2b060 100644
--- a/tests/OpenSleigh.Core.Tests/Unit/SagaRunnerTests.cs
+++ b/tests/OpenSleigh.Core.Tests/Unit/SagaRunnerTests.cs
@@ -327,7 +327,7 @@ await transaction.Received(1)
}
[Fact]
- public async Task RunAsync_should_rollback_transaction_if_exception_occurs()
+ public async Task RunAsync_should_rollback_transaction_if_exception_occurs_and_rethrow()
{
var message = new StartDummySaga(Guid.NewGuid(), Guid.NewGuid());
@@ -369,6 +369,58 @@ public async Task RunAsync_should_rollback_transaction_if_exception_occurs()
await transaction.Received(1)
.RollbackAsync(default);
}
+
+ [Fact]
+ public async Task RunAsync_should_rollback_transaction_when_exception_occurs_and_execute_compensation_if_available()
+ {
+ var message = new StartCompensatingSaga(Guid.NewGuid(), Guid.NewGuid());
+
+ var messageContext = NSubstitute.Substitute.For>();
+ messageContext.Message.Returns(message);
+
+ var sagaStateService = NSubstitute.Substitute.For>();
+
+ var state = new CompensatingSagaState(message.CorrelationId);
+ sagaStateService.GetAsync(messageContext, Arg.Any())
+ .Returns((state, Guid.NewGuid()));
+
+ var expectedException = new ApplicationException("whoops");
+ Action> onStart = (ctx) => throw expectedException;
+
+ var called = false;
+ Action> onCompensate = (ctx) =>
+ {
+ ctx.Should().NotBeNull();
+ ctx.Exception.Should().Be(expectedException);
+ called = true;
+ };
+
+ var saga = new CompensatingSaga(onStart, onCompensate);
+ saga.SetBus(NSubstitute.Substitute.For());
+
+ var sagaFactory = NSubstitute.Substitute.For>();
+ sagaFactory.Create(state)
+ .Returns(saga);
+
+ var logger = NSubstitute.Substitute.For>>();
+
+ var transaction = NSubstitute.Substitute.For();
+ var transactionManager = NSubstitute.Substitute.For();
+ transactionManager.StartTransactionAsync(default)
+ .ReturnsForAnyArgs(transaction);
+
+ var policyFactory = NSubstitute.Substitute.For>();
+ policyFactory.Create().ReturnsNullForAnyArgs();
+
+ var sut = new SagaRunner(sagaFactory, sagaStateService, transactionManager, policyFactory, logger);
+
+ await sut.RunAsync(messageContext);
+
+ await transaction.Received(1)
+ .RollbackAsync(default);
+
+ called.Should().BeTrue();
+ }
}
internal class FakePolicy : IPolicy