Skip to content

Commit

Permalink
Merge ' mvcc: make schema upgrades gentle' from Michał Chojnowski
Browse files Browse the repository at this point in the history
After a schema change, memtable and cache have to be upgraded to the new schema. Currently, they are upgraded (on the first access after a schema change) atomically, i.e. all rows of the entry are upgraded with one non-preemptible call. This is a one of the last vestiges of the times when partition were treated atomically, and it is a well known source of numerous large stalls.

This series makes schema upgrades gentle (preemptible). This is done by co-opting the existing MVCC machinery.
Before the series, all partition_versions in the partition_entry chain have the same schema, and an entry upgrade replaces the entire chain with a single squashed and upgraded version.
After the series, each partition_version has its own schema. A partition entry upgrade happens simply by adding an empty version with the new schema to the head of the chain. Row entries are upgraded to the current schema on-the-fly by the cursor during reads, and by the MVCC version merge ongoing in the background after the upgrade.

The series:
1. Does some code cleanup in the mutation_partition area.
2. Adds a schema field to partition_version and removes it from its containers (partition_snapshot, cache_entry, memtable_entry).
3. Adds upgrading variants of constructors and apply() for `row` and its wrappers.
4. Prepares partition_snapshot_row_cursor, mutation_partition_v2::apply_monotonically and partition_snapshot::merge_partition_versions for dealing with heterogeneous version chains.
5. Modifies partition_entry::upgrade to perform upgrades by extending the version chain with a new schema instead of squashing it to a single upgraded version.

Fixes scylladb#2577

Closes scylladb#13761

* github.com:scylladb/scylladb:
  test: mvcc_test: add a test for gentle schema upgrades
  partition_version: make partition_entry::upgrade() gentle
  partition_version: handle multi-schema snapshots in merge_partition_versions
  mutation_partition_v2: handle schema upgrades in apply_monotonically()
  partition_version: remove the unused "from" argument in partition_entry::upgrade()
  row_cache_test: prepare test_eviction_after_schema_change for gentle schema upgrades
  partition_version: handle multi-schema entries in partition_entry::squashed
  partition_snapshot_row_cursor: handle multi-schema snapshots
  partiton_version: prepare partition_snapshot::squashed() for multi-schema snapshots
  partition_version: prepare partition_snapshot::static_row() for multi-schema snapshots
  partition_version: add a logalloc::region argument to partition_entry::upgrade()
  memtable: propagate the region to memtable_entry::upgrade_schema()
  mutation_partition: add an upgrading variant of lazy_row::apply()
  mutation_partition: add an upgrading variant of rows_entry::rows_entry
  mutation_partition: switch an apply() call to apply_monotonically()
  mutation_partition: add an upgrading variant of rows_entry::apply_monotonically()
  mutation_fragment: add an upgrading variant of clustering_row::apply()
  mutation_partition: add an upgrading variant of row::row
  partition_version: remove _schema from partition_entry::operator<<
  partition_version: remove the schema argument from partition_entry::read()
  memtable: remove _schema from memtable_entry
  row_cache: remove _schema from cache_entry
  partition_version: remove the _schema field from partition_snapshot
  partition_version: add a _schema field to partition_version
  mutation_partition: change schema_ptr to schema& in mutation_partition::difference
  mutation_partition: change schema_ptr to schema& in mutation_partition constructor
  mutation_partition_v2: change schema_ptr to schema& in mutation_partition_v2 constructor
  mutation_partition: add upgrading variants of row::apply()
  partition_version: update the comment to apply_to_incomplete()
  mutation_partition_v2: clean up variants of apply()
  mutation_partition: remove apply_weak()
  mutation_partition_v2: remove a misleading comment in apply_monotonically()
  row_cache_test: add schema changes to test_concurrent_reads_and_eviction
  mutation_partition: fix mixed-schema apply()
  • Loading branch information
tgrabiec committed May 24, 2023
2 parents 7cdee30 + 2d1a345 commit 51e3b93
Show file tree
Hide file tree
Showing 20 changed files with 697 additions and 420 deletions.
2 changes: 1 addition & 1 deletion db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ mutation_partition& view_updates::partition_for(partition_key&& key) {
if (it != _updates.end()) {
return it->second;
}
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
return _updates.emplace(std::move(key), mutation_partition(*_view)).first->second;
}

