Skip to content

Commit

Permalink
Merge pull request #27 from mizrael/compensation
Browse files Browse the repository at this point in the history
added support for compensating transactions
  • Loading branch information
mizrael committed Feb 25, 2021
2 parents bb2d323 + ca49a3e commit d21e86d
Show file tree
Hide file tree
Showing 13 changed files with 410 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file.

## [2021-02-24](https://github.com/mizrael/OpenSleigh/pull/27)
### Added
- added support for compensating transactions
- added new sample

## [2021-02-11](https://github.com/mizrael/OpenSleigh/pull/26)
### Added
- added automatic infrastructure provisioning
Expand Down
25 changes: 25 additions & 0 deletions samples/Sample7/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\OpenSleigh.Core\OpenSleigh.Core.csproj" />
<ProjectReference Include="..\..\..\src\OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
56 changes: 56 additions & 0 deletions samples/Sample7/OpenSleigh.Samples.Sample7.Console/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Persistence.InMemory;
using OpenSleigh.Samples.Sample7.Console.Sagas;

namespace OpenSleigh.Samples.Sample7.Console
{
class Program
{
static async Task Main(string[] args)
{
var hostBuilder = CreateHostBuilder(args);
var host = hostBuilder.Build();

using var scope = host.Services.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var messages = new[]
{
new StartSaga(Guid.NewGuid(), Guid.NewGuid()),
new StartSaga(Guid.NewGuid(), Guid.NewGuid(), true)
};

var tasks = messages
.Select(m => bus.PublishAsync(m))
.Union(new[] {host.RunAsync()})
.ToArray();

await Task.WhenAll(tasks);
}

static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddLogging(cfg =>
{
cfg.AddConsole();
})
.AddOpenSleigh(cfg =>
{
cfg.UseInMemoryTransport()
.UseInMemoryPersistence();
cfg.AddSaga<MySaga, MySagaState>()
.UseStateFactory<StartSaga>(msg => new MySagaState(msg.CorrelationId))
.UseInMemoryTransport();
});
});
}
}
75 changes: 75 additions & 0 deletions samples/Sample7/OpenSleigh.Samples.Sample7.Console/Sagas/MySaga.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Samples.Sample7.Console.Sagas
{
public class MySagaState : SagaState{
public MySagaState(Guid id) : base(id){}

public enum Steps
{
Processing,
Successful,
Failed
};
public Steps CurrentStep { get; set; } = Steps.Processing;
}

public record StartSaga(Guid Id, Guid CorrelationId, bool WillFail = false) : ICommand { }

public record MySagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }

