Skip to content

Commit

Permalink
Merge 'Support reverse reads in the row cache natively' from Tomasz G…
Browse files Browse the repository at this point in the history
…rabiec

This change makes row cache support reverse reads natively so that reversing wrappers are not needed when reading from cache and thus the read can be executed efficiently, with similar cost as the forward-order read.

The database is serving reverse reads from cache by default after this. Before, it was bypassing cache by default after 703aed3.

Refs: #1413

Tests:

  - unit [dev]
  - manual query with build/dev/scylla and cache tracing on

Closes #9454

* github.com:scylladb/scylla:
  tests: row_cache: Extend test_concurrent_reads_and_eviction to run reverse queries
  row_cache: partition_snapshot_row_cursor: Print more details about the current version vector
  row_cache: Improve trace-level logging
  config: Use cache for reversed reads by default
  config: Adjust reversed_reads_auto_bypass_cache description
  row_cache: Support reverse reads natively
  mvcc: partition_snapshot: Support slicing range tombstones in reverse
  test: flat_mutation_reader_assertions: Consume expected range tombstones before end_of_partition
  row_cache: Log produced range tombstones
  test: Make produces_range_tombstone() report ck_ranges
  tests: lib: random_mutation_generator: Extract make_random_range_tombstone()
  partition_snapshot_row_cursor: Support reverse iteration
  utils: immutable-collection: Make movable
  intrusive_btree: Make default-initialized iterator cast to false
  • Loading branch information
avikivity committed Dec 29, 2021
2 parents 4a32377 + 1c80d7f commit 9e74556
Show file tree
Hide file tree
Showing 16 changed files with 1,118 additions and 173 deletions.
125 changes: 93 additions & 32 deletions cache_flat_mutation_reader.hh

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions clustering_key_filter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public:
}
return clustering_key_filter_ranges(ranges);
}

// Returns all clustering ranges determined by `slice` inside partition determined by `key`.
// The ranges will be returned in the same order as stored in the slice.
static clustering_key_filter_ranges get_native_ranges(const schema& schema, const query::partition_slice& slice, const partition_key& key) {
const query::clustering_row_ranges& ranges = slice.row_ranges(schema, key);
return clustering_key_filter_ranges(ranges);
}
};

}
4 changes: 2 additions & 2 deletions db/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, cdc_dont_rewrite_streams(this, "cdc_dont_rewrite_streams", value_status::Used, false,
"Disable rewriting streams from cdc_streams_descriptions to cdc_streams_descriptions_v2. Should not be necessary, but the procedure is expensive and prone to failures; this config option is left as a backdoor in case some user requires manual intervention.")
, strict_allow_filtering(this, "strict_allow_filtering", liveness::LiveUpdate, value_status::Used, strict_allow_filtering_default(), "Match Cassandra in requiring ALLOW FILTERING on slow queries. Can be true, false, or warn. When false, Scylla accepts some slow queries even without ALLOW FILTERING that Cassandra rejects. Warn is same as false, but with warning.")
, reversed_reads_auto_bypass_cache(this, "reversed_reads_auto_bypass_cache", liveness::LiveUpdate, value_status::Used, true,
"Use a new implementation of reversed reads in sstables when performing reversed queries. The new implementation does not require unbounded memory (compared to the old implementation which had to fetch entire partitions into memory) but disables the cache. Turn this option off if your partitions are small so the old implementation is good enough; your queries can then utilize the cache and potentially be faster (since they don't need to use sstables as much). This option is temporary and will be removed as soon as the cache and sstable reverse read algorithms are updated to handle reversed queries correctly.")
, reversed_reads_auto_bypass_cache(this, "reversed_reads_auto_bypass_cache", liveness::LiveUpdate, value_status::Used, false,
"Bypass in-memory data cache (the row cache) when performing reversed queries.")
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port")
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port")
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address")
Expand Down
287 changes: 232 additions & 55 deletions partition_snapshot_row_cursor.hh

Large diffs are not rendered by default.

36 changes: 30 additions & 6 deletions partition_version.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,33 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_

