Skip to content

Commit

Permalink
Merge 'sstables: mx: enable position fast-forwarding in reverse mode'…
Browse files Browse the repository at this point in the history
… from Kamil Braun

Most of the machinery was already implemented since it was used when
jumping between clustering ranges of a query slice. We need only perform
one additional thing when performing an index skip during
fast-forwarding: reset the stored range tombstone in the consumer (which
may only be stored in fast-forwarding mode, so it didn't matter that it
wasn't reset earlier). Comments were added to explain the details.

As a preparation for the change, we extend the sstable reversing reader
random schema test with a fast-forwarding test and include some minor
fixes.

Fixes #9427.

Closes #9484

* github.com:scylladb/scylla:
  query-request: add comment about clustering ranges with non-full prefix key bounds
  sstables: mx: enable position fast-forwarding in reverse mode
  test: sstable_conforms_to_mutation_source_test: extend `test_sstable_reversing_reader_random_schema` with fast-forwarding
  test: sstable_conforms_to_mutation_source_test: fix `vector::erase` call
  test: mutation_source_test: extract `forwardable_reader_to_mutation` function
  test: random_schema: fix clustering column printing in `random_schema::cql`
  • Loading branch information
tgrabiec committed Nov 29, 2021
2 parents 80a1ebf + b2b242d commit 3226c5b
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 149 deletions.
6 changes: 6 additions & 0 deletions position_in_partition.hh
Expand Up @@ -659,3 +659,9 @@ inline
bool position_range::is_all_clustered_rows(const schema& s) const {
return _start.is_before_all_clustered_rows(s) && _end.is_after_all_clustered_rows(s);
}

// Assumes that the bounds of `r` are of 'clustered' type
// and that `r` is non-empty (the left bound is smaller than the right bound).
//
// If `r` does not contain any keys, returns nullopt.
std::optional<query::clustering_range> position_range_to_clustering_range(const position_range& r, const schema&);
6 changes: 6 additions & 0 deletions query-request.hh
Expand Up @@ -43,6 +43,12 @@ template <typename T>
using range = wrapping_range<T>;

using ring_position = dht::ring_position;

// Note: the bounds of a clustering range don't necessarily satisfy `rb.end()->value() >= lb.end()->value()`,
// where `lb`, `rb` are the left and right bound respectively, if the bounds use non-full clustering
// key prefixes. Inclusiveness of the range's bounds must be taken into account during comparisons.
// For example, consider clustering key type consisting of two ints. Then [0:1, 0:] is a valid non-empty range
// (e.g. it includes the key 0:2) even though 0: < 0:1 w.r.t the clustering prefix order.
using clustering_range = nonwrapping_range<clustering_key_prefix>;

// If `range` was supposed to be used with a comparator `cmp`, then
Expand Down
49 changes: 49 additions & 0 deletions query.cc
Expand Up @@ -380,3 +380,52 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
}

}

std::optional<query::clustering_range> position_range_to_clustering_range(const position_range& r, const schema& s) {
assert(r.start().get_type() == partition_region::clustered);
assert(r.end().get_type() == partition_region::clustered);

if (r.start().has_key() && r.end().has_key()
&& clustering_key_prefix::equality(s)(r.start().key(), r.end().key())) {
assert(r.start().get_bound_weight() != r.end().get_bound_weight());

if (r.end().get_bound_weight() == bound_weight::after_all_prefixed
&& r.start().get_bound_weight() != bound_weight::after_all_prefixed) {
// [before x, after x) and [for x, after x) get converted to [x, x].
return query::clustering_range::make_singular(r.start().key());
}

// [before x, for x) does not contain any keys.
return std::nullopt;
}

// position_range -> clustering_range
// (recall that position_ranges are always left-closed, right opened):
// [before x, ...), [for x, ...) -> [x, ...
// [after x, ...) -> (x, ...
// [..., before x), [..., for x) -> ..., x)
// [..., after x) -> ..., x]

auto to_bound = [&s] (const position_in_partition& p, bool left) -> std::optional<query::clustering_range::bound> {
if (p.is_before_all_clustered_rows(s)) {
assert(left);
return {};
}

if (p.is_after_all_clustered_rows(s)) {
assert(!left);
return {};
}

assert(p.has_key());

auto bw = p.get_bound_weight();
bool inclusive = left
? bw != bound_weight::after_all_prefixed
: bw == bound_weight::after_all_prefixed;

return query::clustering_range::bound{p.key(), inclusive};
};

return query::clustering_range{to_bound(r.start(), true), to_bound(r.end(), false)};
}
42 changes: 32 additions & 10 deletions sstables/mx/reader.cc
Expand Up @@ -625,6 +625,24 @@ class mp_row_consumer_m {
}
}

// Call after a reverse index skip is performed during reversed reads.
void reset_after_reversed_read_skip() {
// We must not reset `_in_progress_row` since rows are always consumed fully
// during reversed reads. We also don't need to reset any state that may change
// when moving between partitions as reversed skips are only performed within
// a partition.
// We must only reset the stored tombstone. A range tombstone may be stored in forwarding
// mode, when the parser gets ahead of the currently forwarded-to range and provides
// us (the consumer) a tombstone positioned after the range; we store it so we can
// process it again when (if) the read gets forwarded to a range containing this
// tombstone. But a successful index skip means that the source jumped to a later
// position, so to a position past the stored tombstone's (if there is one) position.
// The stored tombstone may no longer be relevant for the position we're at. The correct
// active tombstone, if any, is obtained from the index and will be set using
// `set_range_tombstone`.
_stored_tombstone.reset();
}

