Skip to content

Commit

Permalink
Added the possibility to restart a subscription if the projector is a…
Browse files Browse the repository at this point in the history
…head of the event source.
  • Loading branch information
dennisdoomen committed May 29, 2017
1 parent e575e9f commit f06507d
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Samples/ExampleHost/CountsProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public CountsProjector(Dispatcher dispatcher, InMemoryDatabase store)

public void Start()
{
dispatcher.Subscribe(null, async transactions =>
dispatcher.Subscribe(null, async (transactions, info) =>
{
await documentCountProjector.Handle(transactions);
});
Expand Down
15 changes: 9 additions & 6 deletions Samples/ExampleHost/JsonFileEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Linq;
using System.Runtime.Serialization.Formatters;
using System.Threading.Tasks;
using LiquidProjections.Abstractions;
using Newtonsoft.Json;

namespace LiquidProjections.ExampleHost
Expand All @@ -25,9 +26,11 @@ public JsonFileEventStore(string filePath, int pageSize)
entryQueue = new Queue<ZipArchiveEntry>(zip.Entries.Where(e => e.Name.EndsWith(".json")));
}

public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
var subscriber = new Subscriber(lastProcessedCheckpoint ?? 0, handler);
var subscription = new Subscription(
lastProcessedCheckpoint ?? 0,
transactions => subscriber.HandleTransactions(transactions, null));

Task.Run(async () =>
{
Expand All @@ -39,13 +42,13 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<T
// Start loading the next page on a separate thread while we have the subscriber handle the previous transactions.
loader = LoadNextPageAsync();
await subscriber.Send(transactions);
await subscription.Send(transactions);
transactions = await loader;
}
});

return subscriber;
return subscription;
}

private Task<Transaction[]> LoadNextPageAsync()
Expand Down Expand Up @@ -105,13 +108,13 @@ public void Dispose()
zip = null;
}

internal class Subscriber : IDisposable
internal class Subscription : IDisposable
{
private readonly long lastProcessedCheckpoint;
private readonly Func<IReadOnlyList<Transaction>, Task> handler;
private bool disposed;

public Subscriber(long lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler)
public Subscription(long lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.handler = handler;
Expand Down
41 changes: 34 additions & 7 deletions Src/LiquidProjections.Abstractions/CreateSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,40 @@ namespace LiquidProjections.Abstractions
/// <summary>
/// Creates a subscription on an event store, starting at the transaction following
/// <paramref name="lastProcessedCheckpoint"/>, identified by <paramref name="subscriptionId"/>, and which
/// passed transactions to the provided <paramref name="handler"/>.
/// passes any transactions to the provided <paramref name="subscriber"/>.
/// </summary>
/// <param name="lastProcessedCheckpoint"></param>
/// <param name="handler"></param>
/// <param name="subscriptionId"></param>
/// <returns></returns>
/// <param name="lastProcessedCheckpoint">
/// The checkpoint of the transaction the subscriber has last seen, or <c>null</c> to start from the beginning.
/// </param>
/// <param name="subscriber">
/// An object wrapping the various handlers that the subscription will use.
/// </param>
/// <param name="subscriptionId">
/// Identifies this subscription and helps distinct multiple subscriptions.
/// </param>
/// <returns>
/// A disposable object that will cancel the subscription.
/// </returns>
public delegate IDisposable CreateSubscription(long? lastProcessedCheckpoint,
Func<IReadOnlyList<Transaction>, Task> handler,
string subscriptionId);
Subscriber subscriber, string subscriptionId);

public class Subscriber
{
/// <summary>
/// Represents a handler that will receive the transactions that the event store pushes to the subscriber.
/// </summary>
public Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> HandleTransactions { get; set; }

/// <summary>
/// Represents a handler that the event store will use if the requested checkpoint does not exist.
/// </summary>
public Func<SubscriptionInfo, Task> NoSuchCheckpoint { get; set; }
}

public class SubscriptionInfo
{
public string Id { get; set; }

public IDisposable Subscription { get; set; }
}
}
66 changes: 48 additions & 18 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,50 @@
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using LiquidProjections.Testing;
using LiquidProjections.Abstractions;

namespace LiquidProjections
namespace LiquidProjections.Testing
{
public class MemoryEventSource
{
private readonly int batchSize;
private long lastCheckpoint;

private readonly List<Subscriber> subscribers = new List<Subscriber>();
private readonly List<Subscription> subscribers = new List<Subscription>();
private readonly List<Transaction> history = new List<Transaction>();

public MemoryEventSource(int batchSize = 10)
{
this.batchSize = batchSize;
}

public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
lastCheckpoint = lastProcessedCheckpoint ?? 0;
var subscriber = new Subscriber(lastCheckpoint, batchSize, handler);
var subscription = new Subscription(lastCheckpoint, batchSize, subscriber, subscriptionId);

subscribers.Add(subscriber);
subscribers.Add(subscription);

Func<Task> asyncAction = async () =>
async Task AsyncAction()
{
if (history.LastOrDefault()?.Checkpoint < lastProcessedCheckpoint)
{
await subscriber.NoSuchCheckpoint(new SubscriptionInfo
{
Id = subscriptionId,
Subscription = subscription
});
}

foreach (Transaction transaction in history)
{
await subscriber.Send(new[] { transaction }).ConfigureAwait(false);
await subscription.Send(new[] {transaction}).ConfigureAwait(false);
}
};
}

asyncAction().ConfigureAwait(false).GetAwaiter().GetResult();
AsyncAction().ConfigureAwait(false).GetAwaiter().GetResult();

return subscriber;
return subscription;
}


