Skip to content

Commit

Permalink
compaction: Fix incremental compaction for sstable cleanup
Browse files Browse the repository at this point in the history
After c7826aa, sstable runs are cleaned up together.

The procedure which executes cleanup was holding reference to all
input sstables, such that it could later retry the same cleanup
job on failure.

Turns out it was not taking into account that incremental compaction
will exhaust the input set incrementally.

Therefore cleanup is affected by the 100% space overhead.

To fix it, cleanup will now have the input set updated, by removing
the sstables that were already cleaned up. On failure, cleanup
will retry the same job with the remaining sstables that weren't
exhausted by incremental compaction.

New unit test reproduces the failure, and passes with the fix.

Fixes #14035.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #14038

(cherry picked from commit 23443e0)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #14195
  • Loading branch information
raphaelsc authored and denesb committed Jun 13, 2023
1 parent c127899 commit 97985a6
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 7 deletions.
17 changes: 10 additions & 7 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1431,23 +1431,26 @@ class compaction_manager::cleanup_sstables_compaction_task : public compaction_m
co_return std::nullopt;
}
private:
// Releases reference to cleaned files such that respective used disk space can be freed.
void release_exhausted(std::vector<sstables::shared_sstable> exhausted_sstables) {
_compacting.release_compacting(exhausted_sstables);
}

future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
co_await coroutine::switch_to(_cm.compaction_sg().cpu);

// Releases reference to cleaned files such that respective used disk space can be freed.
auto release_exhausted = [this, &descriptor] (std::vector<sstables::shared_sstable> exhausted_sstables) mutable {
auto exhausted = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(exhausted_sstables);
std::erase_if(descriptor.sstables, [&] (const sstables::shared_sstable& sst) {
return exhausted.contains(sst);
});
_compacting.release_compacting(exhausted_sstables);
};

for (;;) {
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm.available_memory()));
_cm.register_backlog_tracker(user_initiated);

std::exception_ptr ex;
try {
setup_new_compaction(descriptor.run_identifier);
co_await compact_sstables_and_update_history(descriptor, _compaction_data,
std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1));
co_await compact_sstables_and_update_history(descriptor, _compaction_data, release_exhausted);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return; // done with current job
Expand Down
99 changes: 99 additions & 0 deletions test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5100,3 +5100,102 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
run_cleanup_strategy_test(sstables::compaction_strategy_type::leveled, 64, empty_opts, 0ms, 1);
});
}

SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
builder.set_gc_grace_seconds(10000);
builder.set_compaction_strategy(sstables::compaction_strategy_type::leveled);
std::map<sstring, sstring> opts = {
{ "sstable_size_in_mb", "0" }, // makes sure that every mutation produces one fragment, to trigger incremental compaction
};
builder.set_compaction_strategy_options(std::move(opts));
auto s = builder.build();
auto tmp = tmpdir();
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)] () {
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big);
};

auto make_insert = [&] (partition_key key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), api::new_timestamp());
return m;
};

std::vector<utils::observer<sstable&>> observers;
std::vector<shared_sstable> ssts;
size_t sstables_closed = 0;
size_t sstables_closed_during_cleanup = 0;
static constexpr size_t sstables_nr = 10;

dht::token_range_vector owned_token_ranges;

std::set<mutation, mutation_decorated_key_less_comparator> merged;
for (auto i = 0; i < sstables_nr * 2; i++) {
merged.insert(make_insert(partition_key::from_exploded(*s, {to_bytes(to_sstring(i))})));
}

std::unordered_set<sstables::generation_type> gens; // input sstable generations
utils::UUID run_identifier = utils::make_random_uuid();
auto merged_it = merged.begin();
for (auto i = 0; i < sstables_nr; i++) {
auto mut1 = std::move(*merged_it);
merged_it++;
auto mut2 = std::move(*merged_it);
merged_it++;
auto sst = make_sstable_containing(sst_gen, {
std::move(mut1),
std::move(mut2)
});
sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run.
sst->set_sstable_level(1);

// every sstable will be eligible for cleanup, by having both an owned and unowned token.
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));

gens.insert(sst->generation());
ssts.push_back(std::move(sst));
}

size_t last_input_sstable_count = sstables_nr;
{
column_family_for_tests t(env.manager(), s, tmp.path().string());
auto stop = deferred_stop(t);
t->disable_auto_compaction().get();
const dht::token_range_vector empty_owned_ranges;
for (auto&& sst : ssts) {
testlog.info("run id {}", sst->run_identifier());
column_family_test(t).add_sstable(sst);
column_family_test::update_sstables_known_generation(*t, sst->generation().value());
observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable {
auto sstables = t->get_sstables();
auto input_sstable_count = std::count_if(sstables->begin(), sstables->end(), [&] (const shared_sstable& sst) {
return gens.count(sst->generation());
});

testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), input_sstable_count);
sstables_closed++;
if (input_sstable_count < last_input_sstable_count) {
sstables_closed_during_cleanup++;
last_input_sstable_count = input_sstable_count;
}
}));
}
ssts = {}; // releases references
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get();
testlog.info("Cleanup has finished");
}

while (sstables_closed != sstables_nr) {
yield().get();
}

testlog.info("Closed sstables {}, Closed during cleanup {}", sstables_closed, sstables_closed_during_cleanup);

BOOST_REQUIRE(sstables_closed == sstables_nr);
BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2);
});
}

0 comments on commit 97985a6

Please sign in to comment.