diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index bae09f8c70f7..76d90a337c03 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -1114,6 +1114,14 @@ "allowMultiple":false, "type":"string", "paramType":"query" + }, + { + "name":"ranges_parallelism", + "description":"An integer specifying the number of ranges to repair in parallel by user request. If this number is bigger than the max_repair_ranges_in_parallel calculated by Scylla core, the smaller one will be used.", + "required":false, + "allowMultiple":false, + "type":"string", + "paramType":"query" } ] }, diff --git a/api/storage_service.cc b/api/storage_service.cc index e0fec0154431..d86cc52996cc 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -320,7 +320,7 @@ void set_repair(http_context& ctx, routes& r, sharded& repair) { ss::repair_async.set(r, [&ctx, &repair](std::unique_ptr req) { static std::vector options = {"primaryRange", "parallelism", "incremental", "jobThreads", "ranges", "columnFamilies", "dataCenters", "hosts", "ignore_nodes", "trace", - "startToken", "endToken" }; + "startToken", "endToken", "ranges_parallelism"}; std::unordered_map options_map; for (auto o : options) { auto s = req->get_query_param(o); diff --git a/repair/repair.cc b/repair/repair.cc index 20dfeed679b5..f28345184bef 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -564,7 +564,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu const std::vector& hosts_, const std::unordered_set& ignore_nodes_, streaming::stream_reason reason_, - bool hints_batchlog_flushed) + bool hints_batchlog_flushed, + std::optional ranges_parallelism) : repair_task_impl(module, id, 0, keyspace, "", "", parent_id_.uuid(), reason_) , rs(repair) , db(repair.get_db()) @@ -585,7 +586,11 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu , total_rf(erm->get_replication_factor()) , nr_ranges_total(ranges.size()) , _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) -{ } + , _user_ranges_parallelism(ranges_parallelism ? std::optional(semaphore(*ranges_parallelism)) : std::nullopt) +{ + rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(), + _user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited"); +} void repair::shard_repair_task_impl::check_failed_ranges() { rlogger.info("repair[{}]: stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}", @@ -795,6 +800,8 @@ struct repair_options { // repair to a data center other than the named one returns an error. std::vector data_centers; + int ranges_parallelism = -1; + repair_options(std::unordered_map options) { bool_opt(primary_range, options, PRIMARY_RANGE_KEY); ranges_opt(ranges, options, RANGES_KEY); @@ -830,6 +837,8 @@ struct repair_options { int job_threads; int_opt(job_threads, options, JOB_THREADS_KEY); + int_opt(ranges_parallelism, options, RANGES_PARALLELISM_KEY); + // The parsing code above removed from the map options we have parsed. // If anything is left there in the end, it's an unsupported option. if (!options.empty()) { @@ -850,6 +859,7 @@ struct repair_options { static constexpr const char* TRACE_KEY = "trace"; static constexpr const char* START_TOKEN = "startToken"; static constexpr const char* END_TOKEN = "endToken"; + static constexpr const char* RANGES_PARALLELISM_KEY = "ranges_parallelism"; // Settings of "parallelism" option. Numbers must match Cassandra's // RepairParallelism enum, which is used by the caller. @@ -950,33 +960,34 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() { // repair all the ranges in limited parallelism rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}", global_repair_id.uuid(), idx + 1, table_ids.size(), _status.keyspace, table_name, table_id, _reason); - co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) { - return with_semaphore(rs.get_repair_module().range_parallelism_semaphore(), 1, [this, &range, table_id] { - return repair_range(range, table_id).then([this] { - if (_reason == streaming::stream_reason::bootstrap) { - rs.get_metrics().bootstrap_finished_ranges++; - } else if (_reason == streaming::stream_reason::replace) { - rs.get_metrics().replace_finished_ranges++; - } else if (_reason == streaming::stream_reason::rebuild) { - rs.get_metrics().rebuild_finished_ranges++; - } else if (_reason == streaming::stream_reason::decommission) { - rs.get_metrics().decommission_finished_ranges++; - } else if (_reason == streaming::stream_reason::removenode) { - rs.get_metrics().removenode_finished_ranges++; - } else if (_reason == streaming::stream_reason::repair) { - rs.get_metrics().repair_finished_ranges_sum++; - nr_ranges_finished++; - } - rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}", - global_repair_id.uuid(), - rs.get_metrics().bootstrap_finished_percentage(), - rs.get_metrics().replace_finished_percentage(), - rs.get_metrics().rebuild_finished_percentage(), - rs.get_metrics().decommission_finished_percentage(), - rs.get_metrics().removenode_finished_percentage(), - rs.get_metrics().repair_finished_percentage()); - }); - }); + co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) -> future<> { + // Get the system range parallelism + auto permit = co_await seastar::get_units(rs.get_repair_module().range_parallelism_semaphore(), 1); + // Get the range parallelism specified by user + auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>(); + co_await repair_range(range, table_id); + if (_reason == streaming::stream_reason::bootstrap) { + rs.get_metrics().bootstrap_finished_ranges++; + } else if (_reason == streaming::stream_reason::replace) { + rs.get_metrics().replace_finished_ranges++; + } else if (_reason == streaming::stream_reason::rebuild) { + rs.get_metrics().rebuild_finished_ranges++; + } else if (_reason == streaming::stream_reason::decommission) { + rs.get_metrics().decommission_finished_ranges++; + } else if (_reason == streaming::stream_reason::removenode) { + rs.get_metrics().removenode_finished_ranges++; + } else if (_reason == streaming::stream_reason::repair) { + rs.get_metrics().repair_finished_ranges_sum++; + nr_ranges_finished++; + } + rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}", + global_repair_id.uuid(), + rs.get_metrics().bootstrap_finished_percentage(), + rs.get_metrics().replace_finished_percentage(), + rs.get_metrics().rebuild_finished_percentage(), + rs.get_metrics().decommission_finished_percentage(), + rs.get_metrics().removenode_finished_percentage(), + rs.get_metrics().repair_finished_percentage()); }); if (_reason != streaming::stream_reason::repair) { @@ -1130,7 +1141,8 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map co_return id.id; } - auto task = co_await _repair_module->make_and_start_task({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes)); + auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional(options.ranges_parallelism); + auto task = co_await _repair_module->make_and_start_task({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes), ranges_parallelism); co_return id.id; } @@ -1237,13 +1249,14 @@ future<> repair::user_requested_repair_task_impl::run() { throw std::runtime_error("aborted by user request"); } + auto ranges_parallelism = _ranges_parallelism; for (auto shard : boost::irange(unsigned(0), smp::count)) { - auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, + auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism, data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> { local_repair.get_metrics().repair_total_ranges_sum += ranges.size(); auto task = co_await local_repair._repair_module->make_and_start_task(parent_data, tasks::task_id::create_random_id(), keyspace, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, ranges_parallelism); co_await task->done(); }); repair_results.push_back(std::move(f)); @@ -1347,9 +1360,10 @@ future<> repair::data_sync_repair_task_impl::run() { auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); bool hints_batchlog_flushed = false; + auto ranges_parallelism = std::nullopt; auto task_impl_ptr = seastar::make_shared(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, ranges_parallelism); task_impl_ptr->neighbors = std::move(neighbors); auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data); task->start(); diff --git a/repair/task_manager_module.hh b/repair/task_manager_module.hh index 2ee72f94c16e..9abf7bb7a62c 100644 --- a/repair/task_manager_module.hh +++ b/repair/task_manager_module.hh @@ -45,8 +45,9 @@ private: std::vector _hosts; std::vector _data_centers; std::unordered_set _ignore_nodes; + std::optional _ranges_parallelism; public: - user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr germs, std::vector cfs, dht::token_range_vector ranges, std::vector hosts, std::vector data_centers, std::unordered_set ignore_nodes) noexcept + user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr germs, std::vector cfs, dht::token_range_vector ranges, std::vector hosts, std::vector data_centers, std::unordered_set ignore_nodes, std::optional ranges_parallelism) noexcept : repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair) , _germs(germs) , _cfs(std::move(cfs)) @@ -54,6 +55,7 @@ public: , _hosts(std::move(hosts)) , _data_centers(std::move(data_centers)) , _ignore_nodes(std::move(ignore_nodes)) + , _ranges_parallelism(ranges_parallelism) {} virtual tasks::is_abortable is_abortable() const noexcept override { @@ -123,6 +125,7 @@ public: private: bool _aborted = false; std::optional _failed_because; + std::optional _user_ranges_parallelism; public: shard_repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, @@ -136,7 +139,8 @@ public: const std::vector& hosts_, const std::unordered_set& ignore_nodes_, streaming::stream_reason reason_, - bool hints_batchlog_flushed); + bool hints_batchlog_flushed, + std::optional ranges_parallelism); virtual tasks::is_internal is_internal() const noexcept override { return tasks::is_internal::yes; }