Skip to content

Commit

Permalink
Merge pull request #1666 from tigerbeetle/dj-vsr-superblock-release
Browse files Browse the repository at this point in the history
VSR: SuperBlock.VSRState.CheckpointState.release
  • Loading branch information
sentientwaffle committed Mar 8, 2024
2 parents 3eaa330 + d7016bc commit 4763f93
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/aof.zig
Expand Up @@ -349,7 +349,7 @@ pub const AOFReplayClient = struct {
if (header.operation.vsr_reserved()) continue;

const message = self.client.get_message().build(.request);
errdefer self.client.release(message.base());
errdefer self.client.release_message(message.base());

assert(self.inflight_message == null);
self.inflight_message = message;
Expand Down
1 change: 1 addition & 0 deletions src/lsm/forest_fuzz.zig
Expand Up @@ -166,6 +166,7 @@ const Environment = struct {
env.change_state(.init, .superblock_format);
env.superblock.format(superblock_format_callback, &env.superblock_context, .{
.cluster = cluster,
.release = 1,
.replica = replica,
.replica_count = replica_count,
});
Expand Down
1 change: 1 addition & 0 deletions src/lsm/manifest_log_fuzz.zig
Expand Up @@ -370,6 +370,7 @@ const Environment = struct {
env.pending += 1;
env.manifest_log.superblock.format(format_superblock_callback, &env.superblock_context, .{
.cluster = 0,
.release = 1,
.replica = 0,
.replica_count = 6,
});
Expand Down
1 change: 1 addition & 0 deletions src/lsm/tree_fuzz.zig
Expand Up @@ -206,6 +206,7 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
env.change_state(.init, .superblock_format);
env.superblock.format(superblock_format_callback, &env.superblock_context, .{
.cluster = cluster,
.release = 1,
.replica = replica,
.replica_count = replica_count,
});
Expand Down
2 changes: 1 addition & 1 deletion src/simulator.zig
Expand Up @@ -736,7 +736,7 @@ pub const Simulator = struct {
// Make sure that there is capacity in the client's request queue.
if (client.messages_available == 0) return;
const request_message = client.get_message();
errdefer client.release(request_message);
errdefer client.release_message(request_message);

const request_metadata = simulator.workload.build_request(
client_index,
Expand Down
6 changes: 5 additions & 1 deletion src/testing/cluster.zig
Expand Up @@ -267,6 +267,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
allocator,
.{
.cluster = options.cluster_id,
.release = 1, // TODO Use real release number.
// TODO(zig) It should be possible to remove the `@as(u8,...)`.
.replica = @as(u8, @intCast(replica_index)),
.replica_count = options.replica_count,
Expand Down Expand Up @@ -425,6 +426,9 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
.time = .{ .time = &cluster.replica_times[replica_index] },
.state_machine_options = cluster.options.state_machine,
.message_bus_options = .{ .network = cluster.network },
// TODO Use "real" release numbers.
.release = 1,
.release_client_min = 1,
},
);
assert(replica.cluster == cluster.options.cluster_id);
Expand All @@ -448,7 +452,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
const message = request_message.build(.request);

message.header.* = .{
.release = 1, // TODO Use the real release number.
.release = client.release,
.client = client.id,
.request = undefined, // Set by client.raw_request.
.cluster = client.cluster,
Expand Down
4 changes: 4 additions & 0 deletions src/tigerbeetle/main.zig
Expand Up @@ -58,6 +58,7 @@ pub fn main() !void {
.cluster = args.cluster,
.replica = args.replica,
.replica_count = args.replica_count,
.release = 1, // TODO Use real release number.
}, args.path),
.start => |*args| try Command.start(&arena, args),
.version => |*args| try Command.version(allocator, args.verbose),
Expand Down Expand Up @@ -179,6 +180,9 @@ const Command = struct {
var replica: Replica = undefined;
replica.open(allocator, .{
.node_count = @intCast(args.addresses.len),
// TODO Use real release numbers.
.release = 1,
.release_client_min = 1,
.storage_size_limit = args.storage_size_limit,
.storage = &command.storage,
.aof = &aof,
Expand Down
22 changes: 13 additions & 9 deletions src/vsr/client.zig
Expand Up @@ -140,6 +140,9 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
/// The number of replicas in the cluster.
replica_count: u8,

/// Only tests should ever override the release.
release: u16 = 1, // TODO Use real release number.

/// The total number of ticks elapsed since the client was initialized.
ticks: u64 = 0,

Expand Down Expand Up @@ -245,7 +248,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {

pub fn deinit(self: *Self, allocator: std.mem.Allocator) void {
while (self.request_queue.pop()) |inflight| {
self.release(inflight.message.base());
self.release_message(inflight.message.base());
}
assert(self.messages_available == constants.client_request_queue_max);
self.demux_pool.deinit(allocator);
Expand Down Expand Up @@ -364,7 +367,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
// Unable to batch the events to an existing Message so reserve a new one.
if (self.messages_available == 0) return error.TooManyOutstanding;
const message = self.get_message();
errdefer self.release(message);
errdefer self.release_message(message);

// We will set parent, session, view and checksums only when sending for the first time:
const message_request = message.build(.request);
Expand All @@ -373,7 +376,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
.request = undefined,
.cluster = self.cluster,
.command = .request,
.release = 1, // TODO Use the real release number.
.release = self.release,
.operation = vsr.Operation.from(StateMachine, operation),
.size = @intCast(@sizeOf(Header) + body_size),
};
Expand Down Expand Up @@ -502,7 +505,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
/// Acquires a message from the message bus.
/// The caller must ensure that a message is available.
///
/// Either use it in `client.raw_request()` or discard via `client.release()`,
/// Either use it in `client.raw_request()` or discard via `client.release_message()`,
/// the reference is not guaranteed to be valid after both actions.
/// Do NOT use the reference counter function `message.ref()` for storing the message.
pub fn get_message(self: *Self) *Message {
Expand All @@ -513,7 +516,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
}

/// Releases a message back to the message bus.
pub fn release(self: *Self, message: *Message) void {
pub fn release_message(self: *Self, message: *Message) void {
assert(self.messages_available < constants.client_request_queue_max);
self.messages_available += 1;

Expand Down Expand Up @@ -574,6 +577,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
assert(reply.header.valid_checksum());
assert(reply.header.valid_checksum_body(reply.body()));
assert(reply.header.command == .reply);
assert(reply.header.release == self.release);

if (reply.header.client != self.id) {
log.debug("{}: on_reply: ignoring (wrong client={})", .{
Expand Down Expand Up @@ -608,7 +612,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {

// Eagerly release request message, to ensure that user's callback can submit a new
// request.
self.release(inflight.message.base());
self.release_message(inflight.message.base());
assert(self.messages_available > 0);

// Even though we release our reference to the message, we might have another one
Expand Down Expand Up @@ -700,7 +704,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
const ping = Header.PingClient{
.command = .ping_client,
.cluster = self.cluster,
.release = 1, // TODO Use the real release number.
.release = self.release,
.client = self.id,
};

Expand Down Expand Up @@ -762,7 +766,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
};

const message = self.get_message().build(.request);
errdefer self.release(message);
errdefer self.release_message(message);

// We will set parent, session, view and checksums only when sending for the first time:
message.header.* = .{
Expand All @@ -771,7 +775,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
.cluster = self.cluster,
.command = .request,
.operation = .register,
.release = 1, // TODO Use the real release number.
.release = self.release,
};

assert(self.request_number == 0);
Expand Down
2 changes: 2 additions & 0 deletions src/vsr/message_header.zig
Expand Up @@ -1184,6 +1184,8 @@ pub const Header = extern struct {
pub const Reason = enum(u8) {
reserved = 0,
no_session = 1,
release_too_low = 2,
release_too_high = 3,

comptime {
for (std.enums.values(Reason), 0..) |reason, index| {
Expand Down
71 changes: 64 additions & 7 deletions src/vsr/replica.zig
Expand Up @@ -191,6 +191,25 @@ pub fn ReplicaType(
/// More than half of replica_count.
quorum_majority: u8,

/// The version of code that is running right now.
///
/// Invariants:
/// - release_client_min > 0
///
/// Note that this is a property (rather than a constant) for the purpose of testing.
/// It should never be modified by a running replica.
release: u16,

/// The minimum (inclusive) client version that the replica will accept requests from.
///
/// Invariants:
/// - release_client_min > 0
/// - release_client_min ≥ release
///
/// Note that this is a property (rather than a constant) for the purpose of testing.
/// It should never be modified by a running replica.
release_client_min: u16,

/// A globally unique integer generated by a crypto rng during replica process startup.
/// Presently, it is used to detect outdated start view messages in recovering head status.
nonce: Nonce,
Expand Down Expand Up @@ -466,13 +485,17 @@ pub fn ReplicaType(
state_machine_options: StateMachine.Options,
message_bus_options: MessageBus.Options,
grid_cache_blocks_count: u32 = Grid.Cache.value_count_max_multiple,
release: u16,
release_client_min: u16,
};

/// Initializes and opens the provided replica using the options.
pub fn open(self: *Self, parent_allocator: std.mem.Allocator, options: OpenOptions) !void {
assert(options.storage_size_limit <= constants.storage_size_limit_max);
assert(options.storage_size_limit % constants.sector_size == 0);
assert(options.nonce != 0);
assert(options.release > 0);
assert(options.release >= options.release_client_min);

self.static_allocator = StaticAllocator.init(parent_allocator);
const allocator = self.static_allocator.allocator();
Expand Down Expand Up @@ -523,6 +546,8 @@ pub fn ReplicaType(
.state_machine_options = options.state_machine_options,
.message_bus_options = options.message_bus_options,
.grid_cache_blocks_count = options.grid_cache_blocks_count,
.release = options.release,
.release_client_min = options.release_client_min,
});

// Disable all dynamic allocation from this point onwards.
Expand Down Expand Up @@ -820,6 +845,8 @@ pub fn ReplicaType(
message_bus_options: MessageBus.Options,
state_machine_options: StateMachine.Options,
grid_cache_blocks_count: u32,
release: u16,
release_client_min: u16,
};

/// NOTE: self.superblock must be initialized and opened prior to this call.
Expand Down Expand Up @@ -947,6 +974,8 @@ pub fn ReplicaType(
.quorum_view_change = quorum_view_change,
.quorum_nack_prepare = quorum_nack_prepare,
.quorum_majority = quorum_majority,
.release = options.release,
.release_client_min = options.release_client_min,
.nonce = options.nonce,
// Copy the (already-initialized) time back, to avoid regressing the monotonic
// clock guard.
Expand Down Expand Up @@ -1047,11 +1076,13 @@ pub fn ReplicaType(
.aof = options.aof,
};

log.debug("{}: init: replica_count={} quorum_view_change={} quorum_replication={}", .{
log.debug("{}: init: replica_count={} quorum_view_change={} quorum_replication={} " ++
"release={}", .{
self.replica,
self.replica_count,
self.quorum_view_change,
self.quorum_replication,
self.release,
});
assert(self.status == .recovering);
}
Expand Down Expand Up @@ -1259,7 +1290,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pongs while the view is being updated.
.release = 1, // TODO Use the real release number.
.release = self.release,
// Copy the ping's monotonic timestamp to our pong and add our wall clock sample:
.ping_timestamp_monotonic = message.header.ping_timestamp_monotonic,
.pong_timestamp_wall = @bitCast(self.clock.realtime()),
Expand Down Expand Up @@ -1295,7 +1326,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.replica = self.replica,
.view = self.view,
.release = 1, // TODO Use the real release number.
.release = self.release,
}));
}

Expand Down Expand Up @@ -2544,7 +2575,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pings while the view is being updated.
.release = 1, // TODO Use the real release number.
.release = self.release,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_op = self.op_checkpoint(),
.ping_timestamp_monotonic = self.clock.monotonic(),
Expand Down Expand Up @@ -3688,6 +3719,8 @@ pub fn ReplicaType(
assert(self.status == .normal or self.status == .view_change or
(self.status == .recovering and self.solo()));
assert(self.client_replies.writes.available() > 0);
assert(self.superblock.working.vsr_state.checkpoint.release ==
self.release);
assert(prepare.header.command == .prepare);
assert(prepare.header.operation != .root);
assert(prepare.header.operation != .reserved);
Expand Down Expand Up @@ -3798,7 +3831,7 @@ pub fn ReplicaType(
.cluster = prepare.header.cluster,
.replica = prepare.header.replica,
.view = prepare.header.view,
.release = 1, // TODO Use the real release number.
.release = prepare.header.release,
.op = prepare.header.op,
.timestamp = prepare.header.timestamp,
.commit = prepare.header.op,
Expand Down Expand Up @@ -4329,6 +4362,28 @@ pub fn ReplicaType(
return true;
}

if (message.header.release < self.release_client_min) {
log.warn("{}: on_request: ignoring invalid version (client={} version={}<{})", .{
self.replica,
message.header.client,
message.header.release,
self.release_client_min,
});
self.send_eviction_message_to_client(message.header.client, .release_too_low);
return true;
}

if (message.header.release > self.release) {
log.warn("{}: on_request: ignoring invalid version (client={} version={}>{})", .{
self.replica,
message.header.client,
message.header.release,
self.release,
});
self.send_eviction_message_to_client(message.header.client, .release_too_high);
return true;
}

if (!message.header.operation.valid(StateMachine)) {
// Some possible causes:
// - client bug
Expand Down Expand Up @@ -5193,7 +5248,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.size = request_header.size,
.view = self.view,
.release = 1, // TODO Use the real release number.
.release = request_header.release,
.command = .prepare,
.replica = self.replica,
.parent = latest_entry.checksum,
Expand Down Expand Up @@ -6446,7 +6501,7 @@ pub fn ReplicaType(
self.send_header_to_client(client, @bitCast(Header.Eviction{
.command = .eviction,
.cluster = self.cluster,
.release = 1, // TODO Use the real release number.
.release = self.release,
.replica = self.replica,
.view = self.view,
.client = client,
Expand Down Expand Up @@ -6576,6 +6631,7 @@ pub fn ReplicaType(
}

assert(message.header.cluster == self.cluster);
assert(message.header.release <= self.release);

if (message.header.command == .block) {
assert(message.header.protocol <= vsr.Version);
Expand Down Expand Up @@ -8485,6 +8541,7 @@ pub fn ReplicaType(
assert(message.header.view <= self.view);
assert(message.header.op <= self.op);
assert(message.header.op >= self.op_repair_min());
assert(message.header.release <= self.release);

if (!self.journal.has(message.header)) {
log.debug("{}: write_prepare: ignoring op={} checksum={} (header changed)", .{
Expand Down
1 change: 1 addition & 0 deletions src/vsr/replica_format.zig
Expand Up @@ -238,6 +238,7 @@ test "format" {

try format(Storage, allocator, .{
.cluster = cluster,
.release = 1,
.replica = replica,
.replica_count = replica_count,
}, &storage, &superblock);
Expand Down

0 comments on commit 4763f93

Please sign in to comment.