size_t view_updates::op_count() const {
Expand Down
4 changes: 2 additions & 2 deletions mutation/mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
mutation::data::data(dht::decorated_key&& key, schema_ptr&& schema)
: _schema(std::move(schema))
, _dk(std::move(key))
, _p(_schema)
, _p(*_schema)
{ }

mutation::data::data(partition_key&& key_, schema_ptr&& schema)
: _schema(std::move(schema))
, _dk(dht::decorate_key(*_schema, std::move(key_)))
, _p(_schema)
, _p(*_schema)
{ }

mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, const mutation_partition& mp)
Expand Down
3 changes: 3 additions & 0 deletions mutation/mutation_fragment.hh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public:
void apply(const schema& s, const deletable_row& r) {
_row.apply(s, r);
}
void apply(const schema& our_schema, const schema& their_schema, const deletable_row& r) {
_row.apply(our_schema, their_schema, r);
}

position_in_partition_view position() const;

Expand Down
137 changes: 95 additions & 42 deletions mutation/mutation_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,21 +243,6 @@ void mutation_partition::ensure_last_dummy(const schema& s) {
}
}

void mutation_partition::apply(const schema& s, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats) {
apply_weak(s, p, p_schema, app_stats);
}

void mutation_partition::apply(const schema& s, mutation_partition&& p,
mutation_application_stats& app_stats) {
apply_weak(s, std::move(p), app_stats);
}

void mutation_partition::apply(const schema& s, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats) {
apply_weak(s, p, p_schema, app_stats);
}

struct mutation_fragment_applier {
const schema& _s;
mutation_partition& _mp;
Expand Down Expand Up @@ -489,7 +474,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
if (s.version() == p_schema.version()) {
return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, preemptible, res);
} else {
mutation_partition p2(s, p);
mutation_partition p2(p_schema, p);
p2.upgrade(p_schema, s);
return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::no, res); // FIXME: make preemptible
}
Expand All @@ -508,7 +493,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
}

void
mutation_partition::apply_weak(const schema& s, mutation_partition_view p,
mutation_partition::apply(const schema& s, mutation_partition_view p,
const schema& p_schema, mutation_application_stats& app_stats) {
// FIXME: Optimize
mutation_partition p2(*this, copy_comparators_only{});
Expand All @@ -517,13 +502,13 @@ mutation_partition::apply_weak(const schema& s, mutation_partition_view p,
apply_monotonically(s, std::move(p2), p_schema, app_stats);
}

void mutation_partition::apply_weak(const schema& s, const mutation_partition& p,
void mutation_partition::apply(const schema& s, const mutation_partition& p,
const schema& p_schema, mutation_application_stats& app_stats) {
// FIXME: Optimize
apply_monotonically(s, mutation_partition(s, p), p_schema, app_stats);
apply_monotonically(s, mutation_partition(p_schema, p), p_schema, app_stats);
}

void mutation_partition::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
void mutation_partition::apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
apply_monotonically(s, std::move(p), no_cache_tracker, app_stats);
}

Expand Down Expand Up @@ -1161,22 +1146,42 @@ deletable_row::equal(column_kind kind, const schema& s, const deletable_row& oth
return _cells.equal(kind, s, other._cells, other_schema);
}

void deletable_row::apply(const schema& s, deletable_row&& src) {
apply_monotonically(s, std::move(src));
}

void deletable_row::apply(const schema& s, const deletable_row& src) {
apply_monotonically(s, src);
}

void deletable_row::apply(const schema& s, deletable_row&& src) {
apply_monotonically(s, std::move(src));
void deletable_row::apply(const schema& our_schema, const schema& their_schema, deletable_row&& src) {
apply_monotonically(our_schema, their_schema, std::move(src));
}

void deletable_row::apply(const schema& our_schema, const schema& their_schema, const deletable_row& src) {
apply_monotonically(our_schema, their_schema, src);
}

void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) {
_cells.apply_monotonically(s, column_kind::regular_column, std::move(src._cells));
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}

void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) {
_cells.apply(s, column_kind::regular_column, src._cells);
_cells.apply_monotonically(s, column_kind::regular_column, src._cells);
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}

void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) {
_cells.apply(s, column_kind::regular_column, std::move(src._cells));
void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src) {
_cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, std::move(src._cells));
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}

