Skip to content

Commit

Permalink
vsr: decide commit_max separately from acting on it
Browse files Browse the repository at this point in the history
Before, `commit_journal` played dual role:

* it advanced commit_max
* it tried executing everything up to commit_max

In parciular, we had funny calls `commit_journal(self.commit_max)`,
which neutered the first role in a somewhat roundabout way.

Now, we have two functions:

* "sensitive" `advance_commit_max` to be called when you have learned
  something that justifies _logically_ committing more
* "safe" `commit_journal`, which can be called whenever, and just
  executes locally available prepares up to commit_max

There shouldn't be any behavior changes here, but in the next commit
I'll tweak the logic in `on_prepare` to advance commit_max _before_
appending the prepare, so as not to break op/commit_max invariant even
temporarily.
  • Loading branch information
matklad committed Feb 20, 2024
1 parent ed77c8e commit 1120495
Showing 1 changed file with 42 additions and 34 deletions.
76 changes: 42 additions & 34 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,8 @@ pub fn ReplicaType(

if (self.solo()) {
if (self.commit_min < self.op) {
self.commit_journal(self.op);
self.advance_commit_max(self.op, @src().fn_name);
self.commit_journal();

// Recovery will complete when commit_journal finishes.
assert(self.status == .recovering);
Expand All @@ -798,7 +799,7 @@ pub fn ReplicaType(
}
} else {
if (self.status != .recovering_head) {
self.commit_journal(self.commit_max);
self.commit_journal();
}
}
}
Expand Down Expand Up @@ -1396,7 +1397,8 @@ pub fn ReplicaType(
defer {
if (self.backup()) {
// A prepare may already be committed if requested by repair() so take the max:
self.commit_journal(message.header.commit);
self.advance_commit_max(message.header.commit, @src().fn_name);
self.commit_journal();
assert(self.commit_max >= message.header.commit);
}
}
Expand Down Expand Up @@ -1637,7 +1639,8 @@ pub fn ReplicaType(
}
}

self.commit_journal(message.header.commit);
self.advance_commit_max(message.header.commit, @src().fn_name);
self.commit_journal();
}

fn on_repair(self: *Self, message: *Message.Prepare) void {
Expand Down Expand Up @@ -1991,11 +1994,11 @@ pub fn ReplicaType(
.view_change => {
self.transition_to_normal_from_view_change_status(message.header.view);
self.send_prepare_oks_after_view_change();
self.commit_journal(self.commit_max);
self.commit_journal();
},
.recovering_head => {
self.transition_to_normal_from_recovering_head_status(message.header.view);
self.commit_journal(self.commit_max);
self.commit_journal();
},
.normal => {},
.recovering => unreachable,
Expand Down Expand Up @@ -3015,6 +3018,32 @@ pub fn ReplicaType(
return count;
}

/// Caller must ensure that:
/// - op=commit is indeed committed by the cluster,
/// - local WAL doesn't contain truncated prepares from finished views.
fn advance_commit_max(self: *Self, commit: u64, method: []const u8) void {
defer {
assert(self.commit_max >= commit);
assert(self.commit_max >= self.commit_min);
assert(self.commit_max >= self.op -| constants.pipeline_prepare_queue_max);
if (self.status == .normal and self.primary()) {
assert(self.commit_max == self.commit_min);
assert(self.commit_max == self.op - self.pipeline.queue.prepare_queue.count);
}

}

if (commit > self.commit_max) {
log.debug("{}: {s}: advancing commit_max={}..{}", .{
self.replica,
method,
self.commit_max,
commit,
});
self.commit_max = commit;
}
}

fn append(self: *Self, message: *Message.Prepare) void {
assert(self.status == .normal);
assert(message.header.command == .prepare);
Expand Down Expand Up @@ -3157,31 +3186,17 @@ pub fn ReplicaType(
}
}

/// Commit ops up to commit number `commit` (inclusive).
/// A function which calls `commit_journal()` to set `commit_max` must first call
/// `jump_view()`. Otherwise, we may fork the log.
fn commit_journal(self: *Self, commit: u64) void {
/// Commit ops up to commit_max (inclusive).
fn commit_journal(self: *Self) void {
assert(self.status == .normal or self.status == .view_change or
(self.status == .recovering and self.solo()));
assert(!(self.status == .normal and self.primary()));
assert(self.commit_min <= self.commit_max);
assert(self.commit_min <= self.op);
assert(self.commit_max <= self.op or self.commit_max > self.op);
assert(commit <= self.op or commit > self.op);
maybe(self.commit_max > self.op);

// We have already committed this far:
if (commit <= self.commit_min) return;

// We must update `commit_max` even if we are already committing, otherwise we will lose
// information that we should know, and `set_op_and_commit_max()` will catch us out:
if (commit > self.commit_max) {
log.debug("{}: commit_journal: advancing commit_max={}..{}", .{
self.replica,
self.commit_max,
commit,
});
self.commit_max = commit;
}
if (self.commit_max == self.commit_min) return;

if (!self.state_machine_opened) {
assert(self.commit_stage == .idle);
Expand Down Expand Up @@ -3777,14 +3792,7 @@ pub fn ReplicaType(

self.commit_min += 1;
assert(self.commit_min == prepare.header.op);
if (self.commit_min > self.commit_max) {
log.debug("{}: commit_op: advancing commit_max={}..{}", .{
self.replica,
self.commit_max,
self.commit_min,
});
self.commit_max = self.commit_min;
}
self.advance_commit_max(self.commit_min, @src().fn_name);

if (self.event_callback) |hook| hook(self, .committed);

Expand Down Expand Up @@ -5457,7 +5465,7 @@ pub fn ReplicaType(
// that they form a valid hashchain. Committing may discover more faulty prepares
// and drive further repairs.
assert(!self.solo());
self.commit_journal(self.commit_max);
self.commit_journal();
}

if (self.status == .view_change and self.primary_index(self.view) == self.replica) {
Expand Down Expand Up @@ -7997,7 +8005,7 @@ pub fn ReplicaType(

// Bump commit_max before the superblock update so that a view_durable_update()
// during the sync_start update uses the correct (new) commit_max.
self.commit_max = @max(stage.target.checkpoint_op, self.commit_max);
self.advance_commit_max(stage.target.checkpoint_op, @src().fn_name);

const sync_op_max =
vsr.Checkpoint.trigger_for_checkpoint(stage.target.checkpoint_op).?;
Expand Down

0 comments on commit 1120495

Please sign in to comment.