Skip to content

Commit

Permalink
VSR: Refactor into border_for_checkpoint(), op_checkpoint_next_border()
Browse files Browse the repository at this point in the history
  • Loading branch information
sentientwaffle committed Jan 30, 2024
1 parent 1671571 commit 2c5e3f4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 22 deletions.
10 changes: 10 additions & 0 deletions src/vsr.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,16 @@ pub const Checkpoint = struct {
}
}

pub fn border_for_checkpoint(checkpoint: u64) ?u64 {
assert(valid(checkpoint));

if (trigger_for_checkpoint(checkpoint)) |trigger| {
return trigger + constants.pipeline_prepare_queue_max;
} else {
return null;
}
}

pub fn valid(op: u64) bool {
// Divide by `lsm_batch_multiple` instead of `vsr_checkpoint_interval`:
// although today in practice checkpoints are evenly spaced, the LSM layer doesn't assume
Expand Down
6 changes: 2 additions & 4 deletions src/vsr/message_header.zig
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,8 @@ pub const Header = extern struct {
request_checksum_padding: u128 = 0,
/// The id of the checkpoint where:
///
/// prepare.op >
/// pipeline_prepare_queue_max + trigger_for_checkpoint(checkpoint_op)
/// prepare.op ≤
/// pipeline_prepare_queue_max + trigger_for_checkpoint(checkpoint_after(checkpoint_op))
/// prepare.op > border_for_checkpoint(checkpoint_op)
/// prepare.op ≤ border_for_checkpoint(checkpoint_after(checkpoint_op))
///
/// The purpose of including the checkpoint id is to strictly bound the number of commits
/// that it may take to discover a divergent replica. If a replica diverges, then that
Expand Down
23 changes: 11 additions & 12 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5015,6 +5015,11 @@ pub fn ReplicaType(
return vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint_next()).?;
}

/// Returns the highest op that this replica can safely prepare to its WAL.
fn op_checkpoint_next_border(self: *const Self) u64 {
return vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint_next()).?;
}

/// Returns checkpoint id associated with the op.
///
/// Normally, this is just the id of the checkpoint the op builds on top. However, ops
Expand All @@ -5028,10 +5033,8 @@ pub fn ReplicaType(
if (op == 0) return Header.Prepare.root(self.cluster).checkpoint_id;

if (self.op_checkpoint() > 0) {
const op_checkpoint_trigger =
vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint()).?;
const op_checkpoint_border =
op_checkpoint_trigger + constants.pipeline_prepare_queue_max;
vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint()).?;

if (op <= op_checkpoint_border) {
if (op + constants.vsr_checkpoint_interval <= op_checkpoint_border) {
Expand All @@ -5042,13 +5045,10 @@ pub fn ReplicaType(
return self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id;
}

assert(op + constants.vsr_checkpoint_interval >
self.op_checkpoint_next_trigger() + constants.pipeline_prepare_queue_max);
assert(op + constants.vsr_checkpoint_interval > self.op_checkpoint_next_border());
}

if (op <= self.op_checkpoint_next_trigger() +
constants.pipeline_prepare_queue_max)
{
if (op <= self.op_checkpoint_next_border()) {
// Case 4: op uses the current checkpoint id.
return self.superblock.working.checkpoint_id();
}
Expand Down Expand Up @@ -5211,8 +5211,8 @@ pub fn ReplicaType(
const request_header: Header.Request = request.message.header.*;

const checkpoint_id = checkpoint_id: {
if (vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint())) |trigger| {
if (self.op + 1 <= trigger + constants.pipeline_prepare_queue_max) {
if (vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint())) |op_border| {
if (self.op + 1 <= op_border) {
// Border prepares use the previous checkpoint id.
break :checkpoint_id self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id;
}
Expand Down Expand Up @@ -7068,8 +7068,7 @@ pub fn ReplicaType(
assert(dvcs_all.count() >= self.quorum_view_change);

for (dvcs_all.const_slice()) |message| {
assert(message.header.op <=
self.op_checkpoint_next_trigger() + constants.pipeline_prepare_queue_max);
assert(message.header.op <= self.op_checkpoint_next_border());
}

// The `prepare_timestamp` prevents a primary's own clock from running backwards.
Expand Down
13 changes: 7 additions & 6 deletions src/vsr/replica_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const checkpoint_3 = vsr.Checkpoint.checkpoint_after(checkpoint_2);
const checkpoint_1_trigger = vsr.Checkpoint.trigger_for_checkpoint(checkpoint_1).?;
const checkpoint_2_trigger = vsr.Checkpoint.trigger_for_checkpoint(checkpoint_2).?;
const checkpoint_3_trigger = vsr.Checkpoint.trigger_for_checkpoint(checkpoint_3).?;
const checkpoint_1_border = vsr.Checkpoint.border_for_checkpoint(checkpoint_1).?;
const checkpoint_2_border = vsr.Checkpoint.border_for_checkpoint(checkpoint_2).?;
const checkpoint_3_border = vsr.Checkpoint.border_for_checkpoint(checkpoint_3).?;
const log_level = std.log.Level.err;

// TODO Test client eviction once it no longer triggers a client panic.
Expand Down Expand Up @@ -1048,8 +1051,6 @@ test "Cluster: sync: view-change with lagging replica in recovering_head" {
}

test "Cluster: async-checkpoints: prepare beyond checkpoint trigger" {
const pipeline_prepare_queue_max = constants.pipeline_prepare_queue_max;

const t = try TestContext.init(.{ .replica_count = 3 });
defer t.deinit();

Expand All @@ -1065,7 +1066,7 @@ test "Cluster: async-checkpoints: prepare beyond checkpoint trigger" {
t.replica(.R_).drop(.__, .bidirectional, .prepare_ok);

// Prepare ops beyond the checkpoint.
try c.request(checkpoint_1_trigger - 1 + pipeline_prepare_queue_max, checkpoint_1_trigger - 1);
try c.request(checkpoint_1_border - 1, checkpoint_1_trigger - 1);
try expectEqual(t.replica(.R_).op_checkpoint(), 0);
try expectEqual(t.replica(.R_).commit(), checkpoint_1_trigger - 1);
try expectEqual(t.replica(.R_).op_head(), checkpoint_1_trigger);
Expand All @@ -1074,10 +1075,10 @@ test "Cluster: async-checkpoints: prepare beyond checkpoint trigger" {

t.replica(.R_).pass(.__, .bidirectional, .prepare_ok);
t.run();
try expectEqual(c.replies(), checkpoint_1_trigger - 1 + pipeline_prepare_queue_max);
try expectEqual(c.replies(), checkpoint_1_border - 1);
try expectEqual(t.replica(.R_).op_checkpoint(), checkpoint_1);
try expectEqual(t.replica(.R_).commit(), checkpoint_1_trigger - 1 + pipeline_prepare_queue_max);
try expectEqual(t.replica(.R_).op_head(), checkpoint_1_trigger - 1 + pipeline_prepare_queue_max);
try expectEqual(t.replica(.R_).commit(), checkpoint_1_border - 1);
try expectEqual(t.replica(.R_).op_head(), checkpoint_1_border - 1);
}

const ProcessSelector = enum {
Expand Down

0 comments on commit 2c5e3f4

Please sign in to comment.