Skip to content

Commit

Permalink
Merge 'cache_flat_mutation_reader: use the correct schema in prepare_…
Browse files Browse the repository at this point in the history
…hash' from Michał Chojnowski

Since `mvcc: make schema upgrades gentle` (51e3b93),
rows pointed to by the cursor can have different (older) schema
than the schema of the cursor's snapshot.

However, one place in the code wasn't updated accordingly,
causing a row to be processed with the wrong schema in the right
circumstances.

This passed through unit testing because it requires
a digest-computing cache read after a schema change,
and no test exercised this.

This series fixes the bug and adds a unit test which reproduces the issue.

Fixes #14110

Closes #14305

* github.com:scylladb/scylladb:
  test: boost/row_cache_test: add a reproducer for #14110
  cache_flat_mutation_reader: use the correct schema in prepare_hash
  mutation: mutation_cleaner: add pause()
  • Loading branch information
tgrabiec committed Jun 19, 2023
2 parents 3889e90 + 02bcb5d commit 5fa08ad
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cache_flat_mutation_reader.hh
Expand Up @@ -894,7 +894,7 @@ void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_curs
if (!row.dummy()) {
_read_context.cache().on_row_hit();
if (_read_context.digest_requested()) {
row.latest_row().cells().prepare_hash(table_schema(), column_kind::regular_column);
row.latest_row_prepare_hash();
}
add_clustering_row_to_buffer(mutation_fragment_v2(*_schema, _permit, row.row()));
} else {
Expand Down
29 changes: 28 additions & 1 deletion mutation/mutation_cleaner.hh
Expand Up @@ -24,6 +24,7 @@ class mutation_cleaner_impl final {
snapshot_list snapshots;
logalloc::allocating_section alloc_section;
bool done = false; // true means the worker was abandoned and cannot access the mutation_cleaner_impl instance.
int64_t merging_paused = 0; // Allows for pausing the background merging. Used only for testing purposes.
};
private:
logalloc::region& _region;
Expand Down Expand Up @@ -67,6 +68,13 @@ public:
_scheduling_group = sg;
_worker_state->cv.broadcast();
}
auto pause() {
_worker_state->merging_paused += 1;
return defer([this] {
_worker_state->merging_paused -= 1;
_worker_state->cv.signal();
});
};
auto make_region_space_guard() {
return defer([&, dirty_before = _region.occupancy().total_space()] {
auto dirty_after = _region.occupancy().total_space();
Expand All @@ -93,7 +101,7 @@ void mutation_cleaner_impl::destroy_gently(partition_version& v) noexcept {

inline
void mutation_cleaner_impl::merge_and_destroy(partition_snapshot& ps) noexcept {
if (ps.slide_to_oldest() == stop_iteration::yes || merge_some(ps) == stop_iteration::yes) {
if (ps.slide_to_oldest() == stop_iteration::yes || (!_worker_state->merging_paused && merge_some(ps) == stop_iteration::yes)) {
lw_shared_ptr<partition_snapshot>::dispose(&ps);
} else {
// The snapshot must not be reachable by partitino_entry::read() after this,
Expand Down Expand Up @@ -194,4 +202,23 @@ public:
void merge_and_destroy(partition_snapshot& ps) {
return _impl->merge_and_destroy(ps);
}

// Ensures the cleaner isn't doing any version merging while
// the returned guard object is alive.
//
// Example usage:
//
// mutation_cleaner cleaner;
// ...
// {
// auto pause_guard = cleaner.pause();
// // Merging is paused here.
// ...
// }
// // Merging happens normally here.
//
// Meant only for use by unit tests.
auto pause() {
return _impl->pause();
}
};
5 changes: 4 additions & 1 deletion mutation/mutation_partition.cc
Expand Up @@ -2584,11 +2584,14 @@ void mutation_cleaner_impl::start_worker() {
}
return with_scheduling_group(_scheduling_group, [w, this] {
return w->cv.wait([w] {
return w->done || !w->snapshots.empty();
return (w->done || !w->snapshots.empty()) && !w->merging_paused;
}).then([this, w] () noexcept {
if (w->done) {
return stop_iteration::yes;
}
if (w->merging_paused) {
return stop_iteration::no;
}
merge_some();
return stop_iteration::no;
});
Expand Down
4 changes: 2 additions & 2 deletions partition_snapshot_row_cursor.hh
Expand Up @@ -565,8 +565,8 @@ public:
}

// Can be called only when cursor is valid and pointing at a row.
deletable_row& latest_row() const noexcept {
return _current_row[0].it->row();
void latest_row_prepare_hash() const {
_current_row[0].it->row().cells().prepare_hash(*_current_row[0].schema, column_kind::regular_column);
}

// Can be called only when cursor is valid and pointing at a row.
Expand Down
67 changes: 67 additions & 0 deletions test/boost/row_cache_test.cc
Expand Up @@ -4521,3 +4521,70 @@ SEASTAR_THREAD_TEST_CASE(test_population_of_subrange_of_expired_partition) {
.produces(m1)
.produces_end_of_stream();
}

// Reproducer for #14110.
// Forces a scenario where digest is calculated for rows in old MVCC
// versions, incompatible with the current schema.
// In the original issue, this crashed the node with an assert failure,
// because the digest calculation was passed the current schema,
// instead of the row's actual old schema.
SEASTAR_THREAD_TEST_CASE(test_digest_read_during_schema_upgrade) {
// The test will insert a row into the cache,
// then drop a column, and read the old row with the new schema.
// If the old row was processed with the new schema,
// the test would fail because one of the row's columns would
// have no definition.
auto s1 = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck", utf8_type, column_kind::clustering_key)
.with_column("v1", utf8_type, column_kind::regular_column)
.build();
auto s2 = schema_builder(s1)
.remove_column("v1")
.build();

// Create a mutation with one row, with inconsequential keys and values.
auto pk = partition_key::from_single_value(*s1, serialized(0));
auto m1 = std::invoke([s1, pk] {
auto x = mutation(s1, pk);
auto ck = clustering_key::from_single_value(*s1, serialized(0));
x.set_clustered_cell(ck, "v1", "v1_value", api::new_timestamp());
return x;
});

// Populate the cache with m1.
memtable_snapshot_source underlying(s1);
underlying.apply(m1);
cache_tracker tracker;
row_cache cache(s1, snapshot_source([&] { return underlying(); }), tracker);
populate_range(cache);

// A schema upgrade of a MVCC version happens by adding an empty version
// with the new schema next to it, and merging the old-schema version into
// the new-schema version.
//
// We want to test a read of rows which are still in the old-schema
// version. To ensure that, we have to prevent mutation_cleaner from
// merging the versions until the test is done.
auto pause_background_merges = tracker.cleaner().pause();

// Upgrade the cache
cache.set_schema(s2);

// Create a digest-requesting reader for the tested partition.
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pr = dht::partition_range::make_singular(dht::decorate_key(*s1, pk));
auto slice = partition_slice_builder(*s2)
.with_option<query::partition_slice::option::with_digest>()
.build();
auto rd = cache.make_reader(s2, semaphore.make_permit(), pr, slice);
auto close_rd = deferred_close(rd);

// In the original issue reproduced by this test, the read would crash
// on an assert.
// So what we are really testing below is that the read doesn't crash.
// The comparison with m2 is just a sanity check.
auto m2 = m1;
m2.upgrade(s2);
assert_that(std::move(rd)).produces(m2);
}

0 comments on commit 5fa08ad

Please sign in to comment.