Skip to content

Commit

Permalink
feat: add feedprojector GRAR-1562
Browse files Browse the repository at this point in the history
  • Loading branch information
ridingwolf authored and CumpsD committed Oct 3, 2020
1 parent 0b316fa commit d182f34
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 3 deletions.
17 changes: 16 additions & 1 deletion Be.Vlaanderen.Basisregisters.ProjectionHandling.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Microsoft Visual Studio Solution File, Format Version 12.00
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.28803.156
MinimumVisualStudioVersion = 10.0.40219.1
Expand Down Expand Up @@ -40,6 +40,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Be.Vlaanderen.Basisregister
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Testing", "src\Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Testing\Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Testing.csproj", "{83B43451-645C-498A-8853-BF57CEC14177}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests", "test\Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests\Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests.csproj", "{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -194,6 +196,18 @@ Global
{83B43451-645C-498A-8853-BF57CEC14177}.Release|x64.Build.0 = Release|Any CPU
{83B43451-645C-498A-8853-BF57CEC14177}.Release|x86.ActiveCfg = Release|Any CPU
{83B43451-645C-498A-8853-BF57CEC14177}.Release|x86.Build.0 = Release|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Debug|x64.ActiveCfg = Debug|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Debug|x64.Build.0 = Debug|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Debug|x86.ActiveCfg = Debug|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Debug|x86.Build.0 = Debug|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Release|Any CPU.Build.0 = Release|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Release|x64.ActiveCfg = Release|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Release|x64.Build.0 = Release|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Release|x86.ActiveCfg = Release|Any CPU
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -211,6 +225,7 @@ Global
{90842038-520F-4875-AA3C-C43D138A868D} = {C2F8FF63-7A48-4179-A720-86206C42F496}
{2CE13953-AAE6-4AFC-879C-2E09C3CC6397} = {C2F8FF63-7A48-4179-A720-86206C42F496}
{83B43451-645C-498A-8853-BF57CEC14177} = {C2F8FF63-7A48-4179-A720-86206C42F496}
{BAF79BC1-602C-4B1C-8B47-7B48D7F03DCC} = {6DD3C208-58D1-46CE-AEAF-16F3F90D26C5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2EB87445-E263-4E1E-89CC-3839170028E5}
Expand Down
3 changes: 2 additions & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ Target.create "Lib_Build" (fun _ -> build "Be.Vlaanderen.Basisregisters.Projecti
Target.create "Lib_Test" (fun _ ->
[
"test" @@ "Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector.Tests"
"test" @@ "Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.Tests" ]
"test" @@ "Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.Tests"
"test" @@ "Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests" ]
|> List.iter testWithDotNet
)

Expand Down
1 change: 1 addition & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nuget xunit 2.4.1
nuget xunit.runner.visualstudio 2.4.3
nuget xunit.categories 2.0.4
nuget FluentAssertions 5.10.3
nuget Moq 4.14.5

// BUILD STUFF
nuget SourceLink.Embed.AllSourceFiles 2.8.3 copy_local: true
Expand Down
24 changes: 24 additions & 0 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ NUGET
BlackFox.VsWhere (1.1)
FSharp.Core (>= 4.2.3)
Microsoft.Win32.Registry (>= 4.7)
Castle.Core (4.4.1)
NETStandard.Library (>= 1.6.1)
System.Collections.Specialized (>= 4.3)
System.ComponentModel (>= 4.3)
System.ComponentModel.TypeConverter (>= 4.3)
System.Diagnostics.TraceSource (>= 4.3)
System.Dynamic.Runtime (>= 4.3)
System.Reflection (>= 4.3)
System.Reflection.Emit (>= 4.3)
System.Reflection.TypeExtensions (>= 4.3)
System.Xml.XmlDocument (>= 4.3)
CompareNETObjects (4.67)
Microsoft.CSharp (>= 4.5)
Fake.Core.CommandLineParsing (5.20.3)
Expand Down Expand Up @@ -346,6 +357,9 @@ NUGET
Microsoft.Win32.SystemEvents (4.7)
Microsoft.NETCore.Platforms (>= 3.1)
Mono.Posix.NETStandard (1.0)
Moq (4.14.5)
Castle.Core (>= 4.4)
System.Threading.Tasks.Extensions (>= 4.5.1)
MSBuild.StructuredLogger (2.1.176)
Microsoft.Build (>= 16.4)
Microsoft.Build.Framework (>= 16.4)
Expand Down Expand Up @@ -507,6 +521,16 @@ NUGET
Microsoft.NETCore.Platforms (>= 1.1)
Microsoft.NETCore.Targets (>= 1.1)
System.Runtime (>= 4.3)
System.Diagnostics.TraceSource (4.3)
Microsoft.NETCore.Platforms (>= 1.1)
runtime.native.System (>= 4.3)
System.Collections (>= 4.3)
System.Diagnostics.Debug (>= 4.3)
System.Globalization (>= 4.3)
System.Resources.ResourceManager (>= 4.3)
System.Runtime (>= 4.3)
System.Runtime.Extensions (>= 4.3)
System.Threading (>= 4.3)
System.Diagnostics.Tracing (4.3)
Microsoft.NETCore.Platforms (>= 1.1)
Microsoft.NETCore.Targets (>= 1.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication
using Microsoft.SyndicationFeed;
using Runner;

public class FeedProjectionRunner<TMessage, TContent, TContext>
public interface IFeedProjectionRunner<TContext>
where TContext : RunnerDbContext<TContext>
{
Task CatchUpAsync(Func<Owned<TContext>> context, CancellationToken cancellationToken);
}

public class FeedProjectionRunner<TMessage, TContent, TContext> : IFeedProjectionRunner<TContext>
where TMessage : struct
where TContext : RunnerDbContext<TContext>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Autofac.Features.OwnedInstances;
using Microsoft.Extensions.Logging;
using Runner;

public class FeedProjector<TContext>
where TContext: RunnerDbContext<TContext>
{
private readonly Func<Owned<TContext>> _contextFactory;
private readonly List<IFeedProjectionRunner<TContext>> _feedProjectionRunners;
private readonly ILogger _logger;

public FeedProjector(
Func<Owned<TContext>> contextFactory,
ILoggerFactory loggerFactory,
IEnumerable<IFeedProjectionRunner<TContext>> configuredFeedProjectionRunners)
{
_contextFactory = contextFactory;
_feedProjectionRunners = configuredFeedProjectionRunners?.ToList() ?? new List<IFeedProjectionRunner<TContext>>();
_logger = loggerFactory?.CreateLogger<FeedProjector<TContext>>() ?? throw new ArgumentNullException(nameof(loggerFactory));
}

public FeedProjector<TContext> Register(params IFeedProjectionRunner<TContext>[] feedProjectionRunners)
=> Register((IEnumerable<IFeedProjectionRunner<TContext>>)feedProjectionRunners);

public FeedProjector<TContext> Register(IEnumerable<IFeedProjectionRunner<TContext>> feedProjectionRunners)
{
if (feedProjectionRunners != null)
_feedProjectionRunners.AddRange(feedProjectionRunners);

return this;
}

public async Task Start(CancellationToken cancellationToken)
{
var catchupRunners = _feedProjectionRunners.Select(runner => CreateCatchUpTask(runner, cancellationToken));
await Task.WhenAll(catchupRunners);
}

private async Task CreateCatchUpTask(IFeedProjectionRunner<TContext> feed, CancellationToken cancellationToken)
{
try
{
await feed.CatchUpAsync(_contextFactory, cancellationToken);
}
catch (Exception exception)
{
_logger.LogError(exception, "FeedProjectionRunner failed");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\packages\Be.Vlaanderen.Basisregisters.Build.Pipeline\Content\Be.Vlaanderen.Basisregisters.Build.Pipeline.Settings.Test.props" />

<ItemGroup>
<ProjectReference Include="..\..\src\Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication\Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.csproj" />
</ItemGroup>

<Import Project="..\..\.paket\Paket.Restore.targets" />
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests.FeedProjector
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Autofac.Features.OwnedInstances;

public class FailingRunner : IFeedProjectionRunner<FeedProjectorTestContext>
{
private readonly Exception _exception;
private readonly TimeSpan _delay;

public FailingRunner(Exception exception)
: this(exception, TimeSpan.Zero)
{ }

public FailingRunner(Exception exception, TimeSpan delay)
{
_exception = exception;
_delay = delay;
}

public async Task CatchUpAsync(Func<Owned<FeedProjectorTestContext>> context, CancellationToken cancellationToken)
{
await Task.Delay(_delay, cancellationToken);
throw _exception;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests.FeedProjector
{
using System;

public class FeedProjectionException : Exception
{
public FeedProjectionException(string message) : base(message) { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests.FeedProjector
{
using Runner;

public class FeedProjectorTestContext : RunnerDbContext<FeedProjectorTestContext>
{
public override string ProjectionStateSchema => "FakeSchema";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests.FeedProjector
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Autofac.Features.OwnedInstances;
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;

public class When_starting_the_projector_with_a_projection_that_throws_an_exception
{
private readonly FeedProjector<FeedProjectorTestContext> _sut;
private readonly LoggerFactoryMock _loggerFactoryMock;
private readonly FeedProjectionException _expectedException;

public When_starting_the_projector_with_a_projection_that_throws_an_exception()
{
_loggerFactoryMock = new LoggerFactoryMock();
_expectedException = new FeedProjectionException("error1");

_sut = new FeedProjector<FeedProjectorTestContext>(
() => new Mock<Owned<FeedProjectorTestContext>>().Object,
_loggerFactoryMock,
new []{ new FailingRunner(_expectedException) });
}

private Func<Task> StartProjections =>
() => _sut.Start(CancellationToken.None);

[Fact]
public void Then_the_projector_does_not_throw_an_exception()
{
StartProjections.Should().NotThrow();
}

[Fact]
public async Task Then_the_projector_logs_the_thrown_exception()
{
await StartProjections();

_loggerFactoryMock
.ResolveLogger<FeedProjector<FeedProjectorTestContext>>()
.Verify(
logger => logger.Log(
LogLevel.Error,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => v.ToString() == "FeedProjectionRunner failed"),
_expectedException,
(Func<It.IsAnyType, Exception, string>)It.IsAny<object>()),
Times.Once);
}
}

public class When_starting_the_projector_with_a_multiple_failing_projections_and_an_infinte_projection
{
private readonly FeedProjector<FeedProjectorTestContext> _sut;
private readonly LoggerFactoryMock _loggerFactoryMock;
private readonly FeedProjectionException _firstExpectedException, _secondExpectedException;
private readonly CancellationTokenSource _cancellationTokenSource;

public When_starting_the_projector_with_a_multiple_failing_projections_and_an_infinte_projection()
{
_cancellationTokenSource = new CancellationTokenSource();
_loggerFactoryMock = new LoggerFactoryMock();
_firstExpectedException = new FeedProjectionException("error1");
_secondExpectedException = new FeedProjectionException("error2");

_sut = new FeedProjector<FeedProjectorTestContext>(
() => new Mock<Owned<FeedProjectorTestContext>>().Object,
_loggerFactoryMock,
new IFeedProjectionRunner<FeedProjectorTestContext>[]
{
new FailingRunner(_secondExpectedException, TimeSpan.FromMilliseconds(300)),
new InfiniteRunner(),
new FailingRunner(_firstExpectedException, TimeSpan.FromMilliseconds(50)),
});
}

private Func<Task> StartProjections =>
() => _sut.Start(_cancellationTokenSource.Token);

private void StopProjections()
=> _cancellationTokenSource.Cancel();

[Fact]
public async Task Then_the_projector_logs_the_first_thrown_exception_is_logged_while_the_projector_is_still_running()
{
//don't await starting the projections
StartProjections();

// wait for exception to be thrown
await Task.Delay(55);

_loggerFactoryMock
.ResolveLogger<FeedProjector<FeedProjectorTestContext>>()
.Verify(
logger => logger.Log(
LogLevel.Error,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => v.ToString() == "FeedProjectionRunner failed"),
_firstExpectedException,
(Func<It.IsAnyType, Exception, string>)It.IsAny<object>()),
Times.Once);

StopProjections();
}

[Fact]
public async Task Then_the_projector_logs_the_second_thrown_exception_is_logged_while_the_projector_is_still_running()
{
//don't await starting the projections
StartProjections();

// wait for exception to be thrown
await Task.Delay(305);

_loggerFactoryMock
.ResolveLogger<FeedProjector<FeedProjectorTestContext>>()
.Verify(
logger => logger.Log(
LogLevel.Error,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => v.ToString() == "FeedProjectionRunner failed"),
_secondExpectedException,
(Func<It.IsAnyType, Exception, string>)It.IsAny<object>()),
Times.Once);

StopProjections();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication.Tests.FeedProjector
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Autofac.Features.OwnedInstances;

public class InfiniteRunner : IFeedProjectionRunner<FeedProjectorTestContext>
{
public async Task CatchUpAsync(Func<Owned<FeedProjectorTestContext>> context, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
}
}
}
}
Loading

0 comments on commit d182f34

Please sign in to comment.