-
Notifications
You must be signed in to change notification settings - Fork 558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure log consistency #4717
Ensure log consistency #4717
Conversation
10a6940
to
dd1334b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MiguelPires I added some comments which we should address before merging, but I think the solution itself looks quite simple. Thanks again 👍
atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java
Outdated
Show resolved
Hide resolved
atomix/cluster/src/test/java/io/atomix/raft/roles/LeaderRoleTest.java
Outdated
Show resolved
Hide resolved
logstreams/src/main/java/io/zeebe/logstreams/impl/log/Listener.java
Outdated
Show resolved
Hide resolved
logstreams/src/main/java/io/zeebe/logstreams/impl/log/LogStorageAppender.java
Outdated
Show resolved
Hide resolved
try { | ||
// return position of last event | ||
result = writeEventsToBuffer(claimedBatch.getBuffer()); | ||
writeEventsToBuffer(claimedBatch.getBuffer(), position); | ||
position += eventCount - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't you wrote an test in the dispatcher where the claim returned the the position based on the fragment count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think claim will return already the last position? ANd you then add again the event count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite. The Dispatcher tests verify that the next position increases based on the current fragment count. Claim returns the position of the first record. I think it makes more sense than returning the updated publisher position and then calculating the initial in the writers. The single writer had to subtract a frame length from the claimedPosition to get the actual position and the batch writer was replacing the position anyway using the position returned from ClaimedFragmentBatch#nextFragment
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling the api can still be improved. We dont need to do that now but just an Idea.
We could return a list of claimed fragments. The ClaimedFragment holds and buffer and a position, which the write can use. You can then iterate over the list write in each fragment the corresponding data and the related position and dont need to recalculate it by yourself. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the goal. We used to do the same things in the writers, except in a more complicated way. Previously, the position returned by the Dispatcher was not useful for the batch writer (it was not being used except as a success/fail flag) and was overwritten by the last calculated position. It was also not directly used by the single writer since we had to subtract a frame length to get the actual position of the fragment. Now the single writer uses the position returned by the Dispatcher as is and, since we want to return last event's position in the batch writer, we do that by taking into account the number of events in the batch.
I might be misunderstanding the idea but returning a list of claimed fragment is different than a batch fragment, if we wrote the first fragment and failed in the second, the first fragment wouldn't be aborted. That gives us different semantics in writing. Maybe we could write some additional logic to get around this somehow but I'm not sure I understand why we would. To avoid doing position += eventCount - 1
? I'm not sure I see the benefit, but I might not be seeing the full picture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you misunderstood me. What I would like to have as an API where I not care where the position came from and how it is calculated.
What we currently doing is to claim a batch, which is just a buffer and go over our to write events and write them in the buffer we got. We calculating the position then based on index and the returned position. On claiming we already give to the claimed batched how many fragments/events we will write.
What I like to have is the following:
for (ClaimedFragment fragment : claimedBatch.getFragments) {
writeDateInto(fragment.buffer);
setPositionForEvent(fragment.position)
}
or
var iter = claimedBatch.iterator()
while (iter.hasNext()) {
var fragment = iter.next()
writeDateInto(fragment.buffer);
setPositionForEvent(fragment.position)
}
new ZeebeEntry( | ||
0, System.currentTimeMillis(), lowestPosition, highestPosition, data)); | ||
final ZeebeEntry zbEntry = | ||
new ZeebeEntry(0, System.currentTimeMillis(), lowestPosition, highestPosition, data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm this makes me thinking that this setup here is not ideal, since we now actually test this code and not the code in the Raft appender. It is easy that they drift away when we change something here or there. Any idea how we could use the actual code, which we use in production?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I tried to do that but eventually gave up 😛 it's not ideal but idk of a good way to improve this without a lot of work for something so small
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use the LeaderAppender somehow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up issue?
Thanks for the feedback @Zelldon . I addressed your comments in the latest commit and also opened an issue to investigate the performance impact of the locking in the Dispatcher |
@MiguelPires I will take a look at it today sorry for the delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MiguelPires some more things I found but I think it already looks quite good I just would like to move the validation.
atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java
Outdated
Show resolved
Hide resolved
if (!isEntryConsistent(lowestPosition)) { | ||
appendListener.onWriteError( | ||
new IllegalStateException("New entry has lower Zeebe log position than last entry.")); | ||
final ValidationResult result = validateEntryConsistency(entry, appendListener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that
atomix/cluster/src/main/java/io/atomix/raft/zeebe/ZeebeLogAppender.java
Outdated
Show resolved
Hide resolved
try { | ||
// return position of last event | ||
result = writeEventsToBuffer(claimedBatch.getBuffer()); | ||
writeEventsToBuffer(claimedBatch.getBuffer(), position); | ||
position += eventCount - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling the api can still be improved. We dont need to do that now but just an Idea.
We could return a list of claimed fragments. The ClaimedFragment holds and buffer and a position, which the write can use. You can then iterate over the list write in each fragment the corresponding data and the related position and dont need to recalculate it by yourself. wdyt?
@@ -38,4 +39,9 @@ public void onCommit(final Indexed<ZeebeEntry> indexed) { | |||
public void onCommitError(final Indexed<ZeebeEntry> indexed, final Throwable error) { | |||
delegate.onCommitError(indexed.index(), error); | |||
} | |||
|
|||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this class actually?
new ZeebeEntry( | ||
0, System.currentTimeMillis(), lowestPosition, highestPosition, data)); | ||
final ZeebeEntry zbEntry = | ||
new ZeebeEntry(0, System.currentTimeMillis(), lowestPosition, highestPosition, data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use the LeaderAppender somehow
new ZeebeEntry( | ||
0, System.currentTimeMillis(), lowestPosition, highestPosition, data)); | ||
final ZeebeEntry zbEntry = | ||
new ZeebeEntry(0, System.currentTimeMillis(), lowestPosition, highestPosition, data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up issue?
402150e
to
a272c22
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MiguelPires love it 🥇
Had just smaller comments and lets just run a small benchmark with it. I can setup one.
logstreams/src/main/java/io/zeebe/logstreams/impl/log/ZeebeEntryValidator.java
Show resolved
Hide resolved
atomix/core/src/test/java/io/atomix/core/AbstractAtomixTest.java
Outdated
Show resolved
Hide resolved
atomix/cluster/src/main/java/io/atomix/raft/zeebe/EntryValidator.java
Outdated
Show resolved
Hide resolved
atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java
Outdated
Show resolved
Hide resolved
Hey, thanks for the review, I'll address the comments tomorrow. There's already a benchmark running for this, the namespace is called mp-spike. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work thanks 👍
* positions are now consecutive (they increase by 1) * before appending, we check that there are no gaps between records
51028ba
to
c73738a
Compare
bors r+ |
Build succeeded: |
* feat(backend): Endpoint to get decision requirements by key feat(backend): Endpoint to get decision requirements by key - added model and controller - added tests Closes #4717
Co-authored-by: nathansandi <nathansandi@users.noreply.github.com> Co-authored-by: Vinicius Goulart <vinicius.goulart@camunda.com>
Description
Positions are now consecutive and, before appending we check that records are well-ordered and that there are no gaps in appended blocks.
Related issues
closes #3987
Pull Request Checklist
mvn clean install -DskipTests
locally before committing