Skip to content

Commit

Permalink
db: Use mutation_partition_v2 in mvcc
Browse files Browse the repository at this point in the history
This patch switches memtable and cache to use mutation_partition_v2,
and all affected algorithms accordingly.

The memtable reader was changed to use the same cursor implementation
which cache uses, for improved code reuse and reducing risk of bugs
due to discrepancy of algorithms which deal with MVCC.

Range tombstone eviction in cache has now fine granularity, like with
rows.

Fixes scylladb#2578
Fixes scylladb#3288
Fixes scylladb#10587
  • Loading branch information
tgrabiec committed Dec 22, 2022
1 parent 4560dd5 commit 67b67a8
Show file tree
Hide file tree
Showing 14 changed files with 705 additions and 986 deletions.
476 changes: 256 additions & 220 deletions cache_flat_mutation_reader.hh

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions db/cache_tracker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public:
uint64_t rows_processed_from_memtable;
uint64_t rows_dropped_from_memtable;
uint64_t rows_merged_from_memtable;
uint64_t dummy_processed_from_memtable;
uint64_t rows_covered_by_range_tombstones_from_memtable;
uint64_t partition_evictions;
uint64_t partition_removals;
uint64_t row_evictions;
Expand Down Expand Up @@ -120,6 +122,7 @@ public:
mutation_cleaner& memtable_cleaner() noexcept { return _memtable_cleaner; }
uint64_t partitions() const noexcept { return _stats.partitions; }
const stats& get_stats() const noexcept { return _stats; }
stats& get_stats() noexcept { return _stats; }
void set_compaction_scheduling_group(seastar::scheduling_group);
lru& get_lru() { return _lru; }
};
Expand Down
8 changes: 8 additions & 0 deletions mutation_partition_v2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,14 @@ mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schem
*this = mutation_partition_v2(new_schema, std::move(tmp));
}

mutation_partition mutation_partition_v2::as_mutation_partition(const schema& s) const {
mutation_partition tmp(s.shared_from_this());
tmp.set_static_row_continuous(_static_row_continuous);
partition_builder v(s, tmp);
accept(s, v);
return tmp;
}

mutation_partition_v2::mutation_partition_v2(mutation_partition_v2::incomplete_tag, const schema& s, tombstone t)
: _tombstone(t)
, _static_row_continuous(!s.has_static_columns())
Expand Down
2 changes: 2 additions & 0 deletions mutation_partition_v2.hh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public:
//
// Strong exception guarantees.
void upgrade(const schema& old_schema, const schema& new_schema);

mutation_partition as_mutation_partition(const schema&) const;
private:
// Erases the entry if it's safe to do so without changing the logical state of the partition.
rows_type::iterator maybe_drop(const schema&, cache_tracker*, rows_type::iterator, mutation_application_stats&);
Expand Down
415 changes: 96 additions & 319 deletions partition_snapshot_reader.hh

Large diffs are not rendered by default.

184 changes: 144 additions & 40 deletions partition_snapshot_row_cursor.hh

Large diffs are not rendered by default.

230 changes: 103 additions & 127 deletions partition_version.cc

Large diffs are not rendered by default.

67 changes: 17 additions & 50 deletions partition_version.hh
Original file line number Diff line number Diff line change
Expand Up @@ -105,53 +105,25 @@ class static_row;
// snapshot on the list is marked as unique owner so that on its destruction
// it continues removal of the partition versions.

//
// Continuity merging rules.
//
// Non-evictable snapshots contain fully continuous partitions in all versions at all times.
// For evictable snapshots, that's not the case.
//
// Each version has its own continuity, fully specified in that version,
// independent of continuity of other versions. Continuity of the snapshot is a
// union of continuities of each version. This rule follows from the fact that we
// want eviction from older versions to not have to touch newer versions.
//
// It is assumed that continuous intervals in different versions are non-
// overlapping, with exceptions for points corresponding to complete rows.
// A row may overlap with another row, in which case it completely overrides
// it. A later version may have a row which falls into a continuous interval
// in the older version. A newer version cannot have a continuous interval
// which is not a row and covers a row in the older version. We make use of
// this assumption to make calculation of the union of intervals on merging
// easier.
//
// versions of evictable entries always have a dummy entry at position_in_partition::after_all_clustered_rows().
// This is needed so that they can be always made fully discontinuous by eviction, and because
// we need a way to link partitions with no rows into the LRU.
//
// Snapshots of evictable entries always have a row entry at
// position_in_partition::after_all_clustered_rows().
//

