Skip to content

Commit

Permalink
compaction_manager: Fix race when selecting sstables for rewrite oper…
Browse files Browse the repository at this point in the history
…ations

Rewrite operations are scrub, cleanup and upgrade.

Race can happen because 'selection of sstables' and 'mark sstables as
compacting' are decoupled. So any deferring point in between can lead
to a parallel compaction picking the same files. After commit 2cf0c4b,
files are marked as compacting before rewrite starts, but it didn't
take into account the commit c84217a which moved retrieval of
candidates to a deferring thread, before rewrite_sstables() is even
called.

Scrub isn't affected by this because it uses a coarse grained approach
where whole operation is run with compaction disabled, which isn't good
because regular compaction cannot run until its completion.

From now on, selection of files and marking them as compacting will
be serialized by running them with compaction disabled.

Now cleanup will also retrieve sstables with compaction disabled,
meaning it will no longer leave uncleaned files behind, which is
important to avoid data resurrection if node regains ownership of
data in uncleaned files.

Fixes #8168.
Refs #8155.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20211129133107.53011-1-raphaelsc@scylladb.com>
  • Loading branch information
raphaelsc authored and avikivity committed Nov 29, 2021
1 parent bcadd82 commit 80a1ebf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
76 changes: 38 additions & 38 deletions compaction/compaction_manager.cc
Expand Up @@ -795,15 +795,24 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
_tasks.push_back(task);
cmlog.debug("{} task {} cf={}: started", options.type(), fmt::ptr(task.get()), fmt::ptr(task->compacting_cf));

auto sstables = std::make_unique<std::vector<sstables::shared_sstable>>(get_func(*cf));
std::unique_ptr<std::vector<sstables::shared_sstable>> sstables;
lw_shared_ptr<compacting_sstable_registration> compacting;

// since we might potentially have ongoing compactions, and we
// must ensure that all sstables created before we run are included
// in the re-write, we need to barrier out any previously running
// compaction.
co_await run_with_compaction_disabled(cf, [&] () mutable -> future<> {
sstables = std::make_unique<std::vector<sstables::shared_sstable>>(co_await get_func());
compacting = make_lw_shared<compacting_sstable_registration>(this, *sstables);
});
// sort sstables by size in descending order, such that the smallest files will be rewritten first
// (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher
// chance to succeed when the biggest files are reached.
std::sort(sstables->begin(), sstables->end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) {
return a->data_size() > b->data_size();
});

auto compacting = make_lw_shared<compacting_sstable_registration>(this, *sstables);
auto sstables_ptr = sstables.get();
_stats.pending_tasks += sstables->size();

Expand Down Expand Up @@ -863,7 +872,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
cmlog.debug("{} task {} cf={}: done", type, fmt::ptr(task.get()), fmt::ptr(task->compacting_cf));
});

return task->compaction_done.get_future().then([task] {});
co_return co_await task->compaction_done.get_future();
}

future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family* cf) {
Expand Down Expand Up @@ -945,31 +954,30 @@ future<> compaction_manager::perform_cleanup(database& db, column_family* cf) {
return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name())));
}
return seastar::async([this, cf, &db] {
// FIXME: indentation
auto sorted_owned_ranges = db.get_keyspace_local_ranges(cf->schema()->ks_name());
auto get_sstables = [this, &db, cf, sorted_owned_ranges] () -> future<std::vector<sstables::shared_sstable>> {
return seastar::async([this, &db, cf, sorted_owned_ranges = std::move(sorted_owned_ranges)] {
auto schema = cf->schema();
auto sorted_owned_ranges = db.get_keyspace_local_ranges(schema->ks_name());
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = get_candidates(*cf);
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema);
});
return std::tuple<dht::token_range_vector, std::vector<sstables::shared_sstable>>(sorted_owned_ranges, sstables);
}).then_unpack([this, cf, &db] (dht::token_range_vector owned_ranges, std::vector<sstables::shared_sstable> sstables) {
return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(owned_ranges)),
[sstables = std::move(sstables)] (const table&) { return sstables; });
return sstables;
});
};

return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables));
}

// Submit a column family to be upgraded and wait for its termination.
future<> compaction_manager::perform_sstable_upgrade(database& db, column_family* cf, bool exclude_current_version) {
using shared_sstables = std::vector<sstables::shared_sstable>;
return do_with(shared_sstables{}, [this, &db, cf, exclude_current_version](shared_sstables& tables) {
// since we might potentially have ongoing compactions, and we
// must ensure that all sstables created before we run are included
// in the re-write, we need to barrier out any previously running
// compaction.
return run_with_compaction_disabled(cf, [this, cf, &tables, exclude_current_version] {
auto get_sstables = [this, &db, cf, exclude_current_version] {
// FIXME: indentation
std::vector<sstables::shared_sstable> tables;

auto last_version = cf->get_sstables_manager().get_highest_supported_format();

for (auto& sst : get_candidates(*cf)) {
Expand All @@ -980,36 +988,28 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family
tables.emplace_back(sst);
}
}
return make_ready_future<>();
}).then([&db, cf] {
return db.get_keyspace_local_ranges(cf->schema()->ks_name());
}).then([this, &db, cf, &tables] (dht::token_range_vector owned_ranges) {
// doing a "cleanup" is about as compacting as we need
// to be, provided we get to decide the tables to process,
// and ignoring any existing operations.
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(std::move(owned_ranges)), [&](auto&) mutable {
return std::exchange(tables, {});
});
});
});

return make_ready_future<std::vector<sstables::shared_sstable>>(tables);
};

// doing a "cleanup" is about as compacting as we need
// to be, provided we get to decide the tables to process,
// and ignoring any existing operations.
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(cf->schema()->ks_name())), std::move(get_sstables));
}

// Submit a column family to be scrubbed and wait for its termination.
future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::compaction_type_options::scrub::mode scrub_mode) {
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(cf);
}
// since we might potentially have ongoing compactions, and we
// must ensure that all sstables created before we run are scrubbed,
// we need to barrier out any previously running compaction.
return run_with_compaction_disabled(cf, [this, cf, scrub_mode] {
return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) {
return get_candidates(cf);
// FIXME: indentation
return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this, cf] {
return make_ready_future<std::vector<sstables::shared_sstable>>(get_candidates(*cf));
}, can_purge_tombstones::no);
});
}

void compaction_manager::add(column_family* cf) {
Expand Down
2 changes: 1 addition & 1 deletion compaction/compaction_manager.hh
Expand Up @@ -205,7 +205,7 @@ private:
maintenance_scheduling_group _maintenance_sg;
size_t _available_memory;

using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
class can_purge_tombstones_tag;
using can_purge_tombstones = bool_class<can_purge_tombstones_tag>;

Expand Down

0 comments on commit 80a1ebf

Please sign in to comment.