Skip to content

Commit

Permalink
Merge pull request #1670 from tigerbeetle/dj-vsr-ping-releases
Browse files Browse the repository at this point in the history
VSR: Ping releases
  • Loading branch information
sentientwaffle committed Mar 11, 2024
2 parents 54993cf + 5beb02c commit c6fe5ef
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 26 deletions.
39 changes: 20 additions & 19 deletions docs/internals/testing.md
Expand Up @@ -42,30 +42,31 @@ Documentation for (roughly) code in the `src/testing` directory.
8. Journal faulty/dirty: `0/1Jd` indicates that the journal has 0 faulty headers and 1 dirty headers.
9. WAL prepare ops: e.g. `85:149Wo` indicates that the op of the oldest prepare in the WAL is `85` and the op of the newest prepare in the WAL is `149`.
10. Syncing ops: e.g. `<0:123>` indicates that `vsr_state.sync_op_min=0` and `vsr_state.sync_op_max=123`.
11. Grid blocks acquired: e.g. `167Ga` indicates that the grid has `167` blocks currently in use.
12. Grid blocks queued `grid.read_remote_queue`: e.g. `0G!` indicates that there are `0` reads awaiting remote fulfillment.
13. Grid blocks queued `grid_blocks_missing`: e.g. `0G?` indicates that there are `0` blocks awaiting remote repair.
14. Pipeline prepares (primary-only): e.g. `1/4Pp` indicates that the primary's pipeline has 2 prepares queued, out of a capacity of 4.
15. Pipeline requests (primary-only): e.g. `0/3Pq` indicates that the primary's pipeline has 0 requests queued, out of a capacity of 3.
11. Release version: e.g. `v1:2` indicates that the replica is running release version `1`, and that its maximum available release is `2`.
12. Grid blocks acquired: e.g. `167Ga` indicates that the grid has `167` blocks currently in use.
13. Grid blocks queued `grid.read_remote_queue`: e.g. `0G!` indicates that there are `0` reads awaiting remote fulfillment.
14. Grid blocks queued `grid_blocks_missing`: e.g. `0G?` indicates that there are `0` blocks awaiting remote repair.
15. Pipeline prepares (primary-only): e.g. `1/4Pp` indicates that the primary's pipeline has 2 prepares queued, out of a capacity of 4.
16. Pipeline requests (primary-only): e.g. `0/3Pq` indicates that the primary's pipeline has 0 requests queued, out of a capacity of 3.

### Example

(The first line labels the columns, but is not part of the actual VOPR output).