public class MySaga :
Saga<MySagaState>,
IStartedBy<StartSaga>,
ICompensateMessage<StartSaga>,
IHandleMessage<MySagaCompleted>
{
private readonly ILogger<MySaga> _logger;

public MySaga(ILogger<MySaga> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task HandleAsync(IMessageContext<StartSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting saga '{context.Message.CorrelationId}'...");

if (context.Message.WillFail)
throw new ApplicationException("something, somewhere, went terribly, terribly wrong.");

this.State.CurrentStep = MySagaState.Steps.Successful;

var message = new MySagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task CompensateAsync(ICompensationContext<StartSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogWarning($"saga '{context.MessageContext.Message.CorrelationId}' failed! Reason: {context.Exception.Message}");

this.State.CurrentStep = MySagaState.Steps.Failed;

var message = new MySagaCompleted(Guid.NewGuid(), context.MessageContext.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public Task HandleAsync(IMessageContext<MySagaCompleted> context, CancellationToken cancellationToken = default)
{
this.State.MarkAsCompleted();

var isFailed = this.State.CurrentStep == MySagaState.Steps.Failed;
if(isFailed)
_logger.LogWarning($"saga '{context.Message.CorrelationId}' failed!");
else
_logger.LogInformation($"saga '{context.Message.CorrelationId}' completed!");

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
43 changes: 43 additions & 0 deletions samples/Sample7/Sample7.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30804.86
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core", "..\..\src\OpenSleigh.Core\OpenSleigh.Core.csproj", "{82774D46-07E9-4A39-A987-D49CD1412DCE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory", "..\..\src\OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj", "{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Samples.Sample1.Console", "OpenSleigh.Samples.Sample7.Console\OpenSleigh.Samples.Sample7.Console.csproj", "{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "library", "library", "{775A8A17-E723-42D9-BEA8-4A36A8FEEC84}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Release|Any CPU.Build.0 = Release|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{82774D46-07E9-4A39-A987-D49CD1412DCE} = {775A8A17-E723-42D9-BEA8-4A36A8FEEC84}
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9} = {775A8A17-E723-42D9-BEA8-4A36A8FEEC84}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5}
EndGlobalSection
EndGlobal
11 changes: 11 additions & 0 deletions src/OpenSleigh.Core/ICompensateMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
public interface ICompensateMessage<TM> where TM : IMessage
{
Task CompensateAsync(ICompensationContext<TM> context, CancellationToken cancellationToken = default);
}
}
21 changes: 21 additions & 0 deletions src/OpenSleigh.Core/ICompensationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Diagnostics.CodeAnalysis;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
public interface ICompensationContext<TM>
where TM : IMessage
{
IMessageContext<TM> MessageContext { get; }
Exception Exception { get; }
}

[ExcludeFromCodeCoverage] // only if doesn't get more complex than this
internal record DefaultCompensationContext<TM>(IMessageContext<TM> MessageContext, Exception Exception) : ICompensationContext<TM>
where TM : IMessage
{
public static ICompensationContext<TM> Build(IMessageContext<TM> messageContext, Exception exception)
=> new DefaultCompensationContext<TM>(messageContext, exception);
}
}
35 changes: 34 additions & 1 deletion src/OpenSleigh.Core/SagaRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,42 @@ public async Task RunAsync<TM>(IMessageContext<TM> messageContext, CancellationT

await transaction.CommitAsync(cancellationToken);
}
catch
catch(Exception ex)
{
await transaction.RollbackAsync(cancellationToken);

if (saga is not ICompensateMessage<TM> compensatingHandler)
throw;

await ExecuteCompensationAsync(compensatingHandler, messageContext, ex, state, lockId, cancellationToken);
}
}

private async Task ExecuteCompensationAsync<TM>(ICompensateMessage<TM> compensatingHandler,
IMessageContext<TM> messageContext,
Exception exception,
TD state,
Guid lockId,
CancellationToken cancellationToken) where TM : IMessage
{
_logger.LogWarning(exception, $"something went wrong when processing saga '{messageContext.Message.CorrelationId}' : {exception.Message}. executing compensation ...");

var compensatingTransaction = await _transactionManager.StartTransactionAsync(cancellationToken);

var compensationContext = DefaultCompensationContext<TM>.Build(messageContext, exception);

try
{
await compensatingHandler.CompensateAsync(compensationContext, cancellationToken);

state.SetAsProcessed(messageContext.Message);
await _sagaStateService.SaveAsync(state, lockId, cancellationToken);

await compensatingTransaction.CommitAsync(cancellationToken);
}
catch
{
await compensatingTransaction.RollbackAsync(cancellationToken);
throw;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/OpenSleigh.Core/SagaState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace OpenSleigh.Core
//TODO: get rid of Newtonsoft.JSON dependency
public abstract class SagaState
{
[JsonProperty]
[JsonProperty] //TODO: can we use an HashSet here ?
private readonly Dictionary<Guid, IMessage> _processedMessages = new();

[JsonProperty] private bool _isComplete;
Expand All @@ -28,7 +28,7 @@ protected SagaState(Guid id)
if (this.Id != message.CorrelationId)
throw new ArgumentException($"invalid message correlation id", nameof(message));

_processedMessages.Add(message.Id, message);
_processedMessages[message.Id] = message;
}

public bool CheckWasProcessed<TM>(TM message) where TM : IMessage
Expand Down

0 comments on commit d21e86d

Please sign in to comment.