Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSR: Ping releases #1670

Merged
merged 6 commits into from Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 20 additions & 19 deletions docs/internals/testing.md
Expand Up @@ -42,30 +42,31 @@ Documentation for (roughly) code in the `src/testing` directory.
8. Journal faulty/dirty: `0/1Jd` indicates that the journal has 0 faulty headers and 1 dirty headers.
9. WAL prepare ops: e.g. `85:149Wo` indicates that the op of the oldest prepare in the WAL is `85` and the op of the newest prepare in the WAL is `149`.
10. Syncing ops: e.g. `<0:123>` indicates that `vsr_state.sync_op_min=0` and `vsr_state.sync_op_max=123`.
11. Grid blocks acquired: e.g. `167Ga` indicates that the grid has `167` blocks currently in use.
12. Grid blocks queued `grid.read_remote_queue`: e.g. `0G!` indicates that there are `0` reads awaiting remote fulfillment.
13. Grid blocks queued `grid_blocks_missing`: e.g. `0G?` indicates that there are `0` blocks awaiting remote repair.
14. Pipeline prepares (primary-only): e.g. `1/4Pp` indicates that the primary's pipeline has 2 prepares queued, out of a capacity of 4.
15. Pipeline requests (primary-only): e.g. `0/3Pq` indicates that the primary's pipeline has 0 requests queued, out of a capacity of 3.
11. Release version: e.g. `v1:2` indicates that the replica is running release version `1`, and that its maximum available release is `2`.
12. Grid blocks acquired: e.g. `167Ga` indicates that the grid has `167` blocks currently in use.
13. Grid blocks queued `grid.read_remote_queue`: e.g. `0G!` indicates that there are `0` reads awaiting remote fulfillment.
14. Grid blocks queued `grid_blocks_missing`: e.g. `0G?` indicates that there are `0` blocks awaiting remote repair.
15. Pipeline prepares (primary-only): e.g. `1/4Pp` indicates that the primary's pipeline has 2 prepares queued, out of a capacity of 4.
16. Pipeline requests (primary-only): e.g. `0/3Pq` indicates that the primary's pipeline has 0 requests queued, out of a capacity of 3.

### Example

(The first line labels the columns, but is not part of the actual VOPR output).

