Skip to content

Commit

Permalink
It was possible for a subscription to start processing a new batch of…
Browse files Browse the repository at this point in the history
… transactions after the subscription has been cancelled. (#105)

It still possible that the subscription continues processing of a transaction batch after it has been cancelled.
  • Loading branch information
IharBury authored and dennisdoomen committed Sep 18, 2017
1 parent 1427a45 commit 146c5b1
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,9 @@ private class Subscription : IDisposable
private readonly object syncRoot = new object();
private Task task;
private TaskCompletionSource<long> progressCompletionSource = new TaskCompletionSource<long>();
private readonly TaskCompletionSource<bool> waitForCheckingWhetherItIsAheadCompletionSource = new TaskCompletionSource<bool>();

private readonly TaskCompletionSource<bool> waitForCheckingWhetherItIsAheadCompletionSource =
new TaskCompletionSource<bool>();

public Subscription(long lastProcessedCheckpoint, int batchSize,
Subscriber subscriber, string subscriptionId, MemoryEventSource memoryEventSource)
Expand Down Expand Up @@ -429,7 +431,7 @@ public void Start()
Dispose();
}
},
cancellationTokenSource.Token,
CancellationToken.None,
TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap();
Expand All @@ -438,6 +440,11 @@ public void Start()

private async Task RunAsync(SubscriptionInfo info)
{
if (IsDisposed)
{
return;
}

long oldLastProcessedCheckpoint;

lock (syncRoot)
Expand All @@ -457,7 +464,7 @@ private async Task RunAsync(SubscriptionInfo info)

int nextTransactionIndex = memoryEventSource.GetNextTransactionIndex(oldLastProcessedCheckpoint);

while (!cancellationTokenSource.IsCancellationRequested)
while (!IsDisposed)
{
Task waitForNewTransactions = memoryEventSource.WaitForNewTransactions();
Transaction[] transactions = memoryEventSource.GetTransactionsFromIndex(nextTransactionIndex);
Expand Down Expand Up @@ -507,23 +514,30 @@ public void Dispose()
{
isDisposed = true;

// Run continuations and wait for the subscription task asynchronously.
Task.Run(() =>
{
progressCompletionSource.SetCanceled();
waitForCheckingWhetherItIsAheadCompletionSource.TrySetCanceled();
progressCompletionSource.SetCanceled();
waitForCheckingWhetherItIsAheadCompletionSource.TrySetCanceled();

if (cancellationTokenSource != null)
if (cancellationTokenSource != null)
{
try
{
if (!cancellationTokenSource.IsCancellationRequested)
{
cancellationTokenSource.Cancel();
}
cancellationTokenSource.Cancel();
}
catch (AggregateException)
{
// Ignore.
}

task?.Wait();
if (task == null)
{
cancellationTokenSource.Dispose();
}
});
else
{
// Run continuations and wait for the subscription task asynchronously.
task.ContinueWith(_ => cancellationTokenSource.Dispose());
}
}
}
}
}
Expand Down

0 comments on commit 146c5b1

Please sign in to comment.