stop_iteration
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end,
std::function<stop_iteration(range_tombstone)> callback)
std::function<stop_iteration(range_tombstone)> callback,
bool reverse)
{
partition_version* v = &*version();

if (reverse) [[unlikely]] {
std::swap(start, end);
start = start.reversed();
end = end.reversed();
}

auto pop_stream = [&] (range_tombstone_list::iterator_range& range) -> range_tombstone {
auto rt = reverse ? std::prev(range.end())->tombstone()
: range.begin()->tombstone();
if (reverse) [[unlikely]] {
rt.reverse();
range.advance_end(-1);
} else {
range.advance_begin(1);
}
return rt;
};

if (!v->next()) { // Optimization for single-version snapshots
for (auto&& rt : v->partition().row_tombstones().slice(*_schema, start, end)) {
if (callback(rt.tombstone()) == stop_iteration::yes) {
auto range = v->partition().row_tombstones().slice(*_schema, start, end);
while (!range.empty()) {
if (callback(pop_stream(range)) == stop_iteration::yes) {
return stop_iteration::no;
}
}
Expand All @@ -568,8 +588,13 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_
position_in_partition::less_compare less(*_schema);

// Sorts ranges by first range_tombstone's starting position
// in descending order.
// in descending order (because the heap is a max-heap).
// In reverse mode, sorts by range_tombstone's end position
// in ascending order.
auto stream_less = [&] (range_tombstone_list::iterator_range left, range_tombstone_list::iterator_range right) {
if (reverse) [[unlikely]] {
return less(std::prev(left.end())->end_position(), std::prev(right.end())->end_position());
}
return less(right.begin()->position(), left.begin()->position());
};

Expand All @@ -586,10 +611,9 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_
while (!streams.empty()) {
std::pop_heap(streams.begin(), streams.end(), stream_less);
range_tombstone_list::iterator_range& stream = streams.back();
if (callback(stream.begin()->tombstone()) == stop_iteration::yes) {
if (callback(pop_stream(stream)) == stop_iteration::yes) {
return stop_iteration::no;
}
stream.advance_begin(1);
if (!stream.empty()) {
std::push_heap(streams.begin(), streams.end(), stream_less);
} else {
Expand Down
5 changes: 4 additions & 1 deletion partition_version.hh
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,11 @@ public:
// Invokes the callback for every range tombstones overlapping with [start, end) until
// the callback returns stop_iteration::yes or all tombstones are exhausted.
// Returns stop_iteration::yes if all range tombstones in the range were consumed.
// When reversed is true, start and end are assumed to belong to the domain of reverse clustering order schema
// and the method produces range_tombstones in reverse order, conforming to reverse schema.
stop_iteration range_tombstones(position_in_partition_view start, position_in_partition_view end,
std::function<stop_iteration(range_tombstone)> callback);
std::function<stop_iteration(range_tombstone)> callback,
bool reversed = false);
// Returns all range tombstones
range_tombstone_result range_tombstones();
};
Expand Down
8 changes: 8 additions & 0 deletions read_context.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "schema_fwd.hh"
#include "query-request.hh"
#include "partition_slice_builder.hh"
#include "mutation_fragment.hh"
#include "partition_version.hh"
#include "tracing/tracing.hh"
Expand Down Expand Up @@ -137,6 +138,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
reader_permit _permit;
const dht::partition_range& _range;
const query::partition_slice& _slice;
std::optional<query::partition_slice> _native_slice;
const io_priority_class& _pc;
tracing::trace_state_ptr _trace_state;
mutation_reader::forwarding _fwd_mr;
Expand Down Expand Up @@ -177,6 +179,9 @@ public:
, _range_query(!query::is_single_partition(range))
, _underlying(_cache, *this)
{
if (_slice.options.contains(query::partition_slice::option::reversed)) {
_native_slice = query::legacy_reverse_slice_to_native_reverse_slice(*_schema, _slice);
}
++_cache._tracker._stats.reads;
if (!_range_query) {
_key = range.start()->value().as_decorated_key();
Expand All @@ -197,6 +202,9 @@ public:
reader_permit permit() const { return _permit; }
const dht::partition_range& range() const { return _range; }
const query::partition_slice& slice() const { return _slice; }
bool is_reversed() const { return _slice.options.contains(query::partition_slice::option::reversed); }
// Returns a slice in the native format (for reversed reads, in native-reversed format).
const query::partition_slice& native_slice() const { return is_reversed() ? *_native_slice : _slice; }
const io_priority_class& pc() const { return _pc; }
tracing::trace_state_ptr trace_state() const { return _trace_state; }
mutation_reader::forwarding fwd_mr() const { return _fwd_mr; }
Expand Down
69 changes: 31 additions & 38 deletions row_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,17 @@ logging::logger clogger("cache");
using namespace std::chrono_literals;
using namespace cache;

static schema_ptr to_query_domain(const query::partition_slice& slice, schema_ptr table_domain_schema) {
if (slice.options.contains(query::partition_slice::option::reversed)) [[unlikely]] {
return table_domain_schema->make_reversed();
}
return table_domain_schema;
}

flat_mutation_reader
row_cache::create_underlying_reader(read_context& ctx, mutation_source& src, const dht::partition_range& pr) {
auto reader = src.make_reader(_schema, ctx.permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
schema_ptr entry_schema = to_query_domain(ctx.slice(), _schema);
auto reader = src.make_reader(entry_schema, ctx.permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
ctx.on_underlying_created();
return reader;
}
Expand Down Expand Up @@ -725,7 +733,7 @@ row_cache::make_scanning_reader(const dht::partition_range& range, std::unique_p
}

flat_mutation_reader
row_cache::do_make_reader(schema_ptr s,
row_cache::make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
Expand All @@ -741,7 +749,7 @@ row_cache::do_make_reader(schema_ptr s,
if (query::is_single_partition(range) && !fwd_mr) {
tracing::trace(trace_state, "Querying cache for range {} and slice {}",
range, seastar::value_of([&slice] { return slice.get_all_ranges(); }));
return _read_section(_tracker.region(), [&] {
auto mr = _read_section(_tracker.region(), [&] {
dht::ring_position_comparator cmp(*_schema);
auto&& pos = range.start()->value();
partitions_type::bound_hint hint;
Expand All @@ -759,40 +767,22 @@ row_cache::do_make_reader(schema_ptr s,
return make_flat_mutation_reader<single_partition_populating_reader>(*this, make_context());
}
});
}

tracing::trace(trace_state, "Scanning cache for range {} and slice {}",
range, seastar::value_of([&slice] { return slice.get_all_ranges(); }));
return make_scanning_reader(range, make_context());
}

flat_mutation_reader
row_cache::make_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& query_slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {

// We want to do the reversing on top of the cache reader so we have to
// un-reverse the slice so that underlying mutation sources don't try to
// reverse themselves. Once the cache supports reading in reverse itself,
// we can pass on the reverse slice.
std::unique_ptr<query::partition_slice> unreversed_slice;
const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
unreversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*s, query_slice));
}
const auto& slice = reversed ? *unreversed_slice : query_slice;

auto rd = do_make_reader(std::move(s), permit, range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);

if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
if (fwd == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
} else {
return mr;
}
}

tracing::trace(trace_state, "Scanning cache for range {} and slice {}",
range, seastar::value_of([&slice] { return slice.get_all_ranges(); }));
auto mr = make_scanning_reader(range, make_context());
if (fwd == streamed_mutation::forwarding::yes) {
rd = make_forwardable(std::move(rd));
return make_forwardable(std::move(mr));
} else {
return mr;
}

return rd;
}

row_cache::~row_cache() {
Expand Down Expand Up @@ -1300,19 +1290,22 @@ flat_mutation_reader cache_entry::read(row_cache& rc, std::unique_ptr<read_conte
// Assumes reader is in the corresponding partition
flat_mutation_reader cache_entry::do_read(row_cache& rc, read_context& reader) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, reader.phase());
auto ckr = query::clustering_key_filter_ranges::get_ranges(*_schema, reader.slice(), _key.key());
auto r = make_cache_flat_mutation_reader(_schema, _key, std::move(ckr), rc, reader, std::move(snp));
r.upgrade_schema(rc.schema());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, reader.native_slice(), _key.key());
schema_ptr entry_schema = to_query_domain(reader.slice(), _schema);
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, reader, std::move(snp));
r.upgrade_schema(to_query_domain(reader.slice(), rc.schema()));
r.upgrade_schema(reader.schema());
return r;
}

flat_mutation_reader cache_entry::do_read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, unique_ctx->phase());
auto ckr = query::clustering_key_filter_ranges::get_ranges(*_schema, unique_ctx->slice(), _key.key());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, unique_ctx->native_slice(), _key.key());
schema_ptr reader_schema = unique_ctx->schema();
auto r = make_cache_flat_mutation_reader(_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp));
r.upgrade_schema(rc.schema());
schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), _schema);
auto rc_schema = to_query_domain(unique_ctx->slice(), rc.schema());
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp));
r.upgrade_schema(rc_schema);
r.upgrade_schema(reader_schema);
return r;
}
Expand Down
2 changes: 0 additions & 2 deletions row_cache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,6 @@ private:
// internal_updater is only kept alive until its invocation returns.
future<> do_update(external_updater eu, internal_updater iu) noexcept;

flat_mutation_reader do_make_reader(schema_ptr, reader_permit permit, const dht::partition_range&, const query::partition_slice&,
const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding);
public:
~row_cache();
row_cache(schema_ptr, snapshot_source, cache_tracker&, is_continuous = is_continuous::no);
Expand Down
Loading

0 comments on commit 9e74556

Please sign in to comment.