Skip to content

Commit

Permalink
VSR: Change prepare.header.checkpoint_id at checkpoint boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
sentientwaffle committed Jan 31, 2024
1 parent db66b81 commit d73b9ee
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 47 deletions.
13 changes: 3 additions & 10 deletions src/vsr/message_header.zig
Original file line number Diff line number Diff line change
Expand Up @@ -530,20 +530,13 @@ pub const Header = extern struct {
request_checksum_padding: u128 = 0,
/// The id of the checkpoint where:
///
/// prepare.op > border_for_checkpoint(checkpoint_op)
/// prepare.op ≤ border_for_checkpoint(checkpoint_after(checkpoint_op))
/// prepare.op > checkpoint_op
/// prepare.op ≤ checkpoint_after(checkpoint_op)
///
/// The purpose of including the checkpoint id is to strictly bound the number of commits
/// that it may take to discover a divergent replica. If a replica diverges, then that
/// divergence will be discovered *at latest* when the divergent replica attempts to commit
/// the first op after the next checkpoint trigger + pipeline_prepare_queue_max.
///
/// The first `pipeline_prepare_queue_max` ops immediately after a checkpoint trigger are
/// border prepares.
///
/// A "border prepare" is a prepare that can be prepared in the *next* checkpoint before our
/// previous checkpoint is done. (These prepares' `header.checkpoint_id` will be the id of
/// the *previous* checkpoint, since the id of the next checkpoint may not yet be known).
/// the first op after the next checkpoint.
checkpoint_id: u128,
client: u128,
/// The op number of the latest prepare that may or may not yet be committed. Uncommitted
Expand Down
63 changes: 29 additions & 34 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1410,8 +1410,7 @@ pub fn ReplicaType(
if (message.header.checkpoint_id != self.superblock.working.checkpoint_id() and
message.header.checkpoint_id != self.superblock.working.vsr_state.checkpoint.parent_checkpoint_id)
{
// Panic on encountering a prepare which does not match our own checkpoint id.
// (Or the previous checkpoint id, for border prepares.)
// Panic on encountering a prepare which does not match an expected checkpoint id.
//
// If this branch is hit, there is a storage determinism problem. At this point in
// the code it is not possible to distinguish whether the problem is with this
Expand Down Expand Up @@ -5022,34 +5021,34 @@ pub fn ReplicaType(

/// Returns checkpoint id associated with the op.
///
/// Normally, this is just the id of the checkpoint the op builds on top. However, ops
/// Normally, this is just the id of the op's previous checkpoint. However, ops
/// between a checkpoint and its trigger can't know checkpoint's id yet, and instead use
/// the id of the previous checkpoint.
/// the id of the grandparent checkpoint.
///
/// Returns `null` for ops which are too far in the past/future to know their checkpoint
/// ids.
fn checkpoint_id_for_op(self: *const Self, op: u64) ?u128 {
// Case 1: for the root op, checkpoint id is zero.
if (op == 0) return Header.Prepare.root(self.cluster).checkpoint_id;
const checkpoint_now = self.op_checkpoint();
const checkpoint_next_1 = vsr.Checkpoint.checkpoint_after(checkpoint_now);
const checkpoint_next_2 = vsr.Checkpoint.checkpoint_after(checkpoint_next_1);

if (self.op_checkpoint() > 0) {
const op_checkpoint_border =
vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint()).?;
if (op + constants.vsr_checkpoint_interval <= checkpoint_now) {
// Case 1: op is from a too distant past for us to know its checkpoint id.
return null;
}

if (op <= op_checkpoint_border) {
if (op + constants.vsr_checkpoint_interval <= op_checkpoint_border) {
// Case 2: op is from a too distant past for us to know its checkpoint id.
return null;
}
// Case 3: op is from the previous checkpoint whose id we still remember.
return self.superblock.working.vsr_state.checkpoint.parent_checkpoint_id;
}
if (op <= checkpoint_now) {
// Case 2: op is from the previous checkpoint whose id we still remember.
return self.superblock.working.vsr_state.checkpoint.grandparent_checkpoint_id;
}

assert(op + constants.vsr_checkpoint_interval > self.op_checkpoint_next_border());
if (op <= checkpoint_next_1) {
// Case 3: op is in the current checkpoint.
return self.superblock.working.vsr_state.checkpoint.parent_checkpoint_id;
}

if (op <= self.op_checkpoint_next_border()) {
// Case 4: op uses the current checkpoint id.
if (op <= checkpoint_next_2) {
// Case 4: op is in the next checkpoint (which we have not checkpointed).
return self.superblock.working.checkpoint_id();
}

Expand Down Expand Up @@ -5095,19 +5094,17 @@ pub fn ReplicaType(
self.op_checkpoint_next_trigger(),
) -| (constants.journal_slot_count - 1);

// We know checkpoint ids for the current checkpoint and the one before that.
// We know checkpoint ids for the previous checkpoint and the one before that.
// Don't try repairing ops with older checkpoint_ids which are impossible to
// verify.
//
// The "+pipeline_prepare_queue_max" accounts for the border prepares of the
// older checksum have a different (no-longer known) checkpoint id. A replica
// that is that far back can state sync, though.
const op_with_checkpoint_id_oldest =
(self.op_checkpoint_next_trigger() + 1 +
constants.pipeline_prepare_queue_max) -|
constants.vsr_checkpoint_interval * 2;
(self.op_checkpoint() + 1) -| constants.vsr_checkpoint_interval;
assert(self.checkpoint_id_for_op(op_with_checkpoint_id_oldest) != null);

if (op_with_checkpoint_id_oldest > 0) {
assert(self.checkpoint_id_for_op(op_with_checkpoint_id_oldest - 1) == null);
}

break :op @max(op_wal_oldest, op_with_checkpoint_id_oldest);
} else {
// Strictly speaking a backup only needs to repair commit_min+1… to proceed.
Expand Down Expand Up @@ -5211,13 +5208,11 @@ pub fn ReplicaType(
const request_header: Header.Request = request.message.header.*;

const checkpoint_id = checkpoint_id: {
if (vsr.Checkpoint.border_for_checkpoint(self.op_checkpoint())) |op_border| {
if (self.op + 1 <= op_border) {
// Border prepares use the previous checkpoint id.
break :checkpoint_id self.superblock.working.vsr_state.checkpoint.parent_checkpoint_id;
}
if (self.op + 1 <= self.op_checkpoint_next()) {
break :checkpoint_id self.superblock.working.vsr_state.checkpoint.parent_checkpoint_id;
} else {
break :checkpoint_id self.superblock.working.checkpoint_id();
}
break :checkpoint_id self.superblock.working.checkpoint_id();
};

const latest_entry = self.journal.header_with_op(self.op).?;
Expand Down
13 changes: 10 additions & 3 deletions src/vsr/superblock.zig
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ pub const SuperBlockHeader = extern struct {
/// Following state sync, this is set to the last checkpoint that we skipped.
parent_checkpoint_id: u128,
/// The parent_checkpoint_id of the parent checkpoint.
/// TODO We might be able to remove this when
/// https://github.com/tigerbeetle/tigerbeetle/issues/1378 is fixed.
grandparent_checkpoint_id: u128,

free_set_last_block_address: u64,
Expand Down Expand Up @@ -816,10 +818,15 @@ pub fn SuperBlockType(comptime Storage: type) type {
assert((update.storage_size == data_file_size_min) ==
update.free_set_reference.empty());

// NOTE: Within the vsr_state.checkpoint assignment below, do not read from vsr_state
// directly. A miscompilation bug (as of Zig 0.11.0) causes fields to receive the
// incorrect values.
const vsr_state_staging = superblock.staging.vsr_state;

var vsr_state = superblock.staging.vsr_state;
vsr_state.checkpoint = .{
.parent_checkpoint_id = superblock.staging.checkpoint_id(),
.grandparent_checkpoint_id = vsr_state.checkpoint.parent_checkpoint_id,
.grandparent_checkpoint_id = vsr_state_staging.checkpoint.parent_checkpoint_id,
.commit_min = update.commit_min,
.commit_min_checksum = update.commit_min_checksum,
.free_set_checksum = update.free_set_reference.checksum,
Expand All @@ -836,8 +843,8 @@ pub fn SuperBlockType(comptime Storage: type) type {
.manifest_newest_address = update.manifest_references.newest_address,
.manifest_block_count = update.manifest_references.block_count,
.storage_size = update.storage_size,
.snapshots_block_checksum = vsr_state.checkpoint.snapshots_block_checksum,
.snapshots_block_address = vsr_state.checkpoint.snapshots_block_address,
.snapshots_block_checksum = vsr_state_staging.checkpoint.snapshots_block_checksum,
.snapshots_block_address = vsr_state_staging.checkpoint.snapshots_block_address,
};
vsr_state.commit_max = update.commit_max;
vsr_state.sync_op_min = update.sync_op_min;
Expand Down

0 comments on commit d73b9ee

Please sign in to comment.