-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
raft topology: send cdc generation data in parts #13962
raft topology: send cdc generation data in parts #13962
Conversation
@kbr-scylla please review |
CI state |
Please also rebase the PR on |
5ac7640
to
e3d341d
Compare
v2:
|
3c91bc8
to
d2fd8ca
Compare
v3:
|
CI state |
d2fd8ca
to
4c65a94
Compare
4c65a94
to
498e090
Compare
v4:
v5:
|
CI state |
@gleb-cloudius @piodul please review |
What happens if one command succeeds and the next fails? Do we end up in an inconsistent state? |
498e090
to
8620d19
Compare
8620d19
to
5697dc1
Compare
4bfe367
to
744599f
Compare
@scylladb/scylla-maint please review and trigger CI for this pull request |
v11:
|
CI state |
Looks like the second node booted by the test hanged due to topology coordinator errors:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the order of commits due to the reasons explained by Piotr here: #13962 (comment)
and please investigate the failure.
Provide useful description for `write_mutations` and `broadcast_tables_query` that is stored in `system.group0_history`. Reduces scope of issue scylladb#13370.
For now, `raft_sys_table_storage::_max_mutation_size` equals `max_mutation_size` (half of the commitlog segment size), so with some additional information, it can exceed this threshold resulting in throwing an exception when writing mutation to the commitlog. A batch of raft commands has the size at most `group0_state_machine::merger::max_command_size` (half of the commitlog segment size). It doesn't have additional metadata, but it may have a size of exactly `max_mutation_size`. It shouldn't make any trouble, but it is prefered to be careful. Make `raft_sys_table_storage::_max_mutation_size` and `group0_state_machine::merger::max_command_size` more strict to leave space for metadata. Fixed typo "1204" => "1024".
…_size` `get_cdc_generation_mutations` splits data to mutations of maximal size `mutation_size_treshold`. Before this commit it was hardcoded to 2 MB. Calculate `mutation_size_threshold` to leave space for cdc generation data and not exceed `max_command_size`.
…ion_mutations snapshot Topology snapshots contain only mutation of current CDC generation data but don't contain any previous or future generations. If new a generation of data is being broadcasted but hasn't been entirely applied yet, the applied part won't be sent in a snapshot. In this scenario, new or delayed nodes can never get the applied part. Send entire cdc_generations_v3 table in the snapshot to resolve this problem. As a follow-up, a mechanism to remove old CDC generations will be introduced.
744599f
to
d3eafbf
Compare
v12:
|
… from all merged commands If `group0_state_machine` applies all commands individually (without batching), the resulting current `state_id` -- which will be compared with the `prev_state_id` of the next command if it is a guarded command -- equals the maximum of the `next_state_id` of all commands applied up to this point. That's because the current `state_id` is obtained from the history table by taking the row with the largest clustering key. When `group0_state_machine::apply` is called with a batch of commands, the current `state_id` is loaded from `system.group0_history` to `merger::last_group0_state_id` only once. When a command is merged, its `next_state_id` overwrites `last_group0_state_id`, regardless of their order. Let's consider the following situation: The leader sends two unguarded `write_mutations` commands concurrently, with timeuuids T1 and T2, where T1 < T2. Leader waits to apply them and sends guarded `topology_change` with `prev_state_id` equal T2. Suppose that the command with timeuuid T2 is committed first, and these commands are small enough that all of `write_mutations` could be merged into one command. Some followers can get all of these three commands before its `fsm` polls them. In this situation, `group0_state_machine::apply` is called with all three of them and `merger` will merge both `write_mutations` into one command. After that, `merger::last_group0_state_id` will be equal to T1 (this command was committed as the second one). When it processes the `topology_change` command, it will compare its `prev_state_id` and `merger::last_group0_state_id`, resulting in making this command a no-op (which wouldn't happen if the commands were applied individually). Such a scenario results in inconsistent results: one replica applies `topology_change`, but another makes it a no-op.
This function takes guard and calls its destructor. It's used to not call raw destructor.
a61cd5d
to
6f3a00d
Compare
Broadcasts all mutations returned from `prepare_new_cdc_generation_data` except the last one. Each mutation is sent in separate raft command. It takes `group0_guard`, and if the number of mutations is greater than one, the guard is dropped, and a new one is created and returned, otherwise the old one will be returned. Commands are sent in parallel and unguarded (the guard used for sending the last mutation will guarantee that the term hasn't been changed). Returns the generation's UUID, guard and last mutation, which will be sent with additional topology data by the caller. If we send the last mutation in the `write_mutation` command, we would use a total of `n + 1` commands instead of `n-1 + 1` (where `n` is the number of mutations), so it's better to send it in `topology_change` (we need to send it after all `write_mutations`) with some small metadata. With the default commitlog segment size, `mutation_size_threshold` will be 4 MB. In large clusters e.g. 100 nodes, 64 shards per node, 256 vnodes cdc generation data can reach the size of 30 MB, thus there will be no more than 8 commands. In a multi-DC cluster with 100ms latencies between DCs, this operation should take about 200ms since we send the commands concurrently, but even if the commands were replicated sequentially by Raft, it should take no more than 1.6s, which is incomparably smaller than bootstrapping operation (bootstrapping is quick if there is no data in the cluster, but usually if one has 100 nodes they have tons of data, so indeed streaming/repair will take much longer (hours/days)). Fixes FIXME in pr scylladb#13683.
…_unknown` exception in topology coordinator loop When the topology_cooridnator fiber gets `raft::commit_status_unknown`, it prints an error. This exception is not an error in this case, and it can be thrown when the leader has changed. It can happen in `add_entry_unguarded` while sending a part of the CDC generation data in the `write_mutations` command. Catch this exception in `topology_coordinator::run` and print a warning.
This test limits `commitlog_segment_size_in_mb` to 2, thus `max_command_size` is limited to less than 1 MB. It adds an injection which copies mutations generated by `get_cdc_generation_mutations` n times, where n is picked that the memory size of all mutations exceeds `max_command_size`. This test passes if cdc generation data is committed by raft in multiple commands. If all the data is committed in a single command, the leader node will loop trying to send raft command and getting the error: ``` storage_service - raft topology: topology change coordinator fiber got error raft::command_is_too_big_error (Command size {} is greater than the configured limit {}) ```
6f3a00d
to
4e3c97d
Compare
CI state |
The CDC generation data can be large and not fit in a single command. This pr splits it into multiple mutations by smartly picking a
mutation_size_threshold
and sending each mutation as a separate group 0 command.Commands are sent sequentially to avoid concurrency problems.
Topology snapshots contain only mutation of current CDC generation data but don't contain any previous or future generations. If a new generation of data is being broadcasted but hasn't been entirely applied yet, the applied part won't be sent in a snapshot. New or delayed nodes can never get the applied part in this scenario.
Send the entire cdc_generations_v3 table in the snapshot to resolve this problem.
A mechanism to remove old CDC generations will be introduced as a follow-up.