Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for compensating transactions #27

Merged
merged 7 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file.

## [2021-02-24](https://github.com/mizrael/OpenSleigh/pull/27)
### Added
- added support for compensating transactions
- added new sample

## [2021-02-11](https://github.com/mizrael/OpenSleigh/pull/26)
### Added
- added automatic infrastructure provisioning
Expand Down
25 changes: 25 additions & 0 deletions samples/Sample7/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\OpenSleigh.Core\OpenSleigh.Core.csproj" />
<ProjectReference Include="..\..\..\src\OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
56 changes: 56 additions & 0 deletions samples/Sample7/OpenSleigh.Samples.Sample7.Console/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Persistence.InMemory;
using OpenSleigh.Samples.Sample7.Console.Sagas;

namespace OpenSleigh.Samples.Sample7.Console
{
class Program
{
static async Task Main(string[] args)
{
var hostBuilder = CreateHostBuilder(args);
var host = hostBuilder.Build();

using var scope = host.Services.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var messages = new[]
{
new StartSaga(Guid.NewGuid(), Guid.NewGuid()),
new StartSaga(Guid.NewGuid(), Guid.NewGuid(), true)
};

var tasks = messages
.Select(m => bus.PublishAsync(m))
.Union(new[] {host.RunAsync()})
.ToArray();

await Task.WhenAll(tasks);
}

static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddLogging(cfg =>
{
cfg.AddConsole();
})
.AddOpenSleigh(cfg =>
{
cfg.UseInMemoryTransport()
.UseInMemoryPersistence();
cfg.AddSaga<MySaga, MySagaState>()
.UseStateFactory<StartSaga>(msg => new MySagaState(msg.CorrelationId))
.UseInMemoryTransport();
});
});
}
}
75 changes: 75 additions & 0 deletions samples/Sample7/OpenSleigh.Samples.Sample7.Console/Sagas/MySaga.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Samples.Sample7.Console.Sagas
{
public class MySagaState : SagaState{
public MySagaState(Guid id) : base(id){}

public enum Steps
{
Processing,
Successful,
Failed
};
public Steps CurrentStep { get; set; } = Steps.Processing;
}

public record StartSaga(Guid Id, Guid CorrelationId, bool WillFail = false) : ICommand { }

public record MySagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }

public class MySaga :
Saga<MySagaState>,
IStartedBy<StartSaga>,
ICompensateMessage<StartSaga>,
IHandleMessage<MySagaCompleted>
{
private readonly ILogger<MySaga> _logger;

public MySaga(ILogger<MySaga> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task HandleAsync(IMessageContext<StartSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting saga '{context.Message.CorrelationId}'...");

if (context.Message.WillFail)
throw new ApplicationException("something, somewhere, went terribly, terribly wrong.");

this.State.CurrentStep = MySagaState.Steps.Successful;

var message = new MySagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task CompensateAsync(ICompensationContext<StartSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogWarning($"saga '{context.MessageContext.Message.CorrelationId}' failed! Reason: {context.Exception.Message}");

this.State.CurrentStep = MySagaState.Steps.Failed;

var message = new MySagaCompleted(Guid.NewGuid(), context.MessageContext.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public Task HandleAsync(IMessageContext<MySagaCompleted> context, CancellationToken cancellationToken = default)
{
this.State.MarkAsCompleted();

var isFailed = this.State.CurrentStep == MySagaState.Steps.Failed;
if(isFailed)
_logger.LogWarning($"saga '{context.Message.CorrelationId}' failed!");
else
_logger.LogInformation($"saga '{context.Message.CorrelationId}' completed!");

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
43 changes: 43 additions & 0 deletions samples/Sample7/Sample7.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30804.86
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core", "..\..\src\OpenSleigh.Core\OpenSleigh.Core.csproj", "{82774D46-07E9-4A39-A987-D49CD1412DCE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory", "..\..\src\OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj", "{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Samples.Sample1.Console", "OpenSleigh.Samples.Sample7.Console\OpenSleigh.Samples.Sample7.Console.csproj", "{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "library", "library", "{775A8A17-E723-42D9-BEA8-4A36A8FEEC84}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82774D46-07E9-4A39-A987-D49CD1412DCE}.Release|Any CPU.Build.0 = Release|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}.Release|Any CPU.Build.0 = Release|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F6023CA2-2CFA-496B-A4B6-4DF4A767A166}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{82774D46-07E9-4A39-A987-D49CD1412DCE} = {775A8A17-E723-42D9-BEA8-4A36A8FEEC84}
{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9} = {775A8A17-E723-42D9-BEA8-4A36A8FEEC84}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5}
EndGlobalSection
EndGlobal
11 changes: 11 additions & 0 deletions src/OpenSleigh.Core/ICompensateMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
public interface ICompensateMessage<TM> where TM : IMessage
{
Task CompensateAsync(ICompensationContext<TM> context, CancellationToken cancellationToken = default);
}
}
21 changes: 21 additions & 0 deletions src/OpenSleigh.Core/ICompensationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Diagnostics.CodeAnalysis;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
{
public interface ICompensationContext<TM>
where TM : IMessage
{
IMessageContext<TM> MessageContext { get; }
Exception Exception { get; }
}

[ExcludeFromCodeCoverage] // only if doesn't get more complex than this
internal record DefaultCompensationContext<TM>(IMessageContext<TM> MessageContext, Exception Exception) : ICompensationContext<TM>
where TM : IMessage
{
public static ICompensationContext<TM> Build(IMessageContext<TM> messageContext, Exception exception)
=> new DefaultCompensationContext<TM>(messageContext, exception);
}
}
35 changes: 34 additions & 1 deletion src/OpenSleigh.Core/SagaRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,42 @@ public async Task RunAsync<TM>(IMessageContext<TM> messageContext, CancellationT

await transaction.CommitAsync(cancellationToken);
}
catch
catch(Exception ex)
{
await transaction.RollbackAsync(cancellationToken);

if (saga is not ICompensateMessage<TM> compensatingHandler)
throw;

await ExecuteCompensationAsync(compensatingHandler, messageContext, ex, state, lockId, cancellationToken);
}
}

private async Task ExecuteCompensationAsync<TM>(ICompensateMessage<TM> compensatingHandler,
IMessageContext<TM> messageContext,
Exception exception,
TD state,
Guid lockId,
CancellationToken cancellationToken) where TM : IMessage
{
_logger.LogWarning(exception, $"something went wrong when processing saga '{messageContext.Message.CorrelationId}' : {exception.Message}. executing compensation ...");

var compensatingTransaction = await _transactionManager.StartTransactionAsync(cancellationToken);

var compensationContext = DefaultCompensationContext<TM>.Build(messageContext, exception);

try
{
await compensatingHandler.CompensateAsync(compensationContext, cancellationToken);

state.SetAsProcessed(messageContext.Message);
await _sagaStateService.SaveAsync(state, lockId, cancellationToken);

await compensatingTransaction.CommitAsync(cancellationToken);
}
catch
{
await compensatingTransaction.RollbackAsync(cancellationToken);
throw;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/OpenSleigh.Core/SagaState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace OpenSleigh.Core
//TODO: get rid of Newtonsoft.JSON dependency
public abstract class SagaState
{
[JsonProperty]
[JsonProperty] //TODO: can we use an HashSet here ?
private readonly Dictionary<Guid, IMessage> _processedMessages = new();

[JsonProperty] private bool _isComplete;
Expand All @@ -28,7 +28,7 @@ protected SagaState(Guid id)
if (this.Id != message.CorrelationId)
throw new ArgumentException($"invalid message correlation id", nameof(message));

_processedMessages.Add(message.Id, message);
_processedMessages[message.Id] = message;
}

public bool CheckWasProcessed<TM>(TM message) where TM : IMessage
Expand Down