Skip to content

Commit

Permalink
Merge 'Make SSTable cleanup more efficient by fast forwarding to next…
Browse files Browse the repository at this point in the history
… owned range' from Raphael "Raph" Carvalho

Today, SSTable cleanup skips to the next partition, one at a time, when it finds that the current partition is no longer owned by this node.

That's very inefficient because when a cluster is growing in size, existing nodes lose multiple sequential tokens in its owned ranges. Another inefficiency comes from fetching index pages spanning all unowned tokens, which was described in #14317.

To solve both problems, cleanup will now use multi range reader, to guarantee that it will only process the owned data and as a result skip unowned data. This results in cleanup scanning an owned range and then fast forwarding to the next one, until it's done with them all. This reduces significantly the amount of data in the index caching, as index will only be invoked at each range boundary instead.

Without further ado,

before:

`INFO  2023-07-01 07:10:26,281 [shard 0] compaction - [Cleanup keyspace2.standard1 701af580-17f7-11ee-8b85-a479a1a77573] Cleaned 1 sstables to [./tmp/1/keyspace2/standard1-b490ee20179f11ee9134afb16b3e10fd/me-3g7a_0s8o_06uww24drzrroaodpv-big-Data.db:level=0]. 2GB to 1GB (~50% of original) in 26248ms = 81MB/s. ~9443072 total partitions merged to 4750028.`

after:

`INFO  2023-07-01 07:07:52,354 [shard 0] compaction - [Cleanup keyspace2.standard1 199dff90-17f7-11ee-b592-b4f5d81717b9] Cleaned 1 sstables to [./tmp/1/keyspace2/standard1-b490ee20179f11ee9134afb16b3e10fd/me-3g7a_0s4m_5hehd2rejj8w15d2nt-big-Data.db:level=0]. 2GB to 1GB (~50% of original) in 17424ms = 123MB/s. ~9443072 total partitions merged to 4750028.`

Fixes #12998.
Fixes #14317.

Closes #14469

* github.com:scylladb/scylladb:
  test: Extend cleanup correctness test to cover more cases
  compaction: Make SSTable cleanup more efficient by fast forwarding to next owned range
  sstables: Close SSTable reader if index exhaustion is detected in fast forward call
  sstables: Simplify sstable reader initialization
  compaction: Extend make_sstable_reader() interface to work with mutation_source
  test: Extend sstable partition skipping test to cover fast forward using token
  • Loading branch information
avikivity committed Jul 11, 2023
2 parents 9cdae78 + 60ba1d8 commit 1545ae2
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 57 deletions.
141 changes: 98 additions & 43 deletions compaction/compaction.cc
Expand Up @@ -50,7 +50,7 @@
#include "utils/utf8.hh"
#include "utils/fmt-compat.hh"
#include "utils/error_injection.hh"
#include "readers/filtering.hh"
#include "readers/multi_range.hh"
#include "readers/compacting.hh"
#include "tombstone_gc.hh"
#include "keys.hh"
Expand Down Expand Up @@ -610,16 +610,6 @@ class compaction {
bool enable_garbage_collected_sstable_writer() const noexcept {
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max();
}

flat_mutation_reader_v2::filter make_partition_filter() const {
return [this] (const dht::decorated_key& dk) {
if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) {
log_trace("Token {} does not belong to this node, skipping", dk.token());
return false;
}
return true;
};
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
Expand All @@ -631,17 +621,55 @@ class compaction {
}
private:
// Default range sstable reader that will only return mutation that belongs to current shard.
virtual flat_mutation_reader_v2 make_sstable_reader() const = 0;
virtual flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) const = 0;

// Make a filtering reader if needed
// FIXME: the sstable reader itself should be pass the owned ranges
// so it can skip over the disowned ranges efficiently using the index.
// Ref https://github.com/scylladb/scylladb/issues/12998
flat_mutation_reader_v2 setup_sstable_reader() const {
if (!_owned_ranges_checker) {
return make_sstable_reader();
return make_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
}
return make_filtering_reader(make_sstable_reader(), make_partition_filter());

auto source = mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
log_trace("Creating sstable set reader with range {}", range);
return make_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace_state),
fwd,
fwd_mr);
});

auto owned_range_generator = [this] () -> std::optional<dht::partition_range> {
auto r = _owned_ranges_checker->next_owned_range();
if (r == nullptr) {
return std::nullopt;
}
log_trace("Skipping to the next owned range {}", *r);
return dht::to_partition_range(*r);
};

return make_flat_multi_range_reader(_schema, _permit, std::move(source),
std::move(owned_range_generator),
_schema->full_slice(),
tracing::trace_state_ptr());
}