position_in_partition_view position() {
if (_inside_static_row) {
return position_in_partition_view(position_in_partition_view::static_row_tag_t{});
Expand Down Expand Up @@ -1210,6 +1228,15 @@ class data_consume_rows_context_m : public data_consumer::continuous_data_consum
throw std::logic_error(format("Unable to reset - unknown indexable element: {}", el));
}

// Call after a reverse index skip is performed during reversed reads.
void reset_after_reversed_read_skip() {
// During reversed reads the source is always returning whole rows
// even when we perform an index skip in the middle of a row.
// Thus we must not reset the parser state as we do in regular reset.
// We need only to inform the consumer.
_consumer.reset_after_reversed_read_skip();
}

reader_permit& permit() {
return _consumer.permit();
}
Expand Down Expand Up @@ -1283,9 +1310,6 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
"mx reader: multi-partition reversed queries are not supported yet;"
" partition range: {}", pr));
}
if (fwd != streamed_mutation::forwarding::no) {
on_internal_error(sstlog, "mx reader: forwarding not yet supported in reversed queries");
}
// FIXME: if only the defaults were better...
//assert(fwd_mr == mutation_reader::forwarding::no);
}
Expand Down Expand Up @@ -1434,11 +1458,14 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
auto ip = _index_reader->data_file_positions();
if (ip.end >= *_reversed_read_sstable_position) {
// The reversing data source was already ahead (in reverse - its position was smaller)
// than the index. We must not update the current range tombstone in this case,
// as it already is at least as up to date as the one given by the index end_open_marker.
// than the index. We must not update the current range tombstone in this case
// or reset the context since all fragments up to the new position of the index
// will be (or already have been) provided to the context by the source.
return;
}

_context->reset_after_reversed_read_skip();

_sst->get_stats().on_partition_seek();
auto open_end_marker = _index_reader->reverse_end_open_marker();
if (open_end_marker) {
Expand Down Expand Up @@ -1633,11 +1660,6 @@ class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
}
virtual future<> fast_forward_to(position_range cr) override {
if (reversed()) {
// FIXME
on_internal_error(sstlog, "mx reader: fast_forward_to(position_range) not supported for reversed queries");
}

forward_buffer_to(cr.start());
if (!_partition_finished) {
_end_of_stream = false;
Expand Down
40 changes: 18 additions & 22 deletions sstables/sstables.cc
Expand Up @@ -2108,16 +2108,14 @@ sstable::make_reader(
mutation_reader::forwarding fwd_mr,
read_monitor& mon) {
const auto reversed = slice.is_reversed();
if (_version >= version_types::mc && (!reversed || (range.is_singular() && !fwd))) {
if (_version >= version_types::mc && (!reversed || range.is_singular())) {
return mx::make_reader(shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
}

// The following are not yet supported natively in the mx reader:
// - multi-partition reversed queries
// - position fast-forwarding in (single-partition) reversed queries
// Therefore for these cases we delegate to make_reader_v1 which handles them (by using
// `make_reversing_reader` or `make_forwardable` appropriately which right now work only with the v1 format).
// FIXME: remove these workarounds eventually.
// Multi-partition reversed queries are not yet supported natively in the mx reader.
// Therefore in this case we delegate to make_reader_v1 which handles it (by using
// `make_reversing_reader` which right now works only with the v1 format).
// FIXME: remove this workaround eventually.

return upgrade_to_v2(make_reader_v1(std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon));
}
Expand All @@ -2135,38 +2133,36 @@ sstable::make_reader_v1(
read_monitor& mon) {
const auto reversed = slice.options.contains(query::partition_slice::option::reversed);
auto max_result_size = permit.max_result_size();
auto fwd_sm = reversed ? streamed_mutation::forwarding::no : fwd;

if (_version >= version_types::mc) {
auto rd = (reversed && !range.is_singular())
// The mx reader does not support multi-partition reversed queries.
// Perform a forward query on it, then reverse the result.
? make_reversing_reader(downgrade_to_v1(mx::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, half_reverse_slice(*schema, slice), pc, std::move(trace_state), fwd_sm, fwd_mr, mon)),
max_result_size)
: downgrade_to_v1(mx::make_reader(shared_from_this(), schema, std::move(permit),
range, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, mon));
if (reversed && fwd) {
// FIXME: the mx reader does not support fast-forwarding in reverse mode yet.
rd = make_forwardable(std::move(rd));
if (reversed && !range.is_singular()) {
auto rd = make_reversing_reader(downgrade_to_v1(mx::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, half_reverse_slice(*schema, slice), pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon)),
max_result_size);
if (fwd) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
return rd;

return downgrade_to_v1(mx::make_reader(shared_from_this(), schema, std::move(permit),
range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon));
}

if (reversed) {
// The kl reader does not support reversed queries at all.
// Perform a forward query on it, then reverse the result.
// Note: we can pass a half-reversed slice, the kl reader performs an unreversed query nevertheless.
auto rd = make_reversing_reader(kl::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, mon), max_result_size);
range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon), max_result_size);
if (fwd) {
rd = make_forwardable(std::move(rd));
}
return rd;
}

return kl::make_reader(shared_from_this(), schema, std::move(permit),
range, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, mon);
range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
}

flat_mutation_reader_v2
Expand Down

0 comments on commit 3226c5b

Please sign in to comment.