Expand Down Expand Up @@ -102,29 +111,46 @@ public async Task<Transaction> WriteWithHeaders(object anEvent, IDictionary<stri

return transaction;
}

public bool HasSubscriptionForId(string subscriptionId)
{
Subscription subscription = subscribers.SingleOrDefault(s => s.Id == subscriptionId);
return (subscription != null) && !subscription.IsDisposed;
}
}

internal class Subscriber : IDisposable
internal class Subscription : IDisposable
{
private readonly long lastProcessedCheckpoint;
private readonly int batchSize;
private readonly Func<IReadOnlyList<Transaction>, Task> handler;
private readonly Subscriber subscriber;
private readonly string subscriptionId;
private bool disposed = false;

public Subscriber(long lastProcessedCheckpoint, int batchSize, Func<IReadOnlyList<Transaction>, Task> handler)

public Subscription(long lastProcessedCheckpoint, int batchSize,
Subscriber subscriber, string subscriptionId)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.batchSize = batchSize;
this.handler = handler;
this.subscriber = subscriber;
this.subscriptionId = subscriptionId;
}

public async Task Send(IEnumerable<Transaction> transactions)
{
if (!disposed)
{
foreach (var batch in transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).InBatchesOf(batchSize))
var subscriptionInfo = new SubscriptionInfo
{
await handler(new ReadOnlyCollection<Transaction>(batch.ToList())).ConfigureAwait(false);
Id = subscriptionId,
Subscription = this
};

Transaction[] requestedTransactions = transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).ToArray();
foreach (var batch in requestedTransactions.InBatchesOf(batchSize))
{
await subscriber.HandleTransactions(new ReadOnlyCollection<Transaction>(batch.ToList()), subscriptionInfo)
.ConfigureAwait(false);
}
}
}
Expand All @@ -133,5 +159,9 @@ public void Dispose()
{
disposed = true;
}

public bool IsDisposed => disposed;

public string Id => subscriptionId;
}
}
101 changes: 63 additions & 38 deletions Src/LiquidProjections/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,87 @@ public class Dispatcher
{
private readonly CreateSubscription createSubscription;

[Obsolete("The IEventStore interface has been replaced by the SubscribeToEvents delegate")]
public Dispatcher(IEventStore eventStore)
{
if (eventStore == null)
{
throw new ArgumentNullException(nameof(eventStore));
}

this.createSubscription = eventStore.Subscribe;
}

public Dispatcher(CreateSubscription createSubscription)
{
this.createSubscription = createSubscription;
}

public IDisposable Subscribe(long? checkpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint,
Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> handler,
SubscriptionOptions options = null)
{
if (options == null)
{
options = new SubscriptionOptions();
}

if (handler == null)
{
throw new ArgumentNullException(nameof(handler));
}

var subscriptionMonitor = new object();
IDisposable subscription = null;

lock (subscriptionMonitor)
return createSubscription(lastProcessedCheckpoint, new Subscriber
{
subscription = createSubscription(checkpoint,
async transactions =>
{
try
{
await handler(transactions);
}
catch (Exception exception)
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Projector exception was not handled. Event subscription has been cancelled.",
exception);
HandleTransactions = async (transactions, info) => await HandleTransactions(transactions, handler, info),
NoSuchCheckpoint = async info => await HandleUnknownCheckpoint(info, handler, options)
}, options.Id);
}

lock (subscriptionMonitor)
{
subscription.Dispose();
}
}
},
subscriptionId);
private static async Task HandleTransactions(IReadOnlyList<Transaction> transactions, Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> handler, SubscriptionInfo info)
{
try
{
await handler(transactions, info);
}
catch (Exception exception)
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Projector exception was not handled. Event subscription has been cancelled.",
exception);

return subscription;
info.Subscription?.Dispose();
}
}

public IDisposable Subscribe(long? checkpoint, Func<IReadOnlyList<Transaction>, Task> handler)
private async Task HandleUnknownCheckpoint(SubscriptionInfo info, Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> handler, SubscriptionOptions options)
{
return Subscribe(checkpoint, handler, null);
if (options.RestartWhenAhead)
{
try
{
info.Subscription?.Dispose();

await options.BeforeRestarting();

Subscribe(null, handler, options);
}
catch (Exception exception)
{
LogProvider.GetLogger(typeof(Dispatcher)).FatalException(
"Failed to restart the projector.",
exception);
}
}
}
}

public class SubscriptionOptions
{
/// <summary>
/// Can be used by subscribers to understand which is which.
/// </summary>
public string Id { get; set; }

/// <summary>
/// If set to <c>true</c>, the dispatcher will automatically restart at the first transaction if it detects
/// that the subscriber is ahead of the event store (e.g. because it got restored to an earlier time).
/// </summary>
public bool RestartWhenAhead { get; set; }

/// <summary>
/// If restarting is enabled through <see cref="RestartWhenAhead"/>, this property can be used to run some
/// clean-up code before the dispatcher will restart at the first transaction.
/// </summary>
public Func<Task> BeforeRestarting { get; set; } = () => Task.FromResult(0);
}
}
16 changes: 16 additions & 0 deletions Src/LiquidProjections/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;

namespace LiquidProjections
{
[DebuggerNonUserCode]
internal static class EnumerableExtensions
{
public static IReadOnlyList<T> ToReadOnly<T>(this IEnumerable<T> items)
{
return new ReadOnlyCollection<T>(items.ToList());
}
}
}
Loading

0 comments on commit f06507d

Please sign in to comment.