Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and recover projectors that are ahead of the event store. #96

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Samples/ExampleHost/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Microsoft.Owin" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-3.0.1.0" newVersion="3.0.1.0" />
<bindingRedirect oldVersion="0.0.0.0-3.1.0.0" newVersion="3.1.0.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="System.Web.Http.Owin" publicKeyToken="31bf3856ad364e35" culture="neutral" />
Expand All @@ -25,6 +25,10 @@
<assemblyIdentity name="Microsoft.Owin.Hosting" publicKeyToken="31bf3856ad364e35" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-3.0.1.0" newVersion="3.0.1.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-10.0.0.0" newVersion="10.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>
2 changes: 1 addition & 1 deletion Samples/ExampleHost/CountsProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public CountsProjector(Dispatcher dispatcher, InMemoryDatabase store)

public void Start()
{
dispatcher.Subscribe(null, async transactions =>
dispatcher.Subscribe(null, async (transactions, info) =>
{
await documentCountProjector.Handle(transactions);
});
Expand Down
25 changes: 10 additions & 15 deletions Samples/ExampleHost/ExampleHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,20 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Owin, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.3.0.1\lib\net45\Microsoft.Owin.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.3.1.0\lib\net45\Microsoft.Owin.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin.Diagnostics, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Diagnostics.3.0.1\lib\net45\Microsoft.Owin.Diagnostics.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin.Diagnostics, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Diagnostics.3.1.0\lib\net45\Microsoft.Owin.Diagnostics.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin.Host.HttpListener, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Host.HttpListener.3.0.1\lib\net45\Microsoft.Owin.Host.HttpListener.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin.Host.HttpListener, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Host.HttpListener.3.1.0\lib\net45\Microsoft.Owin.Host.HttpListener.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Owin.Hosting, Version=3.0.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Hosting.3.0.1\lib\net45\Microsoft.Owin.Hosting.dll</HintPath>
<Private>True</Private>
<Reference Include="Microsoft.Owin.Hosting, Version=3.1.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Owin.Hosting.3.1.0\lib\net45\Microsoft.Owin.Hosting.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.6.0.4\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="Owin, Version=1.0.0.0, Culture=neutral, PublicKeyToken=f0ebd12fd5e55cc5, processorArchitecture=MSIL">
<HintPath>..\..\packages\Owin.1.0\lib\net40\Owin.dll</HintPath>
Expand Down
17 changes: 10 additions & 7 deletions Samples/ExampleHost/JsonFileEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Linq;
using System.Runtime.Serialization.Formatters;
using System.Threading.Tasks;
using LiquidProjections.Abstractions;
using Newtonsoft.Json;

namespace LiquidProjections.ExampleHost
Expand All @@ -25,9 +26,11 @@ public JsonFileEventStore(string filePath, int pageSize)
entryQueue = new Queue<ZipArchiveEntry>(zip.Entries.Where(e => e.Name.EndsWith(".json")));
}

public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
var subscriber = new Subscriber(lastProcessedCheckpoint ?? 0, handler);
var subscription = new Subscription(
lastProcessedCheckpoint ?? 0,
transactions => subscriber.HandleTransactions(transactions, null));

Task.Run(async () =>
{
Expand All @@ -39,13 +42,13 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<T
// Start loading the next page on a separate thread while we have the subscriber handle the previous transactions.
loader = LoadNextPageAsync();

await subscriber.Send(transactions);
await subscription.Send(transactions);

transactions = await loader;
}
});

return subscriber;
return subscription;
}

private Task<Transaction[]> LoadNextPageAsync()
Expand All @@ -72,7 +75,7 @@ private Task<Transaction[]> LoadNextPageAsync()
Body = JsonConvert.DeserializeObject(json, new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All,
TypeNameAssemblyFormat = FormatterAssemblyStyle.Full
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full
})
});
}
Expand Down Expand Up @@ -105,13 +108,13 @@ public void Dispose()
zip = null;
}

