Skip to content

Commit

Permalink
Merge 'Compact data before streaming' from Botond Dénes
Browse files Browse the repository at this point in the history
Currently, streaming and repair processes and sends data as-is. This is wasteful: streaming might be sending data which is expired or covered by tombstones, taking up valuable bandwidth and processing time. Repair additionally could be exposed to artificial differences, due to different nodes being in different states of compactness.
This PR adds opt-in compaction to `make_streaming_reader()`, then opts in all users. The main difference being in how these choose the current compaction time to use:
* Load'n'stream and streaming uses the current time on the local node.
* Repair uses a centrally chosen compaction time, generated on the repair master and propagated to al repair followers. This is to ensure all repair participants work with the exact state of compactness.

 Importantly, this compaction does *not* purge tombstones (tombstone GC is disabled completely).

Fixes: #3561

Closes #14756

* github.com:scylladb/scylladb:
  replica: make_[multishard_]streaming_reader(): make compaction_time mandatory
  repair/row_level: opt in to compacting the stream
  streaming: opt-in to compacting the stream
  sstables_loader: opt-in for compacting the stream
  replica/table: add optional compacting to make_multishard_streaming_reader()
  replica/table: add optional compacting to make_streaming_reader()
  db/config: add config item for enabling compaction for streaming and repair
  repair: log the error which caused the repair to fail
  readers: compacting_reader: use compact_mutation_state::abandon_current_partition()
  mutation/mutation_compactor: allow user to abandon current partition
  • Loading branch information
tgrabiec committed Jul 28, 2023
2 parents 24fdd42 + b599f15 commit 4e9d95d
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 72 deletions.
1 change: 1 addition & 0 deletions db/config.cc
Expand Up @@ -852,6 +852,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster (cannot be set if consistent-cluster-management is enabled")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace,removenode,rebuild,bootstrap,decommission", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")
, enable_compacting_data_for_streaming_and_repair(this, "enable_compacting_data_for_streaming_and_repair", liveness::LiveUpdate, value_status::Used, true, "Enable the compacting reader, which compacts the data for streaming and repair (load'n'stream included) before sending it to, or synchronizing it with peers. Can reduce the amount of data to be processed by removing dead data, but adds CPU overhead.")
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
Expand Down
1 change: 1 addition & 0 deletions db/config.hh
Expand Up @@ -329,6 +329,7 @@ public:
named_value<bool> override_decommission;
named_value<bool> enable_repair_based_node_ops;
named_value<sstring> allowed_repair_based_node_ops;
named_value<bool> enable_compacting_data_for_streaming_and_repair;
named_value<uint32_t> ring_delay_ms;
named_value<uint32_t> shadow_round_ms;
named_value<uint32_t> fd_max_interval_ms;
Expand Down
6 changes: 3 additions & 3 deletions message/messaging_service.cc
Expand Up @@ -1303,14 +1303,14 @@ future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repai
}

// Wrapper for REPAIR_ROW_LEVEL_START
void messaging_service::register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func) {
void messaging_service::register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time)>&& func) {
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
}
future<> messaging_service::unregister_repair_row_level_start() {
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
}
future<rpc::optional<repair_row_level_start_response>> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason) {
return send_message<rpc::optional<repair_row_level_start_response>>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason);
future<rpc::optional<repair_row_level_start_response>> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time) {
return send_message<rpc::optional<repair_row_level_start_response>>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason, compaction_time);
}

// Wrapper for REPAIR_ROW_LEVEL_STOP
Expand Down
4 changes: 2 additions & 2 deletions message/messaging_service.hh
Expand Up @@ -420,9 +420,9 @@ public:
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);

// Wrapper for REPAIR_ROW_LEVEL_START
void register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func);
void register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time)>&& func);
future<> unregister_repair_row_level_start();
future<rpc::optional<repair_row_level_start_response>> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason);
future<rpc::optional<repair_row_level_start_response>> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time);

