Skip to content

Commit

Permalink
compaction: Fix sstable cleanup after resharding on refresh
Browse files Browse the repository at this point in the history
Problem can be reproduced easily:
1) wrote some sstables with smp 1
2) shut down scylla
3) moved sstables to upload
4) restarted scylla with smp 2
5) ran refresh (resharding happens, adds sstable to cleanup
set and never removes it)
6) cleanup (tries to cleanup resharded sstables which were
leaked in the cleanup set)

Bumps into assert "Assertion `!sst->is_shared()' failed", as
cleanup picks a shared sstable that was leaked and already
processed by resharding.

Fix is about not inserting shared sstables into cleanup set,
as shared sstables are restricted to resharding and cannot
be processed later by cleanup (nor it should because
resharding itself cleaned up its input files).

Dtest: scylladb/scylla-dtest#3206

Fixes #14001.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #14147
  • Loading branch information
raphaelsc authored and xemul committed Jun 6, 2023
1 parent 1779575 commit 156d771
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 17 deletions.
4 changes: 4 additions & 0 deletions compaction/compaction_manager.cc
Expand Up @@ -1590,6 +1590,10 @@ bool needs_cleanup(const sstables::shared_sstable& sst,

bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) {
auto& cs = get_compaction_state(&t);
if (sst->is_shared()) {
throw std::runtime_error(format("Shared SSTable {} cannot be marked as requiring cleanup, as it can only be processed by resharding",
sst->get_filename()));
}
if (needs_cleanup(sst, sorted_owned_ranges)) {
cs.sstables_requiring_cleanup.insert(sst);
return true;
Expand Down
5 changes: 2 additions & 3 deletions compaction/compaction_manager.hh
Expand Up @@ -307,6 +307,8 @@ public:
private:
future<> try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t);

// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
public:
// Submit a table to be upgraded and wait for its termination.
future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version);
Expand Down Expand Up @@ -407,9 +409,6 @@ public:
return _tombstone_gc_state;
};

// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);

// Uncoditionally erase sst from `sstables_requiring_cleanup`
// Returns true iff sst was found and erased.
bool erase_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst);
Expand Down
3 changes: 0 additions & 3 deletions replica/database.hh
Expand Up @@ -1136,9 +1136,6 @@ public:
// Safely iterate through table states, while performing async operations on them.
future<> parallel_foreach_table_state(std::function<future<>(compaction::table_state&)> action);

// Add sst to or remove it from the sstables_requiring_cleanup set.
bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);

// Uncoditionally erase sst from `sstables_requiring_cleanup`
// Returns true iff sst was found and erased.
bool erase_sstable_cleanup_state(const sstables::shared_sstable& sst);
Expand Down
6 changes: 1 addition & 5 deletions replica/distributed_loader.cc
Expand Up @@ -155,9 +155,8 @@ collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<r
auto shared_sstables = d.retrieve_shared_sstables();
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
if (sorted_owned_ranges_ptr) {
auto& table = db.local().find_column_family(ks_name, table_name);
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) {
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
need_cleanup.push_back(co_await sst->get_open_info());
co_return false;
}
Expand Down Expand Up @@ -242,9 +241,6 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
buckets.emplace_back();
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
auto sst = co_await dir.load_foreign_sstable(info);
if (owned_ranges_ptr) {
table.update_sstable_cleanup_state(sst, *owned_ranges_ptr);
}
// Last bucket gets leftover SSTables
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
buckets.emplace_back();
Expand Down
6 changes: 0 additions & 6 deletions replica/table.cc
Expand Up @@ -2861,12 +2861,6 @@ table::as_data_dictionary() const {
return _impl.wrap(*this);
}

bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) {
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
auto& cg = compaction_group_for_sstable(sst);
return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, sorted_owned_ranges);
}

bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) {
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
auto& cg = compaction_group_for_sstable(sst);
Expand Down

0 comments on commit 156d771

Please sign in to comment.