diff --git a/Samples/ExampleHost/App.config b/Samples/ExampleHost/App.config index 97530a9..7c164fe 100644 --- a/Samples/ExampleHost/App.config +++ b/Samples/ExampleHost/App.config @@ -15,7 +15,7 @@ - + @@ -25,6 +25,10 @@ + + + + diff --git a/Samples/ExampleHost/CountsProjector.cs b/Samples/ExampleHost/CountsProjector.cs index 1a49cb1..7da5c41 100644 --- a/Samples/ExampleHost/CountsProjector.cs +++ b/Samples/ExampleHost/CountsProjector.cs @@ -23,7 +23,7 @@ public CountsProjector(Dispatcher dispatcher, InMemoryDatabase store) public void Start() { - dispatcher.Subscribe(null, async transactions => + dispatcher.Subscribe(null, async (transactions, info) => { await documentCountProjector.Handle(transactions); }); diff --git a/Samples/ExampleHost/ExampleHost.csproj b/Samples/ExampleHost/ExampleHost.csproj index f8d37ac..c03f633 100644 --- a/Samples/ExampleHost/ExampleHost.csproj +++ b/Samples/ExampleHost/ExampleHost.csproj @@ -39,25 +39,20 @@ true - - ..\..\packages\Microsoft.Owin.3.0.1\lib\net45\Microsoft.Owin.dll - True + + ..\..\packages\Microsoft.Owin.3.1.0\lib\net45\Microsoft.Owin.dll - - ..\..\packages\Microsoft.Owin.Diagnostics.3.0.1\lib\net45\Microsoft.Owin.Diagnostics.dll - True + + ..\..\packages\Microsoft.Owin.Diagnostics.3.1.0\lib\net45\Microsoft.Owin.Diagnostics.dll - - ..\..\packages\Microsoft.Owin.Host.HttpListener.3.0.1\lib\net45\Microsoft.Owin.Host.HttpListener.dll - True + + ..\..\packages\Microsoft.Owin.Host.HttpListener.3.1.0\lib\net45\Microsoft.Owin.Host.HttpListener.dll - - ..\..\packages\Microsoft.Owin.Hosting.3.0.1\lib\net45\Microsoft.Owin.Hosting.dll - True + + ..\..\packages\Microsoft.Owin.Hosting.3.1.0\lib\net45\Microsoft.Owin.Hosting.dll - - ..\..\packages\Newtonsoft.Json.6.0.4\lib\net45\Newtonsoft.Json.dll - True + + ..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll ..\..\packages\Owin.1.0\lib\net40\Owin.dll diff --git a/Samples/ExampleHost/JsonFileEventStore.cs b/Samples/ExampleHost/JsonFileEventStore.cs index 3e7e1fa..8e9b4b9 100644 --- a/Samples/ExampleHost/JsonFileEventStore.cs +++ b/Samples/ExampleHost/JsonFileEventStore.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Runtime.Serialization.Formatters; using System.Threading.Tasks; +using LiquidProjections.Abstractions; using Newtonsoft.Json; namespace LiquidProjections.ExampleHost @@ -25,9 +26,11 @@ public JsonFileEventStore(string filePath, int pageSize) entryQueue = new Queue(zip.Entries.Where(e => e.Name.EndsWith(".json"))); } - public IDisposable Subscribe(long? lastProcessedCheckpoint, Func, Task> handler, string subscriptionId) + public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { - var subscriber = new Subscriber(lastProcessedCheckpoint ?? 0, handler); + var subscription = new Subscription( + lastProcessedCheckpoint ?? 0, + transactions => subscriber.HandleTransactions(transactions, null)); Task.Run(async () => { @@ -39,13 +42,13 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Func LoadNextPageAsync() @@ -72,7 +75,7 @@ private Task LoadNextPageAsync() Body = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All, - TypeNameAssemblyFormat = FormatterAssemblyStyle.Full + TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full }) }); } @@ -105,13 +108,13 @@ public void Dispose() zip = null; } - internal class Subscriber : IDisposable + internal class Subscription : IDisposable { private readonly long lastProcessedCheckpoint; private readonly Func, Task> handler; private bool disposed; - public Subscriber(long lastProcessedCheckpoint, Func, Task> handler) + public Subscription(long lastProcessedCheckpoint, Func, Task> handler) { this.lastProcessedCheckpoint = lastProcessedCheckpoint; this.handler = handler; diff --git a/Samples/ExampleHost/packages.config b/Samples/ExampleHost/packages.config index fafe704..fa8d503 100644 --- a/Samples/ExampleHost/packages.config +++ b/Samples/ExampleHost/packages.config @@ -4,12 +4,12 @@ - - - - - - + + + + + + \ No newline at end of file diff --git a/Src/LiquidProjections.Abstractions/CreateSubscription.cs b/Src/LiquidProjections.Abstractions/CreateSubscription.cs index b1bac50..a81ad31 100644 --- a/Src/LiquidProjections.Abstractions/CreateSubscription.cs +++ b/Src/LiquidProjections.Abstractions/CreateSubscription.cs @@ -7,13 +7,40 @@ namespace LiquidProjections.Abstractions /// /// Creates a subscription on an event store, starting at the transaction following /// , identified by , and which - /// passed transactions to the provided . + /// passes any transactions to the provided . /// - /// - /// - /// - /// + /// + /// The checkpoint of the transaction the subscriber has last seen, or null to start from the beginning. + /// + /// + /// An object wrapping the various handlers that the subscription will use. + /// + /// + /// Identifies this subscription and helps distinct multiple subscriptions. + /// + /// + /// A disposable object that will cancel the subscription. + /// public delegate IDisposable CreateSubscription(long? lastProcessedCheckpoint, - Func, Task> handler, - string subscriptionId); + Subscriber subscriber, string subscriptionId); + + public class Subscriber + { + /// + /// Represents a handler that will receive the transactions that the event store pushes to the subscriber. + /// + public Func, SubscriptionInfo, Task> HandleTransactions { get; set; } + + /// + /// Represents a handler that the event store will use if the requested checkpoint does not exist. + /// + public Func NoSuchCheckpoint { get; set; } + } + + public class SubscriptionInfo + { + public string Id { get; set; } + + public IDisposable Subscription { get; set; } + } } \ No newline at end of file diff --git a/Src/LiquidProjections.Abstractions/LiquidProjections.Abstractions.v3.ncrunchproject b/Src/LiquidProjections.Abstractions/LiquidProjections.Abstractions.v3.ncrunchproject new file mode 100644 index 0000000..0e3a2cf --- /dev/null +++ b/Src/LiquidProjections.Abstractions/LiquidProjections.Abstractions.v3.ncrunchproject @@ -0,0 +1,5 @@ + + + True + + \ No newline at end of file diff --git a/Src/LiquidProjections.Testing/LiquidProjections.Testing.v3.ncrunchproject b/Src/LiquidProjections.Testing/LiquidProjections.Testing.v3.ncrunchproject new file mode 100644 index 0000000..0e3a2cf --- /dev/null +++ b/Src/LiquidProjections.Testing/LiquidProjections.Testing.v3.ncrunchproject @@ -0,0 +1,5 @@ + + + True + + \ No newline at end of file diff --git a/Src/LiquidProjections.Testing/MemoryEventSource.cs b/Src/LiquidProjections.Testing/MemoryEventSource.cs index 38d0843..288aa6f 100644 --- a/Src/LiquidProjections.Testing/MemoryEventSource.cs +++ b/Src/LiquidProjections.Testing/MemoryEventSource.cs @@ -4,16 +4,16 @@ using System.Globalization; using System.Linq; using System.Threading.Tasks; -using LiquidProjections.Testing; +using LiquidProjections.Abstractions; -namespace LiquidProjections +namespace LiquidProjections.Testing { public class MemoryEventSource { private readonly int batchSize; private long lastCheckpoint; - private readonly List subscribers = new List(); + private readonly List subscribers = new List(); private readonly List history = new List(); public MemoryEventSource(int batchSize = 10) @@ -21,24 +21,33 @@ public MemoryEventSource(int batchSize = 10) this.batchSize = batchSize; } - public IDisposable Subscribe(long? lastProcessedCheckpoint, Func, Task> handler, string subscriptionId) + public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { lastCheckpoint = lastProcessedCheckpoint ?? 0; - var subscriber = new Subscriber(lastCheckpoint, batchSize, handler); + var subscription = new Subscription(lastCheckpoint, batchSize, subscriber, subscriptionId); - subscribers.Add(subscriber); + subscribers.Add(subscription); - Func asyncAction = async () => + async Task AsyncAction() { + if (history.LastOrDefault()?.Checkpoint < lastProcessedCheckpoint) + { + await subscriber.NoSuchCheckpoint(new SubscriptionInfo + { + Id = subscriptionId, + Subscription = subscription + }); + } + foreach (Transaction transaction in history) { - await subscriber.Send(new[] { transaction }).ConfigureAwait(false); + await subscription.Send(new[] {transaction}).ConfigureAwait(false); } - }; + } - asyncAction().ConfigureAwait(false).GetAwaiter().GetResult(); + AsyncAction().ConfigureAwait(false).GetAwaiter().GetResult(); - return subscriber; + return subscription; } @@ -102,29 +111,46 @@ public async Task WriteWithHeaders(object anEvent, IDictionary s.Id == subscriptionId); + return (subscription != null) && !subscription.IsDisposed; + } } - internal class Subscriber : IDisposable + internal class Subscription : IDisposable { private readonly long lastProcessedCheckpoint; private readonly int batchSize; - private readonly Func, Task> handler; + private readonly Subscriber subscriber; + private readonly string subscriptionId; private bool disposed = false; - - public Subscriber(long lastProcessedCheckpoint, int batchSize, Func, Task> handler) + + public Subscription(long lastProcessedCheckpoint, int batchSize, + Subscriber subscriber, string subscriptionId) { this.lastProcessedCheckpoint = lastProcessedCheckpoint; this.batchSize = batchSize; - this.handler = handler; + this.subscriber = subscriber; + this.subscriptionId = subscriptionId; } public async Task Send(IEnumerable transactions) { if (!disposed) { - foreach (var batch in transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).InBatchesOf(batchSize)) + var subscriptionInfo = new SubscriptionInfo { - await handler(new ReadOnlyCollection(batch.ToList())).ConfigureAwait(false); + Id = subscriptionId, + Subscription = this + }; + + Transaction[] requestedTransactions = transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).ToArray(); + foreach (var batch in requestedTransactions.InBatchesOf(batchSize)) + { + await subscriber.HandleTransactions(new ReadOnlyCollection(batch.ToList()), subscriptionInfo) + .ConfigureAwait(false); } } } @@ -133,5 +159,9 @@ public void Dispose() { disposed = true; } + + public bool IsDisposed => disposed; + + public string Id => subscriptionId; } } diff --git a/Src/LiquidProjections/Dispatcher.cs b/Src/LiquidProjections/Dispatcher.cs index 8c4c8a5..dd35afe 100644 --- a/Src/LiquidProjections/Dispatcher.cs +++ b/Src/LiquidProjections/Dispatcher.cs @@ -10,62 +10,87 @@ public class Dispatcher { private readonly CreateSubscription createSubscription; - [Obsolete("The IEventStore interface has been replaced by the SubscribeToEvents delegate")] - public Dispatcher(IEventStore eventStore) - { - if (eventStore == null) - { - throw new ArgumentNullException(nameof(eventStore)); - } - - this.createSubscription = eventStore.Subscribe; - } - public Dispatcher(CreateSubscription createSubscription) { this.createSubscription = createSubscription; } - public IDisposable Subscribe(long? checkpoint, Func, Task> handler, string subscriptionId) + public IDisposable Subscribe(long? lastProcessedCheckpoint, + Func, SubscriptionInfo, Task> handler, + SubscriptionOptions options = null) { + if (options == null) + { + options = new SubscriptionOptions(); + } + if (handler == null) { throw new ArgumentNullException(nameof(handler)); } - var subscriptionMonitor = new object(); - IDisposable subscription = null; - - lock (subscriptionMonitor) + return createSubscription(lastProcessedCheckpoint, new Subscriber { - subscription = createSubscription(checkpoint, - async transactions => - { - try - { - await handler(transactions); - } - catch (Exception exception) - { - LogProvider.GetLogger(typeof(Dispatcher)).FatalException( - "Projector exception was not handled. Event subscription has been cancelled.", - exception); + HandleTransactions = async (transactions, info) => await HandleTransactions(transactions, handler, info), + NoSuchCheckpoint = async info => await HandleUnknownCheckpoint(info, handler, options) + }, options.Id); + } - lock (subscriptionMonitor) - { - subscription.Dispose(); - } - } - }, - subscriptionId); + private static async Task HandleTransactions(IReadOnlyList transactions, Func, SubscriptionInfo, Task> handler, SubscriptionInfo info) + { + try + { + await handler(transactions, info); } + catch (Exception exception) + { + LogProvider.GetLogger(typeof(Dispatcher)).FatalException( + "Projector exception was not handled. Event subscription has been cancelled.", + exception); - return subscription; + info.Subscription?.Dispose(); + } } - public IDisposable Subscribe(long? checkpoint, Func, Task> handler) + private async Task HandleUnknownCheckpoint(SubscriptionInfo info, Func, SubscriptionInfo, Task> handler, SubscriptionOptions options) { - return Subscribe(checkpoint, handler, null); + if (options.RestartWhenAhead) + { + try + { + info.Subscription?.Dispose(); + + await options.BeforeRestarting(); + + Subscribe(null, handler, options); + } + catch (Exception exception) + { + LogProvider.GetLogger(typeof(Dispatcher)).FatalException( + "Failed to restart the projector.", + exception); + } + } } } + + public class SubscriptionOptions + { + /// + /// Can be used by subscribers to understand which is which. + /// + public string Id { get; set; } + + /// + /// If set to true, the dispatcher will automatically restart at the first transaction if it detects + /// that the subscriber is ahead of the event store (e.g. because it got restored to an earlier time). + /// + public bool RestartWhenAhead { get; set; } + + /// + /// If restarting is enabled through , this property can be used to run some + /// clean-up code before the dispatcher will restart at the first transaction. + /// + public Func BeforeRestarting { get; set; } = () => Task.FromResult(0); + } } \ No newline at end of file diff --git a/Src/LiquidProjections/EnumerableExtensions.cs b/Src/LiquidProjections/EnumerableExtensions.cs new file mode 100644 index 0000000..78adad2 --- /dev/null +++ b/Src/LiquidProjections/EnumerableExtensions.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Linq; + +namespace LiquidProjections +{ + [DebuggerNonUserCode] + internal static class EnumerableExtensions + { + public static IReadOnlyList ToReadOnly(this IEnumerable items) + { + return new ReadOnlyCollection(items.ToList()); + } + } +} \ No newline at end of file diff --git a/Src/LiquidProjections/LiquidProjections.v3.ncrunchproject b/Src/LiquidProjections/LiquidProjections.v3.ncrunchproject new file mode 100644 index 0000000..0e3a2cf --- /dev/null +++ b/Src/LiquidProjections/LiquidProjections.v3.ncrunchproject @@ -0,0 +1,5 @@ + + + True + + \ No newline at end of file diff --git a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs index 3b5a781..c3bf4e7 100644 --- a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs +++ b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -6,10 +7,11 @@ using Chill; using FluentAssertions; - +using LiquidProjections.Abstractions; using LiquidProjections.Logging; - +using LiquidProjections.Testing; using Xunit; +// ReSharper disable ConvertToLambdaExpression namespace LiquidProjections.Specs { @@ -21,27 +23,34 @@ public When_a_projector_throws_an_exception() { Given(() => { - UseThe(new FakeEventStore()); - WithSubject(_ => new Dispatcher(The().Subscribe)); + UseThe(new MemoryEventSource()); + WithSubject(_ => new Dispatcher(The().Subscribe)); + LogProvider.SetCurrentLogProvider(UseThe(new FakeLogProvider())); + UseThe(new ProjectionException("Some message.")); - // Use async throw to easily get a faulted task. -#pragma warning disable 1998 - Subject.Subscribe(null, async transaction => + Subject.Subscribe(null, (transaction, info) => { + // Use async throw to easily get a faulted task. throw The(); + }, + new SubscriptionOptions + { + Id = "mySubscription" }); -#pragma warning restore 1998 }); - When(() => The().Handler(null)); + When(() => + { + return The().Write(new List()); + }); } [Fact] public void It_should_unsubscribe() { - The().IsSubscribed.Should().BeFalse(); + The().HasSubscriptionForId("mySubscription").Should().BeFalse(); } [Fact] @@ -57,14 +66,158 @@ public void It_should_include_the_exception() } } + public class When_the_requested_checkpoint_is_ahead_of_the_store_and_auto_restart_is_configured : GivenSubject + { + private readonly BlockingCollection trace = new BlockingCollection(); + + private readonly BlockingCollection receivedTransactions = new BlockingCollection(); + + private readonly TaskCompletionSource> allTransactionsReceived = + new TaskCompletionSource>(); + + public When_the_requested_checkpoint_is_ahead_of_the_store_and_auto_restart_is_configured() + { + Given(async () => + { + UseThe(new MemoryEventSource()); + + WithSubject(_ => new Dispatcher(The().Subscribe)); + + await The().Write( + new TransactionBuilder().WithCheckpoint(1).Build()); + + await The().Write( + new TransactionBuilder().WithCheckpoint(999).Build()); + }); + + When(() => + { + var options = new SubscriptionOptions + { + Id = "someId", + RestartWhenAhead = true, + BeforeRestarting = () => + { + trace.Add("BeforeRestarting"); + return Task.FromResult(0); + } + }; + + Subject.Subscribe(1000, (transactions, info) => + { + trace.Add("TransactionsReceived"); + + foreach (var transaction in transactions) + { + receivedTransactions.Add(transaction); + } + + if (receivedTransactions.Count == 2) + { + allTransactionsReceived.SetResult(transactions); + } + + return Task.FromResult(0); + + }, options); + }); + } + + [Fact] + public async Task It_should_allow_the_subscriber_to_cleanup_before_restarting() + { + await allTransactionsReceived.Task.TimeoutAfter(30.Seconds()); + + trace.Should().Equal("BeforeRestarting", "TransactionsReceived", "TransactionsReceived"); + } + + [Fact] + public async Task It_should_restart_sending_transactions_from_the_beginning() + { + var transactions = await allTransactionsReceived.Task.TimeoutAfter(30.Seconds()); + + transactions.First().Checkpoint.Should().Be(999); + } + } + public class When_there_are_no_new_transactions_available_and_auto_restart_is_configured : GivenSubject + { + private readonly BlockingCollection receivedTransactions = new BlockingCollection(); + + private readonly TaskCompletionSource allTransactionsReceived = + new TaskCompletionSource(); + + private bool restarted = false; + + public When_there_are_no_new_transactions_available_and_auto_restart_is_configured() + { + Given(async () => + { + UseThe(new MemoryEventSource()); + + WithSubject(_ => new Dispatcher(The().Subscribe)); + + await The().Write( + new TransactionBuilder().WithCheckpoint(123).Build()); + + await The().Write( + new TransactionBuilder().WithCheckpoint(456).Build()); + }); + + When(() => + { + var options = new SubscriptionOptions + { + Id = "someId", + RestartWhenAhead = true, + BeforeRestarting = () => + { + restarted = true; + return Task.FromResult(0); + } + }; + + Subject.Subscribe(123, (transactions, info) => + { + foreach (var transaction in transactions) + { + receivedTransactions.Add(transaction); + } + + if (receivedTransactions.Count == 1) + { + allTransactionsReceived.SetResult(true); + } + + return Task.FromResult(0); + + }, options); + }); + } + + [Fact] + public async Task It_should_only_provide_the_newer_transactions() + { + await allTransactionsReceived.Task.TimeoutAfter(30.Seconds()); + + receivedTransactions.Should().HaveCount(1); + } + + public async Task It_should_not_have_restartedAsync() + { + await allTransactionsReceived.Task.TimeoutAfter(30.Seconds()); + + restarted.Should().BeFalse(); + } + } + internal class FakeEventStore { private readonly object syncRoot = new object(); public bool IsSubscribed { get; private set; } - public Func, Task> Handler { get; private set; } + public Func, SubscriptionInfo, Task> Handler { get; private set; } - public IDisposable Subscribe(long? previousCheckpoint, Func, Task> handler, string subscriptionId = null) + public IDisposable Subscribe(long? previousCheckpoint, Func, SubscriptionInfo,Task> handler, string subscriptionId = null) { lock (syncRoot) { diff --git a/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj b/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj index 4839cf3..f9b3bcc 100644 --- a/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj +++ b/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj @@ -34,17 +34,14 @@ true - - ..\..\packages\Chill.2.4.1\Lib\net45\Chill.dll - True + + ..\..\packages\Chill.3.0.0\lib\net45\Chill.dll - - ..\..\packages\FluentAssertions.4.18.0\lib\net45\FluentAssertions.dll - True + + ..\..\packages\FluentAssertions.4.19.2\lib\net45\FluentAssertions.dll - - ..\..\packages\FluentAssertions.4.18.0\lib\net45\FluentAssertions.Core.dll - True + + ..\..\packages\FluentAssertions.4.19.2\lib\net45\FluentAssertions.Core.dll @@ -55,20 +52,16 @@ - ..\..\packages\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll - True + ..\..\packages\xunit.abstractions.2.0.1\lib\net35\xunit.abstractions.dll - - ..\..\packages\xunit.assert.2.1.0\lib\dotnet\xunit.assert.dll - True + + ..\..\packages\xunit.assert.2.2.0\lib\netstandard1.1\xunit.assert.dll - - ..\..\packages\xunit.extensibility.core.2.1.0\lib\dotnet\xunit.core.dll - True + + ..\..\packages\xunit.extensibility.core.2.2.0\lib\netstandard1.1\xunit.core.dll - - ..\..\packages\xunit.extensibility.execution.2.1.0\lib\net45\xunit.execution.desktop.dll - True + + ..\..\packages\xunit.extensibility.execution.2.2.0\lib\net452\xunit.execution.desktop.dll @@ -77,9 +70,8 @@ - - - + + @@ -98,6 +90,9 @@ + + +