Skip to content

Commit

Permalink
vsr: sync uses correct view to go into recovering_head
Browse files Browse the repository at this point in the history
It might be the case that op_checkpoint:

* is prepared  in view X
* is truncated in view X+1
* is committed in view X+2

When deciding whether to go into recovering_head state, the replica
currently considers prepared view (checkpoint.header.view), and not the
committed view.

Use the correct view by:

* Adding checkpoint's view into VSR State (but _not_ checkpoint state:
  different replicas might commit prepares at different views)
* Populating that view from the message that informed us about the
  checkpoint target

  * this requires some intricate logic on pings, to make sure they
    indeed propagate correct view for checkpoint --- a replica accepts a
    checkpoint before it transitions to its view, and it should
    subsequently correctly propagate this higher view.

    This works because checkpoint view is also durable.

Seed: 8593423301425288917
Closes: #1703
  • Loading branch information
matklad committed Mar 15, 2024
1 parent e90d670 commit 164e8a5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 16 deletions.
37 changes: 22 additions & 15 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ pub fn ReplicaType(
self.transition_to_normal_from_recovering_status();
}
} else {
if (self.log_view < self.superblock.working.vsr_state.checkpoint.header.view) {
if (self.log_view < self.superblock.working.vsr_state.sync_view) {
// During state sync, the replica installed CheckpointState from a future view.
self.transition_to_recovering_head();
} else {
Expand Down Expand Up @@ -2702,7 +2702,8 @@ pub fn ReplicaType(
.size = @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max,
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pings while the view is being updated.
// Don't drop pings while the view is being updated.
.view = @max(self.view_durable(), self.superblock.working.vsr_state.sync_view),
.release = self.release,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_op = self.op_checkpoint(),
Expand All @@ -2716,7 +2717,13 @@ pub fn ReplicaType(
message.header.set_checksum_body(message.body());
message.header.set_checksum();

assert(message.header.view <= self.view);
// Pings advertise checkpoints, and current checkpoint's view might be greater than
// the replica view.
if (message.header.view > self.view) {
assert(self.status == .recovering_head);
assert(self.superblock.working.vsr_state.sync_view >
self.superblock.working.vsr_state.view);
}

self.send_message_to_other_replicas_and_standbys(message.base());
}
Expand Down Expand Up @@ -5079,7 +5086,7 @@ pub fn ReplicaType(
return true;
}

if (message.header.view < self.superblock.working.vsr_state.checkpoint.header.view) {
if (message.header.view < self.superblock.working.vsr_state.sync_view) {
assert(self.status == .recovering_head);
log.debug("{}: on_{s}: ignoring (recovering_head, checkpoint from newer view)", .{
self.replica,
Expand Down Expand Up @@ -7163,7 +7170,9 @@ pub fn ReplicaType(
// Critical: Do not advertise a view/log_view before it is durable.
// See view_durable()/log_view_durable().
if (message.header.view > self.view_durable() and
message.header.command != .request_start_view)
message.header.command != .request_start_view and
!(message.header.command == .ping and
message.header.view == self.superblock.working.vsr_state.sync_view))
{
// Pings are used for syncing time, so they must not be
// blocked on persisting view.
Expand Down Expand Up @@ -7680,9 +7689,9 @@ pub fn ReplicaType(
assert(self.commit_stage == .idle);
assert(self.journal.header_with_op(self.op) != null);

if (self.log_view < self.superblock.working.vsr_state.checkpoint.header.view) {
if (self.log_view < self.superblock.working.vsr_state.sync_view) {
// Transitioning to recovering head after state sync --- checkpoint is from a
// "future" view, so the replica needs to truncate our log.
// "future" view, so the replica needs to truncate its log.
assert(self.commit_min == self.op_checkpoint());
} else {
if (self.status == .recovering) {
Expand Down Expand Up @@ -8383,6 +8392,7 @@ pub fn ReplicaType(
const stage: *const SyncStage.RequestingCheckpoint =
&self.syncing.requesting_checkpoint;
assert(stage.target.checkpoint_id == vsr.checksum(std.mem.asBytes(checkpoint_state)));
assert(stage.target.view >= checkpoint_state.header.view);

log.debug("{[replica]}: sync_requesting_checkpoint_callback: " ++
"checkpoint_op={[checkpoint_op]} checkpoint_id={[checkpoint_id]x:0>32} " ++
Expand Down Expand Up @@ -8458,6 +8468,7 @@ pub fn ReplicaType(
else
sync_min_new;
};
const sync_view = stage.target.view;

self.sync_message_timeout.stop();
self.superblock.sync(
Expand All @@ -8468,6 +8479,7 @@ pub fn ReplicaType(
.commit_max = self.commit_max,
.sync_op_max = sync_op_max,
.sync_op_min = sync_op_min,
.sync_view = sync_view,
},
);
}
Expand Down Expand Up @@ -8526,9 +8538,7 @@ pub fn ReplicaType(

// The head op must be in the Journal and there should not be a break between the
// checkpoint header and the Journal.
if (self.op < self.op_checkpoint() or
self.log_view < self.superblock.working.vsr_state.checkpoint.header.view)
{
if (self.op < self.op_checkpoint() or self.log_view < stage.target.view) {
self.transition_to_recovering_head();
}

Expand Down Expand Up @@ -8946,7 +8956,7 @@ pub fn ReplicaType(
if (header.replica >= self.replica_count) return; // Ignore messages from standbys.
if (header.replica == self.replica) return; // Ignore messages from self (misdirected).

const candidate = switch (header.into_any()) {
const candidate: SyncTarget = switch (header.into_any()) {
inline .commit, .ping => |h| .{
.checkpoint_id = h.checkpoint_id,
.checkpoint_op = h.checkpoint_op,
Expand Down Expand Up @@ -9010,10 +9020,7 @@ pub fn ReplicaType(
@tagName(self.syncing),
});

self.sync_target_max = .{
.checkpoint_id = candidate.checkpoint_id,
.checkpoint_op = candidate.checkpoint_op,
};
self.sync_target_max = candidate;

if (self.syncing != .idle) {
self.sync_start_from_sync();
Expand Down
12 changes: 11 additions & 1 deletion src/vsr/superblock.zig
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ pub const SuperBlockHeader = extern struct {
/// past them via state sync.)
sync_op_max: u64,

/// The view where it is known that checkpoint.header is committed.
/// It may be greater than view --- during state sync, the replica first accepts a new
/// checkpoint, and then jumps view. Appending checkpoint first is required for appending
/// SV headers to the journal.
sync_view: u32,

/// The last view in which the replica's status was normal.
log_view: u32,

Expand All @@ -140,7 +146,7 @@ pub const SuperBlockHeader = extern struct {
/// Number of replicas (determines sizes of the quorums), part of VSR configuration.
replica_count: u8,

reserved: [783]u8 = [_]u8{0} ** 783,
reserved: [779]u8 = [_]u8{0} ** 779,

comptime {
assert(@sizeOf(VSRState) == 2048);
Expand Down Expand Up @@ -184,6 +190,7 @@ pub const SuperBlockHeader = extern struct {
.commit_max = 0,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = 0,
.view = 0,
};
Expand Down Expand Up @@ -751,6 +758,7 @@ pub fn SuperBlockType(comptime Storage: type) type {
.commit_max = 0,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = 0,
.view = 0,
.replica_count = options.replica_count,
Expand Down Expand Up @@ -930,6 +938,7 @@ pub fn SuperBlockType(comptime Storage: type) type {
commit_max: u64,
sync_op_min: u64,
sync_op_max: u64,
sync_view: u32,
};

pub fn sync(
Expand Down Expand Up @@ -958,6 +967,7 @@ pub fn SuperBlockType(comptime Storage: type) type {
vsr_state.commit_max = update.commit_max;
vsr_state.sync_op_min = update.sync_op_min;
vsr_state.sync_op_max = update.sync_op_max;
vsr_state.sync_view = update.sync_view;

assert(superblock.staging.vsr_state.would_be_updated_by(vsr_state));

Expand Down
3 changes: 3 additions & 0 deletions src/vsr/superblock_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn run_fuzz(allocator: std.mem.Allocator, seed: u64, transitions_count_total: us
.commit_max = 0,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = 0,
.view = 0,
.replica_id = members[replica],
Expand Down Expand Up @@ -334,6 +335,7 @@ const Environment = struct {
.commit_max = env.superblock.staging.vsr_state.commit_max + 3,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = env.superblock.staging.vsr_state.log_view + 4,
.view = env.superblock.staging.vsr_state.view + 5,
.replica_id = env.members[replica],
Expand Down Expand Up @@ -415,6 +417,7 @@ const Environment = struct {
.commit_max = vsr_state_old.commit_max + 1,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = vsr_state_old.log_view,
.view = vsr_state_old.view,
.replica_id = env.members[replica],
Expand Down
3 changes: 3 additions & 0 deletions src/vsr/sync.zig
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ pub const Target = struct {
checkpoint_id: u128,
/// The op_checkpoint() that corresponds to the checkpoint id.
checkpoint_op: u64,
/// The view where the target's checkpoint is committed.
/// It might be greater than `checkpoint.header.view`.
view: u32,
};

0 comments on commit 164e8a5

Please sign in to comment.