class partition_version_ref;

class partition_version : public anchorless_list_base_hook<partition_version> {
partition_version_ref* _backref = nullptr;
mutation_partition _partition;
mutation_partition_v2 _partition;

friend class partition_version_ref;
friend class partition_entry;
friend class partition_snapshot;
public:
static partition_version& container_of(mutation_partition& mp) {
static partition_version& container_of(mutation_partition_v2& mp) {
return *boost::intrusive::get_parent_from_member(&mp, &partition_version::_partition);
}

using is_evictable = bool_class<class evictable_tag>;

explicit partition_version(schema_ptr s) noexcept
: _partition(std::move(s)) { }
explicit partition_version(mutation_partition mp) noexcept
explicit partition_version(mutation_partition_v2 mp) noexcept
: _partition(std::move(mp)) { }
partition_version(partition_version&& pv) noexcept;
partition_version& operator=(partition_version&& pv) noexcept;
Expand All @@ -160,8 +132,8 @@ public:
// Returns stop_iteration::yes iff there are no more elements to free.
stop_iteration clear_gently(cache_tracker* tracker) noexcept;

mutation_partition& partition() { return _partition; }
const mutation_partition& partition() const { return _partition; }
mutation_partition_v2& partition() { return _partition; }
const mutation_partition_v2& partition() const { return _partition; }

bool is_referenced() const { return _backref; }
// Returns true iff this version is directly referenced from a partition_entry (is its newset version).
Expand Down Expand Up @@ -403,19 +375,6 @@ public:

using range_tombstone_result = utils::chunked_vector<range_tombstone>;

// Returns range tombstones overlapping with [start, end)
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
// Invokes the callback for every range tombstones overlapping with [start, end) until
// the callback returns stop_iteration::yes or all tombstones are exhausted.
// Returns stop_iteration::yes if all range tombstones in the range were consumed.
// When reversed is true, start and end are assumed to belong to the domain of reverse clustering order schema
// and the method produces range_tombstones in reverse order, conforming to reverse schema.
stop_iteration range_tombstones(position_in_partition_view start, position_in_partition_view end,
std::function<stop_iteration(range_tombstone)> callback,
bool reversed = false);
// Returns all range tombstones
range_tombstone_result range_tombstones();

phase_type phase() const { return _phase; }
};

Expand Down Expand Up @@ -485,7 +444,8 @@ public:
// Constructs a non-evictable entry holding empty partition
partition_entry() = default;
// Constructs a non-evictable entry
explicit partition_entry(mutation_partition mp);
explicit partition_entry(mutation_partition_v2);
partition_entry(const schema&, mutation_partition);
// Returns a reference to partition_entry containing given pv,
// assuming pv.is_referenced_from_entry().
static partition_entry& container_of(partition_version& pv) {
Expand Down Expand Up @@ -550,14 +510,21 @@ public:
void apply(logalloc::region&,
mutation_cleaner&,
const schema& s,
const mutation_partition& mp,
const mutation_partition_v2& mp,
const schema& mp_schema,
mutation_application_stats& app_stats);

void apply(logalloc::region&,
mutation_cleaner&,
const schema& s,
mutation_partition_v2&& mp,
const schema& mp_schema,
mutation_application_stats& app_stats);

void apply(logalloc::region&,
mutation_cleaner&,
const schema& s,
mutation_partition&& mp,
const mutation_partition& mp,
const schema& mp_schema,
mutation_application_stats& app_stats);

Expand Down Expand Up @@ -619,7 +586,7 @@ public:
return *_version;
}

mutation_partition squashed(schema_ptr from, schema_ptr to);
mutation_partition_v2 squashed(schema_ptr from, schema_ptr to);
mutation_partition squashed(const schema&);
tombstone partition_tombstone() const;

Expand Down
2 changes: 1 addition & 1 deletion replica/memtable.hh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public:
memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p)
: _schema(std::move(s))
, _key(std::move(key))
, _pe(std::move(p))
, _pe(*_schema, std::move(p))
{ }

memtable_entry(memtable_entry&& o) noexcept;
Expand Down
11 changes: 6 additions & 5 deletions row_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ void cache_entry::on_evicted(cache_tracker& tracker) noexcept {
}

void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
mutation_partition::rows_type::iterator it(this);
mutation_partition_v2::rows_type::iterator it(this);

