diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index a57bd77e0d94..9c038ed47778 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -795,7 +795,17 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa _tasks.push_back(task); cmlog.debug("{} task {} cf={}: started", options.type(), fmt::ptr(task.get()), fmt::ptr(task->compacting_cf)); - auto sstables = std::make_unique>(get_func(*cf)); + std::unique_ptr> sstables; + lw_shared_ptr compacting; + + // since we might potentially have ongoing compactions, and we + // must ensure that all sstables created before we run are included + // in the re-write, we need to barrier out any previously running + // compaction. + co_await run_with_compaction_disabled(cf, [&] () mutable -> future<> { + sstables = std::make_unique>(co_await get_func()); + compacting = make_lw_shared(this, *sstables); + }); // sort sstables by size in descending order, such that the smallest files will be rewritten first // (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher // chance to succeed when the biggest files are reached. @@ -803,7 +813,6 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa return a->data_size() > b->data_size(); }); - auto compacting = make_lw_shared(this, *sstables); auto sstables_ptr = sstables.get(); _stats.pending_tasks += sstables->size(); @@ -863,7 +872,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa cmlog.debug("{} task {} cf={}: done", type, fmt::ptr(task.get()), fmt::ptr(task->compacting_cf)); }); - return task->compaction_done.get_future().then([task] {}); + co_return co_await task->compaction_done.get_future(); } future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family* cf) { @@ -945,31 +954,30 @@ future<> compaction_manager::perform_cleanup(database& db, column_family* cf) { return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name()))); } - return seastar::async([this, cf, &db] { + // FIXME: indentation + auto sorted_owned_ranges = db.get_keyspace_local_ranges(cf->schema()->ks_name()); + auto get_sstables = [this, &db, cf, sorted_owned_ranges] () -> future> { + return seastar::async([this, &db, cf, sorted_owned_ranges = std::move(sorted_owned_ranges)] { auto schema = cf->schema(); - auto sorted_owned_ranges = db.get_keyspace_local_ranges(schema->ks_name()); auto sstables = std::vector{}; const auto candidates = get_candidates(*cf); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema); }); - return std::tuple>(sorted_owned_ranges, sstables); - }).then_unpack([this, cf, &db] (dht::token_range_vector owned_ranges, std::vector sstables) { - return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(owned_ranges)), - [sstables = std::move(sstables)] (const table&) { return sstables; }); + return sstables; }); + }; + + return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables)); } // Submit a column family to be upgraded and wait for its termination. future<> compaction_manager::perform_sstable_upgrade(database& db, column_family* cf, bool exclude_current_version) { - using shared_sstables = std::vector; - return do_with(shared_sstables{}, [this, &db, cf, exclude_current_version](shared_sstables& tables) { - // since we might potentially have ongoing compactions, and we - // must ensure that all sstables created before we run are included - // in the re-write, we need to barrier out any previously running - // compaction. - return run_with_compaction_disabled(cf, [this, cf, &tables, exclude_current_version] { + auto get_sstables = [this, &db, cf, exclude_current_version] { + // FIXME: indentation + std::vector tables; + auto last_version = cf->get_sstables_manager().get_highest_supported_format(); for (auto& sst : get_candidates(*cf)) { @@ -980,21 +988,17 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family tables.emplace_back(sst); } } - return make_ready_future<>(); - }).then([&db, cf] { - return db.get_keyspace_local_ranges(cf->schema()->ks_name()); - }).then([this, &db, cf, &tables] (dht::token_range_vector owned_ranges) { - // doing a "cleanup" is about as compacting as we need - // to be, provided we get to decide the tables to process, - // and ignoring any existing operations. - // Note that we potentially could be doing multiple - // upgrades here in parallel, but that is really the users - // problem. - return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(std::move(owned_ranges)), [&](auto&) mutable { - return std::exchange(tables, {}); - }); - }); - }); + + return make_ready_future>(tables); + }; + + // doing a "cleanup" is about as compacting as we need + // to be, provided we get to decide the tables to process, + // and ignoring any existing operations. + // Note that we potentially could be doing multiple + // upgrades here in parallel, but that is really the users + // problem. + return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(cf->schema()->ks_name())), std::move(get_sstables)); } // Submit a column family to be scrubbed and wait for its termination. @@ -1002,14 +1006,10 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables:: if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(cf); } - // since we might potentially have ongoing compactions, and we - // must ensure that all sstables created before we run are scrubbed, - // we need to barrier out any previously running compaction. - return run_with_compaction_disabled(cf, [this, cf, scrub_mode] { - return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) { - return get_candidates(cf); + // FIXME: indentation + return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this, cf] { + return make_ready_future>(get_candidates(*cf)); }, can_purge_tombstones::no); - }); } void compaction_manager::add(column_family* cf) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index d90d06de962b..ec020a001b7b 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -205,7 +205,7 @@ private: maintenance_scheduling_group _maintenance_sg; size_t _available_memory; - using get_candidates_func = std::function(const column_family&)>; + using get_candidates_func = std::function>()>; class can_purge_tombstones_tag; using can_purge_tombstones = bool_class;