Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vsr: synchronize op and commit_max updates on_prepare #1576

Merged
merged 3 commits into from Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 56 additions & 50 deletions src/vsr/replica.zig
Expand Up @@ -784,7 +784,8 @@ pub fn ReplicaType(

if (self.solo()) {
if (self.commit_min < self.op) {
self.commit_journal(self.op);
self.advance_commit_max(self.op, @src().fn_name);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether I like @src().fn_name. On the one hand, it is cryptic and too clever. On the other hand, we did have bugs where we moved code around but didn't update call-sites.

If we double-down here, we might pass the whole @src() in even

self.commit_journal();

// Recovery will complete when commit_journal finishes.
assert(self.status == .recovering);
Expand All @@ -798,7 +799,7 @@ pub fn ReplicaType(
}
} else {
if (self.status != .recovering_head) {
self.commit_journal(self.commit_max);
self.commit_journal();
}
}
}
Expand Down Expand Up @@ -1393,13 +1394,11 @@ pub fn ReplicaType(
assert(message.header.op > self.op);
assert(message.header.op > self.commit_min);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a assert(message.header.op > message.header.commit); to emphasize that the self.advance_commit_max(message.header.commit, @src().fn_name); below is ok even though we won't have loaded message.header into our journal yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

advancing commit_max should be ok even if we don't have a header? We allow maybe(self.op < self.commit_max)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point!


defer {
if (self.backup()) {
// A prepare may already be committed if requested by repair() so take the max:
self.commit_journal(message.header.commit);
assert(self.commit_max >= message.header.commit);
}
if (self.backup()) {
self.advance_commit_max(message.header.commit, @src().fn_name);
assert(self.commit_max >= message.header.commit);
}
defer if (self.backup()) self.commit_journal();

// Verify that the new request will fit in the WAL.
if (message.header.op > self.op_prepare_max()) {
Expand Down Expand Up @@ -1637,7 +1636,8 @@ pub fn ReplicaType(
}
}

self.commit_journal(message.header.commit);
self.advance_commit_max(message.header.commit, @src().fn_name);
self.commit_journal();
}

fn on_repair(self: *Self, message: *Message.Prepare) void {
Expand Down Expand Up @@ -1991,11 +1991,11 @@ pub fn ReplicaType(
.view_change => {
self.transition_to_normal_from_view_change_status(message.header.view);
self.send_prepare_oks_after_view_change();
self.commit_journal(self.commit_max);
self.commit_journal();
},
.recovering_head => {
self.transition_to_normal_from_recovering_head_status(message.header.view);
self.commit_journal(self.commit_max);
self.commit_journal();
},
.normal => {},
.recovering => unreachable,
Expand Down Expand Up @@ -3015,6 +3015,31 @@ pub fn ReplicaType(
return count;
}

/// Caller must ensure that:
/// - op=commit is indeed committed by the cluster,
/// - local WAL doesn't contain truncated prepares from finished views.
fn advance_commit_max(self: *Self, commit: u64, method: []const u8) void {
defer {
assert(self.commit_max >= commit);
assert(self.commit_max >= self.commit_min);
assert(self.commit_max >= self.op -| constants.pipeline_prepare_queue_max);
if (self.status == .normal and self.primary()) {
assert(self.commit_max == self.commit_min);
assert(self.commit_max == self.op - self.pipeline.queue.prepare_queue.count);
}
}

if (commit > self.commit_max) {
log.debug("{}: {s}: advancing commit_max={}..{}", .{
self.replica,
method,
self.commit_max,
commit,
});
self.commit_max = commit;
}
}

fn append(self: *Self, message: *Message.Prepare) void {
assert(self.status == .normal);
assert(message.header.command == .prepare);
Expand Down Expand Up @@ -3157,31 +3182,17 @@ pub fn ReplicaType(
}
}

/// Commit ops up to commit number `commit` (inclusive).
/// A function which calls `commit_journal()` to set `commit_max` must first call
/// `jump_view()`. Otherwise, we may fork the log.
fn commit_journal(self: *Self, commit: u64) void {
/// Commit ops up to commit_max (inclusive).
fn commit_journal(self: *Self) void {
assert(self.status == .normal or self.status == .view_change or
(self.status == .recovering and self.solo()));
assert(!(self.status == .normal and self.primary()));
assert(self.commit_min <= self.commit_max);
assert(self.commit_min <= self.op);
assert(self.commit_max <= self.op or self.commit_max > self.op);
assert(commit <= self.op or commit > self.op);
maybe(self.commit_max > self.op);

// We have already committed this far:
if (commit <= self.commit_min) return;

// We must update `commit_max` even if we are already committing, otherwise we will lose
// information that we should know, and `set_op_and_commit_max()` will catch us out:
if (commit > self.commit_max) {
log.debug("{}: commit_journal: advancing commit_max={}..{}", .{
self.replica,
self.commit_max,
commit,
});
self.commit_max = commit;
}
if (self.commit_max == self.commit_min) return;

if (!self.state_machine_opened) {
assert(self.commit_stage == .idle);
Expand Down Expand Up @@ -3436,17 +3447,6 @@ pub fn ReplicaType(
assert(self.commit_min <= self.commit_max);

if (self.status == .normal and self.primary()) {
{
const prepare = self.pipeline.queue.pop_prepare().?;
defer self.message_bus.unref(prepare.message);

assert(prepare.message.header.command == .prepare);
assert(prepare.message.header.checksum == self.commit_prepare.?.header.checksum);
assert(prepare.message.header.op == self.commit_min);
assert(prepare.message.header.op == self.commit_max);
assert(prepare.ok_quorum_received);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did like how "pop prepare" was immediately followed by "pop request -> push prepare to pipeline". But commit_op is already pretty gigantic 😓

if (self.pipeline.queue.pop_request()) |request| {
// Start preparing the next request in the queue (if any).
self.primary_pipeline_prepare(request);
Expand Down Expand Up @@ -3775,16 +3775,21 @@ pub fn ReplicaType(
assert(self.state_machine.commit_timestamp <= prepare.header.timestamp or constants.aof_recovery);
self.state_machine.commit_timestamp = prepare.header.timestamp;

if (self.status == .normal and self.primary()) {
const pipeline_prepare = self.pipeline.queue.pop_prepare().?;
defer self.message_bus.unref(pipeline_prepare.message);

assert(pipeline_prepare.message == prepare);
assert(pipeline_prepare.message.header.command == .prepare);
assert(pipeline_prepare.message.header.checksum == self.commit_prepare.?.header.checksum);
assert(pipeline_prepare.message.header.op == self.commit_min + 1);
assert(pipeline_prepare.message.header.op == self.commit_max + 1);
assert(pipeline_prepare.ok_quorum_received);
}

self.commit_min += 1;
assert(self.commit_min == prepare.header.op);
if (self.commit_min > self.commit_max) {
log.debug("{}: commit_op: advancing commit_max={}..{}", .{
self.replica,
self.commit_max,
self.commit_min,
});
self.commit_max = self.commit_min;
}
self.advance_commit_max(self.commit_min, @src().fn_name);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels something suspicious. I think it would be better to update commit_max before the primary starts committing, not after it is done. Though I guess for solo mode updating later is actually desirable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I like the "on primary, commit_min == commit_max invariant" that we have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, this feels extra tricky --- it seems like we could be advancing commit_max here even if we are no longer primary, if view change happens while we are committing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 if we allow primary's commit_max to run ahead of commit_min, that might increase latency tolerance. If the primary is slow to commit, clients could get replies from backups which commit ahead of the primary.


if (self.event_callback) |hook| hook(self, .committed);

Expand Down Expand Up @@ -5074,6 +5079,7 @@ pub fn ReplicaType(
if (self.status == .recovering) assert(self.solo());
assert(self.op >= self.op_checkpoint());
assert(self.op <= self.op_prepare_max());
assert(self.commit_max >= self.op -| constants.pipeline_prepare_queue_max);

const op = op: {
if (self.primary_index(self.view) == self.replica) {
Expand Down Expand Up @@ -5456,7 +5462,7 @@ pub fn ReplicaType(
// that they form a valid hashchain. Committing may discover more faulty prepares
// and drive further repairs.
assert(!self.solo());
self.commit_journal(self.commit_max);
self.commit_journal();
}

if (self.status == .view_change and self.primary_index(self.view) == self.replica) {
Expand Down Expand Up @@ -7996,7 +8002,7 @@ pub fn ReplicaType(

// Bump commit_max before the superblock update so that a view_durable_update()
// during the sync_start update uses the correct (new) commit_max.
self.commit_max = @max(stage.target.checkpoint_op, self.commit_max);
self.advance_commit_max(stage.target.checkpoint_op, @src().fn_name);

const sync_op_max =
vsr.Checkpoint.trigger_for_checkpoint(stage.target.checkpoint_op).?;
Expand Down