Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSR: Move client sessions from superblock to grid #1356

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 20 additions & 38 deletions docs/internals/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@ State sync is used when when a lagging replica's log no longer intersects with t
(VRR refers to state sync as "state transfer", but we already have [transfers](../reference/transfers.md) elsewhere.)

In the context of state sync, "state" refers to:
1. the superblock free set
1. the superblock client sessions
3. the superblock `vsr_state.checkpoint.manifest_{head,tail}_{address,checksum}`
4. the superblock `vsr_state.checkpoint.commit_min`
5. the superblock `vsr_state.checkpoint.commit_min_checksum`
6. the grid (manifest blocks)
7. the grid (LSM table data; acquired blocks only)
8. client replies

State sync consists of three protocols:
- [Sync Superblock](./vsr.md#protocol-sync-superblock) (syncs 1-5)
- [Sync Forest](./vsr.md#protocol-sync-forest) (syncs 6-7)
- [Sync Client Replies](./vsr.md#protocol-sync-client-replies) (syncs 8)
1. the superblock `vsr_state.checkpoint`
2. the grid (manifest, free set, and client sessions blocks)
3. the grid (LSM table data; acquired blocks only)
4. client replies

State sync consists of four protocols:
- [Sync Superblock](./vsr.md#protocol-sync-superblock) (syncs 1)
- [Repair Grid](./vsr.md#protocol-repair-grid) (syncs 2)
- [Sync Forest](./vsr.md#protocol-sync-forest) (syncs 3)
- [Sync Client Replies](./vsr.md#protocol-sync-client-replies) (syncs 4)

The target of superblock-sync is the latest checkpoint of the healthy cluster.
When we catch up to the latest checkpoint (or very close to it), then we can transition back to a healthy state.
Expand All @@ -51,25 +48,22 @@ Checkpoints:
3. Wait for grid IO to finish. (See `Grid.cancel()`.)
4. Wait for a usable sync target to arrive. (Usually we already have one.)
5. Begin [sync-superblock protocol](./vsr.md#protocol-sync-superblock).
6. [Request superblock trailers](#6-request-superblock-trailers).
7. Update superblock headers:
6. [Request superblock checkpoint state](#6-request-superblock-checkpoint-state).
7. Update the superblock headers with:
- Bump `vsr_state.checkpoint.commit_min`/`vsr_state.checkpoint.commit_min_checksum` to the sync target op/op-checksum.
- Bump `vsr_state.checkpoint.previous_checkpoint_id` to the checkpoint id that is previous to our sync target (i.e. it isn't _our_ previous checkpoint).
- Bump `replica.commit_min`. (If `replica.commit_min` exceeds `replica.op`, transition to `status=recovering_head`).
- Write the target checkpoint's trailers.
8. Request and write manifest blocks. (Handled by [Grid Repair Protocol](./vsr.md#protocol-repair-grid).)
9. Update the superblock with:
- Set `vsr_state.sync_op_min` to the minimum op which has not been repaired.
- Set `vsr_state.sync_op_max` to the maximum op which has not been repaired.
10. Sync-superblock protocol is done.
11. Repair [replies](./vsr.md#protocol-sync-client-replies), [manifest blocks, and table blocks](./vsr.md#protocol-sync-forest) that were created within the `sync_op_{min,max}` range.
12. Update the superblock with:
8. Sync-superblock protocol is done.
9. Repair [replies](./vsr.md#protocol-sync-client-replies), [free set, client sessions, and manifest blocks](./vsr.md#protocol-repair-grid), and [table blocks](./vsr.md#protocol-sync-forest) that were created within the `sync_op_{min,max}` range.
10. Update the superblock with:
- Set `vsr_state.sync_op_min = 0`
- Set `vsr_state.sync_op_max = 0`

If a newer sync target is discovered during steps *5*-*8* or *11*, go to step *4*.
If a newer sync target is discovered during steps *5*-*6* or *9*, go to step *4*.

If the replica starts up with `vsr_state.sync_op_max ≠ 0`, go to step *11*.
If the replica starts up with `vsr_state.sync_op_max ≠ 0`, go to step *9*.

### 0: Scenarios

Expand All @@ -96,19 +90,9 @@ State sync is initially triggered by any of the following:
- a WAL or grid repair is in progress and,
- the replica's checkpoint is lagging behind the cluster's (far enough that the repair may never complete).

### 6: Request Superblock Trailers

The replica concurrently sends out three request messages, with the sync target identifier attached to each:

1. `command=request_sync_checkpoint`
2. `command=request_sync_client_sessions`

Replicas with a matching checkpoint identifier reply (respectively) with:

1. `command=sync_checkpoint`
2. `command=sync_client_sessions`
### 6: Request Superblock Checkpoint State

If a trailer is too large to fit in a message, the syncing replica requests it again, with a byte offset.
The syncing replica sends `command=request_sync_checkpoint` messages (with the sync target identifier attached to each) until it receives a `command=sync_checkpoint` with a matching checkpoint identifier.

## Concepts
### Syncing Replica
Expand Down Expand Up @@ -151,17 +135,15 @@ Having 2/3 replicas syncing means that a single grid-block corruption on the pri

### Checkpoint Identifier

A _checkpoint id_ is a hash of the superblock trailers.
A _checkpoint id_ is a hash of the superblock `CheckpointState`.

A checkpoint identifier is attached to the following message types:
- `command=commit`: Current checkpoint identifier of sender.
- `command=ping`: Current checkpoint identifier of sender.
- `command=prepare`: The attached checkpoint id is the checkpoint id during which the corresponding prepare was originally prepared.
- `command=prepare_ok`: The attached checkpoint id is the checkpoint id during which the corresponding prepare was originally prepared.
- `command=request_sync_checkpoint`: Requested checkpoint identifier.
- `command=request_sync_client_sessions`: Requested checkpoint identifier.
- `command=sync_checkpoint`: Current checkpoint identifier of sender.
- `command=sync_client_sessions`: Current checkpoint identifier of sender.

### Canonical Checkpoint

Expand Down
2 changes: 0 additions & 2 deletions docs/internals/vsr.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ Storage:
| `block` | replica | replica | [Sync Forest](#protocol-sync-forest), [Repair Grid](#protocol-repair-grid) |
| `request_sync_checkpoint` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |
| `sync_checkpoint` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |
| `request_sync_client_sessions` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |
| `sync_client_sessions` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |

### Recovery

Expand Down
4 changes: 0 additions & 4 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ const ConfigProcess = struct {
grid_missing_tables_max: usize = 3,
aof_record: bool = false,
aof_recovery: bool = false,
/// When null, this defaults to message_body_size_max.
sync_trailer_message_body_size_max: ?usize = null,
};

/// Configurations which are tunable per-cluster.
Expand Down Expand Up @@ -248,8 +246,6 @@ pub const configs = struct {
.grid_missing_blocks_max = 3,
.grid_missing_tables_max = 2,
.verify = true,
// Set to a small value to ensure the multipart trailer sync is easily tested.
.sync_trailer_message_body_size_max = 129,
},
.cluster = .{
.clients_max = 4 + 3,
Expand Down
17 changes: 0 additions & 17 deletions src/constants.zig
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,6 @@ comptime {
assert(message_body_size_max >= @sizeOf(vsr.CheckpointState));
}

/// The maximum body size of:
/// - command=sync_client_sessions
///
/// In practice, this should always be as high as possible to minimize the number of round trips
/// required for sync.
/// It is configurable to make testing multipart trailers simple.
pub const sync_trailer_message_body_size_max =
config.process.sync_trailer_message_body_size_max orelse
message_body_size_max;

comptime {
assert(sync_trailer_message_body_size_max > 0);
assert(sync_trailer_message_body_size_max <= message_body_size_max);
}

/// The maximum number of Viewstamped Replication prepare messages that can be inflight at a time.
/// This is immutable once assigned per cluster, as replicas need to know how many operations might
/// possibly be uncommitted during a view change, and this must be constant for all replicas.
Expand Down Expand Up @@ -446,8 +431,6 @@ comptime {
///
/// The superblock contains local state for the replica and therefore cannot be replicated remotely.
/// Loss of the superblock would represent loss of the replica and so it must be protected.
/// Since each superblock copy also copies the superblock trailer (around 33 MiB), setting this
/// beyond 4 copies (or decreasing block_size < 64 KiB) can result in a superblock zone > 264 MiB.
///
/// This can mean checkpointing latencies in the rare extreme worst-case of at most 264ms, although
/// this would require EWAH compression of our block free set to have zero effective compression.
Expand Down
21 changes: 6 additions & 15 deletions src/lsm/forest_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -250,24 +250,15 @@ const Environment = struct {
env.grid.checkpoint(grid_checkpoint_callback);
try env.tick_until_state_change(.grid_checkpoint, .superblock_checkpoint);

{
// VSRState.monotonic() asserts that the previous_checkpoint id changes.
// In a normal replica this is guaranteed – even if the LSM is idle and no blocks
// are acquired or released, the client sessions are necessarily mutated.
var reply = std.mem.zeroInit(vsr.Header.Reply, .{
.cluster = cluster,
.command = .reply,
.op = env.checkpoint_op.?,
.commit = env.checkpoint_op.?,
});
reply.set_checksum_body(&.{});
reply.set_checksum();

_ = env.superblock.client_sessions.put(1, &reply);
}
env.superblock.checkpoint(superblock_checkpoint_callback, &env.superblock_context, .{
.manifest_references = env.forest.manifest_log.checkpoint_references(),
.free_set_reference = env.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = .{
.last_block_checksum = 0,
.last_block_address = 0,
.trailer_size = 0,
.checksum = vsr.checksum(&.{}),
},
.commit_min_checksum = env.superblock.working.vsr_state.checkpoint.commit_min_checksum + 1,
.commit_min = env.checkpoint_op.?,
.commit_max = env.checkpoint_op.? + 1,
Expand Down
22 changes: 6 additions & 16 deletions src/lsm/manifest_log_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -464,29 +464,19 @@ const Environment = struct {

const vsr_state = &env.manifest_log.superblock.working.vsr_state;

{
// VSRState.monotonic() asserts that the previous_checkpoint id changes.
// In a normal replica this is guaranteed – even if the LSM is idle and no blocks
// are acquired or released, the client sessions are necessarily mutated.
var reply = std.mem.zeroInit(vsr.Header.Reply, .{
.cluster = 0,
.command = .reply,
.op = vsr_state.checkpoint.commit_min + 1,
.commit = vsr_state.checkpoint.commit_min + 1,
});
reply.set_checksum_body(&.{});
reply.set_checksum();

_ = env.manifest_log.superblock.client_sessions.put(1, &reply);
}

env.pending += 1;
env.manifest_log.superblock.checkpoint(
checkpoint_superblock_callback,
&env.superblock_context,
.{
.manifest_references = env.manifest_log.checkpoint_references(),
.free_set_reference = env.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = .{
.last_block_checksum = 0,
.last_block_address = 0,
.trailer_size = 0,
.checksum = vsr.checksum(&.{}),
},
.commit_min_checksum = vsr_state.checkpoint.commit_min_checksum + 1,
.commit_min = vsr.Checkpoint.checkpoint_after(vsr_state.checkpoint.commit_min),
.commit_max = vsr.Checkpoint.checkpoint_after(vsr_state.commit_max),
Expand Down
50 changes: 29 additions & 21 deletions src/lsm/schema.zig
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ pub const BlockType = enum(u8) {
reserved = 0,

free_set = 1,
manifest = 2,
index = 3,
data = 4,
client_sessions = 2,
manifest = 3,
index = 4,
data = 5,

pub fn valid(block_type: BlockType) bool {
_ = std.meta.intToEnum(BlockType, @intFromEnum(block_type)) catch return false;
Expand Down Expand Up @@ -343,13 +344,12 @@ pub const TableData = struct {
}
};

pub const FreeSetNode = struct {
pub const Word = u64;

/// A TrailerNode is either a `BlockType.free_set` or `BlockType.client_sessions`.
pub const TrailerNode = struct {
pub const Metadata = extern struct {
previous_free_set_block_checksum: u128,
previous_free_set_block_checksum_padding: u128 = 0,
previous_free_set_block_address: u64,
previous_trailer_block_checksum: u128,
previous_trailer_block_checksum_padding: u128 = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, do we need this padding? I think we switched all pure checksumms to be unpadded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this actually is a pure checksum -- it is referring to a block, which would be encrypted, so the "checksum" would be the AEAD tag.

previous_trailer_block_address: u64,
reserved: [56]u8 = .{0} ** 56,

comptime {
Expand All @@ -361,20 +361,30 @@ pub const FreeSetNode = struct {
fn metadata(free_set_block: BlockPtrConst) *const Metadata {
const header = header_from_block(free_set_block);
assert(header.command == .block);
assert(header.block_type == .free_set);
assert(header.block_type == .free_set or header.block_type == .client_sessions);
assert(header.address > 0);
assert(header.snapshot == 0);

const header_metadata = std.mem.bytesAsValue(Metadata, &header.metadata_bytes);
assert(header_metadata.previous_free_set_block_checksum_padding == 0);
assert(header_metadata.previous_trailer_block_checksum_padding == 0);
assert(stdx.zeroed(&header_metadata.reserved));

if (header_metadata.previous_free_set_block_address == 0) {
assert(header_metadata.previous_free_set_block_checksum == 0);
if (header_metadata.previous_trailer_block_address == 0) {
assert(header_metadata.previous_trailer_block_checksum == 0);
}

assert(header.size > @sizeOf(vsr.Header));
assert((header.size - @sizeOf(vsr.Header)) % @sizeOf(Word) == 0);

switch (header.block_type) {
.free_set => {
assert((header.size - @sizeOf(vsr.Header)) % @sizeOf(u64) == 0);
},
.client_sessions => {
assert((header.size - @sizeOf(vsr.Header)) %
(@sizeOf(vsr.Header) + @sizeOf(u64)) == 0);
},
else => unreachable,
}

return header_metadata;
}
Expand All @@ -386,20 +396,18 @@ pub const FreeSetNode = struct {
pub fn previous(free_set_block: BlockPtrConst) ?BlockReference {
const header_metadata = metadata(free_set_block);

if (header_metadata.previous_free_set_block_address == 0) {
assert(header_metadata.previous_free_set_block_checksum == 0);
if (header_metadata.previous_trailer_block_address == 0) {
assert(header_metadata.previous_trailer_block_checksum == 0);
return null;
} else {
return .{
.checksum = header_metadata.previous_free_set_block_checksum,
.address = header_metadata.previous_free_set_block_address,
.checksum = header_metadata.previous_trailer_block_checksum,
.address = header_metadata.previous_trailer_block_address,
};
}
}

pub fn encoded_words(block: BlockPtrConst) []align(@alignOf(Word)) const u8 {
assert_valid_header(block);

pub fn body(block: BlockPtrConst) []align(@sizeOf(vsr.Header)) const u8 {
const header = header_from_block(block);
return block[@sizeOf(vsr.Header)..header.size];
}
Expand Down
1 change: 0 additions & 1 deletion src/lsm/table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ pub fn TableType(
assert(stdx.no_padding(Value));

// These impact our calculation of:
// * the superblock trailer size, and
// * the manifest log layout for alignment.
assert(key_size >= 8);
assert(key_size <= 32);
Expand Down
22 changes: 6 additions & 16 deletions src/lsm/tree_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -275,22 +275,6 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
pub fn checkpoint(env: *Environment, op: u64) void {
env.tree.assert_between_bars();

{
// VSRState.monotonic() asserts that the previous_checkpoint id changes.
// In a normal replica this is guaranteed – even if the LSM is idle and no blocks
// are acquired or released, the client sessions are necessarily mutated.
var reply = std.mem.zeroInit(vsr.Header.Reply, .{
.cluster = cluster,
.command = .reply,
.op = op,
.commit = op,
});
reply.set_checksum_body(&.{});
reply.set_checksum();

_ = env.superblock.client_sessions.put(1, &reply);
}

env.grid.checkpoint(grid_checkpoint_callback);
env.change_state(.fuzzing, .grid_checkpoint);
env.tick_until_state_change(.grid_checkpoint, .fuzzing);
Expand All @@ -299,6 +283,12 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
env.superblock.checkpoint(superblock_checkpoint_callback, &env.superblock_context, .{
.manifest_references = std.mem.zeroes(vsr.SuperBlockManifestReferences),
.free_set_reference = env.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = .{
.last_block_checksum = 0,
.last_block_address = 0,
.trailer_size = 0,
.checksum = vsr.checksum(&.{}),
},
.commit_min_checksum = env.superblock.working.vsr_state.checkpoint.commit_min_checksum + 1,
.commit_min = checkpoint_op,
.commit_max = checkpoint_op + 1,
Expand Down
2 changes: 0 additions & 2 deletions src/message_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ pub const MessagePool = struct {
pub const Block = MessageType(.block);
pub const RequestSyncCheckpoint = MessageType(.request_sync_checkpoint);
pub const SyncCheckpoint = MessageType(.sync_checkpoint);
pub const RequestSyncClientSessions = MessageType(.request_sync_client_sessions);
pub const SyncClientSessions = MessageType(.sync_client_sessions);

// TODO Avoid the extra level of indirection.
// (https://github.com/tigerbeetle/tigerbeetle/pull/1295#discussion_r1394265250)
Expand Down
2 changes: 1 addition & 1 deletion src/testing/cluster.zig
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
assert(cluster.replica_diverged.isSet(event_data.replica));
},
.sync_stage_changed => switch (replica.syncing) {
.requesting_trailers => {
.requesting_checkpoint => {
cluster.log_replica(.sync_commenced, replica.replica);
cluster.sync_checker.replica_sync_start(replica);
},
Expand Down