Skip to content

Commit

Permalink
compaction: add shard_reshard_sstables_compaction_task_impl
Browse files Browse the repository at this point in the history
Add task manager's task covering resharding compaction on one shard.
  • Loading branch information
Deexie committed Jun 28, 2023
1 parent db6e4a3 commit 87c8d63
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
29 changes: 20 additions & 9 deletions compaction/task_manager_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,19 +532,30 @@ future<> table_resharding_compaction_task_impl::run() {
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table);

co_await _db.invoke_on_all(coroutine::lambda([&] (replica::database& db) -> future<> {
auto& table = db.find_column_family(_status.keyspace, _status.table);
auto info_vec = std::move(destinations[this_shard_id()].info_vec);
// make shard-local copy of owned_ranges
compaction::owned_ranges_ptr local_owned_ranges_ptr;
if (_owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
}
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(local_owned_ranges_ptr));
co_await _dir.local().move_foreign_sstables(_dir);
tasks::task_info parent_info{_status.id, _status.shard};
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, _owned_ranges_ptr, destinations);
co_await task->done();
}));

auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table, duration.count(), utils::pretty_printed_throughput(total_size, duration));
}

tasks::is_internal shard_resharding_compaction_task_impl::is_internal() const noexcept {
return tasks::is_internal::yes;
}

future<> shard_resharding_compaction_task_impl::run() {
auto& table = _db.find_column_family(_status.keyspace, _status.table);
auto info_vec = std::move(_destinations[this_shard_id()].info_vec);
// make shard-local copy of owned_ranges
compaction::owned_ranges_ptr local_owned_ranges_ptr;
if (_owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
}
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(local_owned_ranges_ptr));
co_await _dir.local().move_foreign_sstables(_dir);
}

}
34 changes: 34 additions & 0 deletions compaction/task_manager_module.hh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ namespace sstables {
class sstable_directory;
}

namespace replica {
class reshard_shard_descriptor;
}

namespace compaction {

class compaction_task_impl : public tasks::task_manager::task::impl {
Expand Down Expand Up @@ -597,6 +601,36 @@ protected:
virtual future<> run() override;
};

class shard_resharding_compaction_task_impl : public resharding_compaction_task_impl {
private:
sharded<sstables::sstable_directory>& _dir;
replica::database& _db;
sstables::compaction_sstable_creator_fn _creator;
compaction::owned_ranges_ptr _owned_ranges_ptr;
std::vector<replica::reshard_shard_descriptor>& _destinations;
public:
shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
tasks::task_id parent_id,
sharded<sstables::sstable_directory>& dir,
replica::database& db,
sstables::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr,
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), std::move(table), "", parent_id)
, _dir(dir)
, _db(db)
, _creator(std::move(creator))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
, _destinations(destinations)
{}

virtual tasks::is_internal is_internal() const noexcept override;
protected:
virtual future<> run() override;
};

class task_manager_module : public tasks::task_manager::module {
public:
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}
Expand Down

0 comments on commit 87c8d63

Please sign in to comment.