Skip to content

Commit

Permalink
VSR: Use previous checkpoint_id for border prepares
Browse files Browse the repository at this point in the history
# Background

Right now, a replica will not prepare (or even queue) requests whose op would extend beyond their next checkpoint's trigger op.

This is problematic for performance (the "checkpoint latency spike"), since the replica enters the new checkpoint with nothing to commit.
And the "latency spike" is farther exacerbated by the fact that clients will back off retrying their requests.

Originally we didn't start preparing past the checkpoint to guard against overwriting WAL entries before they will definitely not be needed again.
But thanks to `vsr_checkpoint_interval`, that reason does not apply.

However, prepare headers include a `checkpoint_id` ([motivation here](https://github.com/tigerbeetle/tigerbeetle/blob/30cfbfa2eca94b8cd0b1d2a8ce41c7e7720128f0/src/vsr/message_header.zig#L531-L539)). That checkpoint id isn't available until the checkpoint trigger prepare commits.

# Fix

The first `constants.pipeline_prepare_queue_max` prepares that follow a checkpoint trigger op will now contain the _previous_ checkpoint's id, since that is available both before and after that checkpoint trigger commits.

These are called the "border" prepares – they may be prepared during either checkpoint. (This part if not implemented in this commit, though!)
  • Loading branch information
sentientwaffle committed Jan 30, 2024
1 parent 942ab03 commit 1671571
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
15 changes: 12 additions & 3 deletions src/vsr/message_header.zig
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,22 @@ pub const Header = extern struct {
request_checksum_padding: u128 = 0,
/// The id of the checkpoint where:
///
/// prepare.op > trigger_for_checkpoint(checkpoint_op)
/// prepare.op ≤ trigger_for_checkpoint(checkpoint_after(checkpoint_op))
/// prepare.op >
/// pipeline_prepare_queue_max + trigger_for_checkpoint(checkpoint_op)
/// prepare.op ≤
/// pipeline_prepare_queue_max + trigger_for_checkpoint(checkpoint_after(checkpoint_op))
///
/// The purpose of including the checkpoint id is to strictly bound the number of commits
/// that it may take to discover a divergent replica. If a replica diverges, then that
/// divergence will be discovered *at latest* when the divergent replica attempts to commit
/// the first op after the next checkpoint trigger.
/// the first op after the next checkpoint trigger + pipeline_prepare_queue_max.
///
/// The first `pipeline_prepare_queue_max` ops immediately after a checkpoint trigger are
/// border prepares.
///
/// A "border prepare" is a prepare that can be prepared in the *next* checkpoint before our
/// previous checkpoint is done. (These prepares' `header.checkpoint_id` will be the id of
/// the *previous* checkpoint, since the id of the next checkpoint may not yet be known).
checkpoint_id: u128,
client: u128,
/// The op number of the latest prepare that may or may not yet be committed. Uncommitted
Expand Down
41 changes: 33 additions & 8 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,11 @@ pub fn ReplicaType(
return;
}

if (message.header.checkpoint_id != self.superblock.working.checkpoint_id()) {
if (message.header.checkpoint_id != self.superblock.working.checkpoint_id() and
message.header.checkpoint_id != self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id)
{
// Panic on encountering a prepare which does not match our own checkpoint id.
// (Or the previous checkpoint id, for border prepares.)
//
// If this branch is hit, there is a storage determinism problem. At this point in
// the code it is not possible to distinguish whether the problem is with this
Expand Down Expand Up @@ -1483,7 +1486,6 @@ pub fn ReplicaType(
assert(prepare.message.header.op <= self.op);

assert(prepare.message.header.checkpoint_id == message.header.checkpoint_id);
assert(prepare.message.header.checkpoint_id == self.superblock.working.checkpoint_id());
assert(prepare.message.header.checkpoint_id ==
self.checkpoint_id_for_op(prepare.message.header.op).?);

Expand Down Expand Up @@ -5028,19 +5030,25 @@ pub fn ReplicaType(
if (self.op_checkpoint() > 0) {
const op_checkpoint_trigger =
vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint()).?;
if (op <= op_checkpoint_trigger) {
if (op + constants.vsr_checkpoint_interval <= op_checkpoint_trigger) {
const op_checkpoint_border =
op_checkpoint_trigger + constants.pipeline_prepare_queue_max;

if (op <= op_checkpoint_border) {
if (op + constants.vsr_checkpoint_interval <= op_checkpoint_border) {
// Case 2: op is from a too distant past for us to know its checkpoint id.
return null;
}
// Case 3: op is from the previous checkpoint whose id we still remember.
return self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id;
}

assert(op + constants.vsr_checkpoint_interval > self.op_checkpoint_next_trigger());
assert(op + constants.vsr_checkpoint_interval >
self.op_checkpoint_next_trigger() + constants.pipeline_prepare_queue_max);
}

if (op <= self.op_checkpoint_next_trigger()) {
if (op <= self.op_checkpoint_next_trigger() +
constants.pipeline_prepare_queue_max)
{
// Case 4: op uses the current checkpoint id.
return self.superblock.working.checkpoint_id();
}
Expand Down Expand Up @@ -5090,8 +5098,14 @@ pub fn ReplicaType(
// We know checkpoint ids for the current checkpoint and the one before that.
// Don't try repairing ops with older checkpoint_ids which are impossible to
// verify.
//
// The "+pipeline_prepare_queue_max" accounts for the border prepares of the
// older checksum have a different (no-longer known) checkpoint id. A replica
// that is that far back can state sync, though.
const op_with_checkpoint_id_oldest =
(self.op_checkpoint_next_trigger() + 1) -| constants.vsr_checkpoint_interval * 2;
(self.op_checkpoint_next_trigger() + 1 +
constants.pipeline_prepare_queue_max) -|
constants.vsr_checkpoint_interval * 2;
assert(self.checkpoint_id_for_op(op_with_checkpoint_id_oldest) != null);

break :op @max(op_wal_oldest, op_with_checkpoint_id_oldest);
Expand Down Expand Up @@ -5196,6 +5210,16 @@ pub fn ReplicaType(
// Copy the header to the stack before overwriting it to avoid UB.
const request_header: Header.Request = request.message.header.*;

const checkpoint_id = checkpoint_id: {
if (vsr.Checkpoint.trigger_for_checkpoint(self.op_checkpoint())) |trigger| {
if (self.op + 1 <= trigger + constants.pipeline_prepare_queue_max) {
// Border prepares use the previous checkpoint id.
break :checkpoint_id self.superblock.working.vsr_state.checkpoint.previous_checkpoint_id;
}
}
break :checkpoint_id self.superblock.working.checkpoint_id();
};

const latest_entry = self.journal.header_with_op(self.op).?;
message.header.* = Header.Prepare{
.cluster = self.cluster,
Expand All @@ -5206,7 +5230,7 @@ pub fn ReplicaType(
.parent = latest_entry.checksum,
.client = request_header.client,
.request_checksum = request_header.checksum,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_id = checkpoint_id,
.op = self.op + 1,
.commit = self.commit_max,
.timestamp = timestamp: {
Expand Down Expand Up @@ -6236,6 +6260,7 @@ pub fn ReplicaType(
log.debug("{}: send_prepare_ok: not sending (old)", .{self.replica});
return;
};
assert(checkpoint_id == header.checkpoint_id);

// It is crucial that replicas stop accepting prepare messages from earlier views
// once they start the view change protocol. Without this constraint, the system
Expand Down

0 comments on commit 1671571

Please sign in to comment.