Skip to content

Commit

Permalink
Merge "Introduce flat_mutation_reader_v2" from Tomasz
Browse files Browse the repository at this point in the history
"
This series introduces a new version of the mutation fragment stream (called v2)
which aims at improving range tombstone handling in the system.

When compacting a mutation fragment stream (e.g. for sstable compaction, data query, repair),
the compactor needs to accumulate range tombstones which are relevant for the yet-to-be-processed range.
See range_tombstone_accumulator. One problem is that it has unbounded memory footprint because the
accumulator needs to keep track of all the tombstoned ranges which are still active.

Another, although more benign, problem is computational complexity needed to maintain that data structure.

The fix is to get rid of the overlap of range tombstones in the mutation fragment stream. In v2 of the
stream, there is no longer a range_tombstone fragment. Deletions of ranges of rows within a given
partition are represented with range_tombstone_change fragments. At any point in the stream there
is a single active clustered tombstone. It is initially equal to the neutral tombstone when the
stream of each partition starts. The range_tombstone_change fragment type signify changes of the
active clustered tombstone. All fragments emitted while a given clustered tombstone is active are
affected by that tombstone. Like with the old range_tombstone fragments, the clustered tombstone
is independent from the partition tombstone carried in partition_start.

The memory needed to compact a stream is now constant, because the compactor needs to only track the
current tombstone. Also, there is no need to expire ranges on each fragment because the stream emits
a fragment when the range ends.

This series doesn't convert all readers to v2. It introduces adaptors which can convert
between v1 and v2 streams. Each mutation source can be constructed with either v1 or v2 stream factory,
but it can be asked any version, performing conversion under the hood if necessary.

In order to guarantee that v1 to v2 conversion produces a well-formed stream, this series needs to
impose a constraint on v1 streams to trim range tombstones to clustering restrictions. Otherwise,
v1->v2 converted could produce range tombstone changes which lie outside query restrictions, making
the stream non-canonical.

The v2 stream is strict about range tombstone trimming. It emits range tombstone changes which reflect
range tombstones trimmed to query restrictions, and fast-forwarding ranges. This makes the stream
more canonical, meaning that for a given set of writes, querying the database should produce the
same stream of fragments for a given restrictions. There is less ambiguity in how the writes
are represented in the fragment stream. It wasn't the case with v1. For example, A given set
of deletions could be produced either as one range_tombstone, or may, split and/or deoverlapped
with other fragments. Making a stream canonical is easier for diff-calculating.

The mc sstable reader was converted to v2 because it seemed like a comparable effort to do that
versus implementing range tombstone trimming in v1.

The classes related to mutation fragment streams were cloned:
flat_mutation_reader_v2, mutation_fragment_v2, related concepts.

Refs #8625. To fully fix #8625 we need to finish the transition and get rid of the converters.
Converters accumulate range tombstones.

Tests:

 - unit [dev]
"

* tag 'flat_mutation_reader_range_tombstone_split-v3.2' of github.com:tgrabiec/scylla: (26 commits)
  tests: mutation_source_test: Run tests with conversions inserted in the middle
  tests: mutation_source_tests: Unroll run_flat_mutation_reader_tests()
  tests: Add tests for flat_mutation_reader_v2
  flat_mutation_reader: Update the doc to reflect range tombstone trimming
  sstables: Switch the mx reader to flat_mutation_reader_v2
  row_cache: Emit range tombstone adjacent to upper bound of population range
  tests: sstables: Fix test assertions to not expect more than they should
  flat_mutation_reader: Trim range tombstones in make_flat_mutation_reader_from_fragments()
  clustering_ranges_walker: Emit range tombstone changes while walking
  tests: flat_mutation_reader_assertions_v2: Adapt to the v2 stream
  Clone flat_reader_assertions into flat_reader_assertions_v2
  test: lib: simple_schema: Reuse new_tombstone()
  test: lib: simple_schema: Accept tombstone in delete_range()
  mutation_source: Introduce make_reader_v2()
  partition_snapshot_flat_reader: Trim range tombstones to query ranges
  mutation_partition: Trim range tombstones to query ranges
  sstables: reader: Inline specialization of sstable_mutation_reader
  sstables: k_l: reader: Trim range tombstones to query ranges
  clustering_ranges_walker: Introduce split_tombstone()
  position_range: Introduce contains() check for ranges
  ...
  • Loading branch information