if (is_last_dummy()) {
// Every evictable partition entry must have a dummy entry at the end,
Expand All @@ -1236,18 +1236,19 @@ void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
// When evicting a dummy with both sides continuous we don't need to break continuity.
//
auto still_continuous = continuous() && dummy();
mutation_partition::rows_type::key_grabber kg(it);
auto old_rt = range_tombstone();
mutation_partition_v2::rows_type::key_grabber kg(it);
kg.release(current_deleter<rows_entry>());
if (!still_continuous) {
if (!still_continuous || old_rt != it->range_tombstone()) {
it->set_continuous(false);
}
tracker.on_row_eviction();
}

mutation_partition::rows_type* rows = it.tree_if_singular();
mutation_partition_v2::rows_type* rows = it.tree_if_singular();
if (rows != nullptr) {
assert(it->is_last_dummy());
partition_version& pv = partition_version::container_of(mutation_partition::container_of(*rows));
partition_version& pv = partition_version::container_of(mutation_partition_v2::container_of(*rows));
if (pv.is_referenced_from_entry()) {
partition_entry& pe = partition_entry::container_of(pv);
if (!pe.is_locked()) {
Expand Down
18 changes: 12 additions & 6 deletions test/boost/cache_flat_mutation_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ struct expected_tombstone {
};

static void assert_cached_tombstones(partition_snapshot_ptr snp, std::deque<range_tombstone> expected, const query::clustering_row_ranges& ck_ranges) {
range_tombstone_list rts = snp->version()->partition().mutable_row_tombstones();
range_tombstone_list rts = snp->version()->partition().as_mutation_partition(*SCHEMA).mutable_row_tombstones();
rts.trim(*SCHEMA, ck_ranges);

range_tombstone_list expected_list(*SCHEMA);
Expand Down Expand Up @@ -498,7 +498,8 @@ SEASTAR_TEST_CASE(test_single_row_cached_as_noncontinuous_range_from_row_to_end)
SEASTAR_TEST_CASE(test_single_row_cached_as_noncontinuous_exclusive_range_on_the_left) {
return seastar::async([] {
test_single_row(1, true, is_continuous::no, make_slice({ query::clustering_range::make_ending_with({make_ck(1), false}) }), { }, {
expected_row(1, is_continuous::yes),
before_cont(1),
expected_row(1, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
Expand Down Expand Up @@ -966,7 +967,8 @@ SEASTAR_TEST_CASE(test_two_rows_cached_non_continuous_exclusive_range_on_the_lef
return seastar::async([] {
test_two_rows(1, true, is_continuous::no, 3, true, is_continuous::no, make_slice({ query::clustering_range::make_ending_with({make_ck(3), false}) }), { 1 }, {
expected_row(1, is_continuous::yes),
expected_row(3, is_continuous::yes),
before_cont(3),
expected_row(3, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
Expand All @@ -988,7 +990,8 @@ SEASTAR_TEST_CASE(test_two_rows_cached_non_continuous_exclusive_range_between_ro
test_two_rows(1, true, is_continuous::no, 3, true, is_continuous::no, make_slice({ query::clustering_range::make({make_ck(1), false}, {make_ck(3), false}) }), { }, {
expected_row(1, is_continuous::no),
after_notc(1),
expected_row(3, is_continuous::yes),
before_cont(3),
expected_row(3, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
Expand All @@ -1009,7 +1012,8 @@ SEASTAR_TEST_CASE(test_two_rows_cached_non_continuous_exclusive_range_between_ro
return seastar::async([] {
test_two_rows(1, true, is_continuous::no, 3, true, is_continuous::no, make_slice({ query::clustering_range::make(make_ck(1), {make_ck(3), false}) }), { 1 }, {
expected_row(1, is_continuous::no),
expected_row(3, is_continuous::yes),
before_cont(3),
expected_row(3, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
Expand Down Expand Up @@ -1298,7 +1302,8 @@ SEASTAR_TEST_CASE(test_single_row_and_tombstone_not_cached_single_row_range1) {
expected_fragment(1),
expected_fragment(position_in_partition_view::after_all_prefixed(make_ck(1)), {}),
}, {
expected_row(1, is_continuous::no),
before_notc(1),
expected_row(1, is_continuous::yes),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
}, { rt });
});
Expand Down Expand Up @@ -1347,6 +1352,7 @@ SEASTAR_TEST_CASE(test_single_row_and_tombstone_not_cached_single_row_range3) {
expected_fragment(4)
}, {
before_notc(0),
after_cont(2), // upper bound of rt
expected_row(4, is_continuous::yes),
after_cont(5),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
Expand Down

0 comments on commit 67b67a8

Please sign in to comment.