Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSR: SuperBlock.VSRState.CheckpointState.release #1666

Merged
merged 7 commits into from Mar 8, 2024
Merged
2 changes: 1 addition & 1 deletion src/aof.zig
Expand Up @@ -349,7 +349,7 @@ pub const AOFReplayClient = struct {
if (header.operation.vsr_reserved()) continue;

const message = self.client.get_message().build(.request);
errdefer self.client.release(message.base());
errdefer self.client.release_message(message.base());

assert(self.inflight_message == null);
self.inflight_message = message;
Expand Down
1 change: 1 addition & 0 deletions src/lsm/forest_fuzz.zig
Expand Up @@ -166,6 +166,7 @@ const Environment = struct {
env.change_state(.init, .superblock_format);
env.superblock.format(superblock_format_callback, &env.superblock_context, .{
.cluster = cluster,
.release = 1,
.replica = replica,
.replica_count = replica_count,
});
Expand Down
1 change: 1 addition & 0 deletions src/lsm/manifest_log_fuzz.zig
Expand Up @@ -370,6 +370,7 @@ const Environment = struct {
env.pending += 1;
env.manifest_log.superblock.format(format_superblock_callback, &env.superblock_context, .{
.cluster = 0,
.release = 1,
.replica = 0,
.replica_count = 6,
});
Expand Down
1 change: 1 addition & 0 deletions src/lsm/tree_fuzz.zig
Expand Up @@ -206,6 +206,7 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
env.change_state(.init, .superblock_format);
env.superblock.format(superblock_format_callback, &env.superblock_context, .{
.cluster = cluster,
.release = 1,
.replica = replica,
.replica_count = replica_count,
});
Expand Down
2 changes: 1 addition & 1 deletion src/simulator.zig
Expand Up @@ -736,7 +736,7 @@ pub const Simulator = struct {
// Make sure that there is capacity in the client's request queue.
if (client.messages_available == 0) return;
const request_message = client.get_message();
errdefer client.release(request_message);
errdefer client.release_message(request_message);

const request_metadata = simulator.workload.build_request(
client_index,
Expand Down
6 changes: 5 additions & 1 deletion src/testing/cluster.zig
Expand Up @@ -267,6 +267,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
allocator,
.{
.cluster = options.cluster_id,
.release = 1, // TODO Use real release number.
// TODO(zig) It should be possible to remove the `@as(u8,...)`.
.replica = @as(u8, @intCast(replica_index)),
.replica_count = options.replica_count,
Expand Down Expand Up @@ -425,6 +426,9 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
.time = .{ .time = &cluster.replica_times[replica_index] },
.state_machine_options = cluster.options.state_machine,
.message_bus_options = .{ .network = cluster.network },
// TODO Use "real" release numbers.
.release = 1,
.release_client_min = 1,
},
);
assert(replica.cluster == cluster.options.cluster_id);
Expand All @@ -448,7 +452,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
const message = request_message.build(.request);

message.header.* = .{
.release = 1, // TODO Use the real release number.
.release = client.release,
.client = client.id,
.request = undefined, // Set by client.raw_request.
.cluster = client.cluster,
Expand Down
4 changes: 4 additions & 0 deletions src/tigerbeetle/main.zig
Expand Up @@ -58,6 +58,7 @@ pub fn main() !void {
.cluster = args.cluster,
.replica = args.replica,
.replica_count = args.replica_count,
.release = 1, // TODO Use real release number.
}, args.path),
.start => |*args| try Command.start(&arena, args),
.version => |*args| try Command.version(allocator, args.verbose),
Expand Down Expand Up @@ -179,6 +180,9 @@ const Command = struct {
var replica: Replica = undefined;
replica.open(allocator, .{
.node_count = @intCast(args.addresses.len),
// TODO Use real release numbers.
.release = 1,
.release_client_min = 1,
.storage_size_limit = args.storage_size_limit,
.storage = &command.storage,
.aof = &aof,
Expand Down
22 changes: 13 additions & 9 deletions src/vsr/client.zig
Expand Up @@ -140,6 +140,9 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
/// The number of replicas in the cluster.
replica_count: u8,

/// Only tests should ever override the release.
release: u16 = 1, // TODO Use real release number.

/// The total number of ticks elapsed since the client was initialized.
ticks: u64 = 0,

Expand Down Expand Up @@ -245,7 +248,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {

pub fn deinit(self: *Self, allocator: std.mem.Allocator) void {
while (self.request_queue.pop()) |inflight| {
self.release(inflight.message.base());
self.release_message(inflight.message.base());
}
assert(self.messages_available == constants.client_request_queue_max);
self.demux_pool.deinit(allocator);
Expand Down Expand Up @@ -364,7 +367,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
// Unable to batch the events to an existing Message so reserve a new one.
if (self.messages_available == 0) return error.TooManyOutstanding;
const message = self.get_message();
errdefer self.release(message);
errdefer self.release_message(message);

// We will set parent, session, view and checksums only when sending for the first time:
const message_request = message.build(.request);
Expand All @@ -373,7 +376,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
.request = undefined,
.cluster = self.cluster,
.command = .request,
.release = 1, // TODO Use the real release number.
.release = self.release,
.operation = vsr.Operation.from(StateMachine, operation),
.size = @intCast(@sizeOf(Header) + body_size),
};
Expand Down Expand Up @@ -502,7 +505,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
/// Acquires a message from the message bus.
/// The caller must ensure that a message is available.
///
/// Either use it in `client.raw_request()` or discard via `client.release()`,
/// Either use it in `client.raw_request()` or discard via `client.release_message()`,
/// the reference is not guaranteed to be valid after both actions.
/// Do NOT use the reference counter function `message.ref()` for storing the message.
pub fn get_message(self: *Self) *Message {
Expand All @@ -513,7 +516,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
}

/// Releases a message back to the message bus.
pub fn release(self: *Self, message: *Message) void {
pub fn release_message(self: *Self, message: *Message) void {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer release_message to release even ignoring the name collision -- it mirrors get_message.

assert(self.messages_available < constants.client_request_queue_max);
self.messages_available += 1;

Expand Down Expand Up @@ -574,6 +577,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
assert(reply.header.valid_checksum());
assert(reply.header.valid_checksum_body(reply.body()));
assert(reply.header.command == .reply);
assert(reply.header.release == self.release);

if (reply.header.client != self.id) {
log.debug("{}: on_reply: ignoring (wrong client={})", .{
Expand Down Expand Up @@ -608,7 +612,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {

// Eagerly release request message, to ensure that user's callback can submit a new
// request.
self.release(inflight.message.base());
self.release_message(inflight.message.base());
assert(self.messages_available > 0);

// Even though we release our reference to the message, we might have another one
Expand Down Expand Up @@ -700,7 +704,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
const ping = Header.PingClient{
.command = .ping_client,
.cluster = self.cluster,
.release = 1, // TODO Use the real release number.
.release = self.release,
.client = self.id,
};

Expand Down Expand Up @@ -762,7 +766,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
};

const message = self.get_message().build(.request);
errdefer self.release(message);
errdefer self.release_message(message);

// We will set parent, session, view and checksums only when sending for the first time:
message.header.* = .{
Expand All @@ -771,7 +775,7 @@ pub fn Client(comptime StateMachine_: type, comptime MessageBus: type) type {
.cluster = self.cluster,
.command = .request,
.operation = .register,
.release = 1, // TODO Use the real release number.
.release = self.release,
};

assert(self.request_number == 0);
Expand Down
2 changes: 2 additions & 0 deletions src/vsr/message_header.zig
Expand Up @@ -1184,6 +1184,8 @@ pub const Header = extern struct {
pub const Reason = enum(u8) {
reserved = 0,
no_session = 1,
release_too_low = 2,
release_too_high = 3,

comptime {
for (std.enums.values(Reason), 0..) |reason, index| {
Expand Down
71 changes: 64 additions & 7 deletions src/vsr/replica.zig
Expand Up @@ -191,6 +191,25 @@ pub fn ReplicaType(
/// More than half of replica_count.
quorum_majority: u8,

/// The version of code that is running right now.
///
/// Invariants:
/// - release_client_min > 0
///
/// Note that this is a property (rather than a constant) for the purpose of testing.
/// It should never be modified by a running replica.
release: u16,

/// The minimum (inclusive) client version that the replica will accept requests from.
///
/// Invariants:
/// - release_client_min > 0
/// - release_client_min ≥ release
///
/// Note that this is a property (rather than a constant) for the purpose of testing.
/// It should never be modified by a running replica.
release_client_min: u16,

/// A globally unique integer generated by a crypto rng during replica process startup.
/// Presently, it is used to detect outdated start view messages in recovering head status.
nonce: Nonce,
Expand Down Expand Up @@ -466,13 +485,17 @@ pub fn ReplicaType(
state_machine_options: StateMachine.Options,
message_bus_options: MessageBus.Options,
grid_cache_blocks_count: u32 = Grid.Cache.value_count_max_multiple,
release: u16,
release_client_min: u16,
};

/// Initializes and opens the provided replica using the options.
pub fn open(self: *Self, parent_allocator: std.mem.Allocator, options: OpenOptions) !void {
assert(options.storage_size_limit <= constants.storage_size_limit_max);
assert(options.storage_size_limit % constants.sector_size == 0);
assert(options.nonce != 0);
assert(options.release > 0);
assert(options.release >= options.release_client_min);

self.static_allocator = StaticAllocator.init(parent_allocator);
const allocator = self.static_allocator.allocator();
Expand Down Expand Up @@ -523,6 +546,8 @@ pub fn ReplicaType(
.state_machine_options = options.state_machine_options,
.message_bus_options = options.message_bus_options,
.grid_cache_blocks_count = options.grid_cache_blocks_count,
.release = options.release,
.release_client_min = options.release_client_min,
});

// Disable all dynamic allocation from this point onwards.
Expand Down Expand Up @@ -820,6 +845,8 @@ pub fn ReplicaType(
message_bus_options: MessageBus.Options,
state_machine_options: StateMachine.Options,
grid_cache_blocks_count: u32,
release: u16,
release_client_min: u16,
};

/// NOTE: self.superblock must be initialized and opened prior to this call.
Expand Down Expand Up @@ -947,6 +974,8 @@ pub fn ReplicaType(
.quorum_view_change = quorum_view_change,
.quorum_nack_prepare = quorum_nack_prepare,
.quorum_majority = quorum_majority,
.release = options.release,
.release_client_min = options.release_client_min,
.nonce = options.nonce,
// Copy the (already-initialized) time back, to avoid regressing the monotonic
// clock guard.
Expand Down Expand Up @@ -1047,11 +1076,13 @@ pub fn ReplicaType(
.aof = options.aof,
};

log.debug("{}: init: replica_count={} quorum_view_change={} quorum_replication={}", .{
log.debug("{}: init: replica_count={} quorum_view_change={} quorum_replication={} " ++
"release={}", .{
self.replica,
self.replica_count,
self.quorum_view_change,
self.quorum_replication,
self.release,
});
assert(self.status == .recovering);
}
Expand Down Expand Up @@ -1259,7 +1290,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pongs while the view is being updated.
.release = 1, // TODO Use the real release number.
.release = self.release,
// Copy the ping's monotonic timestamp to our pong and add our wall clock sample:
.ping_timestamp_monotonic = message.header.ping_timestamp_monotonic,
.pong_timestamp_wall = @bitCast(self.clock.realtime()),
Expand Down Expand Up @@ -1295,7 +1326,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.replica = self.replica,
.view = self.view,
.release = 1, // TODO Use the real release number.
.release = self.release,
}));
}

Expand Down Expand Up @@ -2544,7 +2575,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pings while the view is being updated.
.release = 1, // TODO Use the real release number.
.release = self.release,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_op = self.op_checkpoint(),
.ping_timestamp_monotonic = self.clock.monotonic(),
Expand Down Expand Up @@ -3688,6 +3719,8 @@ pub fn ReplicaType(
assert(self.status == .normal or self.status == .view_change or
(self.status == .recovering and self.solo()));
assert(self.client_replies.writes.available() > 0);
assert(self.superblock.working.vsr_state.checkpoint.release ==
self.release);
assert(prepare.header.command == .prepare);
assert(prepare.header.operation != .root);
assert(prepare.header.operation != .reserved);
Expand Down Expand Up @@ -3798,7 +3831,7 @@ pub fn ReplicaType(
.cluster = prepare.header.cluster,
.replica = prepare.header.replica,
.view = prepare.header.view,
.release = 1, // TODO Use the real release number.
.release = prepare.header.release,
.op = prepare.header.op,
.timestamp = prepare.header.timestamp,
.commit = prepare.header.op,
Expand Down Expand Up @@ -4329,6 +4362,28 @@ pub fn ReplicaType(
return true;
}

if (message.header.release < self.release_client_min) {
log.warn("{}: on_request: ignoring invalid version (client={} version={}<{})", .{
self.replica,
message.header.client,
message.header.release,
self.release_client_min,
});
self.send_eviction_message_to_client(message.header.client, .release_too_low);
return true;
}

if (message.header.release > self.release) {
log.warn("{}: on_request: ignoring invalid version (client={} version={}>{})", .{
self.replica,
message.header.client,
message.header.release,
self.release,
});
self.send_eviction_message_to_client(message.header.client, .release_too_high);
return true;
}

if (!message.header.operation.valid(StateMachine)) {
// Some possible causes:
// - client bug
Expand Down Expand Up @@ -5193,7 +5248,7 @@ pub fn ReplicaType(
.cluster = self.cluster,
.size = request_header.size,
.view = self.view,
.release = 1, // TODO Use the real release number.
.release = request_header.release,
.command = .prepare,
.replica = self.replica,
.parent = latest_entry.checksum,
Expand Down Expand Up @@ -6446,7 +6501,7 @@ pub fn ReplicaType(
self.send_header_to_client(client, @bitCast(Header.Eviction{
.command = .eviction,
.cluster = self.cluster,
.release = 1, // TODO Use the real release number.
.release = self.release,
.replica = self.replica,
.view = self.view,
.client = client,
Expand Down Expand Up @@ -6576,6 +6631,7 @@ pub fn ReplicaType(
}

assert(message.header.cluster == self.cluster);
assert(message.header.release <= self.release);

if (message.header.command == .block) {
assert(message.header.protocol <= vsr.Version);
Expand Down Expand Up @@ -8485,6 +8541,7 @@ pub fn ReplicaType(
assert(message.header.view <= self.view);
assert(message.header.op <= self.op);
assert(message.header.op >= self.op_repair_min());
assert(message.header.release <= self.release);

if (!self.journal.has(message.header)) {
log.debug("{}: write_prepare: ignoring op={} checksum={} (header changed)", .{
Expand Down
1 change: 1 addition & 0 deletions src/vsr/replica_format.zig
Expand Up @@ -238,6 +238,7 @@ test "format" {

try format(Storage, allocator, .{
.cluster = cluster,
.release = 1,
.replica = replica,
.replica_count = replica_count,
}, &storage, &superblock);
Expand Down