virtual sstables::sstable_set make_sstable_set_for_input() const {
Expand Down Expand Up @@ -1019,14 +1047,20 @@ class reshape_compaction : public compaction {
return sstables::make_partitioned_sstable_set(_schema, false);
}

flat_mutation_reader_v2 make_sstable_reader() const override {
return _compacting->make_local_shard_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
return _compacting->make_local_shard_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace),
sm_fwd,
mr_fwd,
default_read_monitor_generator());
}

Expand Down Expand Up @@ -1064,14 +1098,20 @@ class regular_compaction : public compaction {
{
}

flat_mutation_reader_v2 make_sstable_reader() const override {
return _compacting->make_local_shard_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
return _compacting->make_local_shard_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace),
sm_fwd,
mr_fwd,
_monitor_generator);
}

Expand Down Expand Up @@ -1456,8 +1496,17 @@ class scrub_compaction final : public regular_compaction {
return _scrub_finish_description;
}

flat_mutation_reader_v2 make_sstable_reader() const override {
auto crawling_reader = _compacting->make_crawling_reader(_schema, _permit, nullptr);
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
if (!range.is_full()) {
on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range));
}
auto crawling_reader = _compacting->make_crawling_reader(std::move(s), std::move(permit), nullptr);
return make_flat_mutation_reader_v2<reader>(std::move(crawling_reader), _options.operation_mode, _validation_errors);
}

Expand Down Expand Up @@ -1548,14 +1597,20 @@ class resharding_compaction final : public compaction {
~resharding_compaction() { }

// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader_v2 make_sstable_reader() const override {
return _compacting->make_range_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace,
streamed_mutation::forwarding sm_fwd,
mutation_reader::forwarding mr_fwd) const override {
return _compacting->make_range_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
sm_fwd,
mr_fwd);

}

Expand Down
7 changes: 7 additions & 0 deletions dht/partition_filter.hh
Expand Up @@ -39,6 +39,13 @@ public:
return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator());
}

const dht::token_range* next_owned_range() const noexcept {
if (_it == _sorted_owned_ranges.end()) {
return nullptr;
}
return &*_it++;
}

static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges);
};

