From 1120495c0636eb994f8a80c9a45590706355ffd7 Mon Sep 17 00:00:00 2001 From: Alex Kladov Date: Tue, 20 Feb 2024 15:14:48 +0000 Subject: [PATCH] vsr: decide commit_max separately from acting on it Before, `commit_journal` played dual role: * it advanced commit_max * it tried executing everything up to commit_max In parciular, we had funny calls `commit_journal(self.commit_max)`, which neutered the first role in a somewhat roundabout way. Now, we have two functions: * "sensitive" `advance_commit_max` to be called when you have learned something that justifies _logically_ committing more * "safe" `commit_journal`, which can be called whenever, and just executes locally available prepares up to commit_max There shouldn't be any behavior changes here, but in the next commit I'll tweak the logic in `on_prepare` to advance commit_max _before_ appending the prepare, so as not to break op/commit_max invariant even temporarily. --- src/vsr/replica.zig | 76 +++++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/src/vsr/replica.zig b/src/vsr/replica.zig index 0c7df14fa5..f3874a93c8 100644 --- a/src/vsr/replica.zig +++ b/src/vsr/replica.zig @@ -784,7 +784,8 @@ pub fn ReplicaType( if (self.solo()) { if (self.commit_min < self.op) { - self.commit_journal(self.op); + self.advance_commit_max(self.op, @src().fn_name); + self.commit_journal(); // Recovery will complete when commit_journal finishes. assert(self.status == .recovering); @@ -798,7 +799,7 @@ pub fn ReplicaType( } } else { if (self.status != .recovering_head) { - self.commit_journal(self.commit_max); + self.commit_journal(); } } } @@ -1396,7 +1397,8 @@ pub fn ReplicaType( defer { if (self.backup()) { // A prepare may already be committed if requested by repair() so take the max: - self.commit_journal(message.header.commit); + self.advance_commit_max(message.header.commit, @src().fn_name); + self.commit_journal(); assert(self.commit_max >= message.header.commit); } } @@ -1637,7 +1639,8 @@ pub fn ReplicaType( } } - self.commit_journal(message.header.commit); + self.advance_commit_max(message.header.commit, @src().fn_name); + self.commit_journal(); } fn on_repair(self: *Self, message: *Message.Prepare) void { @@ -1991,11 +1994,11 @@ pub fn ReplicaType( .view_change => { self.transition_to_normal_from_view_change_status(message.header.view); self.send_prepare_oks_after_view_change(); - self.commit_journal(self.commit_max); + self.commit_journal(); }, .recovering_head => { self.transition_to_normal_from_recovering_head_status(message.header.view); - self.commit_journal(self.commit_max); + self.commit_journal(); }, .normal => {}, .recovering => unreachable, @@ -3015,6 +3018,32 @@ pub fn ReplicaType( return count; } + /// Caller must ensure that: + /// - op=commit is indeed committed by the cluster, + /// - local WAL doesn't contain truncated prepares from finished views. + fn advance_commit_max(self: *Self, commit: u64, method: []const u8) void { + defer { + assert(self.commit_max >= commit); + assert(self.commit_max >= self.commit_min); + assert(self.commit_max >= self.op -| constants.pipeline_prepare_queue_max); + if (self.status == .normal and self.primary()) { + assert(self.commit_max == self.commit_min); + assert(self.commit_max == self.op - self.pipeline.queue.prepare_queue.count); + } + + } + + if (commit > self.commit_max) { + log.debug("{}: {s}: advancing commit_max={}..{}", .{ + self.replica, + method, + self.commit_max, + commit, + }); + self.commit_max = commit; + } + } + fn append(self: *Self, message: *Message.Prepare) void { assert(self.status == .normal); assert(message.header.command == .prepare); @@ -3157,31 +3186,17 @@ pub fn ReplicaType( } } - /// Commit ops up to commit number `commit` (inclusive). - /// A function which calls `commit_journal()` to set `commit_max` must first call - /// `jump_view()`. Otherwise, we may fork the log. - fn commit_journal(self: *Self, commit: u64) void { + /// Commit ops up to commit_max (inclusive). + fn commit_journal(self: *Self) void { assert(self.status == .normal or self.status == .view_change or (self.status == .recovering and self.solo())); assert(!(self.status == .normal and self.primary())); assert(self.commit_min <= self.commit_max); assert(self.commit_min <= self.op); - assert(self.commit_max <= self.op or self.commit_max > self.op); - assert(commit <= self.op or commit > self.op); + maybe(self.commit_max > self.op); // We have already committed this far: - if (commit <= self.commit_min) return; - - // We must update `commit_max` even if we are already committing, otherwise we will lose - // information that we should know, and `set_op_and_commit_max()` will catch us out: - if (commit > self.commit_max) { - log.debug("{}: commit_journal: advancing commit_max={}..{}", .{ - self.replica, - self.commit_max, - commit, - }); - self.commit_max = commit; - } + if (self.commit_max == self.commit_min) return; if (!self.state_machine_opened) { assert(self.commit_stage == .idle); @@ -3777,14 +3792,7 @@ pub fn ReplicaType( self.commit_min += 1; assert(self.commit_min == prepare.header.op); - if (self.commit_min > self.commit_max) { - log.debug("{}: commit_op: advancing commit_max={}..{}", .{ - self.replica, - self.commit_max, - self.commit_min, - }); - self.commit_max = self.commit_min; - } + self.advance_commit_max(self.commit_min, @src().fn_name); if (self.event_callback) |hook| hook(self, .committed); @@ -5457,7 +5465,7 @@ pub fn ReplicaType( // that they form a valid hashchain. Committing may discover more faulty prepares // and drive further repairs. assert(!self.solo()); - self.commit_journal(self.commit_max); + self.commit_journal(); } if (self.status == .view_change and self.primary_index(self.view) == self.replica) { @@ -7997,7 +8005,7 @@ pub fn ReplicaType( // Bump commit_max before the superblock update so that a view_durable_update() // during the sync_start update uses the correct (new) commit_max. - self.commit_max = @max(stage.target.checkpoint_op, self.commit_max); + self.advance_commit_max(stage.target.checkpoint_op, @src().fn_name); const sync_op_max = vsr.Checkpoint.trigger_for_checkpoint(stage.target.checkpoint_op).?;