diff --git a/docs/internals/testing.md b/docs/internals/testing.md index 7df95b235a..35434f5a5c 100644 --- a/docs/internals/testing.md +++ b/docs/internals/testing.md @@ -42,30 +42,31 @@ Documentation for (roughly) code in the `src/testing` directory. 8. Journal faulty/dirty: `0/1Jd` indicates that the journal has 0 faulty headers and 1 dirty headers. 9. WAL prepare ops: e.g. `85:149Wo` indicates that the op of the oldest prepare in the WAL is `85` and the op of the newest prepare in the WAL is `149`. 10. Syncing ops: e.g. `<0:123>` indicates that `vsr_state.sync_op_min=0` and `vsr_state.sync_op_max=123`. -11. Grid blocks acquired: e.g. `167Ga` indicates that the grid has `167` blocks currently in use. -12. Grid blocks queued `grid.read_remote_queue`: e.g. `0G!` indicates that there are `0` reads awaiting remote fulfillment. -13. Grid blocks queued `grid_blocks_missing`: e.g. `0G?` indicates that there are `0` blocks awaiting remote repair. -14. Pipeline prepares (primary-only): e.g. `1/4Pp` indicates that the primary's pipeline has 2 prepares queued, out of a capacity of 4. -15. Pipeline requests (primary-only): e.g. `0/3Pq` indicates that the primary's pipeline has 0 requests queued, out of a capacity of 3. +11. Release version: e.g. `v1:2` indicates that the replica is running release version `1`, and that its maximum available release is `2`. +12. Grid blocks acquired: e.g. `167Ga` indicates that the grid has `167` blocks currently in use. +13. Grid blocks queued `grid.read_remote_queue`: e.g. `0G!` indicates that there are `0` reads awaiting remote fulfillment. +14. Grid blocks queued `grid_blocks_missing`: e.g. `0G?` indicates that there are `0` blocks awaiting remote repair. +15. Pipeline prepares (primary-only): e.g. `1/4Pp` indicates that the primary's pipeline has 2 prepares queued, out of a capacity of 4. +16. Pipeline requests (primary-only): e.g. `0/3Pq` indicates that the primary's pipeline has 0 requests queued, out of a capacity of 3. ### Example (The first line labels the columns, but is not part of the actual VOPR output). ``` - 1 2 3 4-------- 5--- 6---------- 7------- 8----- 9------- 10----- 11----- 12- 13- 14--- 15--- + 1 2 3 4-------- 5--- 6---------- 7------- 8----- 9------- 10----- 11-- 12----- 13- 14- 15--- 16--- - 3 [ / . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? 0/4Pp 0/3Rq - 4 ^ \ . 2V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> nullGa 0G! 0G? - 2 \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? - 2 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? - 6 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? - 6 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? - 3 ] / . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 167Ga 0G! 0G? 0/4Pp 0/3Rq - 2 ] \ . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 167Ga 0G! 0G? - 1 \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> 183Ga 0G! 0G? - 1 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> 183Ga 0G! 0G? - 4 < ~ v 3V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> 66Ga 0G! 0G? - 5 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? - 5 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> 183Ga 0G! 0G? + 3 [ / . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? 0/4Pp 0/3Rq + 4 ^ \ . 2V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> v1:2 nullGa 0G! 0G? + 2 \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? + 2 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? + 6 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? + 6 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? + 3 ] / . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 167Ga 0G! 0G? 0/4Pp 0/3Rq + 2 ] \ . 3V 95/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 167Ga 0G! 0G? + 1 \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> v1:2 183Ga 0G! 0G? + 1 [ \ . 3V 71/_99/_99C 68:_99Jo 0/_1J! 67:_98Wo <__0:__0> v1:2 183Ga 0G! 0G? + 4 < ~ v 3V 23/_23/_46C 19:_50Jo 0/_0J! 19:_50Wo <__0:__0> v1:2 66Ga 0G! 0G? + 5 | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? + 5 [ | . 3V 71/_99/_99C 68:_99Jo 0/_0J! 68:_99Wo <__0:__0> v1:2 183Ga 0G! 0G? ``` diff --git a/src/config.zig b/src/config.zig index eac2f1b8ee..b8d9463ad6 100644 --- a/src/config.zig +++ b/src/config.zig @@ -142,6 +142,7 @@ const ConfigCluster = struct { lsm_batch_multiple: comptime_int = 32, lsm_snapshots_max: usize = 32, lsm_manifest_compact_extra_blocks: comptime_int = 1, + vsr_releases_max: usize = 64, // Arbitrary value. // TODO(batiati): Maybe this constant should be derived from `grid_iops_read_max`, diff --git a/src/constants.zig b/src/constants.zig index c1a9ac0244..d01b8ab954 100644 --- a/src/constants.zig +++ b/src/constants.zig @@ -83,6 +83,17 @@ comptime { assert(clients_max >= Config.Cluster.clients_max_min); } +/// The maximum number of release versions (upgrade candidates) that can be advertised by a replica +/// in each ping message body. +pub const vsr_releases_max = config.cluster.vsr_releases_max; + +comptime { + assert(vsr_releases_max >= 2); + assert(vsr_releases_max * @sizeOf(u16) <= message_body_size_max); + // The number of releases is encoded into ping headers as a u16. + assert(vsr_releases_max <= std.math.maxInt(u16)); +} + /// The maximum number of nodes required to form a quorum for replication. /// Majority quorums are only required across view change and replication phases (not within). /// As per Flexible Paxos, provided `quorum_replication + quorum_view_change > replicas`: diff --git a/src/testing/cluster.zig b/src/testing/cluster.zig index 813d8d031e..a7a234b6c5 100644 --- a/src/testing/cluster.zig +++ b/src/testing/cluster.zig @@ -429,6 +429,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type { // TODO Use "real" release numbers. .release = 1, .release_client_min = 1, + .releases_bundled = &[_]u16{1}, }, ); assert(replica.cluster == cluster.options.cluster_id); @@ -628,6 +629,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type { "{[journal_faulty]:>2}/{[journal_dirty]:_>2}J! " ++ "{[wal_op_min]:>3}:{[wal_op_max]:_>3}Wo " ++ "<{[sync_op_min]:_>3}:{[sync_op_max]:_>3}> " ++ + "v{[release]}:{[release_max]} " ++ "{[grid_blocks_acquired]?:>5}Ga " ++ "{[grid_blocks_global]:>2}G! " ++ "{[grid_blocks_repair]:>3}G?", .{ @@ -643,6 +645,8 @@ pub fn ClusterType(comptime StateMachineType: anytype) type { .wal_op_max = wal_op_max, .sync_op_min = replica.superblock.working.vsr_state.sync_op_min, .sync_op_max = replica.superblock.working.vsr_state.sync_op_max, + .release = replica.release, + .release_max = replica.releases_bundled.get(0), .grid_blocks_acquired = if (replica.grid.free_set.opened) replica.grid.free_set.count_acquired() else diff --git a/src/tigerbeetle/main.zig b/src/tigerbeetle/main.zig index 3b6bb19a86..77458929dd 100644 --- a/src/tigerbeetle/main.zig +++ b/src/tigerbeetle/main.zig @@ -183,6 +183,7 @@ const Command = struct { // TODO Use real release numbers. .release = 1, .release_client_min = 1, + .releases_bundled = &[_]u16{1}, .storage_size_limit = args.storage_size_limit, .storage = &command.storage, .aof = &aof, diff --git a/src/vsr.zig b/src/vsr.zig index d1b3aa45cb..f189418e64 100644 --- a/src/vsr.zig +++ b/src/vsr.zig @@ -62,6 +62,9 @@ pub const CheckpointTrailerType = @import("vsr/checkpoint_trailer.zig").Checkpoi /// For backwards compatibility through breaking changes (e.g. upgrading checksums/ciphers). pub const Version: u16 = 0; +/// A ReleaseList is ordered from lowest-to-highest version. +pub const ReleaseList = stdx.BoundedArray(u16, constants.vsr_releases_max); + pub const ProcessType = enum { replica, client }; pub const Zone = enum { @@ -1044,6 +1047,18 @@ pub fn member_index(members: *const Members, replica_id: u128) ?u8 { } else return null; } +pub fn verify_release_list(releases: []const u16) void { + assert(releases.len >= 1); + assert(releases.len <= constants.vsr_releases_max); + + for ( + releases[0 .. releases.len - 1], + releases[1..], + ) |release_a, release_b| { + assert(release_a < release_b); + } +} + pub const Headers = struct { pub const Array = stdx.BoundedArray(Header.Prepare, constants.view_change_headers_max); /// The SuperBlock's persisted VSR headers. diff --git a/src/vsr/message_header.zig b/src/vsr/message_header.zig index 1892b42d9f..88ad787c89 100644 --- a/src/vsr/message_header.zig +++ b/src/vsr/message_header.zig @@ -291,7 +291,7 @@ pub const Header = extern struct { checksum_body_padding: u128 = 0, nonce_reserved: u128 = 0, cluster: u128, - size: u32 = @sizeOf(Header), + size: u32, epoch: u32 = 0, // NB: unlike every other message, pings and pongs use on disk view, rather than in-memory // view, to avoid disrupting clock synchronization while the view is being updated. @@ -308,16 +308,22 @@ pub const Header = extern struct { checkpoint_op: u64, ping_timestamp_monotonic: u64, + release_count: u16, - reserved: [96]u8 = [_]u8{0} ** 96, + reserved: [94]u8 = [_]u8{0} ** 94, fn invalid_header(self: *const @This()) ?[]const u8 { assert(self.command == .ping); - if (self.size != @sizeOf(Header)) return "size != @sizeOf(Header)"; - if (self.checksum_body != checksum_body_empty) return "checksum_body != expected"; + if (self.size != @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max) { + return "size != @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max"; + } if (self.release == 0) return "release == 0"; if (!vsr.Checkpoint.valid(self.checkpoint_op)) return "checkpoint_op invalid"; if (self.ping_timestamp_monotonic == 0) return "ping_timestamp_monotonic != expected"; + if (self.release_count == 0) return "release_count == 0"; + if (self.release_count > constants.vsr_releases_max) { + return "release_count > vsr_releases_max"; + } if (!stdx.zeroed(&self.reserved)) return "reserved != 0"; return null; } diff --git a/src/vsr/replica.zig b/src/vsr/replica.zig index 2ac5ab95a4..58655d7fa3 100644 --- a/src/vsr/replica.zig +++ b/src/vsr/replica.zig @@ -210,6 +210,14 @@ pub fn ReplicaType( /// It should never be modified by a running replica. release_client_min: u16, + /// A list of all versions of code that are available in the current binary. + /// Includes the current version, newer versions, and older versions. + /// Ordered from lowest/oldest to highest/newest. + /// + /// Note that this is a property (rather than a constant) for the purpose of testing. + /// It should never be modified for a running replica. + releases_bundled: vsr.ReleaseList, + /// A globally unique integer generated by a crypto rng during replica process startup. /// Presently, it is used to detect outdated start view messages in recovering head status. nonce: Nonce, @@ -266,6 +274,17 @@ pub fn ReplicaType( /// - If syncing≠idle then sync_tables=null. sync_tables: ?ForestTableIterator = null, + /// The latest release list from every other replica. (Constructed from pings.) + /// + /// Invariants: + /// - upgrade_targets[self.replica] = null + /// - upgrade_targets[*].releases > release + upgrade_targets: [constants.replicas_max]?struct { + checkpoint: u64, + view: u32, + releases: vsr.ReleaseList, + } = .{null} ** constants.replicas_max, + /// The current view. /// Initialized from the superblock's VSRState. /// @@ -487,6 +506,7 @@ pub fn ReplicaType( grid_cache_blocks_count: u32 = Grid.Cache.value_count_max_multiple, release: u16, release_client_min: u16, + releases_bundled: []const u16, }; /// Initializes and opens the provided replica using the options. @@ -548,6 +568,7 @@ pub fn ReplicaType( .grid_cache_blocks_count = options.grid_cache_blocks_count, .release = options.release, .release_client_min = options.release_client_min, + .releases_bundled = options.releases_bundled, }); // Disable all dynamic allocation from this point onwards. @@ -847,6 +868,7 @@ pub fn ReplicaType( grid_cache_blocks_count: u32, release: u16, release_client_min: u16, + releases_bundled: []const u16, }; /// NOTE: self.superblock must be initialized and opened prior to this call. @@ -889,6 +911,9 @@ pub fn ReplicaType( // Flexible quorums are safe if these two quorums intersect so that this relation holds: assert(quorum_replication + quorum_view_change > replica_count); + vsr.verify_release_list(options.releases_bundled); + assert(std.mem.indexOfScalar(u16, options.releases_bundled, options.release) != null); + self.time = options.time; // The clock is special-cased for standbys. We want to balance two concerns: @@ -976,6 +1001,9 @@ pub fn ReplicaType( .quorum_majority = quorum_majority, .release = options.release, .release_client_min = options.release_client_min, + .releases_bundled = vsr.ReleaseList.from_slice( + options.releases_bundled, + ) catch unreachable, .nonce = options.nonce, // Copy the (already-initialized) time back, to avoid regressing the monotonic // clock guard. @@ -1295,6 +1323,30 @@ pub fn ReplicaType( .ping_timestamp_monotonic = message.header.ping_timestamp_monotonic, .pong_timestamp_wall = @bitCast(self.clock.realtime()), })); + + if (message.header.replica < self.replica_count) { + const upgrade_targets = &self.upgrade_targets[message.header.replica]; + if (upgrade_targets.* == null or + (upgrade_targets.*.?.checkpoint <= message.header.checkpoint_op and + upgrade_targets.*.?.view <= message.header.view)) + { + upgrade_targets.* = .{ + .checkpoint = message.header.checkpoint_op, + .view = message.header.view, + .releases = .{}, + }; + + const releases_all = std.mem.bytesAsSlice(u16, message.body()); + const releases = releases_all[0..message.header.release_count]; + assert(releases.len == message.header.release_count); + vsr.verify_release_list(releases); + for (releases) |release| { + if (release > self.release) { + upgrade_targets.*.?.releases.append_assume_capacity(release); + } + } + } + } } fn on_pong(self: *Self, message: *const Message.Pong) void { @@ -2570,8 +2622,12 @@ pub fn ReplicaType( fn on_ping_timeout(self: *Self) void { self.ping_timeout.reset(); - var ping = Header.Ping{ + const message = self.message_bus.pool.get_message(.ping); + defer self.message_bus.unref(message); + + message.header.* = Header.Ping{ .command = .ping, + .size = @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max, .cluster = self.cluster, .replica = self.replica, .view = self.view_durable(), // Don't drop pings while the view is being updated. @@ -2579,10 +2635,18 @@ pub fn ReplicaType( .checkpoint_id = self.superblock.working.checkpoint_id(), .checkpoint_op = self.op_checkpoint(), .ping_timestamp_monotonic = self.clock.monotonic(), + .release_count = self.releases_bundled.count_as(u16), }; - assert(ping.view <= self.view); - self.send_header_to_other_replicas_and_standbys(ping.frame_const().*); + const ping_versions = std.mem.bytesAsSlice(u16, message.body()); + stdx.copy_disjoint(.inexact, u16, ping_versions, self.releases_bundled.const_slice()); + @memset(ping_versions[self.releases_bundled.count()..], 0); + message.header.set_checksum_body(message.body()); + message.header.set_checksum(); + + assert(message.header.view <= self.view); + + self.send_message_to_other_replicas_and_standbys(message.base()); } fn on_prepare_timeout(self: *Self) void { @@ -6595,6 +6659,15 @@ pub fn ReplicaType( self.send_message_to_other_replicas_base(message.base()); } + fn send_message_to_other_replicas_and_standbys(self: *Self, message: *Message) void { + var replica: u8 = 0; + while (replica < self.node_count) : (replica += 1) { + if (replica != self.replica) { + self.send_message_to_replica_base(replica, message); + } + } + } + fn send_message_to_other_replicas_base(self: *Self, message: *Message) void { var replica: u8 = 0; while (replica < self.replica_count) : (replica += 1) {