Skip to content

Commit

Permalink
Merge 'repair: Add ranges_parallelism option' from Asias He
Browse files Browse the repository at this point in the history
This patch adds the ranges_parallelism option to repair restful API.

Users can use this option to optionally specify the number of ranges to repair in parallel per repair job to a smaller number than the Scylla core calculated default max_repair_ranges_in_parallel.

Scylla manager can also use this option to provide more ranges (>N) in a single repair job but only repairing N ranges_parallelism in parallel, instead of providing N ranges in a repair job.

To make it safer, unlike the PR #4848, this patch does not allow user to exceed the max_repair_ranges_in_parallel.

Fixes #4847

Closes #14886

* github.com:scylladb/scylladb:
  repair: Add ranges_parallelism option
  repair: Change to use coroutine in do_repair_ranges
  • Loading branch information
denesb committed Aug 3, 2023
2 parents d4ee84e + 9b3fd94 commit 946c648
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 36 deletions.
8 changes: 8 additions & 0 deletions api/api-doc/storage_service.json
Expand Up @@ -1114,6 +1114,14 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"ranges_parallelism",
"description":"An integer specifying the number of ranges to repair in parallel by user request. If this number is bigger than the max_repair_ranges_in_parallel calculated by Scylla core, the smaller one will be used.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
},
Expand Down
2 changes: 1 addition & 1 deletion api/storage_service.cc
Expand Up @@ -320,7 +320,7 @@ void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair) {
ss::repair_async.set(r, [&ctx, &repair](std::unique_ptr<http::request> req) {
static std::vector<sstring> options = {"primaryRange", "parallelism", "incremental",
"jobThreads", "ranges", "columnFamilies", "dataCenters", "hosts", "ignore_nodes", "trace",
"startToken", "endToken" };
"startToken", "endToken", "ranges_parallelism"};
std::unordered_map<sstring, sstring> options_map;
for (auto o : options) {
auto s = req->get_query_param(o);
Expand Down
80 changes: 47 additions & 33 deletions repair/repair.cc
Expand Up @@ -564,7 +564,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
bool hints_batchlog_flushed)
bool hints_batchlog_flushed,
std::optional<int> ranges_parallelism)
: repair_task_impl(module, id, 0, keyspace, "", "", parent_id_.uuid(), reason_)
, rs(repair)
, db(repair.get_db())
Expand All @@ -585,7 +586,11 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
, total_rf(erm->get_replication_factor())
, nr_ranges_total(ranges.size())
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed))
{ }
, _user_ranges_parallelism(ranges_parallelism ? std::optional<semaphore>(semaphore(*ranges_parallelism)) : std::nullopt)
{
rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(),
_user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited");
}

