Skip to content

Commit

Permalink
repair: Add ranges_parallelism option support for tablet
Browse files Browse the repository at this point in the history
The ranges_parallelism option is introduced in commit 9b3fd94.
Currently, this option works for vnode table repair only.

This patch enables it for tablet repair, since it is useful for
tablet repair too.

Fixes #18383

Closes #18385
  • Loading branch information
asias authored and denesb committed May 8, 2024
1 parent 66e1f30 commit 6e63a9b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
10 changes: 5 additions & 5 deletions repair/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,8 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
}

bool primary_replica_only = options.primary_range;
co_await repair_tablets(id, keyspace, cfs, host2ip, primary_replica_only, options.ranges, options.data_centers, hosts, ignore_nodes);
auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional<int>(options.ranges_parallelism);
co_await repair_tablets(id, keyspace, cfs, host2ip, primary_replica_only, options.ranges, options.data_centers, hosts, ignore_nodes, ranges_parallelism);
co_return id.id;
}
}
Expand Down Expand Up @@ -2082,7 +2083,7 @@ static std::unordered_set<gms::inet_address> get_nodes_in_dcs(std::vector<sstrin
}

// Repair all tablets belong to this node for the given table
future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_name, std::vector<sstring> table_names, host2ip_t host2ip, bool primary_replica_only, dht::token_range_vector ranges_specified, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> hosts, std::unordered_set<gms::inet_address> ignore_nodes) {
future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_name, std::vector<sstring> table_names, host2ip_t host2ip, bool primary_replica_only, dht::token_range_vector ranges_specified, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> hosts, std::unordered_set<gms::inet_address> ignore_nodes, std::optional<int> ranges_parallelism) {
std::vector<tablet_repair_task_meta> task_metas;
for (auto& table_name : table_names) {
lw_shared_ptr<replica::table> t;
Expand Down Expand Up @@ -2239,7 +2240,7 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
}
}
}
auto task = co_await _repair_module->make_and_start_task<repair::tablet_repair_task_impl>({}, rid, keyspace_name, table_names, streaming::stream_reason::repair, std::move(task_metas));
auto task = co_await _repair_module->make_and_start_task<repair::tablet_repair_task_impl>({}, rid, keyspace_name, table_names, streaming::stream_reason::repair, std::move(task_metas), ranges_parallelism);
}

future<> repair::tablet_repair_task_impl::run() {
Expand Down Expand Up @@ -2303,7 +2304,7 @@ future<> repair::tablet_repair_task_impl::run() {
});


rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables] (repair_service& rs) -> future<> {
rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, ranges_parallelism = _ranges_parallelism] (repair_service& rs) -> future<> {
for (auto& m : metas) {
if (m.master_shard_id != this_shard_id()) {
continue;
Expand Down Expand Up @@ -2336,7 +2337,6 @@ future<> repair::tablet_repair_task_impl::run() {
participants.push_front(my_address);
bool hints_batchlog_flushed = co_await flush_hints(rs, id, rs._db.local(), m.keyspace_name, tables, ignore_nodes, participants);
bool small_table_optimization = false;
auto ranges_parallelism = std::nullopt;

auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(rs._repair_module, tasks::task_id::create_random_id(),
m.keyspace_name, rs, erm, std::move(ranges), std::move(table_ids), id, std::move(data_centers), std::move(hosts),
Expand Down
2 changes: 1 addition & 1 deletion repair/row_level.hh
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private:
shared_ptr<node_ops_info> ops_info);

public:
future<> repair_tablets(repair_uniq_id id, sstring keyspace_name, std::vector<sstring> table_names, host2ip_t host2ip, bool primary_replica_only = true, dht::token_range_vector ranges_specified = {}, std::vector<sstring> dcs = {}, std::unordered_set<gms::inet_address> hosts = {}, std::unordered_set<gms::inet_address> ignore_nodes = {});
future<> repair_tablets(repair_uniq_id id, sstring keyspace_name, std::vector<sstring> table_names, host2ip_t host2ip, bool primary_replica_only = true, dht::token_range_vector ranges_specified = {}, std::vector<sstring> dcs = {}, std::unordered_set<gms::inet_address> hosts = {}, std::unordered_set<gms::inet_address> ignore_nodes = {}, std::optional<int> ranges_parallelism = std::nullopt);

private:

Expand Down
4 changes: 3 additions & 1 deletion repair/task_manager_module.hh
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ private:
std::vector<sstring> _tables;
std::vector<tablet_repair_task_meta> _metas;
optimized_optional<abort_source::subscription> _abort_subscription;
std::optional<int> _ranges_parallelism;
public:
tablet_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, sstring keyspace, std::vector<sstring> tables, streaming::stream_reason reason, std::vector<tablet_repair_task_meta> metas)
tablet_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, sstring keyspace, std::vector<sstring> tables, streaming::stream_reason reason, std::vector<tablet_repair_task_meta> metas, std::optional<int> ranges_parallelism)
: repair_task_impl(module, id.uuid(), id.id, "keyspace", keyspace, "", "", tasks::task_id::create_null_id(), reason)
, _keyspace(std::move(keyspace))
, _tables(std::move(tables))
, _metas(std::move(metas))
, _ranges_parallelism(ranges_parallelism)
{
}

Expand Down

0 comments on commit 6e63a9b

Please sign in to comment.