```
1 2 3 4-------- 5--- 6---------- 7------- 8----- 9------- 10----- 11----- 12- 13- 14--- 15---
1 2 3 4-------- 5--- 6---------- 7------- 8----- 9------- 10----- 11-- 12----- 13- 14- 15--- 16---
3 [ / . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? 0/4Pp 0/3Rq
4 ^ \ . 2V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> nullGa 0G! 0G?
2 \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
2 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
6 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
6 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
3 ] / . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 167Ga 0G! 0G? 0/4Pp 0/3Rq
2 ] \ . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 167Ga 0G! 0G?
1 \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> 183Ga 0G! 0G?
1 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> 183Ga 0G! 0G?
4 < ~ v 3V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> 66Ga 0G! 0G?
5 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
5 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G?
3 [ / . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? 0/4Pp 0/3Rq
4 ^ \ . 2V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> v1:2 nullGa 0G! 0G?
2 \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
2 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
6 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
6 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
3 ] / . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 167Ga 0G! 0G? 0/4Pp 0/3Rq
2 ] \ . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 167Ga 0G! 0G?
1 \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> v1:2 183Ga 0G! 0G?
1 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> v1:2 183Ga 0G! 0G?
4 < ~ v 3V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> v1:2 66Ga 0G! 0G?
5 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
5 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G?
```
1 change: 1 addition & 0 deletions src/config.zig
Expand Up @@ -142,6 +142,7 @@ const ConfigCluster = struct {
lsm_batch_multiple: comptime_int = 32,
lsm_snapshots_max: usize = 32,
lsm_manifest_compact_extra_blocks: comptime_int = 1,
vsr_releases_max: usize = 64,

// Arbitrary value.
// TODO(batiati): Maybe this constant should be derived from `grid_iops_read_max`,
Expand Down
11 changes: 11 additions & 0 deletions src/constants.zig
Expand Up @@ -83,6 +83,17 @@ comptime {
assert(clients_max >= Config.Cluster.clients_max_min);
}

/// The maximum number of release versions (upgrade candidates) that can be advertised by a replica
/// in each ping message body.
pub const vsr_releases_max = config.cluster.vsr_releases_max;

comptime {
assert(vsr_releases_max >= 2);
assert(vsr_releases_max * @sizeOf(u16) <= message_body_size_max);
// The number of releases is encoded into ping headers as a u16.
assert(vsr_releases_max <= std.math.maxInt(u16));
}

/// The maximum number of nodes required to form a quorum for replication.
/// Majority quorums are only required across view change and replication phases (not within).
/// As per Flexible Paxos, provided `quorum_replication + quorum_view_change > replicas`:
Expand Down
4 changes: 4 additions & 0 deletions src/testing/cluster.zig
Expand Up @@ -429,6 +429,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
// TODO Use "real" release numbers.
.release = 1,
.release_client_min = 1,
.releases_bundled = &[_]u16{1},
},
);
assert(replica.cluster == cluster.options.cluster_id);
Expand Down Expand Up @@ -628,6 +629,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
"{[journal_faulty]:>2}/{[journal_dirty]:_>2}J! " ++
"{[wal_op_min]:>3}:{[wal_op_max]:_>3}Wo " ++
"<{[sync_op_min]:_>3}:{[sync_op_max]:_>3}> " ++
"v{[release]}:{[release_max]} " ++
"{[grid_blocks_acquired]?:>5}Ga " ++
"{[grid_blocks_global]:>2}G! " ++
"{[grid_blocks_repair]:>3}G?", .{
Expand All @@ -643,6 +645,8 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
.wal_op_max = wal_op_max,
.sync_op_min = replica.superblock.working.vsr_state.sync_op_min,
.sync_op_max = replica.superblock.working.vsr_state.sync_op_max,
.release = replica.release,
.release_max = replica.releases_bundled.get(0),
.grid_blocks_acquired = if (replica.grid.free_set.opened)
replica.grid.free_set.count_acquired()
else
Expand Down
1 change: 1 addition & 0 deletions src/tigerbeetle/main.zig
Expand Up @@ -183,6 +183,7 @@ const Command = struct {
// TODO Use real release numbers.
.release = 1,
.release_client_min = 1,
.releases_bundled = &[_]u16{1},
.storage_size_limit = args.storage_size_limit,
.storage = &command.storage,
.aof = &aof,
Expand Down
15 changes: 15 additions & 0 deletions src/vsr.zig
Expand Up @@ -62,6 +62,9 @@ pub const CheckpointTrailerType = @import("vsr/checkpoint_trailer.zig").Checkpoi
/// For backwards compatibility through breaking changes (e.g. upgrading checksums/ciphers).
pub const Version: u16 = 0;

/// A ReleaseList is ordered from lowest-to-highest version.
pub const ReleaseList = stdx.BoundedArray(u16, constants.vsr_releases_max);

pub const ProcessType = enum { replica, client };

pub const Zone = enum {
Expand Down Expand Up @@ -1044,6 +1047,18 @@ pub fn member_index(members: *const Members, replica_id: u128) ?u8 {
} else return null;
}

pub fn verify_release_list(releases: []const u16) void {
assert(releases.len >= 1);
assert(releases.len <= constants.vsr_releases_max);

for (
releases[0 .. releases.len - 1],
releases[1..],
) |release_a, release_b| {
assert(release_a < release_b);
}
}

pub const Headers = struct {
pub const Array = stdx.BoundedArray(Header.Prepare, constants.view_change_headers_max);
/// The SuperBlock's persisted VSR headers.
Expand Down
14 changes: 10 additions & 4 deletions src/vsr/message_header.zig
Expand Up @@ -291,7 +291,7 @@ pub const Header = extern struct {
checksum_body_padding: u128 = 0,
nonce_reserved: u128 = 0,
cluster: u128,
size: u32 = @sizeOf(Header),
size: u32,
epoch: u32 = 0,
// NB: unlike every other message, pings and pongs use on disk view, rather than in-memory
// view, to avoid disrupting clock synchronization while the view is being updated.
Expand All @@ -308,16 +308,22 @@ pub const Header = extern struct {
checkpoint_op: u64,

ping_timestamp_monotonic: u64,
release_count: u16,

reserved: [96]u8 = [_]u8{0} ** 96,
reserved: [94]u8 = [_]u8{0} ** 94,

fn invalid_header(self: *const @This()) ?[]const u8 {
assert(self.command == .ping);
if (self.size != @sizeOf(Header)) return "size != @sizeOf(Header)";
if (self.checksum_body != checksum_body_empty) return "checksum_body != expected";
if (self.size != @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max) {
return "size != @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max";
}
if (self.release == 0) return "release == 0";
if (!vsr.Checkpoint.valid(self.checkpoint_op)) return "checkpoint_op invalid";
if (self.ping_timestamp_monotonic == 0) return "ping_timestamp_monotonic != expected";
if (self.release_count == 0) return "release_count == 0";
if (self.release_count > constants.vsr_releases_max) {
return "release_count > vsr_releases_max";
}
if (!stdx.zeroed(&self.reserved)) return "reserved != 0";
return null;
}
Expand Down
79 changes: 76 additions & 3 deletions src/vsr/replica.zig
Expand Up @@ -210,6 +210,14 @@ pub fn ReplicaType(
/// It should never be modified by a running replica.
release_client_min: u16,

/// A list of all versions of code that are available in the current binary.
/// Includes the current version, newer versions, and older versions.
/// Ordered from lowest/oldest to highest/newest.
///
/// Note that this is a property (rather than a constant) for the purpose of testing.
/// It should never be modified for a running replica.
releases_bundled: vsr.ReleaseList,

/// A globally unique integer generated by a crypto rng during replica process startup.
/// Presently, it is used to detect outdated start view messages in recovering head status.
nonce: Nonce,
Expand Down Expand Up @@ -266,6 +274,17 @@ pub fn ReplicaType(
/// - If syncing≠idle then sync_tables=null.
sync_tables: ?ForestTableIterator = null,

/// The latest release list from every other replica. (Constructed from pings.)
///
/// Invariants:
/// - upgrade_targets[self.replica] = null
/// - upgrade_targets[*].releases > release
upgrade_targets: [constants.replicas_max]?struct {
checkpoint: u64,
view: u32,
releases: vsr.ReleaseList,
} = .{null} ** constants.replicas_max,

/// The current view.
/// Initialized from the superblock's VSRState.
///
Expand Down Expand Up @@ -487,6 +506,7 @@ pub fn ReplicaType(
grid_cache_blocks_count: u32 = Grid.Cache.value_count_max_multiple,
release: u16,
release_client_min: u16,
releases_bundled: []const u16,
};

/// Initializes and opens the provided replica using the options.
Expand Down Expand Up @@ -548,6 +568,7 @@ pub fn ReplicaType(
.grid_cache_blocks_count = options.grid_cache_blocks_count,
.release = options.release,
.release_client_min = options.release_client_min,
.releases_bundled = options.releases_bundled,
});

// Disable all dynamic allocation from this point onwards.
Expand Down Expand Up @@ -847,6 +868,7 @@ pub fn ReplicaType(
grid_cache_blocks_count: u32,
release: u16,
release_client_min: u16,
releases_bundled: []const u16,
};

/// NOTE: self.superblock must be initialized and opened prior to this call.
Expand Down Expand Up @@ -889,6 +911,9 @@ pub fn ReplicaType(
// Flexible quorums are safe if these two quorums intersect so that this relation holds:
assert(quorum_replication + quorum_view_change > replica_count);

vsr.verify_release_list(options.releases_bundled);
assert(std.mem.indexOfScalar(u16, options.releases_bundled, options.release) != null);

self.time = options.time;

// The clock is special-cased for standbys. We want to balance two concerns:
Expand Down Expand Up @@ -976,6 +1001,9 @@ pub fn ReplicaType(
.quorum_majority = quorum_majority,
.release = options.release,
.release_client_min = options.release_client_min,
.releases_bundled = vsr.ReleaseList.from_slice(
options.releases_bundled,
) catch unreachable,
.nonce = options.nonce,
// Copy the (already-initialized) time back, to avoid regressing the monotonic
// clock guard.
Expand Down Expand Up @@ -1295,6 +1323,30 @@ pub fn ReplicaType(
.ping_timestamp_monotonic = message.header.ping_timestamp_monotonic,
.pong_timestamp_wall = @bitCast(self.clock.realtime()),
}));

if (message.header.replica < self.replica_count) {
const upgrade_targets = &self.upgrade_targets[message.header.replica];
if (upgrade_targets.* == null or
(upgrade_targets.*.?.checkpoint <= message.header.checkpoint_op and
upgrade_targets.*.?.view <= message.header.view))
{
upgrade_targets.* = .{
.checkpoint = message.header.checkpoint_op,
.view = message.header.view,
.releases = .{},
};

const releases_all = std.mem.bytesAsSlice(u16, message.body());
const releases = releases_all[0..message.header.release_count];
assert(releases.len == message.header.release_count);
vsr.verify_release_list(releases);
for (releases) |release| {
if (release > self.release) {
upgrade_targets.*.?.releases.append_assume_capacity(release);
}
}
}
}
}

fn on_pong(self: *Self, message: *const Message.Pong) void {
Expand Down Expand Up @@ -2570,19 +2622,31 @@ pub fn ReplicaType(
fn on_ping_timeout(self: *Self) void {
self.ping_timeout.reset();

var ping = Header.Ping{
const message = self.message_bus.pool.get_message(.ping);
defer self.message_bus.unref(message);

message.header.* = Header.Ping{
.command = .ping,
.size = @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max,
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pings while the view is being updated.
.release = self.release,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_op = self.op_checkpoint(),
.ping_timestamp_monotonic = self.clock.monotonic(),
.release_count = self.releases_bundled.count_as(u16),
};
assert(ping.view <= self.view);

self.send_header_to_other_replicas_and_standbys(ping.frame_const().*);
const ping_versions = std.mem.bytesAsSlice(u16, message.body());
stdx.copy_disjoint(.inexact, u16, ping_versions, self.releases_bundled.const_slice());
@memset(ping_versions[self.releases_bundled.count()..], 0);
message.header.set_checksum_body(message.body());
message.header.set_checksum();

assert(message.header.view <= self.view);

self.send_message_to_other_replicas_and_standbys(message.base());
}

fn on_prepare_timeout(self: *Self) void {
Expand Down Expand Up @@ -6595,6 +6659,15 @@ pub fn ReplicaType(
self.send_message_to_other_replicas_base(message.base());
}

fn send_message_to_other_replicas_and_standbys(self: *Self, message: *Message) void {
var replica: u8 = 0;
while (replica < self.node_count) : (replica += 1) {
if (replica != self.replica) {
self.send_message_to_replica_base(replica, message);
}
}
}

fn send_message_to_other_replicas_base(self: *Self, message: *Message) void {
var replica: u8 = 0;
while (replica < self.replica_count) : (replica += 1) {
Expand Down

0 comments on commit c6fe5ef

Please sign in to comment.