void repair::shard_repair_task_impl::check_failed_ranges() {
rlogger.info("repair[{}]: stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
Expand Down Expand Up @@ -795,6 +800,8 @@ struct repair_options {
// repair to a data center other than the named one returns an error.
std::vector<sstring> data_centers;

int ranges_parallelism = -1;

repair_options(std::unordered_map<sstring, sstring> options) {
bool_opt(primary_range, options, PRIMARY_RANGE_KEY);
ranges_opt(ranges, options, RANGES_KEY);
Expand Down Expand Up @@ -830,6 +837,8 @@ struct repair_options {
int job_threads;
int_opt(job_threads, options, JOB_THREADS_KEY);

int_opt(ranges_parallelism, options, RANGES_PARALLELISM_KEY);

// The parsing code above removed from the map options we have parsed.
// If anything is left there in the end, it's an unsupported option.
if (!options.empty()) {
Expand All @@ -850,6 +859,7 @@ struct repair_options {
static constexpr const char* TRACE_KEY = "trace";
static constexpr const char* START_TOKEN = "startToken";
static constexpr const char* END_TOKEN = "endToken";
static constexpr const char* RANGES_PARALLELISM_KEY = "ranges_parallelism";

// Settings of "parallelism" option. Numbers must match Cassandra's
// RepairParallelism enum, which is used by the caller.
Expand Down Expand Up @@ -950,33 +960,34 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() {
// repair all the ranges in limited parallelism
rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}",
global_repair_id.uuid(), idx + 1, table_ids.size(), _status.keyspace, table_name, table_id, _reason);
co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) {
return with_semaphore(rs.get_repair_module().range_parallelism_semaphore(), 1, [this, &range, table_id] {
return repair_range(range, table_id).then([this] {
if (_reason == streaming::stream_reason::bootstrap) {
rs.get_metrics().bootstrap_finished_ranges++;
} else if (_reason == streaming::stream_reason::replace) {
rs.get_metrics().replace_finished_ranges++;
} else if (_reason == streaming::stream_reason::rebuild) {
rs.get_metrics().rebuild_finished_ranges++;
} else if (_reason == streaming::stream_reason::decommission) {
rs.get_metrics().decommission_finished_ranges++;
} else if (_reason == streaming::stream_reason::removenode) {
rs.get_metrics().removenode_finished_ranges++;
} else if (_reason == streaming::stream_reason::repair) {
rs.get_metrics().repair_finished_ranges_sum++;
nr_ranges_finished++;
}
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
global_repair_id.uuid(),
rs.get_metrics().bootstrap_finished_percentage(),
rs.get_metrics().replace_finished_percentage(),
rs.get_metrics().rebuild_finished_percentage(),
rs.get_metrics().decommission_finished_percentage(),
rs.get_metrics().removenode_finished_percentage(),
rs.get_metrics().repair_finished_percentage());
});
});
co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) -> future<> {
// Get the system range parallelism
auto permit = co_await seastar::get_units(rs.get_repair_module().range_parallelism_semaphore(), 1);
// Get the range parallelism specified by user
auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>();
co_await repair_range(range, table_id);
if (_reason == streaming::stream_reason::bootstrap) {
rs.get_metrics().bootstrap_finished_ranges++;
} else if (_reason == streaming::stream_reason::replace) {
rs.get_metrics().replace_finished_ranges++;
} else if (_reason == streaming::stream_reason::rebuild) {
rs.get_metrics().rebuild_finished_ranges++;
} else if (_reason == streaming::stream_reason::decommission) {
rs.get_metrics().decommission_finished_ranges++;
} else if (_reason == streaming::stream_reason::removenode) {
rs.get_metrics().removenode_finished_ranges++;
} else if (_reason == streaming::stream_reason::repair) {
rs.get_metrics().repair_finished_ranges_sum++;
nr_ranges_finished++;
}
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
global_repair_id.uuid(),
rs.get_metrics().bootstrap_finished_percentage(),
rs.get_metrics().replace_finished_percentage(),
rs.get_metrics().rebuild_finished_percentage(),
rs.get_metrics().decommission_finished_percentage(),
rs.get_metrics().removenode_finished_percentage(),
rs.get_metrics().repair_finished_percentage());
});

if (_reason != streaming::stream_reason::repair) {
Expand Down Expand Up @@ -1130,7 +1141,8 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
co_return id.id;
}

auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes));
auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional<int>(options.ranges_parallelism);
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes), ranges_parallelism);
co_return id.id;
}

Expand Down Expand Up @@ -1237,13 +1249,14 @@ future<> repair::user_requested_repair_task_impl::run() {
throw std::runtime_error("aborted by user request");
}

auto ranges_parallelism = _ranges_parallelism;
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism,
data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> {
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), keyspace,
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, ranges_parallelism);
co_await task->done();
});
repair_results.push_back(std::move(f));
Expand Down Expand Up @@ -1347,9 +1360,10 @@ future<> repair::data_sync_repair_task_impl::run() {
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
auto ranges_parallelism = std::nullopt;
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, ranges_parallelism);
task_impl_ptr->neighbors = std::move(neighbors);
auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data);
task->start();
Expand Down
8 changes: 6 additions & 2 deletions repair/task_manager_module.hh
Expand Up @@ -45,15 +45,17 @@ private:
std::vector<sstring> _hosts;
std::vector<sstring> _data_centers;
std::unordered_set<gms::inet_address> _ignore_nodes;
std::optional<int> _ranges_parallelism;
public:
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes, std::optional<int> ranges_parallelism) noexcept
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
, _germs(germs)
, _cfs(std::move(cfs))
, _ranges(std::move(ranges))
, _hosts(std::move(hosts))
, _data_centers(std::move(data_centers))
, _ignore_nodes(std::move(ignore_nodes))
, _ranges_parallelism(ranges_parallelism)
{}

virtual tasks::is_abortable is_abortable() const noexcept override {
Expand Down Expand Up @@ -123,6 +125,7 @@ public:
private:
bool _aborted = false;
std::optional<sstring> _failed_because;
std::optional<semaphore> _user_ranges_parallelism;
public:
shard_repair_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
Expand All @@ -136,7 +139,8 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
bool hints_batchlog_flushed);
bool hints_batchlog_flushed,
std::optional<int> ranges_parallelism);
virtual tasks::is_internal is_internal() const noexcept override {
return tasks::is_internal::yes;
}
Expand Down

0 comments on commit 946c648

Please sign in to comment.