Skip to content

Commit

Permalink
fixed: in EF Core, when synchronously projecting multiple times in a …
Browse files Browse the repository at this point in the history
…single tx (e.g. because projector itself publishes events), do not project the already projected events again
  • Loading branch information
martinzima committed Nov 10, 2022
1 parent 9e5b2b6 commit 7557918
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Common.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<VersionPrefix>1.30.2</VersionPrefix>
<VersionPrefix>1.30.3</VersionPrefix>
</PropertyGroup>

<PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task OnBeforeCommitAsync()
projectedEvents.AddRange(newEvents);

await projectionSubSystem.ExecuteProjectionsAsync(
commandContext.UnitOfWork.EventBuffer.Events
newEvents
.OfType<IEventMessage<DomainAggregateEvent>>()
.ToArray(),
commandContext.UnitOfWork,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using NSubstitute;
using Revo.Core.Commands;
using Revo.Core.Events;
using Revo.Core.Transactions;
using Revo.Domain.Events;
using Revo.EFCore.Projections;
using Revo.Infrastructure.Projections;
using Revo.Testing.Infrastructure;
using Xunit;

namespace Revo.EFCore.Tests.Projections;

public class EFCoreSyncProjectionHookTests
{
private EFCoreSyncProjectionHook sut;
private ICommandContext commandContext = Substitute.For<ICommandContext>();
private IEFCoreProjectionSubSystem projectionSubSystem = Substitute.For<IEFCoreProjectionSubSystem>();
private IUnitOfWork unitOfWork = Substitute.For<IUnitOfWork>();
private IPublishEventBuffer eventBuffer = Substitute.For<IPublishEventBuffer>();
private List<IEventMessage> events = new();

public EFCoreSyncProjectionHookTests()
{
unitOfWork.EventBuffer.Returns(eventBuffer);
eventBuffer.Events.Returns(events);

sut = new EFCoreSyncProjectionHook(commandContext, projectionSubSystem);
}

[Fact]
public async Task ProjectsBeforeCommit()
{
events.Add(new TestEvent().ToMessageDraft());
commandContext.UnitOfWork.Returns(unitOfWork);

await sut.OnBeforeCommitAsync();

projectionSubSystem.Received(1).ExecuteProjectionsAsync(
Arg.Is<IReadOnlyCollection<IEventMessage<DomainAggregateEvent>>>(
evs => evs.SequenceEqual(events)),
unitOfWork,
Arg.Is<EFCoreEventProjectionOptions>(x => x.IsSynchronousProjection));
}

[Fact]
public async Task ProjectsOnlyAdditionalEvents()
{
events.Add(new TestEvent().ToMessageDraft());
commandContext.UnitOfWork.Returns(unitOfWork);

await sut.OnBeforeCommitAsync();
events.Add(new TestEvent().ToMessageDraft());
await sut.OnBeforeCommitAsync();

projectionSubSystem.ReceivedWithAnyArgs(2).ExecuteProjectionsAsync(null, null, null);

projectionSubSystem.Received(1).ExecuteProjectionsAsync(
Arg.Is<IReadOnlyCollection<IEventMessage<DomainAggregateEvent>>>(
evs => evs.SequenceEqual(new[] { events[0] })),
unitOfWork,
Arg.Is<EFCoreEventProjectionOptions>(x => x.IsSynchronousProjection));

projectionSubSystem.Received(1).ExecuteProjectionsAsync(
Arg.Is<IReadOnlyCollection<IEventMessage<DomainAggregateEvent>>>(
evs => evs.SequenceEqual(new[] { events[1] })),
unitOfWork,
Arg.Is<EFCoreEventProjectionOptions>(x => x.IsSynchronousProjection));
}

[Fact]
public async Task ProjectsAgainAfterCommitSucceeded()
{
events.Add(new TestEvent().ToMessageDraft());
commandContext.UnitOfWork.Returns(unitOfWork);

await sut.OnBeforeCommitAsync();
await sut.OnCommitSucceededAsync();
await sut.OnBeforeCommitAsync();

projectionSubSystem.Received(2).ExecuteProjectionsAsync(
Arg.Is<IReadOnlyCollection<IEventMessage<DomainAggregateEvent>>>(
evs => evs.SequenceEqual(events)),
unitOfWork,
Arg.Is<EFCoreEventProjectionOptions>(x => x.IsSynchronousProjection));
}

[Fact]
public async Task ProjectsAgainAfterCommitFailed()
{
events.Add(new TestEvent().ToMessageDraft());
commandContext.UnitOfWork.Returns(unitOfWork);

await sut.OnBeforeCommitAsync();
await sut.OnCommitFailedAsync();
await sut.OnBeforeCommitAsync();

projectionSubSystem.Received(2).ExecuteProjectionsAsync(
Arg.Is<IReadOnlyCollection<IEventMessage<DomainAggregateEvent>>>(
evs => evs.SequenceEqual(events)),
unitOfWork,
Arg.Is<EFCoreEventProjectionOptions>(x => x.IsSynchronousProjection));
}

private class TestEvent : DomainAggregateEvent
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@
<ProjectReference Include="..\..\Revo.EFCore\Revo.EFCore.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Projections\" />
</ItemGroup>


</Project>
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# RELEASE NOTES

## [1.30.3] - 2022-11-10

### Fixed
- in EF Core, when synchronously projecting multiple times in a single tx (e.g. because projector itself publishes events), do not project the already projected events again

## [1.30.2] - 2022-10-12

### Fixed
Expand Down
11 changes: 5 additions & 6 deletions Revo.Infrastructure/Projections/ProjectionSubSystem.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Revo.Core.Events;
using Revo.Core.Events;
using Revo.Core.Transactions;
using Revo.Domain.Entities;
using Revo.Domain.Events;
using Revo.Infrastructure.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Revo.Infrastructure.Projections
{
Expand Down

0 comments on commit 7557918

Please sign in to comment.