void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src) {
_cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, src._cells);
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}
Expand Down Expand Up @@ -1576,6 +1581,16 @@ row::row(const schema& s, column_kind kind, const row& o) : _size(o._size)
_cells.clone_from(o._cells, clone_cell_and_hash);
}

row row::construct(const schema& our_schema, const schema& their_schema, column_kind kind, const row& o) {
if (our_schema.version() == their_schema.version()) {
return row(our_schema, kind, o);
} else {
row r;
r.apply(our_schema, their_schema, kind, o);
return r;
}
}

row::~row() {
}

Expand Down Expand Up @@ -1640,7 +1655,32 @@ row& row::operator=(row&& other) noexcept {
return *this;
}

void row::apply(const schema& s, column_kind kind, row&& other) {
apply_monotonically(s, kind, std::move(other));
}

void row::apply(const schema& s, column_kind kind, const row& other) {
apply_monotonically(s, kind, other);
}

void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) {
apply_monotonically(our_schema, their_schema, kind, std::move(other));
};

void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) {
apply_monotonically(our_schema, their_schema, kind, other);
};

void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
if (other.empty()) {
return;
}
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
});
}

void row::apply_monotonically(const schema& s, column_kind kind, const row& other) {
if (other.empty()) {
return;
}
Expand All @@ -1649,16 +1689,29 @@ void row::apply(const schema& s, column_kind kind, const row& other) {
});
}

void row::apply(const schema& s, column_kind kind, row&& other) {
apply_monotonically(s, kind, std::move(other));
void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) {
if (our_schema.version() == their_schema.version()) {
return apply_monotonically(our_schema, kind, std::move(other));
}
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
const column_definition& their_col = their_schema.column_at(kind, id);
const column_definition* our_col = our_schema.get_column_definition(their_col.name());
if (our_col) {
converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell);
}
});
}

void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
if (other.empty()) {
return;
void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) {
if (our_schema.version() == their_schema.version()) {
return apply_monotonically(our_schema, kind, other);
}
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
other.for_each_cell([&] (column_id id, const cell_and_hash& c_a_h) {
const column_definition& their_col = their_schema.column_at(kind, id);
const column_definition* our_col = our_schema.get_column_definition(their_col.name());
if (our_col) {
converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell);
}
});
}

Expand Down Expand Up @@ -1873,32 +1926,32 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
return !is_missing() && _ttl != dead;
}

mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const
mutation_partition mutation_partition::difference(const schema& s, const mutation_partition& other) const
{
check_schema(*s);
check_schema(s);
mutation_partition mp(s);
if (_tombstone > other._tombstone) {
mp.apply(_tombstone);
}
mp._static_row = _static_row.difference(*s, column_kind::static_column, other._static_row);
mp._static_row = _static_row.difference(s, column_kind::static_column, other._static_row);

mp._row_tombstones = _row_tombstones.difference(*s, other._row_tombstones);
mp._row_tombstones = _row_tombstones.difference(s, other._row_tombstones);

auto it_r = other._rows.begin();
rows_entry::compare cmp_r(*s);
rows_entry::compare cmp_r(s);
for (auto&& r : _rows) {
if (r.dummy()) {
continue;
}
while (it_r != other._rows.end() && (it_r->dummy() || cmp_r(*it_r, r))) {
++it_r;
}
if (it_r == other._rows.end() || !it_r->key().equal(*s, r.key())) {
mp.insert_row(*s, r.key(), r.row());
if (it_r == other._rows.end() || !it_r->key().equal(s, r.key())) {
mp.insert_row(s, r.key(), r.row());
} else {
auto dr = r.row().difference(*s, column_kind::regular_column, it_r->row());
auto dr = r.row().difference(s, column_kind::regular_column, it_r->row());
if (!dr.empty()) {
mp.insert_row(*s, r.key(), std::move(dr));
mp.insert_row(s, r.key(), std::move(dr));
}
}
}
Expand Down Expand Up @@ -1936,7 +1989,7 @@ void mutation_partition::accept(const schema& s, mutation_partition_visitor& v)
void
mutation_partition::upgrade(const schema& old_schema, const schema& new_schema) {
// We need to copy to provide strong exception guarantees.
mutation_partition tmp(new_schema.shared_from_this());
mutation_partition tmp(new_schema);
tmp.set_static_row_continuous(_static_row_continuous);
converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp);
accept(old_schema, v);
Expand Down

0 comments on commit 51e3b93

Please sign in to comment.