Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vsr: prevent state sync from breaking hash chain #1598

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8368,6 +8368,7 @@ pub fn ReplicaType(
inline .commit, .ping => |h| .{
.checkpoint_id = h.checkpoint_id,
.checkpoint_op = h.checkpoint_op,
.view = h.view,
},
else => return,
};
Expand All @@ -8388,6 +8389,25 @@ pub fn ReplicaType(
@panic("checkpoint diverged");
}

if (candidate.view > self.view) {
log.debug("{}: on_{s}: jump_sync_target: ignoring, newer view" ++
" (view={} candidate.view={})", .{
self.replica,
@tagName(header.command),
self.view,
candidate.view,
});
return;
}
Comment on lines +8392 to +8401
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking if I understand how this works:

In on_start_view(), we will reject the SV and transition to state sync if all of the SV's headers are beyond our prepare_max:

            for (view_headers.slice) |*header| {
                assert(header.commit <= message.header.commit);

                if (header.op <= self.op_prepare_max()) {
                    if (self.log_view < self.view or
                        (self.log_view == self.view and header.op >= self.op))
                    {
                        self.set_op_and_commit_max(header.op, message.header.commit, @src());
                        assert(self.op == header.op);
                        assert(self.commit_max >= message.header.commit);
                        break;
                    }
                }
            } else {
                // This replica is too far behind, i.e. the new `self.op` is too far ahead of
                // the last checkpoint. If we wrap now, we overwrite un-checkpointed transfers
                // in the WAL, precluding recovery.
                if (self.syncing == .idle) {
                    log.warn("{}: on_start_view: start sync; lagging behind cluster " ++
                        "(op_prepare_max={} quorum_head={})", .{
                        self.replica,
                        self.op_prepare_max(),
                        view_headers.slice[0].op,
                    });
                    self.sync_start_from_committing();
                }
                return;
            }

So to advance our view (such that this new jump_sync_target() will not drop the message) we rely on pings or other messages that jump_view() uses to transition to view-change status (not normal status, since we will ignore the SV that we would request).

And jump_view() does indeed precede jump_sync_target(), so that can happen with a single ping:

            self.jump_view(message.header);
            self.jump_sync_target(message.header);

Copy link
Member Author

@matklad matklad Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly! This is related to me saying "the code already has the fix". My original thinking was that, indeed, accepting SV requires to do the state sync first. But to do state sync we need to change view. And when we receive a commit we don't jump view immediately, but wait for SV, which seems to create a deadlock.

So the fix I was planing to implement here was to jump to view_change when we receive a checkpoint target from the next log wrap and the next view.

But, when I started coding, I realised that there's already code to do that! Right here:

if (self.view < message.header.view) {
self.transition_to_view_change_status(message.header.view);
}

That is, even though we don't append any headers from an SV, we still go into view_change! And that resolves the deadlock.

So the full sequence of events is this:

  • we receive a .commit with the next view & checkpoint
  • when jump_view on this commit, we send out .request_start_view
  • when jump_sync_target on this commit, we ignore the target, as it's in the future view
  • sometime later, we receive the requested SV
  • if we install anything from that SV, we transition to .normal in the new view and can proceed with state sync
  • if we don't install anything, we trantisition to .view_change in the new view, and can still proceed with state sync (and will ask for another SV after sync is done)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh that makes sense, I forgot that on_start_view did that!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made merealize that there's a subtlety here! I forgot about view_durable and check only for view, but this is still correct:

when view_durable is not updated, we can crash and restart into an earlier view, but that also means that we'll restart without newer sync target, so it ends up ok in the end.

Added a maybe to that effect.


if (candidate.view > self.view_durable()) {
// For ignoring, it is correct to check only view and not view_durable. This can't
// lead to a situation where we crash and restart with an older view and a newer
// sync target, because superblock updates are serialized.
assert(self.view > self.view_durable());
assert(self.view_durable_updating());
}

// Don't sync backwards, or to our current checkpoint.
if (candidate.checkpoint_op <= self.op_checkpoint()) return;

Expand Down
82 changes: 76 additions & 6 deletions src/vsr/replica_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const assert = std.debug.assert;
const maybe = stdx.maybe;
const log = std.log.scoped(.test_replica);
const expectEqual = std.testing.expectEqual;
const expect = std.testing.expectEqual;
const allocator = std.testing.allocator;