// Wrapper for REPAIR_ROW_LEVEL_STOP
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
Expand Down
5 changes: 5 additions & 0 deletions mutation/mutation_compactor.hh
Expand Up @@ -578,6 +578,11 @@ public:
}
}

/// Signal to the compactor that the current partition will not be finished.
void abandon_current_partition() {
_validator.reset(mutation_fragment_v2::kind::partition_end, position_in_partition_view::for_partition_end(), {});
}

bool are_limits_reached() const {
return _row_limit == 0 || _partition_limit == 0;
}
Expand Down
14 changes: 2 additions & 12 deletions readers/mutation_readers.cc
Expand Up @@ -1425,16 +1425,6 @@ class compacting_reader : public flat_mutation_reader_v2::impl {
_has_compacted_partition_start = false;
}
}
void maybe_inject_partition_end() {
// The compactor needs a valid stream, but downstream doesn't care about
// the injected partition end, so ignore it.
if (_last_uncompacted_kind != mutation_fragment_v2::kind::partition_end) {
_ignore_partition_end = true;
_compactor.consume_end_of_partition(*this, _gc_consumer);
_last_uncompacted_kind = mutation_fragment_v2::kind::partition_end;
_ignore_partition_end = false;
}
}
void consume_new_partition(const dht::decorated_key& dk) {
_has_compacted_partition_start = true;
// We need to reset the partition's tombstone here. If the tombstone is
Expand Down Expand Up @@ -1544,13 +1534,13 @@ class compacting_reader : public flat_mutation_reader_v2::impl {
return make_ready_future<>();
}
_end_of_stream = false;
maybe_inject_partition_end();
_compactor.abandon_current_partition();
return _reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
_end_of_stream = false;
maybe_inject_partition_end();
_compactor.abandon_current_partition();
return _reader.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
Expand Down
6 changes: 4 additions & 2 deletions repair/reader.hh
Expand Up @@ -52,7 +52,8 @@ private:
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard);
unsigned remote_shard,
gc_clock::time_point compaction_time);

public:
repair_reader(
Expand All @@ -64,7 +65,8 @@ public:
const dht::sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy);
read_strategy strategy,
gc_clock::time_point compaction_time);

future<mutation_fragment_opt>
read_mutation_fragment();
Expand Down
12 changes: 8 additions & 4 deletions repair/repair.cc
Expand Up @@ -589,9 +589,13 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
void repair::shard_repair_task_impl::check_failed_ranges() {
rlogger.info("repair[{}]: stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
global_repair_id.uuid(), _reason, _status.keyspace, table_names(), ranges.size(), _stats.get_stats());
if (nr_failed_ranges || _aborted || _failed) {
auto msg = format("repair[{}]: {} out of {} ranges failed, keyspace={}, tables={}, repair_reason={}, nodes_down_during_repair={}, aborted_by_user={}",
global_repair_id.uuid(), nr_failed_ranges, ranges_size(), _status.keyspace, table_names(), _reason, nodes_down, _aborted);
if (nr_failed_ranges || _aborted || _failed_because) {
sstring failed_because = "N/A";
if (!_aborted) {
failed_because = _failed_because ? *_failed_because : "unknown";
}
auto msg = format("repair[{}]: {} out of {} ranges failed, keyspace={}, tables={}, repair_reason={}, nodes_down_during_repair={}, aborted_by_user={}, failed_because={}",
global_repair_id.uuid(), nr_failed_ranges, ranges_size(), _status.keyspace, table_names(), _reason, nodes_down, _aborted, failed_because);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
} else {
Expand Down Expand Up @@ -1000,7 +1004,7 @@ future<> repair::shard_repair_task_impl::run() {
try {
co_await do_repair_ranges();
} catch (...) {
_failed = true;
_failed_because.emplace(fmt::to_string(std::current_exception()));
rlogger.debug("repair[{}]: got error in do_repair_ranges: {}",
global_repair_id.uuid(), std::current_exception());
}
Expand Down

0 comments on commit 4e9d95d

Please sign in to comment.