internal class Subscriber : IDisposable
internal class Subscription : IDisposable
{
private readonly long lastProcessedCheckpoint;
private readonly Func<IReadOnlyList<Transaction>, Task> handler;
private bool disposed;

public Subscriber(long lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler)
public Subscription(long lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.handler = handler;
Expand Down
12 changes: 6 additions & 6 deletions Samples/ExampleHost/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
<package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net45" />
<package id="Microsoft.AspNet.WebApi.Core" version="5.2.3" targetFramework="net45" />
<package id="Microsoft.AspNet.WebApi.Owin" version="5.2.3" targetFramework="net452" />
<package id="Microsoft.Owin" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.Diagnostics" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.Host.HttpListener" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.Hosting" version="3.0.1" targetFramework="net452" />
<package id="Microsoft.Owin.SelfHost" version="3.0.1" targetFramework="net452" />
<package id="Newtonsoft.Json" version="6.0.4" targetFramework="net45" />
<package id="Microsoft.Owin" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.Diagnostics" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.Host.HttpListener" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.Hosting" version="3.1.0" targetFramework="net45" />
<package id="Microsoft.Owin.SelfHost" version="3.1.0" targetFramework="net45" />
<package id="Newtonsoft.Json" version="10.0.2" targetFramework="net45" />
<package id="Owin" version="1.0" targetFramework="net452" />
<package id="TinyIoC" version="1.3" targetFramework="net452" developmentDependency="true" />
</packages>
41 changes: 34 additions & 7 deletions Src/LiquidProjections.Abstractions/CreateSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,40 @@ namespace LiquidProjections.Abstractions
/// <summary>
/// Creates a subscription on an event store, starting at the transaction following
/// <paramref name="lastProcessedCheckpoint"/>, identified by <paramref name="subscriptionId"/>, and which
/// passed transactions to the provided <paramref name="handler"/>.
/// passes any transactions to the provided <paramref name="subscriber"/>.
/// </summary>
/// <param name="lastProcessedCheckpoint"></param>
/// <param name="handler"></param>
/// <param name="subscriptionId"></param>
/// <returns></returns>
/// <param name="lastProcessedCheckpoint">
/// The checkpoint of the transaction the subscriber has last seen, or <c>null</c> to start from the beginning.
/// </param>
/// <param name="subscriber">
/// An object wrapping the various handlers that the subscription will use.
/// </param>
/// <param name="subscriptionId">
/// Identifies this subscription and helps distinct multiple subscriptions.
/// </param>
/// <returns>
/// A disposable object that will cancel the subscription.
/// </returns>
public delegate IDisposable CreateSubscription(long? lastProcessedCheckpoint,
Func<IReadOnlyList<Transaction>, Task> handler,
string subscriptionId);
Subscriber subscriber, string subscriptionId);

public class Subscriber
{
/// <summary>
/// Represents a handler that will receive the transactions that the event store pushes to the subscriber.
/// </summary>
public Func<IReadOnlyList<Transaction>, SubscriptionInfo, Task> HandleTransactions { get; set; }

/// <summary>
/// Represents a handler that the event store will use if the requested checkpoint does not exist.
/// </summary>
public Func<SubscriptionInfo, Task> NoSuchCheckpoint { get; set; }
}

public class SubscriptionInfo
{
public string Id { get; set; }

public IDisposable Subscription { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<ProjectConfiguration>
<Settings>
<PreviouslyBuiltSuccessfully>True</PreviouslyBuiltSuccessfully>
</Settings>
</ProjectConfiguration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<ProjectConfiguration>
<Settings>
<PreviouslyBuiltSuccessfully>True</PreviouslyBuiltSuccessfully>
</Settings>
</ProjectConfiguration>
66 changes: 48 additions & 18 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,50 @@
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using LiquidProjections.Testing;
using LiquidProjections.Abstractions;

namespace LiquidProjections
namespace LiquidProjections.Testing
{
public class MemoryEventSource
{
private readonly int batchSize;
private long lastCheckpoint;

private readonly List<Subscriber> subscribers = new List<Subscriber>();
private readonly List<Subscription> subscribers = new List<Subscription>();
private readonly List<Transaction> history = new List<Transaction>();

public MemoryEventSource(int batchSize = 10)
{
this.batchSize = batchSize;
}

public IDisposable Subscribe(long? lastProcessedCheckpoint, Func<IReadOnlyList<Transaction>, Task> handler, string subscriptionId)
public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
lastCheckpoint = lastProcessedCheckpoint ?? 0;
var subscriber = new Subscriber(lastCheckpoint, batchSize, handler);
var subscription = new Subscription(lastCheckpoint, batchSize, subscriber, subscriptionId);

subscribers.Add(subscriber);
subscribers.Add(subscription);

Func<Task> asyncAction = async () =>
async Task AsyncAction()
{
if (history.LastOrDefault()?.Checkpoint < lastProcessedCheckpoint)
{
await subscriber.NoSuchCheckpoint(new SubscriptionInfo
{
Id = subscriptionId,
Subscription = subscription
});
}

foreach (Transaction transaction in history)
{
await subscriber.Send(new[] { transaction }).ConfigureAwait(false);
await subscription.Send(new[] {transaction}).ConfigureAwait(false);
}
};
}

asyncAction().ConfigureAwait(false).GetAwaiter().GetResult();
AsyncAction().ConfigureAwait(false).GetAwaiter().GetResult();

return subscriber;
return subscription;
}


Expand Down Expand Up @@ -102,29 +111,46 @@ public async Task<Transaction> WriteWithHeaders(object anEvent, IDictionary<stri

return transaction;
}

public bool HasSubscriptionForId(string subscriptionId)
{
Subscription subscription = subscribers.SingleOrDefault(s => s.Id == subscriptionId);
return (subscription != null) && !subscription.IsDisposed;
}
}

internal class Subscriber : IDisposable
internal class Subscription : IDisposable
{
private readonly long lastProcessedCheckpoint;
private readonly int batchSize;
private readonly Func<IReadOnlyList<Transaction>, Task> handler;
private readonly Subscriber subscriber;
private readonly string subscriptionId;
private bool disposed = false;

public Subscriber(long lastProcessedCheckpoint, int batchSize, Func<IReadOnlyList<Transaction>, Task> handler)

public Subscription(long lastProcessedCheckpoint, int batchSize,
Subscriber subscriber, string subscriptionId)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.batchSize = batchSize;
this.handler = handler;
this.subscriber = subscriber;
this.subscriptionId = subscriptionId;
}

public async Task Send(IEnumerable<Transaction> transactions)
{
if (!disposed)
{
foreach (var batch in transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).InBatchesOf(batchSize))
var subscriptionInfo = new SubscriptionInfo
{
await handler(new ReadOnlyCollection<Transaction>(batch.ToList())).ConfigureAwait(false);
Id = subscriptionId,
Subscription = this
};

Transaction[] requestedTransactions = transactions.Where(t => t.Checkpoint > lastProcessedCheckpoint).ToArray();
foreach (var batch in requestedTransactions.InBatchesOf(batchSize))
{
await subscriber.HandleTransactions(new ReadOnlyCollection<Transaction>(batch.ToList()), subscriptionInfo)
.ConfigureAwait(false);
}
}
}
Expand All @@ -133,5 +159,9 @@ public void Dispose()
{
disposed = true;
}

public bool IsDisposed => disposed;

public string Id => subscriptionId;
}
}
Loading