Skip to content

Commit

Permalink
compaction: Fix incremental compaction for sstable cleanup
Browse files Browse the repository at this point in the history
After c7826aa, sstable runs are cleaned up together.

The procedure which executes cleanup was holding reference to all
input sstables, such that it could later retry the same cleanup
job on failure.

Turns out it was not taking into account that incremental compaction
will exhaust the input set incrementally.

Therefore cleanup is affected by the 100% space overhead.

To fix it, cleanup will now have the input set updated, by removing
the sstables that were already cleaned up. On failure, cleanup
will retry the same job with the remaining sstables that weren't
exhausted by incremental compaction.

New unit test reproduces the failure, and passes with the fix.

Fixes scylladb#14035.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes scylladb#14038
  • Loading branch information
raphaelsc authored and denesb committed May 31, 2023
1 parent 66ccc14 commit 23443e0
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 7 deletions.
17 changes: 10 additions & 7 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1524,23 +1524,26 @@ class cleanup_sstables_compaction_task_executor : public compaction_task_executo
co_return std::nullopt;
}
private:
// Releases reference to cleaned files such that respective used disk space can be freed.
void release_exhausted(std::vector<sstables::shared_sstable> exhausted_sstables) {
_compacting.release_compacting(exhausted_sstables);
}

future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
co_await coroutine::switch_to(_cm.compaction_sg().cpu);

// Releases reference to cleaned files such that respective used disk space can be freed.
auto release_exhausted = [this, &descriptor] (std::vector<sstables::shared_sstable> exhausted_sstables) mutable {
auto exhausted = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(exhausted_sstables);
std::erase_if(descriptor.sstables, [&] (const sstables::shared_sstable& sst) {
return exhausted.contains(sst);
});
_compacting.release_compacting(exhausted_sstables);
};

for (;;) {
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm.available_memory()));
_cm.register_backlog_tracker(user_initiated);

std::exception_ptr ex;
try {
setup_new_compaction(descriptor.run_identifier);
co_await compact_sstables_and_update_history(descriptor, _compaction_data,
std::bind(&cleanup_sstables_compaction_task_executor::release_exhausted, this, std::placeholders::_1));
co_await compact_sstables_and_update_history(descriptor, _compaction_data, release_exhausted);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return; // done with current job
Expand Down
96 changes: 96 additions & 0 deletions test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5058,3 +5058,99 @@ SEASTAR_TEST_CASE(compaction_optimization_to_avoid_bloom_filter_checks) {
BOOST_REQUIRE_EQUAL(1, result.stats.bloom_filter_checks);
});
}

SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
builder.set_gc_grace_seconds(10000);
builder.set_compaction_strategy(sstables::compaction_strategy_type::leveled);
std::map<sstring, sstring> opts = {
{ "sstable_size_in_mb", "0" }, // makes sure that every mutation produces one fragment, to trigger incremental compaction
};
builder.set_compaction_strategy_options(std::move(opts));
auto s = builder.build();
auto sst_gen = env.make_sst_factory(s);

auto make_insert = [&] (partition_key key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), api::new_timestamp());
return m;
};

std::vector<utils::observer<sstable&>> observers;
std::vector<shared_sstable> ssts;
size_t sstables_closed = 0;
size_t sstables_closed_during_cleanup = 0;
static constexpr size_t sstables_nr = 10;

dht::token_range_vector owned_token_ranges;

std::set<mutation, mutation_decorated_key_less_comparator> merged;
for (auto i = 0; i < sstables_nr * 2; i++) {
merged.insert(make_insert(partition_key::from_exploded(*s, {to_bytes(to_sstring(i))})));
}

std::unordered_set<sstables::generation_type> gens; // input sstable generations
run_id run_identifier = run_id::create_random_id();
auto merged_it = merged.begin();
for (auto i = 0; i < sstables_nr; i++) {
auto mut1 = std::move(*merged_it);
merged_it++;
auto mut2 = std::move(*merged_it);
merged_it++;
auto sst = make_sstable_containing(sst_gen, {
std::move(mut1),
std::move(mut2)
});
sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run.
sst->set_sstable_level(1);

// every sstable will be eligible for cleanup, by having both an owned and unowned token.
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));

gens.insert(sst->generation());
ssts.push_back(std::move(sst));
}

size_t last_input_sstable_count = sstables_nr;
{
auto t = env.make_table_for_tests(s);
auto stop = deferred_stop(t);
t->disable_auto_compaction().get();
const dht::token_range_vector empty_owned_ranges;
for (auto&& sst : ssts) {
testlog.info("run id {}", sst->run_identifier());
column_family_test(t).add_sstable(sst).get();
column_family_test::update_sstables_known_generation(*t, sst->generation());
observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable {
auto sstables = t->get_sstables();
auto input_sstable_count = std::count_if(sstables->begin(), sstables->end(), [&] (const shared_sstable& sst) {
return gens.count(sst->generation());
});

testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), input_sstable_count);
sstables_closed++;
if (input_sstable_count < last_input_sstable_count) {
sstables_closed_during_cleanup++;
last_input_sstable_count = input_sstable_count;
}
}));
}
ssts = {}; // releases references
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get();
testlog.info("Cleanup has finished");
}

while (sstables_closed != sstables_nr) {
yield().get();
}

testlog.info("Closed sstables {}, Closed during cleanup {}", sstables_closed, sstables_closed_during_cleanup);

BOOST_REQUIRE(sstables_closed == sstables_nr);
BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2);
});
}

0 comments on commit 23443e0

Please sign in to comment.