Skip to content

Commit

Permalink
Merge pull request #1356 from tigerbeetle/dj-vsr-no-superblock-trailers
Browse files Browse the repository at this point in the history
VSR: Move client sessions from superblock to grid
  • Loading branch information
sentientwaffle committed Dec 15, 2023
2 parents 61e2a61 + 0bf91e8 commit 928b36f
Show file tree
Hide file tree
Showing 28 changed files with 488 additions and 1,189 deletions.
58 changes: 20 additions & 38 deletions docs/internals/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@ State sync is used when when a lagging replica's log no longer intersects with t
(VRR refers to state sync as "state transfer", but we already have [transfers](../reference/transfers.md) elsewhere.)

In the context of state sync, "state" refers to:
1. the superblock free set
1. the superblock client sessions
3. the superblock `vsr_state.checkpoint.manifest_{head,tail}_{address,checksum}`
4. the superblock `vsr_state.checkpoint.commit_min`
5. the superblock `vsr_state.checkpoint.commit_min_checksum`
6. the grid (manifest blocks)
7. the grid (LSM table data; acquired blocks only)
8. client replies

State sync consists of three protocols:
- [Sync Superblock](./vsr.md#protocol-sync-superblock) (syncs 1-5)
- [Sync Forest](./vsr.md#protocol-sync-forest) (syncs 6-7)
- [Sync Client Replies](./vsr.md#protocol-sync-client-replies) (syncs 8)
1. the superblock `vsr_state.checkpoint`
2. the grid (manifest, free set, and client sessions blocks)
3. the grid (LSM table data; acquired blocks only)
4. client replies

State sync consists of four protocols:
- [Sync Superblock](./vsr.md#protocol-sync-superblock) (syncs 1)
- [Repair Grid](./vsr.md#protocol-repair-grid) (syncs 2)
- [Sync Forest](./vsr.md#protocol-sync-forest) (syncs 3)
- [Sync Client Replies](./vsr.md#protocol-sync-client-replies) (syncs 4)

The target of superblock-sync is the latest checkpoint of the healthy cluster.
When we catch up to the latest checkpoint (or very close to it), then we can transition back to a healthy state.
Expand All @@ -51,25 +48,22 @@ Checkpoints:
3. Wait for grid IO to finish. (See `Grid.cancel()`.)
4. Wait for a usable sync target to arrive. (Usually we already have one.)
5. Begin [sync-superblock protocol](./vsr.md#protocol-sync-superblock).
6. [Request superblock trailers](#6-request-superblock-trailers).
7. Update superblock headers:
6. [Request superblock checkpoint state](#6-request-superblock-checkpoint-state).
7. Update the superblock headers with:
- Bump `vsr_state.checkpoint.commit_min`/`vsr_state.checkpoint.commit_min_checksum` to the sync target op/op-checksum.
- Bump `vsr_state.checkpoint.previous_checkpoint_id` to the checkpoint id that is previous to our sync target (i.e. it isn't _our_ previous checkpoint).
- Bump `replica.commit_min`. (If `replica.commit_min` exceeds `replica.op`, transition to `status=recovering_head`).
- Write the target checkpoint's trailers.
8. Request and write manifest blocks. (Handled by [Grid Repair Protocol](./vsr.md#protocol-repair-grid).)
9. Update the superblock with:
- Set `vsr_state.sync_op_min` to the minimum op which has not been repaired.
- Set `vsr_state.sync_op_max` to the maximum op which has not been repaired.
10. Sync-superblock protocol is done.
11. Repair [replies](./vsr.md#protocol-sync-client-replies), [manifest blocks, and table blocks](./vsr.md#protocol-sync-forest) that were created within the `sync_op_{min,max}` range.
12. Update the superblock with:
8. Sync-superblock protocol is done.
9. Repair [replies](./vsr.md#protocol-sync-client-replies), [free set, client sessions, and manifest blocks](./vsr.md#protocol-repair-grid), and [table blocks](./vsr.md#protocol-sync-forest) that were created within the `sync_op_{min,max}` range.
10. Update the superblock with:
- Set `vsr_state.sync_op_min = 0`
- Set `vsr_state.sync_op_max = 0`

If a newer sync target is discovered during steps *5*-*8* or *11*, go to step *4*.
If a newer sync target is discovered during steps *5*-*6* or *9*, go to step *4*.

If the replica starts up with `vsr_state.sync_op_max ≠ 0`, go to step *11*.
If the replica starts up with `vsr_state.sync_op_max ≠ 0`, go to step *9*.

### 0: Scenarios

Expand All @@ -96,19 +90,9 @@ State sync is initially triggered by any of the following:
- a WAL or grid repair is in progress and,
- the replica's checkpoint is lagging behind the cluster's (far enough that the repair may never complete).

### 6: Request Superblock Trailers

The replica concurrently sends out three request messages, with the sync target identifier attached to each:

1. `command=request_sync_checkpoint`
2. `command=request_sync_client_sessions`

Replicas with a matching checkpoint identifier reply (respectively) with:

1. `command=sync_checkpoint`
2. `command=sync_client_sessions`
### 6: Request Superblock Checkpoint State

If a trailer is too large to fit in a message, the syncing replica requests it again, with a byte offset.
The syncing replica sends `command=request_sync_checkpoint` messages (with the sync target identifier attached to each) until it receives a `command=sync_checkpoint` with a matching checkpoint identifier.

## Concepts
### Syncing Replica
Expand Down Expand Up @@ -151,17 +135,15 @@ Having 2/3 replicas syncing means that a single grid-block corruption on the pri

### Checkpoint Identifier

A _checkpoint id_ is a hash of the superblock trailers.
A _checkpoint id_ is a hash of the superblock `CheckpointState`.

A checkpoint identifier is attached to the following message types:
- `command=commit`: Current checkpoint identifier of sender.
- `command=ping`: Current checkpoint identifier of sender.
- `command=prepare`: The attached checkpoint id is the checkpoint id during which the corresponding prepare was originally prepared.
- `command=prepare_ok`: The attached checkpoint id is the checkpoint id during which the corresponding prepare was originally prepared.
- `command=request_sync_checkpoint`: Requested checkpoint identifier.
- `command=request_sync_client_sessions`: Requested checkpoint identifier.
- `command=sync_checkpoint`: Current checkpoint identifier of sender.
- `command=sync_client_sessions`: Current checkpoint identifier of sender.

### Canonical Checkpoint

Expand Down
2 changes: 0 additions & 2 deletions docs/internals/vsr.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ Storage:
| `block` | replica | replica | [Sync Forest](#protocol-sync-forest), [Repair Grid](#protocol-repair-grid) |
| `request_sync_checkpoint` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |
| `sync_checkpoint` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |
| `request_sync_client_sessions` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |
| `sync_client_sessions` | replica | replica | [Sync Superblock](#protocol-sync-superblock) |

### Recovery

Expand Down
4 changes: 0 additions & 4 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ const ConfigProcess = struct {
grid_missing_tables_max: usize = 3,
aof_record: bool = false,
aof_recovery: bool = false,
/// When null, this defaults to message_body_size_max.
sync_trailer_message_body_size_max: ?usize = null,
};

/// Configurations which are tunable per-cluster.
Expand Down Expand Up @@ -248,8 +246,6 @@ pub const configs = struct {
.grid_missing_blocks_max = 3,
.grid_missing_tables_max = 2,
.verify = true,
// Set to a small value to ensure the multipart trailer sync is easily tested.
.sync_trailer_message_body_size_max = 129,
},
.cluster = .{
.clients_max = 4 + 3,
Expand Down
17 changes: 0 additions & 17 deletions src/constants.zig
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,6 @@ comptime {
assert(message_body_size_max >= @sizeOf(vsr.CheckpointState));
}

/// The maximum body size of:
/// - command=sync_client_sessions
///
/// In practice, this should always be as high as possible to minimize the number of round trips
/// required for sync.
/// It is configurable to make testing multipart trailers simple.
pub const sync_trailer_message_body_size_max =
config.process.sync_trailer_message_body_size_max orelse
message_body_size_max;

comptime {
assert(sync_trailer_message_body_size_max > 0);
assert(sync_trailer_message_body_size_max <= message_body_size_max);
}

/// The maximum number of Viewstamped Replication prepare messages that can be inflight at a time.
/// This is immutable once assigned per cluster, as replicas need to know how many operations might
/// possibly be uncommitted during a view change, and this must be constant for all replicas.
Expand Down Expand Up @@ -446,8 +431,6 @@ comptime {
///
/// The superblock contains local state for the replica and therefore cannot be replicated remotely.
/// Loss of the superblock would represent loss of the replica and so it must be protected.
/// Since each superblock copy also copies the superblock trailer (around 33 MiB), setting this
/// beyond 4 copies (or decreasing block_size < 64 KiB) can result in a superblock zone > 264 MiB.
///
/// This can mean checkpointing latencies in the rare extreme worst-case of at most 264ms, although
/// this would require EWAH compression of our block free set to have zero effective compression.
Expand Down
21 changes: 6 additions & 15 deletions src/lsm/forest_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -250,24 +250,15 @@ const Environment = struct {
env.grid.checkpoint(grid_checkpoint_callback);
try env.tick_until_state_change(.grid_checkpoint, .superblock_checkpoint);

{
// VSRState.monotonic() asserts that the previous_checkpoint id changes.
// In a normal replica this is guaranteed – even if the LSM is idle and no blocks
// are acquired or released, the client sessions are necessarily mutated.
var reply = std.mem.zeroInit(vsr.Header.Reply, .{
.cluster = cluster,
.command = .reply,
.op = env.checkpoint_op.?,
.commit = env.checkpoint_op.?,
});
reply.set_checksum_body(&.{});
reply.set_checksum();

_ = env.superblock.client_sessions.put(1, &reply);
}
env.superblock.checkpoint(superblock_checkpoint_callback, &env.superblock_context, .{
.manifest_references = env.forest.manifest_log.checkpoint_references(),
.free_set_reference = env.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = .{
.last_block_checksum = 0,
.last_block_address = 0,
.trailer_size = 0,
.checksum = vsr.checksum(&.{}),
},
.commit_min_checksum = env.superblock.working.vsr_state.checkpoint.commit_min_checksum + 1,
.commit_min = env.checkpoint_op.?,
.commit_max = env.checkpoint_op.? + 1,
Expand Down
22 changes: 6 additions & 16 deletions src/lsm/manifest_log_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -464,29 +464,19 @@ const Environment = struct {

const vsr_state = &env.manifest_log.superblock.working.vsr_state;

{
// VSRState.monotonic() asserts that the previous_checkpoint id changes.
// In a normal replica this is guaranteed – even if the LSM is idle and no blocks
// are acquired or released, the client sessions are necessarily mutated.
var reply = std.mem.zeroInit(vsr.Header.Reply, .{
.cluster = 0,
.command = .reply,
.op = vsr_state.checkpoint.commit_min + 1,
.commit = vsr_state.checkpoint.commit_min + 1,
});
reply.set_checksum_body(&.{});
reply.set_checksum();

_ = env.manifest_log.superblock.client_sessions.put(1, &reply);
}

env.pending += 1;
env.manifest_log.superblock.checkpoint(
checkpoint_superblock_callback,
&env.superblock_context,
.{
.manifest_references = env.manifest_log.checkpoint_references(),
.free_set_reference = env.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = .{
.last_block_checksum = 0,
.last_block_address = 0,
.trailer_size = 0,
.checksum = vsr.checksum(&.{}),
},
.commit_min_checksum = vsr_state.checkpoint.commit_min_checksum + 1,
.commit_min = vsr.Checkpoint.checkpoint_after(vsr_state.checkpoint.commit_min),
.commit_max = vsr.Checkpoint.checkpoint_after(vsr_state.commit_max),
Expand Down
50 changes: 29 additions & 21 deletions src/lsm/schema.zig
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ pub const BlockType = enum(u8) {
reserved = 0,

free_set = 1,
manifest = 2,
index = 3,
data = 4,
client_sessions = 2,
manifest = 3,
index = 4,
data = 5,

pub fn valid(block_type: BlockType) bool {
_ = std.meta.intToEnum(BlockType, @intFromEnum(block_type)) catch return false;
Expand Down Expand Up @@ -343,13 +344,12 @@ pub const TableData = struct {
}
};

pub const FreeSetNode = struct {
pub const Word = u64;

/// A TrailerNode is either a `BlockType.free_set` or `BlockType.client_sessions`.
pub const TrailerNode = struct {
pub const Metadata = extern struct {
previous_free_set_block_checksum: u128,
previous_free_set_block_checksum_padding: u128 = 0,
previous_free_set_block_address: u64,
previous_trailer_block_checksum: u128,
previous_trailer_block_checksum_padding: u128 = 0,
previous_trailer_block_address: u64,
reserved: [56]u8 = .{0} ** 56,

comptime {
Expand All @@ -361,20 +361,30 @@ pub const FreeSetNode = struct {
fn metadata(free_set_block: BlockPtrConst) *const Metadata {
const header = header_from_block(free_set_block);
assert(header.command == .block);
assert(header.block_type == .free_set);
assert(header.block_type == .free_set or header.block_type == .client_sessions);
assert(header.address > 0);
assert(header.snapshot == 0);

const header_metadata = std.mem.bytesAsValue(Metadata, &header.metadata_bytes);
assert(header_metadata.previous_free_set_block_checksum_padding == 0);
assert(header_metadata.previous_trailer_block_checksum_padding == 0);
assert(stdx.zeroed(&header_metadata.reserved));

if (header_metadata.previous_free_set_block_address == 0) {
assert(header_metadata.previous_free_set_block_checksum == 0);
if (header_metadata.previous_trailer_block_address == 0) {
assert(header_metadata.previous_trailer_block_checksum == 0);
}

assert(header.size > @sizeOf(vsr.Header));
assert((header.size - @sizeOf(vsr.Header)) % @sizeOf(Word) == 0);

switch (header.block_type) {
.free_set => {
assert((header.size - @sizeOf(vsr.Header)) % @sizeOf(u64) == 0);
},
.client_sessions => {
assert((header.size - @sizeOf(vsr.Header)) %
(@sizeOf(vsr.Header) + @sizeOf(u64)) == 0);
},
else => unreachable,
}

return header_metadata;
}
Expand All @@ -386,20 +396,18 @@ pub const FreeSetNode = struct {
pub fn previous(free_set_block: BlockPtrConst) ?BlockReference {
const header_metadata = metadata(free_set_block);

if (header_metadata.previous_free_set_block_address == 0) {
assert(header_metadata.previous_free_set_block_checksum == 0);
if (header_metadata.previous_trailer_block_address == 0) {
assert(header_metadata.previous_trailer_block_checksum == 0);
return null;
} else {
return .{
.checksum = header_metadata.previous_free_set_block_checksum,
.address = header_metadata.previous_free_set_block_address,
.checksum = header_metadata.previous_trailer_block_checksum,
.address = header_metadata.previous_trailer_block_address,
};
}
}

pub fn encoded_words(block: BlockPtrConst) []align(@alignOf(Word)) const u8 {
assert_valid_header(block);

pub fn body(block: BlockPtrConst) []align(@sizeOf(vsr.Header)) const u8 {
const header = header_from_block(block);
return block[@sizeOf(vsr.Header)..header.size];
}
Expand Down
1 change: 0 additions & 1 deletion src/lsm/table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub fn TableType(
assert(stdx.no_padding(Value));

// These impact our calculation of:
// * the superblock trailer size, and
// * the manifest log layout for alignment.
assert(key_size >= 8);
assert(key_size <= 32);
Expand Down
22 changes: 6 additions & 16 deletions src/lsm/tree_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -275,22 +275,6 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
pub fn checkpoint(env: *Environment, op: u64) void {
env.tree.assert_between_bars();

{
// VSRState.monotonic() asserts that the previous_checkpoint id changes.
// In a normal replica this is guaranteed – even if the LSM is idle and no blocks
// are acquired or released, the client sessions are necessarily mutated.
var reply = std.mem.zeroInit(vsr.Header.Reply, .{
.cluster = cluster,
.command = .reply,
.op = op,
.commit = op,
});
reply.set_checksum_body(&.{});
reply.set_checksum();

_ = env.superblock.client_sessions.put(1, &reply);
}

env.grid.checkpoint(grid_checkpoint_callback);
env.change_state(.fuzzing, .grid_checkpoint);
env.tick_until_state_change(.grid_checkpoint, .fuzzing);
Expand All @@ -299,6 +283,12 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
env.superblock.checkpoint(superblock_checkpoint_callback, &env.superblock_context, .{
.manifest_references = std.mem.zeroes(vsr.SuperBlockManifestReferences),
.free_set_reference = env.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = .{
.last_block_checksum = 0,
.last_block_address = 0,
.trailer_size = 0,
.checksum = vsr.checksum(&.{}),
},
.commit_min_checksum = env.superblock.working.vsr_state.checkpoint.commit_min_checksum + 1,
.commit_min = checkpoint_op,
.commit_max = checkpoint_op + 1,
Expand Down
2 changes: 0 additions & 2 deletions src/message_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ pub const MessagePool = struct {
pub const Block = MessageType(.block);
pub const RequestSyncCheckpoint = MessageType(.request_sync_checkpoint);
pub const SyncCheckpoint = MessageType(.sync_checkpoint);
pub const RequestSyncClientSessions = MessageType(.request_sync_client_sessions);
pub const SyncClientSessions = MessageType(.sync_client_sessions);

// TODO Avoid the extra level of indirection.
// (https://github.com/tigerbeetle/tigerbeetle/pull/1295#discussion_r1394265250)
Expand Down
2 changes: 1 addition & 1 deletion src/testing/cluster.zig
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ pub fn ClusterType(comptime StateMachineType: anytype) type {
assert(cluster.replica_diverged.isSet(event_data.replica));
},
.sync_stage_changed => switch (replica.syncing) {
.requesting_trailers => {
.requesting_checkpoint => {
cluster.log_replica(.sync_commenced, replica.replica);
cluster.sync_checker.replica_sync_start(replica);
},
Expand Down

0 comments on commit 928b36f

Please sign in to comment.