Skip to content

Commit

Permalink
Revert "Merge 'Compaction resharding tasks' from Aleksandra Martyniuk"
Browse files Browse the repository at this point in the history
This reverts commit 2a58b4a, reversing
changes made to dd63169.

After patch 87c8d63,
table_resharding_compaction_task_impl::run() performs the forbidden
action of copying a lw_shared_ptr (_owned_ranges_ptr) on a remote shard,
which is a data race that can cause a use-after-free, typically manifesting
as allocator corruption.

Note: before the bad patch, this was avoided by copying the _contents_ of the
lw_shared_ptr into a new, local lw_shared_ptr.

Fixes #14475
Fixes #14618

Closes #14641
  • Loading branch information
michoecho authored and avikivity committed Jul 11, 2023
1 parent e1a52af commit b511d57
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 290 deletions.
199 changes: 0 additions & 199 deletions compaction/task_manager_module.cc
Expand Up @@ -6,173 +6,13 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include <boost/range/algorithm/min_element.hpp>

#include "compaction/task_manager_module.hh"
#include "compaction/compaction_manager.hh"
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_directory.hh"
#include "utils/pretty_printers.hh"

namespace replica {

// Helper structure for resharding.
//
// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and
// need to be resharded. Each shard will keep one such descriptor, that contains the list of
// SSTables assigned to it, and their total size. The total size is used to make sure we are
// fairly balancing SSTables among shards.
struct reshard_shard_descriptor {
sstables::sstable_directory::sstable_open_info_vector info_vec;
uint64_t uncompressed_data_size = 0;

bool total_size_smaller(const reshard_shard_descriptor& rhs) const {
return uncompressed_data_size < rhs.uncompressed_data_size;
}

uint64_t size() const {
return uncompressed_data_size;
}
};

} // namespace replica

// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all.
// This function assumes that the list of SSTables can be fairly big so it is careful to
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
future<sstables::sstable_directory::sstable_open_info_vector>
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto info_vec = sstables::sstable_directory::sstable_open_info_vector();

// We want to make sure that each distributed object reshards about the same amount of data.
// Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
// all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
// their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
// move in bulk and that's efficient. That shard can then distribute the work among all the
// others who will reshard.
auto coordinator = this_shard_id();
// We will first move all of the foreign open info to temporary storage so that we can sort
// them. We want to distribute bigger sstables first.
const auto* sorted_owned_ranges_ptr = owned_ranges_ptr.get();
co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> {
auto shared_sstables = d.retrieve_shared_sstables();
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
if (sorted_owned_ranges_ptr) {
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
need_cleanup.push_back(co_await sst->get_open_info());
co_return false;
}
co_return true;
});
}
if (shared_sstables.empty() && need_cleanup.empty()) {
co_return;
}
co_await smp::submit_to(coordinator, [&] () -> future<> {
info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size());
for (auto& info : shared_sstables) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
for (auto& info : need_cleanup) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
});
});

co_return info_vec;
}

// Given a vector of shared sstables to be resharded, distribute it among all shards.
// The vector is first sorted to make sure that we are moving the biggest SSTables first.
//
// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do.
future<std::vector<replica::reshard_shard_descriptor>>
distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector source) {
auto destinations = std::vector<replica::reshard_shard_descriptor>(smp::count);

std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
// Sort on descending SSTable sizes.
return a.uncompressed_data_size > b.uncompressed_data_size;
});

for (auto& info : source) {
// Choose the stable shard owner with the smallest amount of accumulated work.
// Note that for sstables that need cleanup via resharding, owners may contain
// a single shard.
auto shard_it = boost::min_element(info.owners, [&] (const shard_id& lhs, const shard_id& rhs) {
return destinations[lhs].total_size_smaller(destinations[rhs]);
});
auto& dest = destinations[*shard_it];
dest.uncompressed_data_size += info.uncompressed_data_size;
dest.info_vec.push_back(std::move(info));
co_await coroutine::maybe_yield();
}

co_return destinations;
}