```
1 2 3 4-------- 5--- 6---------- 7------- 8----- 9------- 10----- 11----- 12- 13- 14--- 15---
1 2 3 4-------- 5--- 6---------- 7------- 8----- 9------- 10----- 11-- 12----- 13- 14- 15--- 16---

3 [ / . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? 0/4Pp 0/3Rq
4 ^ \ . 2V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> nullGa 0G! 0G?
2 \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
2 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
6 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
6 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
3 ] / . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 167Ga 0G! 0G? 0/4Pp 0/3Rq
2 ] \ . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 167Ga 0G! 0G?
1 \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> 183Ga 0G! 0G?
1 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> 183Ga 0G! 0G?
4 < ~ v 3V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> 66Ga 0G! 0G?
5 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
5 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
3 [ / . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? 0/4Pp 0/3Rq
4 ^ \ . 2V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> v1:2 nullGa 0G! 0G?
2 \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
2 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
6 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
6 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
3 ] / . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 167Ga 0G! 0G? 0/4Pp 0/3Rq
2 ] \ . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 167Ga 0G! 0G?
1 \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> v1:2 183Ga 0G! 0G?
1 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> v1:2 183Ga 0G! 0G?
4 < ~ v 3V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> v1:2 66Ga 0G! 0G?
5 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
5 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
```
1 change: 1 addition & 0 deletions src/config.zig
Expand Up @@ -142,6 +142,7 @@ const ConfigCluster = struct {
lsm_batch_multiple: comptime_int = 32,
lsm_snapshots_max: usize = 32,
lsm_manifest_compact_extra_blocks: comptime_int = 1,
vsr_releases_max: usize = 64,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vsr_releases_max is a new config/constant which defines the maximum number of releases that can be advertised by a replica in a ping message. (So this is effectively the maximum number of releases that can be compiled into a multiversion binary).

I set it to 64 right now, which I chose somewhat arbitrarily: We release on a weekly cadence, there are 52 weeks/year, rounded up to the nearest power-of-2 is 64.


// Arbitrary value.
// TODO(batiati): Maybe this constant should be derived from `grid_iops_read_max`,
Expand Down
11 changes: 11 additions & 0 deletions src/constants.zig
Expand Up @@ -83,6 +83,17 @@ comptime {
assert(clients_max >= Config.Cluster.clients_max_min);
}

/// The maximum number of release versions (upgrade candidates) that can be advertised by a replica
/// in each ping message body.
pub const vsr_releases_max = config.cluster.vsr_releases_max;

comptime {
assert(vsr_releases_max >= 2);
assert(vsr_releases_max * @sizeOf(u16) <= message_body_size_max);
// The number of releases is encoded into ping headers as a u16.
assert(vsr_releases_max <= std.math.maxInt(u16));
}

/// The maximum number of nodes required to form a quorum for replication.
/// Majority quorums are only required across view change and replication phases (not within).
/// As per Flexible Paxos, provided `quorum_replication + quorum_view_change > replicas`:
Expand Down
4 changes: 4 additions & 0 deletions src/testing/cluster.zig
Expand Up @@ -429,6 +429,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
// TODO Use "real" release numbers.
.release = 1,
.release_client_min = 1,
.releases_bundled = &[_]u16{1},
},
);
assert(replica.cluster == cluster.options.cluster_id);
Expand Down Expand Up @@ -628,6 +629,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
"{[journal_faulty]:>2}/{[journal_dirty]:_>2}J! " ++
"{[wal_op_min]:>3}:{[wal_op_max]:_>3}Wo " ++
"<{[sync_op_min]:_>3}:{[sync_op_max]:_>3}> " ++
"v{[release]}:{[release_max]} " ++
"{[grid_blocks_acquired]?:>5}Ga " ++
"{[grid_blocks_global]:>2}G! " ++
"{[grid_blocks_repair]:>3}G?", .{
Expand All @@ -643,6 +645,8 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
.wal_op_max = wal_op_max,
.sync_op_min = replica.superblock.working.vsr_state.sync_op_min,
.sync_op_max = replica.superblock.working.vsr_state.sync_op_max,
.release = replica.release,
.release_max = replica.releases_bundled.get(0),
.grid_blocks_acquired = if (replica.grid.free_set.opened)
replica.grid.free_set.count_acquired()
else
Expand Down
1 change: 1 addition & 0 deletions src/tigerbeetle/main.zig
Expand Up @@ -183,6 +183,7 @@ const Command = struct {
// TODO Use real release numbers.
.release = 1,
.release_client_min = 1,
.releases_bundled = &[_]u16{1},
.storage_size_limit = args.storage_size_limit,
.storage = &command.storage,
.aof = &aof,
Expand Down
15 changes: 15 additions & 0 deletions src/vsr.zig
Expand Up @@ -62,6 +62,9 @@ pub const CheckpointTrailerType = @import("vsr/checkpoint_trailer.zig").Checkpoi
/// For backwards compatibility through breaking changes (e.g. upgrading checksums/ciphers).
pub const Version: u16 = 0;

/// A ReleaseList is ordered from lowest-to-highest version.
pub const ReleaseList = stdx.BoundedArray(u16, constants.vsr_releases_max);

pub const ProcessType = enum { replica, client };

pub const Zone = enum {
Expand Down Expand Up @@ -1044,6 +1047,18 @@ pub fn member_index(members: *const Members, replica_id: u128) ?u8 {
} else return null;
}

pub fn verify_release_list(releases: []const u16) void {
assert(releases.len >= 1);
assert(releases.len <= constants.vsr_releases_max);

for (
releases[0 .. releases.len - 1],
releases[1..],
) |release_a, release_b| {
assert(release_a < release_b);
}
}

pub const Headers = struct {
pub const Array = stdx.BoundedArray(Header.Prepare, constants.view_change_headers_max);
/// The SuperBlock's persisted VSR headers.
Expand Down
18 changes: 14 additions & 4 deletions src/vsr/message_header.zig
Expand Up @@ -291,7 +291,7 @@ pub const Header = extern struct {
checksum_body_padding: u128 = 0,
nonce_reserved: u128 = 0,
cluster: u128,
size: u32 = @sizeOf(Header),
size: u32,
epoch: u32 = 0,
// NB: unlike every other message, pings and pongs use on disk view, rather than in-memory
// view, to avoid disrupting clock synchronization while the view is being updated.
Expand All @@ -308,13 +308,23 @@ pub const Header = extern struct {
checkpoint_op: u64,

ping_timestamp_monotonic: u64,
release_count: u16,

reserved: [96]u8 = [_]u8{0} ** 96,
reserved: [94]u8 = [_]u8{0} ** 94,

fn invalid_header(self: *const @This()) ?[]const u8 {
assert(self.command == .ping);
if (self.size != @sizeOf(Header)) return "size != @sizeOf(Header)";
if (self.checksum_body != checksum_body_empty) return "checksum_body != expected";
if (self.size <= @sizeOf(Header)) return "size <= @sizeOf(Header)";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code to validate the body of a ping is missing? I.e, checking that the releases are strictly sorted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't have access to the message body. We do assert the order in on_ping, though.

if (self.size > @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max) {
return "size > limit";
}
if (self.release_count == 0) return "release_count == 0";
if (self.release_count > constants.vsr_releases_max) {
return "release_count > vsr_releases_max";
}
if (self.size != @sizeOf(Header) + self.release_count * @sizeOf(u16)) {
return "size != @sizeOf(Header) + release_count * @sizeOf(u16)";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a separate check that release_count <= vsr_releases_max, so that we don't have to worry about overflow?

I think the code is correct, but if, eg the release_count were an u64 then it would be possible to get an overflow here. Seems like its more robust to check count limits individually before multiplying to check size limits.

}
if (self.release == 0) return "release == 0";
if (!vsr.Checkpoint.valid(self.checkpoint_op)) return "checkpoint_op invalid";
if (self.ping_timestamp_monotonic == 0) return "ping_timestamp_monotonic != expected";
Expand Down
77 changes: 74 additions & 3 deletions src/vsr/replica.zig
Expand Up @@ -210,6 +210,14 @@ pub fn ReplicaType(
/// It should never be modified by a running replica.
release_client_min: u16,

/// A list of all versions of code that are available in the current binary.
/// Includes the current version, newer versions, and older versions.
/// Ordered from lowest/oldest to highest/newest.
///
/// Note that this is a property (rather than a constant) for the purpose of testing.
/// It should never be modified for a running replica.
releases_bundled: vsr.ReleaseList,

/// A globally unique integer generated by a crypto rng during replica process startup.
/// Presently, it is used to detect outdated start view messages in recovering head status.
nonce: Nonce,
Expand Down Expand Up @@ -266,6 +274,17 @@ pub fn ReplicaType(
/// - If syncing≠idle then sync_tables=null.
sync_tables: ?ForestTableIterator = null,

/// The latest release list from every other replica. (Constructed from pings.)
///
/// Invariants:
/// - upgrade_targets[self.replica] = null
/// - upgrade_targets[*].releases > release
upgrade_targets: [constants.replicas_max]?struct {
checkpoint: u64,
view: u32,
releases: vsr.ReleaseList,
} = .{null} ** constants.replicas_max,

/// The current view.
/// Initialized from the superblock's VSRState.
///
Expand Down Expand Up @@ -487,6 +506,7 @@ pub fn ReplicaType(
grid_cache_blocks_count: u32 = Grid.Cache.value_count_max_multiple,
release: u16,
release_client_min: u16,
releases_bundled: []const u16,
};

/// Initializes and opens the provided replica using the options.
Expand Down Expand Up @@ -548,6 +568,7 @@ pub fn ReplicaType(
.grid_cache_blocks_count = options.grid_cache_blocks_count,
.release = options.release,
.release_client_min = options.release_client_min,
.releases_bundled = options.releases_bundled,
});

// Disable all dynamic allocation from this point onwards.
Expand Down Expand Up @@ -847,6 +868,7 @@ pub fn ReplicaType(
grid_cache_blocks_count: u32,
release: u16,
release_client_min: u16,
releases_bundled: []const u16,
};

/// NOTE: self.superblock must be initialized and opened prior to this call.
Expand Down Expand Up @@ -889,6 +911,9 @@ pub fn ReplicaType(
// Flexible quorums are safe if these two quorums intersect so that this relation holds:
assert(quorum_replication + quorum_view_change > replica_count);

vsr.verify_release_list(options.releases_bundled);
assert(std.mem.indexOfScalar(u16, options.releases_bundled, options.release) != null);

self.time = options.time;

// The clock is special-cased for standbys. We want to balance two concerns:
Expand Down Expand Up @@ -976,6 +1001,9 @@ pub fn ReplicaType(
.quorum_majority = quorum_majority,
.release = options.release,
.release_client_min = options.release_client_min,
.releases_bundled = vsr.ReleaseList.from_slice(
options.releases_bundled,
) catch unreachable,
.nonce = options.nonce,
// Copy the (already-initialized) time back, to avoid regressing the monotonic
// clock guard.
Expand Down Expand Up @@ -1295,6 +1323,29 @@ pub fn ReplicaType(
.ping_timestamp_monotonic = message.header.ping_timestamp_monotonic,
.pong_timestamp_wall = @bitCast(self.clock.realtime()),
}));

if (message.header.replica < self.replica_count) {
const upgrade_targets = &self.upgrade_targets[message.header.replica];
if (upgrade_targets.* == null or
(upgrade_targets.*.?.checkpoint <= message.header.checkpoint_op and
upgrade_targets.*.?.view <= message.header.view))
{
upgrade_targets.* = .{
.checkpoint = message.header.checkpoint_op,
.view = message.header.view,
.releases = .{},
};

const releases = std.mem.bytesAsSlice(u16, message.body());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert that releases are sorted.

assert(releases.len == message.header.release_count);
vsr.verify_release_list(releases);
for (releases) |release| {
if (release > self.release) {
upgrade_targets.*.?.releases.append_assume_capacity(release);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append_assume_capacity

I think this can fire? We have a bound for a single message, but different ping messages might have originated from different binaries, so their union might actually exceed what we have here. This is of course and edge case, but there's a related realistic problem: imagine if you run a binary with a new release on one replica, but then retire it in favor of the old binary. The knowledge that that replica has a new release persists until the viewchange/checkpoint boundary, although the actual binary does not support that version...

It feels like just replacing the whole set of targets might be safer? But then again, that assumes that the network drops all old messages eventually.

Hm, I think I really don't like the union behavior because it is guaranteed to be wrong: if we have two pings with different release sets, then what we support is one release set or the other release set. When we union the two, we are guaranteed to have an unsupported release in there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We replace the entire list every time, there is no union:

+                    upgrade_targets.* = .{
+                        .checkpoint = message.header.checkpoint_op,
+                        .view = message.header.view,
+                        .releases = .{},
+                    };

above resets the list to empty before we append anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 gotta grab one more pu-erh cup as I don't seem to be sufficiently woken up, sorry!

}
}
}
}
}

fn on_pong(self: *Self, message: *const Message.Pong) void {
Expand Down Expand Up @@ -2570,19 +2621,30 @@ pub fn ReplicaType(
fn on_ping_timeout(self: *Self) void {
self.ping_timeout.reset();

var ping = Header.Ping{
const message = self.message_bus.pool.get_message(.ping);
defer self.message_bus.unref(message);

message.header.* = Header.Ping{
.command = .ping,
.size = @sizeOf(Header) + @sizeOf(u16) * self.releases_bundled.count_as(u16),
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pings while the view is being updated.
.release = self.release,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_op = self.op_checkpoint(),
.ping_timestamp_monotonic = self.clock.monotonic(),
.release_count = self.releases_bundled.count_as(u16),
};
assert(ping.view <= self.view);

self.send_header_to_other_replicas_and_standbys(ping.frame_const().*);
const ping_versions = std.mem.bytesAsSlice(u16, message.body());
stdx.copy_disjoint(.exact, u16, ping_versions, self.releases_bundled.const_slice());
message.header.set_checksum_body(message.body());
message.header.set_checksum();

assert(message.header.view <= self.view);

self.send_message_to_other_replicas_and_standbys(message.base());
}

fn on_prepare_timeout(self: *Self) void {
Expand Down Expand Up @@ -6595,6 +6657,15 @@ pub fn ReplicaType(
self.send_message_to_other_replicas_base(message.base());
}

fn send_message_to_other_replicas_and_standbys(self: *Self, message: *Message) void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why send_header functions don't delegate to send_message ones?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is due to Message pointers vs MessageType pointers, to keep the caller from needing to do any conversion.

var replica: u8 = 0;
while (replica < self.node_count) : (replica += 1) {
if (replica != self.replica) {
self.send_message_to_replica_base(replica, message);
}
}
}

fn send_message_to_other_replicas_base(self: *Self, message: *Message) void {
var replica: u8 = 0;
while (replica < self.replica_count) : (replica += 1) {
Expand Down