avikivity committed Jun 16, 2021
2 parents 44f3ad8 + 3fcd1f4 commit fce124b
Show file tree
Hide file tree
Showing 32 changed files with 3,930 additions and 643 deletions.
11 changes: 4 additions & 7 deletions cache_flat_mutation_reader.hh
Expand Up @@ -362,13 +362,10 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
}
if (_next_row_in_range) {
maybe_update_continuity();
add_to_buffer(_next_row);
try {
move_to_next_entry();
} catch (const std::bad_alloc&) {
// We cannot reenter the section, since we may have moved to the new range, and
// because add_to_buffer() should not be repeated.
_snp->region().allocator().invalidate_references(); // Invalidates _next_row
if (!_next_row.dummy()) {
_lower_bound = position_in_partition::before_key(_next_row.key());
} else {
_lower_bound = _next_row.position();
}
} else {
if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
Expand Down
112 changes: 109 additions & 3 deletions clustering_ranges_walker.hh
Expand Up @@ -26,28 +26,34 @@
#include "schema.hh"
#include "query-request.hh"
#include "mutation_fragment.hh"
#include "mutation_fragment_v2.hh"

// Utility for in-order checking of overlap with position ranges.
class clustering_ranges_walker {
const schema& _schema;
const query::clustering_row_ranges& _ranges;
boost::iterator_range<query::clustering_row_ranges::const_iterator> _current_range;
bool _in_current; // next position is known to be >= _current_start
bool _past_current; // next position is known to be >= _current_end
bool _using_clustering_range; // Whether current range comes from _current_range
bool _with_static_row;
position_in_partition_view _current_start;
position_in_partition_view _current_end;
std::optional<position_in_partition> _trim;
size_t _change_counter = 1;
tombstone _tombstone;
private:
bool advance_to_next_range() {
_in_current = false;
if (!_current_start.is_static_row()) {
_past_current = false;
if (_using_clustering_range) {
if (!_current_range) {
return false;
}
_current_range.advance_begin(1);
}
++_change_counter;
_using_clustering_range = true;
if (!_current_range) {
_current_end = _current_start = position_in_partition_view::after_all_clustered_rows();
return false;
Expand All @@ -58,17 +64,20 @@ private:
}

void set_current_positions() {
_using_clustering_range = false;
if (!_with_static_row) {
if (!_current_range) {
_current_start = position_in_partition_view::before_all_clustered_rows();
} else {
_current_start = position_in_partition_view::for_range_start(_current_range.front());
_current_end = position_in_partition_view::for_range_end(_current_range.front());
_using_clustering_range = true;
}
} else {
// If the first range is contiguous with the static row, then advance _current_end as much as we can
if (_current_range && !_current_range.front().start()) {
_current_end = position_in_partition_view::for_range_end(_current_range.front());
_using_clustering_range = true;
}
}
}
Expand All @@ -79,6 +88,7 @@ public:
, _ranges(ranges)
, _current_range(ranges)
, _in_current(with_static_row)
, _past_current(false)
, _with_static_row(with_static_row)
, _current_start(position_in_partition_view::for_static_row())
, _current_end(position_in_partition_view::before_all_clustered_rows()) {
Expand All @@ -91,11 +101,33 @@ public:
clustering_ranges_walker& operator=(const clustering_ranges_walker&) = delete;
clustering_ranges_walker& operator=(clustering_ranges_walker&&) = delete;

using range_tombstones = utils::small_vector<range_tombstone_change, 1>;

// Result of advancing to a given position.
struct progress {
// True iff the position is contained in requested ranges.
bool contained;

// Range tombstone changes to emit which reflect current range tombstone
// trimmed to requested ranges, up to the advanced-to position (inclusive).
//
// It is guaranteed that the sequence of tombstones returned from all
// advance_to() calls will be the same for a given ranges no matter at
// which positions you call advance_to(), provided that you change
// the current tombstone at the same positions.
// Redundant changes will not be generated.
// This is to support the guarantees of flat_mutation_reader_v2.
range_tombstones rts;
};

// Excludes positions smaller than pos from the ranges.
// pos should be monotonic.
// No constraints between pos and positions passed to advance_to().
//
// After the invocation, when !out_of_range(), lower_bound() returns the smallest position still contained.
//
// After this, advance_to(lower_bound()) will always emit a range tombstone change for pos
// if there is an active range tombstone and !out_of_range().
void trim_front(position_in_partition pos) {
position_in_partition::less_compare less(_schema);

Expand All @@ -117,29 +149,60 @@ public:
// Must be called with monotonic positions.
// Idempotent.
bool advance_to(position_in_partition_view pos) {
return advance_to(pos, _tombstone).contained;
}

// Returns result of advancing over clustering restrictions.
// Must be called with monotonic positions.
//
// The walker tracks current clustered tombstone.
// The new_tombstone will be the current clustered tombstone after advancing, starting from pos (inclusive).
// The returned progress object contains range_tombstone_change fragments which reflect changes of
// the current clustered tombstone trimmed to the boundaries of requested ranges, up to the
// advanced-to position (inclusive).
progress advance_to(position_in_partition_view pos, tombstone new_tombstone) {
position_in_partition::less_compare less(_schema);
range_tombstones rts;

auto prev_tombstone = _tombstone;
_tombstone = new_tombstone;

do {
if (!_in_current && less(pos, _current_start)) {
break;
}

if (!_in_current && prev_tombstone) {
rts.push_back(range_tombstone_change(_current_start, prev_tombstone));
}

// All subsequent clustering keys are larger than the start of this
// range so there is no need to check that again.
_in_current = true;

if (less(pos, _current_end)) {
return true;
if (prev_tombstone != new_tombstone) {
rts.push_back(range_tombstone_change(pos, new_tombstone));
}
return progress{.contained = true, .rts = std::move(rts)};
} else {
if (!_past_current && prev_tombstone) {
rts.push_back(range_tombstone_change(_current_end, {}));
}
_past_current = true;
}
} while (advance_to_next_range());

return false;
return progress{.contained = false, .rts = std::move(rts)};
}

// Returns true if the range expressed by start and end (as in position_range) overlaps
// with clustering ranges.
// Must be called with monotonic start position. That position must also be greater than
// the last position passed to the other advance_to() overload.
// Idempotent.
// Breaks the tracking of current range tombstone, so don't use if you also use the advance_to()
// overload which tracks tombstones.
bool advance_to(position_in_partition_view start, position_in_partition_view end) {
position_in_partition::less_compare less(_schema);

Expand Down Expand Up @@ -181,6 +244,48 @@ public:
return false;
}

// Intersects rt with query ranges. The first overlap is returned and the rest is applied to dst.
// If returns a disengaged optional, there is no overlap and nothing was applied to dst.
// No monotonicity restrictions on argument values across calls.
// Does not affect lower_bound().
std::optional<range_tombstone> split_tombstone(range_tombstone rt, range_tombstone_stream& dst) const {
position_in_partition::less_compare less(_schema);

if (_trim && !rt.trim_front(_schema, *_trim)) {
return std::nullopt;
}

std::optional<range_tombstone> first;

for (const auto& rng : _current_range) {
auto range_start = position_in_partition_view::for_range_start(rng);
auto range_end = position_in_partition_view::for_range_end(rng);
if (!less(rt.position(), range_start) && !less(range_end, rt.end_position())) {
// Fully enclosed by this range.
assert(!first);
return std::move(rt);
}
auto this_range_rt = rt;
if (this_range_rt.trim(_schema, range_start, range_end)) {
if (first) {
dst.apply(std::move(this_range_rt));
} else {
first = std::move(this_range_rt);
}
}
}

return first;
}

tombstone current_tombstone() const {
return _tombstone;
}

void set_tombstone(tombstone t) {
_tombstone = t;
}

// Returns true if advanced past all contained positions. Any later advance_to() until reset() will return false.
bool out_of_range() const {
return !_in_current && !_current_range;
Expand All @@ -191,6 +296,7 @@ public:
void reset() {
_current_range = _ranges;
_in_current = _with_static_row;
_past_current = false;
_current_start = position_in_partition_view::for_static_row();
_current_end = position_in_partition_view::before_all_clustered_rows();
set_current_positions();
Expand Down

0 comments on commit fce124b

Please sign in to comment.