// reshards a collection of SSTables.
//
// A reference to the compaction manager must be passed so we can register with it. Knowing
// which table is being processed is a requirement of the compaction manager, so this must be
// passed too.
//
// We will reshard max_sstables_per_job at once.
//
// A creator function must be passed that will create an SSTable object in the correct shard,
// and an I/O priority must be specified.
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr)
{
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
// to reshard in this shard.
if (shared_info.empty()) {
co_return;
}

// We want to reshard many SSTables at a time for efficiency. However if we have too many we may
// be risking OOM.
auto max_sstables_per_job = table.schema()->max_compaction_threshold();
auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job;
auto sstables_per_job = shared_info.size() / num_jobs;

std::vector<std::vector<sstables::shared_sstable>> buckets;
buckets.reserve(num_jobs);
buckets.emplace_back();
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
auto sst = co_await dir.load_foreign_sstable(info);
// Last bucket gets leftover SSTables
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
buckets.emplace_back();
}
buckets.back().push_back(std::move(sst));
});
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
auto& t = table.as_table_state();
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> {
auto erm = table.get_effective_replication_map(); // keep alive around compaction.

sstables::compaction_descriptor desc(sstlist);
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = creator;
desc.sharder = &erm->get_sharder(*table.schema());
desc.owned_ranges = owned_ranges_ptr;

auto result = co_await sstables::compact_sstables(std::move(desc), info, t);
// input sstables are moved, to guarantee their resources are released once we're done
// resharding them.
co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result();
});
});
}

namespace compaction {

struct table_tasks_info {
Expand Down Expand Up @@ -519,43 +359,4 @@ future<> shard_reshaping_compaction_task_impl::run() {
_total_shard_size = reshaped_size;
}

future<> table_resharding_compaction_task_impl::run() {
auto all_jobs = co_await collect_all_shared_sstables(_dir, _db, _status.keyspace, _status.table, _owned_ranges_ptr);
auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs));

uint64_t total_size = boost::accumulate(destinations | boost::adaptors::transformed(std::mem_fn(&replica::reshard_shard_descriptor::size)), uint64_t(0));
if (total_size == 0) {
co_return;
}

auto start = std::chrono::steady_clock::now();
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table);

co_await _db.invoke_on_all(coroutine::lambda([&] (replica::database& db) -> future<> {
tasks::task_info parent_info{_status.id, _status.shard};
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, _owned_ranges_ptr, destinations);
co_await task->done();
}));

auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table, duration.count(), utils::pretty_printed_throughput(total_size, duration));
}

tasks::is_internal shard_resharding_compaction_task_impl::is_internal() const noexcept {
return tasks::is_internal::yes;
}

future<> shard_resharding_compaction_task_impl::run() {
auto& table = _db.find_column_family(_status.keyspace, _status.table);
auto info_vec = std::move(_destinations[this_shard_id()].info_vec);
// make shard-local copy of owned_ranges
compaction::owned_ranges_ptr local_owned_ranges_ptr;
if (_owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
}
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(local_owned_ranges_ptr));
co_await _dir.local().move_foreign_sstables(_dir);
}

}
81 changes: 0 additions & 81 deletions compaction/task_manager_module.hh
Expand Up @@ -16,11 +16,6 @@
namespace sstables {
class sstable_directory;
}

namespace replica {
class reshard_shard_descriptor;
}

namespace compaction {

class compaction_task_impl : public tasks::task_manager::task::impl {
Expand Down Expand Up @@ -555,82 +550,6 @@ protected:
virtual future<> run() override;
};


class resharding_compaction_task_impl : public compaction_task_impl {
public:
resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
unsigned sequence_number,
std::string keyspace,
std::string table,
std::string entity,
tasks::task_id parent_id) noexcept
: compaction_task_impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(entity), parent_id)
{
// FIXME: add progress units
}

virtual std::string type() const override {
return "resharding compaction";
}
protected:
virtual future<> run() override = 0;
};

class table_resharding_compaction_task_impl : public resharding_compaction_task_impl {
private:
sharded<sstables::sstable_directory>& _dir;
sharded<replica::database>& _db;
sstables::compaction_sstable_creator_fn _creator;
compaction::owned_ranges_ptr _owned_ranges_ptr;
public:
table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
sharded<sstables::sstable_directory>& dir,
sharded<replica::database>& db,
sstables::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id())
, _dir(dir)
, _db(db)
, _creator(std::move(creator))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
{}
protected:
virtual future<> run() override;
};

class shard_resharding_compaction_task_impl : public resharding_compaction_task_impl {
private:
sharded<sstables::sstable_directory>& _dir;
replica::database& _db;
sstables::compaction_sstable_creator_fn _creator;
compaction::owned_ranges_ptr _owned_ranges_ptr;
std::vector<replica::reshard_shard_descriptor>& _destinations;
public:
shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
tasks::task_id parent_id,
sharded<sstables::sstable_directory>& dir,
replica::database& db,
sstables::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr,
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), std::move(table), "", parent_id)
, _dir(dir)
, _db(db)
, _creator(std::move(creator))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
, _destinations(destinations)
{}

virtual tasks::is_internal is_internal() const noexcept override;
protected:
virtual future<> run() override;
};

class task_manager_module : public tasks::task_manager::module {
public:
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}
Expand Down

0 comments on commit b511d57

Please sign in to comment.