diff --git a/src/vsr.zig b/src/vsr.zig index d5fdb8c5db..44d0927fa3 100644 --- a/src/vsr.zig +++ b/src/vsr.zig @@ -1397,6 +1397,16 @@ pub const Checkpoint = struct { } } + pub fn border_for_checkpoint(checkpoint: u64) ?u64 { + assert(valid(checkpoint)); + + if (trigger_for_checkpoint(checkpoint)) |trigger| { + return trigger + constants.pipeline_prepare_queue_max; + } else { + return null; + } + } + pub fn valid(op: u64) bool { // Divide by `lsm_batch_multiple` instead of `vsr_checkpoint_interval`: // although today in practice checkpoints are evenly spaced, the LSM layer doesn't assume diff --git a/src/vsr/message_header.zig b/src/vsr/message_header.zig index 518de33d8e..6139251b37 100644 --- a/src/vsr/message_header.zig +++ b/src/vsr/message_header.zig @@ -530,10 +530,8 @@ pub const Header = extern struct { request_checksum_padding: u128 = 0, /// The id of the checkpoint where: /// - /// prepare.op > - /// pipeline_prepare_queue_max + trigger_for_checkpoint(checkpoint_op) - /// prepare.op ≤ - /// pipeline_prepare_queue_max + trigger_for_checkpoint(checkpoint_after(checkpoint_op)) + /// prepare.op > border_for_checkpoint(checkpoint_op) + /// prepare.op ≤ border_for_checkpoint(checkpoint_after(checkpoint_op)) /// /// The purpose of including the checkpoint id is to strictly bound the number of commits /// that it may take to discover a divergent replica. If a replica diverges, then that diff --git a/src/vsr/replica.zig b/src/vsr/replica.zig index 395f12460b..c06c9c4bee 100644 --- a/src/vsr/replica.zig +++ b/src/vsr/replica.zig @@ -5015,6 +5015,11 @@ pub fn ReplicaType( return vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint_next()).?; } + /// Returns the highest op that this replica can safely prepare to its WAL. + fn op_checkpoint_next_border(self: *const Self) u64 { + return vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint_next()).?; + } + /// Returns checkpoint id associated with the op. /// /// Normally, this is just the id of the checkpoint the op builds on top. However, ops @@ -5028,10 +5033,8 @@ pub fn ReplicaType( if (op == 0) return Header.Prepare.root(self.cluster).checkpoint_id; if (self.op_checkpoint() > 0) { - const op_checkpoint_trigger = - vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint()).?; const op_checkpoint_border = - op_checkpoint_trigger + constants.pipeline_prepare_queue_max; + vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint()).?; if (op <= op_checkpoint_border) { if (op + constants.vsr_checkpoint_interval <= op_checkpoint_border) { @@ -5042,13 +5045,10 @@ pub fn ReplicaType( return self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id; } - assert(op + constants.vsr_checkpoint_interval > - self.op_checkpoint_next_trigger() + constants.pipeline_prepare_queue_max); + assert(op + constants.vsr_checkpoint_interval > self.op_checkpoint_next_border()); } - if (op <= self.op_checkpoint_next_trigger() + - constants.pipeline_prepare_queue_max) - { + if (op <= self.op_checkpoint_next_border()) { // Case 4: op uses the current checkpoint id. return self.superblock.working.checkpoint_id(); } @@ -5211,8 +5211,8 @@ pub fn ReplicaType( const request_header: Header.Request = request.message.header.*; const checkpoint_id = checkpoint_id: { - if (vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint())) |trigger| { - if (self.op + 1 <= trigger + constants.pipeline_prepare_queue_max) { + if (vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint())) |op_border| { + if (self.op + 1 <= op_border) { // Border prepares use the previous checkpoint id. break :checkpoint_id self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id; } @@ -7068,8 +7068,7 @@ pub fn ReplicaType( assert(dvcs_all.count() >= self.quorum_view_change); for (dvcs_all.const_slice()) |message| { - assert(message.header.op <= - self.op_checkpoint_next_trigger() + constants.pipeline_prepare_queue_max); + assert(message.header.op <= self.op_checkpoint_next_border()); } // The `prepare_timestamp` prevents a primary's own clock from running backwards. diff --git a/src/vsr/replica_test.zig b/src/vsr/replica_test.zig index 095a0cfc12..3b57185c0e 100644 --- a/src/vsr/replica_test.zig +++ b/src/vsr/replica_test.zig @@ -24,6 +24,9 @@ const checkpoint_3 = vsr.Checkpoint.checkpoint_after(checkpoint_2); const checkpoint_1_trigger = vsr.Checkpoint.trigger_for_checkpoint(checkpoint_1).?; const checkpoint_2_trigger = vsr.Checkpoint.trigger_for_checkpoint(checkpoint_2).?; const checkpoint_3_trigger = vsr.Checkpoint.trigger_for_checkpoint(checkpoint_3).?; +const checkpoint_1_border = vsr.Checkpoint.border_for_checkpoint(checkpoint_1).?; +const checkpoint_2_border = vsr.Checkpoint.border_for_checkpoint(checkpoint_2).?; +const checkpoint_3_border = vsr.Checkpoint.border_for_checkpoint(checkpoint_3).?; const log_level = std.log.Level.err; // TODO Test client eviction once it no longer triggers a client panic. @@ -1048,8 +1051,6 @@ test "Cluster: sync: view-change with lagging replica in recovering_head" { } test "Cluster: async-checkpoints: prepare beyond checkpoint trigger" { - const pipeline_prepare_queue_max = constants.pipeline_prepare_queue_max; - const t = try TestContext.init(.{ .replica_count = 3 }); defer t.deinit(); @@ -1065,7 +1066,7 @@ test "Cluster: async-checkpoints: prepare beyond checkpoint trigger" { t.replica(.R_).drop(.__, .bidirectional, .prepare_ok); // Prepare ops beyond the checkpoint. - try c.request(checkpoint_1_trigger - 1 + pipeline_prepare_queue_max, checkpoint_1_trigger - 1); + try c.request(checkpoint_1_border - 1, checkpoint_1_trigger - 1); try expectEqual(t.replica(.R_).op_checkpoint(), 0); try expectEqual(t.replica(.R_).commit(), checkpoint_1_trigger - 1); try expectEqual(t.replica(.R_).op_head(), checkpoint_1_trigger); @@ -1074,10 +1075,10 @@ test "Cluster: async-checkpoints: prepare beyond checkpoint trigger" { t.replica(.R_).pass(.__, .bidirectional, .prepare_ok); t.run(); - try expectEqual(c.replies(), checkpoint_1_trigger - 1 + pipeline_prepare_queue_max); + try expectEqual(c.replies(), checkpoint_1_border - 1); try expectEqual(t.replica(.R_).op_checkpoint(), checkpoint_1); - try expectEqual(t.replica(.R_).commit(), checkpoint_1_trigger - 1 + pipeline_prepare_queue_max); - try expectEqual(t.replica(.R_).op_head(), checkpoint_1_trigger - 1 + pipeline_prepare_queue_max); + try expectEqual(t.replica(.R_).commit(), checkpoint_1_border - 1); + try expectEqual(t.replica(.R_).op_head(), checkpoint_1_border - 1); } const ProcessSelector = enum {