Expand Down
1 change: 1 addition & 0 deletions readers/mutation_readers.cc
Expand Up @@ -532,6 +532,7 @@ class flat_multi_range_mutation_reader : public flat_mutation_reader_v2::impl {
return make_ready_future<>();
}
if (auto r = next()) {
mrlog.trace("flat_multi_range_mutation_reader {}: fast forwarding to range {}", fmt::ptr(this), *r);
return _reader.fast_forward_to(*r);
} else {
_end_of_stream = true;
Expand Down
45 changes: 32 additions & 13 deletions sstables/mx/reader.cc
Expand Up @@ -1511,10 +1511,25 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
}
});
}
bool has_sstable_attached() const noexcept {
return bool(_sst);
}
bool is_initialized() const {
return bool(_context);
}
future<> initialize() {
// Returns true if reader is initialized, by either a previous or current request
future<bool> maybe_initialize() {
if (is_initialized()) {
co_return true;
}
// If the reader has no SSTable attached, the reader was proactively closed in the
// context of fast-forward calls. The higher level code has no way to know that
// underlying reader is really exhausted, so reader is responsible for releasing
// its resources beforehand. From there on, the reader has the same semantics
// as that of an empty reader.
if (!has_sstable_attached()) {
co_return false;
}
if (_single_partition_read) {
_sst->get_stats().on_single_partition_read();
const auto& key = dht::ring_position_view(_pr.start()->value());
Expand All @@ -1523,7 +1538,7 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {

if (!present) {
_sst->get_filter_tracker().add_false_positive();
co_return;
co_return false;
}

_sst->get_filter_tracker().add_true_positive();
Expand Down Expand Up @@ -1558,12 +1573,7 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
_monitor.on_read_started(_context->reader_position());
_index_in_current_partition = true;
_will_likely_slice = will_likely_slice(_slice);
}
future<> ensure_initialized() {
if (is_initialized()) {
return make_ready_future<>();
}
return initialize();
co_return true;
}
future<> skip_to(indexable_element el, uint64_t begin) {
sstlog.trace("sstable_reader: {}: skip_to({} -> {}, el={})", fmt::ptr(_context.get()), _context->position(), begin, static_cast<int>(el));
Expand Down Expand Up @@ -1591,8 +1601,8 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
on_internal_error(sstlog, "mx reader: fast_forward_to(partition_range) not supported for reversed queries");
}

return ensure_initialized().then([this, &pr] {
if (!is_initialized()) {
return maybe_initialize().then([this, &pr] (bool initialized) {
if (!initialized) {
_end_of_stream = true;
return make_ready_future<>();
} else {
Expand All @@ -1613,6 +1623,15 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
}
_index_in_current_partition = false;
_read_enabled = false;
if (_index_reader->eof()) {
// Close the SSTable reader proactively, if the index is completely exhausted
// and no partition was found in the current fast-forward call. This allows
// disk space of SSTables to be reclaimed earlier if they take part in a
// long-living read and they're deleted midway.
sstlog.trace("Closing reader {} for {} after fast-forward call found that index reached EOF and there's nothing left to read",
fmt::ptr(this), _sst->get_filename());
return close();
}
return make_ready_future<>();
});
}
Expand All @@ -1623,8 +1642,8 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
return make_ready_future<>();
}
if (!is_initialized()) {
return initialize().then([this] {
if (!is_initialized()) {
return maybe_initialize().then([this] (bool initialized) {
if (!initialized) {
_end_of_stream = true;
return make_ready_future<>();
} else {
Expand Down Expand Up @@ -1700,7 +1719,7 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
close_index_reader = _index_reader->close().finally([_ = std::move(_index_reader)] {});
}

return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([] (std::exception_ptr ep) {
return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([sst = std::move(_sst)] (std::exception_ptr ep) {
// close can not fail as it is called either from the destructor or from flat_mutation_reader::close
sstlog.warn("Failed closing of sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
});
Expand Down
23 changes: 22 additions & 1 deletion test/boost/sstable_compaction_test.cc
Expand Up @@ -1966,6 +1966,8 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {

auto total_partitions = 10000U;
auto local_keys = tests::generate_partition_keys(total_partitions, s);
dht::decorated_key::less_comparator cmp(s);
std::sort(local_keys.begin(), local_keys.end(), cmp);
std::vector<mutation> mutations;
for (auto i = 0U; i < total_partitions; i++) {
mutations.push_back(make_insert(local_keys.at(i)));
Expand All @@ -1978,14 +1980,33 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
cf->start();

auto local_ranges = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name));
auto descriptor = sstables::compaction_descriptor({std::move(sst)}, compaction_descriptor::default_level,
auto descriptor = sstables::compaction_descriptor({sst}, compaction_descriptor::default_level,
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(), std::move(local_ranges));
auto ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0();

BOOST_REQUIRE(ret.new_sstables.size() == 1);
BOOST_REQUIRE(ret.new_sstables.front()->get_estimated_key_count() >= total_partitions);
BOOST_REQUIRE((ret.new_sstables.front()->get_estimated_key_count() - total_partitions) <= uint64_t(s->min_index_interval()));
BOOST_REQUIRE(ret.new_sstables.front()->run_identifier() == run_identifier);

dht::token_range_vector ranges;
ranges.push_back(dht::token_range::make_singular(local_keys.at(0).token()));
ranges.push_back(dht::token_range::make_singular(local_keys.at(10).token()));
ranges.push_back(dht::token_range::make_singular(local_keys.at(100).token()));
ranges.push_back(dht::token_range::make_singular(local_keys.at(900).token()));
local_ranges = compaction::make_owned_ranges_ptr(std::move(ranges));
descriptor = sstables::compaction_descriptor({sst}, compaction_descriptor::default_level,
compaction_descriptor::default_max_sstable_bytes, run_identifier,
compaction_type_options::make_cleanup(), std::move(local_ranges));
ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0();
BOOST_REQUIRE(ret.new_sstables.size() == 1);
auto reader = ret.new_sstables[0]->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice());
assert_that(std::move(reader))
.produces(local_keys[0])
.produces(local_keys[10])
.produces(local_keys[100])
.produces(local_keys[900])
.produces_end_of_stream();
});
});
}
Expand Down
13 changes: 13 additions & 0 deletions test/boost/sstable_datafile_test.cc
Expand Up @@ -1643,6 +1643,19 @@ SEASTAR_TEST_CASE(test_partition_skipping) {
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make({ dht::ring_position(keys[8]), false }, { dht::ring_position(keys[9]), false }))
.produces_end_of_stream();

pr = dht::partition_range::make(dht::ring_position(keys[0]), dht::ring_position(keys[1]));
assert_that(sstable_reader_v2(sst, s, env.make_reader_permit(), pr))
.fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[0].token()), dht::ring_position::ending_at(keys[1].token())))
.produces(keys[0])
.produces(keys[1])
.fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[3].token()), dht::ring_position::ending_at(keys[4].token())))
.produces(keys[3])
.produces(keys[4])
.fast_forward_to(dht::partition_range::make_starting_with(dht::ring_position::starting_at(keys[8].token())))
.produces(keys[8])
.produces(keys[9])
.produces_end_of_stream();
}
});
}
Expand Down

0 comments on commit 1545ae2

Please sign in to comment.