-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect and recover projectors that are ahead of the event store. #96
Detect and recover projectors that are ahead of the event store. #96
Conversation
d082db7
to
f12c640
Compare
f12c640
to
6e37f01
Compare
{ | ||
yield return batch; | ||
} | ||
yield return batch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we handle an empty batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because an implementation of an event store must provide at least one empty set of transactions, so that the dispatcher can detect subscribers that are ahead of the event store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why you want to make that a responsibility of this method. Its name tells us that it should just split the sequence into batches of the given size.
Also the current implementation could lead to 3 different scenarios:
- It can return 1 empty batch
- It can return 1 or more non-empty batches
- It can return 1 or more non-empty batches followed by an empty batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but the MemoryEventSource
needs this to be able to meet my new requirement that an event source should always send an empty set of transactions right after subscribing, if there are no transactions beyond the provided checkpoint
Src/LiquidProjections/Dispatcher.cs
Outdated
{ | ||
try | ||
{ | ||
await handler(transactions); | ||
if (options.RestartWhenAhead && !transactions.Any()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why absence of transactions here means being ahead of event store. Why is it called without transactions at all? This delegate is invoked multiple times. What if the second call has no transactions?
If we want the event store to notify us of being ahead, I would create a separate delegate parameter for that with a clear responsibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's the other way around. By changing the subscription so that it requests the last transaction the subscriber has handled, and then verifying if it is still there, we can conclude that the subscriber is ahead. Otherwise we would have every event store signal us when we request a checkpoint that is beyond its last checkpoint. By going this route, we keep the complexity in the dispatcher rather than in each implementation of an event store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to modify all the event stores anyway. Otherwise they will not call this delegate with empty batch. So they better call another delegate in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I tend to agree with you. Back to the drawing board....
Adding another delegate would mean the subscription delegate will start to look like this;
Feels a bit awkward. What do you think? |
e6e12b7
to
9adaed7
Compare
@IharBury please review again. I've applied your proposal. |
Src/LiquidProjections/Dispatcher.cs
Outdated
{ | ||
return Subscribe(checkpoint, handler, null); | ||
BeforeRestarting = () => Task.FromResult(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Syntax suggestion: you can assign the initial value in the property declaration:
public Func<Task> BeforeRestarting { get; set; } = () => Task.FromResult(0);
…head of the event source.
9adaed7
to
f06507d
Compare
Adds a new options object that allows a subscriber to tell the dispatcher to restart the subscription at the first transaction available in the underlying event source, and allow the subscriber to clean-up just before that.