const stdx = @import("../stdx.zig");
Expand Down Expand Up @@ -883,12 +884,8 @@ test "Cluster: view-change: nack older view" {
b2.pass(.R_, .incoming, .prepare);
b2.filter(.R_, .incoming, struct {
fn drop_message(message: *Message) bool {
switch (message.into_any()) {
.prepare => |prepare| {
return (prepare.header.op < checkpoint_1_trigger + 3);
},
else => return false,
}
const prepare = message.into(.prepare) orelse return false;
return prepare.header.op < checkpoint_1_trigger + 3;
}
}.drop_message);

Expand Down Expand Up @@ -1138,6 +1135,79 @@ test "Cluster: sync: slightly lagging replica" {
try expectEqual(t.replica(.R_).commit(), checkpoint_1_trigger + 3);
}

test "Cluster: sync: checkpoint from a newer view" {
// B1 appends (but does not commit) prepares across a checkpoint boundary.
// Then the cluster truncates those prepares and commits past the checkpoint trigger.
// When B1 subsequently joins, it should state sync and truncate the log. Immediately
// after state sync, the log doesn't connect to B1's new checkpoint.
const t = try TestContext.init(.{ .replica_count = 6 });
defer t.deinit();

var c = t.clients(0, t.cluster.clients.len);
try c.request(checkpoint_1 - 1, checkpoint_1 - 1);
try expectEqual(t.replica(.R_).commit(), checkpoint_1 - 1);

var a0 = t.replica(.A0);
var b1 = t.replica(.B1);

{
// Prevent A0 from committing, prevent any other replica from becoming a primary, and
// only allow B1 to learn about A0 prepares.
t.replica(.R_).drop(.R_, .incoming, .prepare);
t.replica(.R_).drop(.R_, .incoming, .prepare_ok);
t.replica(.R_).drop(.R_, .incoming, .start_view_change);
b1.pass(.A0, .incoming, .prepare);
b1.filter(.A0, .incoming, struct {
// Force b1 to sync, rather than repair.
fn drop_message(message: *Message) bool {
const prepare = message.into(.prepare) orelse return false;
return prepare.header.op == checkpoint_1;
}
}.drop_message);
try c.request(checkpoint_1 + 1, checkpoint_1 - 1);
try expectEqual(a0.op_head(), checkpoint_1 + 1);
try expectEqual(b1.op_head(), checkpoint_1 + 1);
try expectEqual(a0.commit(), checkpoint_1 - 1);
try expectEqual(b1.commit(), checkpoint_1 - 1);
}

{
// Make the rest of cluster prepare and commit a different sequence of prepares.
t.replica(.R_).pass(.R_, .incoming, .prepare);
t.replica(.R_).pass(.R_, .incoming, .prepare_ok);
t.replica(.R_).pass(.R_, .incoming, .start_view_change);

a0.drop_all(.R_, .bidirectional);
b1.drop_all(.R_, .bidirectional);
try c.request(checkpoint_2, checkpoint_2);
}

{
// Let B1 rejoin, but prevent it from jumping into view change.
b1.pass_all(.R_, .bidirectional);
b1.drop(.R_, .bidirectional, .start_view);
b1.drop(.R_, .incoming, .ping);
b1.drop(.R_, .incoming, .pong);

// TODO: Explicit coverage marks: This should hit the
// "jump_sync_target: ignoring, newer view" log line.
const b1_view_before = b1.view();
try c.request(checkpoint_2_trigger - 1, checkpoint_2_trigger - 1);
try expectEqual(b1_view_before, b1.view());
try expectEqual(b1.op_checkpoint(), 0); // B1 ignores new checkpoint.

b1.stop(); // It should be ignored even after restart.
try b1.open();
t.run();
try expectEqual(b1_view_before, b1.view());
try expectEqual(b1.op_checkpoint(), 0);
}

t.replica(.R_).pass_all(.R_, .bidirectional);
t.run();
try expectEqual(t.replica(.R_).commit(), checkpoint_2_trigger - 1);
}

test "Cluster: prepare beyond checkpoint trigger" {
const t = try TestContext.init(.{ .replica_count = 3 });
defer t.deinit();
Expand Down