From 80a1ebf0f37642d3a64c8098308339d16f43730a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 29 Nov 2021 10:31:07 -0300 Subject: [PATCH] compaction_manager: Fix race when selecting sstables for rewrite operations Rewrite operations are scrub, cleanup and upgrade. Race can happen because 'selection of sstables' and 'mark sstables as compacting' are decoupled. So any deferring point in between can lead to a parallel compaction picking the same files. After commit 2cf0c4bbf, files are marked as compacting before rewrite starts, but it didn't take into account the commit c84217ad which moved retrieval of candidates to a deferring thread, before rewrite_sstables() is even called. Scrub isn't affected by this because it uses a coarse grained approach where whole operation is run with compaction disabled, which isn't good because regular compaction cannot run until its completion. From now on, selection of files and marking them as compacting will be serialized by running them with compaction disabled. Now cleanup will also retrieve sstables with compaction disabled, meaning it will no longer leave uncleaned files behind, which is important to avoid data resurrection if node regains ownership of data in uncleaned files. Fixes #8168. Refs #8155. Signed-off-by: Raphael S. Carvalho Message-Id: <20211129133107.53011-1-raphaelsc@scylladb.com> --- compaction/compaction_manager.cc | 76 ++++++++++++++++---------------- compaction/compaction_manager.hh | 2 +- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index a57bd77e0d94..9c038ed47778 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -795,7 +795,17 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa _tasks.push_back(task); cmlog.debug("{} task {} cf={}: started", options.type(), fmt::ptr(task.get()), fmt::ptr(task->compacting_cf)); - auto sstables = std::make_unique>(get_func(*cf)); + std::unique_ptr> sstables; + lw_shared_ptr compacting; + + // since we might potentially have ongoing compactions, and we + // must ensure that all sstables created before we run are included + // in the re-write, we need to barrier out any previously running + // compaction. + co_await run_with_compaction_disabled(cf, [&] () mutable -> future<> { + sstables = std::make_unique>(co_await get_func()); + compacting = make_lw_shared(this, *sstables); + }); // sort sstables by size in descending order, such that the smallest files will be rewritten first // (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher // chance to succeed when the biggest files are reached. @@ -803,7 +813,6 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa return a->data_size() > b->data_size(); }); - auto compacting = make_lw_shared(this, *sstables); auto sstables_ptr = sstables.get(); _stats.pending_tasks += sstables->size(); @@ -863,7 +872,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa cmlog.debug("{} task {} cf={}: done", type, fmt::ptr(task.get()), fmt::ptr(task->compacting_cf)); }); - return task->compaction_done.get_future().then([task] {}); + co_return co_await task->compaction_done.get_future(); } future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family* cf) { @@ -945,31 +954,30 @@ future<> compaction_manager::perform_cleanup(database& db, column_family* cf) { return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name()))); } - return seastar::async([this, cf, &db] { + // FIXME: indentation + auto sorted_owned_ranges = db.get_keyspace_local_ranges(cf->schema()->ks_name()); + auto get_sstables = [this, &db, cf, sorted_owned_ranges] () -> future> { + return seastar::async([this, &db, cf, sorted_owned_ranges = std::move(sorted_owned_ranges)] { auto schema = cf->schema(); - auto sorted_owned_ranges = db.get_keyspace_local_ranges(schema->ks_name()); auto sstables = std::vector{}; const auto candidates = get_candidates(*cf); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema); }); - return std::tuple>(sorted_owned_ranges, sstables); - }).then_unpack([this, cf, &db] (dht::token_range_vector owned_ranges, std::vector sstables) { - return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(owned_ranges)), - [sstables = std::move(sstables)] (const table&) { return sstables; }); + return sstables; }); + }; + + return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables)); } // Submit a column family to be upgraded and wait for its termination. future<> compaction_manager::perform_sstable_upgrade(database& db, column_family* cf, bool exclude_current_version) { - using shared_sstables = std::vector; - return do_with(shared_sstables{}, [this, &db, cf, exclude_current_version](shared_sstables& tables) { - // since we might potentially have ongoing compactions, and we - // must ensure that all sstables created before we run are included - // in the re-write, we need to barrier out any previously running - // compaction. - return run_with_compaction_disabled(cf, [this, cf, &tables, exclude_current_version] { + auto get_sstables = [this, &db, cf, exclude_current_version] { + // FIXME: indentation + std::vector tables; + auto last_version = cf->get_sstables_manager().get_highest_supported_format(); for (auto& sst : get_candidates(*cf)) { @@ -980,21 +988,17 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family tables.emplace_back(sst); } } - return make_ready_future<>(); - }).then([&db, cf] { - return db.get_keyspace_local_ranges(cf->schema()->ks_name()); - }).then([this, &db, cf, &tables] (dht::token_range_vector owned_ranges) { - // doing a "cleanup" is about as compacting as we need - // to be, provided we get to decide the tables to process, - // and ignoring any existing operations. - // Note that we potentially could be doing multiple - // upgrades here in parallel, but that is really the users - // problem. - return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(std::move(owned_ranges)), [&](auto&) mutable { - return std::exchange(tables, {}); - }); - }); - }); + + return make_ready_future>(tables); + }; + + // doing a "cleanup" is about as compacting as we need + // to be, provided we get to decide the tables to process, + // and ignoring any existing operations. + // Note that we potentially could be doing multiple + // upgrades here in parallel, but that is really the users + // problem. + return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(cf->schema()->ks_name())), std::move(get_sstables)); } // Submit a column family to be scrubbed and wait for its termination. @@ -1002,14 +1006,10 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables:: if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(cf); } - // since we might potentially have ongoing compactions, and we - // must ensure that all sstables created before we run are scrubbed, - // we need to barrier out any previously running compaction. - return run_with_compaction_disabled(cf, [this, cf, scrub_mode] { - return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) { - return get_candidates(cf); + // FIXME: indentation + return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this, cf] { + return make_ready_future>(get_candidates(*cf)); }, can_purge_tombstones::no); - }); } void compaction_manager::add(column_family* cf) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index d90d06de962b..ec020a001b7b 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -205,7 +205,7 @@ private: maintenance_scheduling_group _maintenance_sg; size_t _available_memory; - using get_candidates_func = std::function(const column_family&)>; + using get_candidates_func = std::function>()>; class can_purge_tombstones_tag; using can_purge_tombstones = bool_class;