diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index bc2522186768..3cbbad7ad77f 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1590,6 +1590,10 @@ bool needs_cleanup(const sstables::shared_sstable& sst, bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) { auto& cs = get_compaction_state(&t); + if (sst->is_shared()) { + throw std::runtime_error(format("Shared SSTable {} cannot be marked as requiring cleanup, as it can only be processed by resharding", + sst->get_filename())); + } if (needs_cleanup(sst, sorted_owned_ranges)) { cs.sstables_requiring_cleanup.insert(sst); return true; diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index e969ca1fff3a..2055a3b3a77f 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -307,6 +307,8 @@ public: private: future<> try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t); + // Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set. + bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges); public: // Submit a table to be upgraded and wait for its termination. future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version); @@ -407,9 +409,6 @@ public: return _tombstone_gc_state; }; - // Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set. - bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges); - // Uncoditionally erase sst from `sstables_requiring_cleanup` // Returns true iff sst was found and erased. bool erase_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst); diff --git a/replica/database.hh b/replica/database.hh index cda4f96119b6..c24dded72b05 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1136,9 +1136,6 @@ public: // Safely iterate through table states, while performing async operations on them. future<> parallel_foreach_table_state(std::function(compaction::table_state&)> action); - // Add sst to or remove it from the sstables_requiring_cleanup set. - bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges); - // Uncoditionally erase sst from `sstables_requiring_cleanup` // Returns true iff sst was found and erased. bool erase_sstable_cleanup_state(const sstables::shared_sstable& sst); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 1860b79f1b4f..40d333fe1427 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -155,9 +155,8 @@ collect_all_shared_sstables(sharded& dir, sharded future { - if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) { + if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) { need_cleanup.push_back(co_await sst->get_open_info()); co_return false; } @@ -242,9 +241,6 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: buckets.emplace_back(); co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> { auto sst = co_await dir.load_foreign_sstable(info); - if (owned_ranges_ptr) { - table.update_sstable_cleanup_state(sst, *owned_ranges_ptr); - } // Last bucket gets leftover SSTables if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) { buckets.emplace_back(); diff --git a/replica/table.cc b/replica/table.cc index 158aaf349ee9..959d4ca86ebe 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2861,12 +2861,6 @@ table::as_data_dictionary() const { return _impl.wrap(*this); } -bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) { - // FIXME: it's possible that the sstable belongs to multiple compaction_groups - auto& cg = compaction_group_for_sstable(sst); - return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, sorted_owned_ranges); -} - bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) { // FIXME: it's possible that the sstable belongs to multiple compaction_groups auto& cg = compaction_group_for_sstable(sst);