Skip to content

Commit

Permalink
Merge 'Make sure that cache_flat_mutation_reader::do_fill_buffer does…
Browse files Browse the repository at this point in the history
… not fast forward finished underlying reader' from Piotr Jastrzębski

It is possible that a partition is in cache but is not present in sstables that are underneath.
In such case:
1. cache_flat_mutation_reader will fast forward underlying reader to that partition
2. The underlying reader will enter the state when it's empty and its is_end_of_stream() returns true
3. Previously cache_flat_mutation_reader::do_fill_buffer would try to fast forward such empty underlying reader
4. This PR fixes that

Test: unit(dev)

Fixes #8435
Fixes #8411

Closes #8437

* github.com:scylladb/scylla:
  row_cache: remove redundant check in make_reader
  cache_flat_mutation_reader: fix do_fill_buffer
  read_context: add _partition_exists
  read_context: remove skip_first_fragment arg from create_underlying
  read_context: skip first fragment in ensure_underlying

(cherry picked from commit 163f2be)
  • Loading branch information
tgrabiec authored and avikivity committed Apr 20, 2021
1 parent 2049646 commit 97664e6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
3 changes: 3 additions & 0 deletions cache_flat_mutation_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_poin
}
_state = state::reading_from_underlying;
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
if (!_read_context->partition_exists()) {
return read_from_underlying(timeout);
}
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
: position_in_partition(_upper_bound);
return _underlying->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
Expand Down
17 changes: 15 additions & 2 deletions read_context.hh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
mutation_source_opt _underlying_snapshot;
dht::partition_range _sm_range;
std::optional<dht::decorated_key> _key;
bool _partition_exists;
row_cache::phase_type _phase;
public:
read_context(row_cache& cache,
Expand Down Expand Up @@ -190,22 +191,34 @@ public:
autoupdating_underlying_reader& underlying() { return _underlying; }
row_cache::phase_type phase() const { return _phase; }
const dht::decorated_key& key() const { return *_key; }
bool partition_exists() const { return _partition_exists; }
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
public:
future<> ensure_underlying(db::timeout_clock::time_point timeout) {
if (_underlying_snapshot) {
return create_underlying(true, timeout);
return create_underlying(timeout).then([this, timeout] {
return _underlying.underlying()(timeout).then([this] (mutation_fragment_opt&& mfopt) {
_partition_exists = bool(mfopt);
});
});
}
// We know that partition exists because all the callers of
// enter_partition(const dht::decorated_key&, row_cache::phase_type)
// check that and there's no other way of setting _underlying_snapshot
// to empty. Except for calling create_underlying.
_partition_exists = true;
return make_ready_future<>();
}
public:
future<> create_underlying(bool skip_first_fragment, db::timeout_clock::time_point timeout);
future<> create_underlying(db::timeout_clock::time_point timeout);
void enter_partition(const dht::decorated_key& dk, mutation_source& snapshot, row_cache::phase_type phase) {
_phase = phase;
_underlying_snapshot = snapshot;
_key = dk;
}
// Precondition: each caller needs to make sure that partition with |dk| key
// exists in underlying before calling this function.
void enter_partition(const dht::decorated_key& dk, row_cache::phase_type phase) {
_phase = phase;
_underlying_snapshot = {};
Expand Down
13 changes: 4 additions & 9 deletions row_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,16 @@ class partition_range_cursor final {
}
};

future<> read_context::create_underlying(bool skip_first_fragment, db::timeout_clock::time_point timeout) {
future<> read_context::create_underlying(db::timeout_clock::time_point timeout) {
if (_range_query) {
// FIXME: Singular-range mutation readers don't support fast_forward_to(), so need to use a wide range
// here in case the same reader will need to be fast forwarded later.
_sm_range = dht::partition_range({dht::ring_position(*_key)}, {dht::ring_position(*_key)});
} else {
_sm_range = dht::partition_range::make_singular({dht::ring_position(*_key)});
}
return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this, skip_first_fragment, timeout] {
return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this] {
_underlying_snapshot = {};
if (skip_first_fragment) {
return _underlying.underlying()(timeout).then([](auto &&mf) {});
} else {
return make_ready_future<>();
}
});
}

Expand All @@ -366,7 +361,7 @@ class single_partition_populating_reader final : public flat_mutation_reader::im
auto src_and_phase = _cache.snapshot_of(_read_context->range().start()->value());
auto phase = src_and_phase.phase;
_read_context->enter_partition(_read_context->range().start()->value().as_decorated_key(), src_and_phase.snapshot, phase);
return _read_context->create_underlying(false, timeout).then([this, phase, timeout] {
return _read_context->create_underlying(timeout).then([this, phase, timeout] {
return _read_context->underlying().underlying()(timeout).then([this, phase] (auto&& mfopt) {
if (!mfopt) {
if (phase == _cache.phase_of(_read_context->range().start()->value())) {
Expand Down Expand Up @@ -728,7 +723,7 @@ row_cache::make_reader(schema_ptr s,
auto&& pos = ctx->range().start()->value();
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(pos, cmp, hint);
if (i != _partitions.end() && hint.match) {
if (hint.match) {
cache_entry& e = *i;
upgrade_entry(e);
on_partition_hit();
Expand Down

0 comments on commit 97664e6

Please sign in to comment.