From 4f73a281747aa1dd598643aa008047d2abc1a720 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Mon, 19 Jun 2023 13:56:51 +0200 Subject: [PATCH 1/3] mutation: mutation_cleaner: add pause() In unit tests, we would want to delay the merging of some MVCC versions to test the transient scenarios with multiple versions present. In many cases this can be done by holding snapshots to all versions. But sometimes (i.e. during schema upgrades) versions are added and scheduled for merge immediately, without a window for the test to grab a snapshot to the new version. This patch adds a pause() method to mutation_cleaner, which ensures that no asynchronous/implicit MVCC version merges happen within the scope of the call. This functionality will be used by a test added in an upcoming patch. --- mutation/mutation_cleaner.hh | 29 ++++++++++++++++++++++++++++- mutation/mutation_partition.cc | 5 ++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/mutation/mutation_cleaner.hh b/mutation/mutation_cleaner.hh index 59653da8ee6f..6cad63f9360f 100644 --- a/mutation/mutation_cleaner.hh +++ b/mutation/mutation_cleaner.hh @@ -24,6 +24,7 @@ class mutation_cleaner_impl final { snapshot_list snapshots; logalloc::allocating_section alloc_section; bool done = false; // true means the worker was abandoned and cannot access the mutation_cleaner_impl instance. + int64_t merging_paused = 0; // Allows for pausing the background merging. Used only for testing purposes. }; private: logalloc::region& _region; @@ -67,6 +68,13 @@ public: _scheduling_group = sg; _worker_state->cv.broadcast(); } + auto pause() { + _worker_state->merging_paused += 1; + return defer([this] { + _worker_state->merging_paused -= 1; + _worker_state->cv.signal(); + }); + }; auto make_region_space_guard() { return defer([&, dirty_before = _region.occupancy().total_space()] { auto dirty_after = _region.occupancy().total_space(); @@ -93,7 +101,7 @@ void mutation_cleaner_impl::destroy_gently(partition_version& v) noexcept { inline void mutation_cleaner_impl::merge_and_destroy(partition_snapshot& ps) noexcept { - if (ps.slide_to_oldest() == stop_iteration::yes || merge_some(ps) == stop_iteration::yes) { + if (ps.slide_to_oldest() == stop_iteration::yes || (!_worker_state->merging_paused && merge_some(ps) == stop_iteration::yes)) { lw_shared_ptr::dispose(&ps); } else { // The snapshot must not be reachable by partitino_entry::read() after this, @@ -194,4 +202,23 @@ public: void merge_and_destroy(partition_snapshot& ps) { return _impl->merge_and_destroy(ps); } + + // Ensures the cleaner isn't doing any version merging while + // the returned guard object is alive. + // + // Example usage: + // + // mutation_cleaner cleaner; + // ... + // { + // auto pause_guard = cleaner.pause(); + // // Merging is paused here. + // ... + // } + // // Merging happens normally here. + // + // Meant only for use by unit tests. + auto pause() { + return _impl->pause(); + } }; diff --git a/mutation/mutation_partition.cc b/mutation/mutation_partition.cc index 2aa19c5cc9d7..3600d3a0d20e 100644 --- a/mutation/mutation_partition.cc +++ b/mutation/mutation_partition.cc @@ -2584,11 +2584,14 @@ void mutation_cleaner_impl::start_worker() { } return with_scheduling_group(_scheduling_group, [w, this] { return w->cv.wait([w] { - return w->done || !w->snapshots.empty(); + return (w->done || !w->snapshots.empty()) && !w->merging_paused; }).then([this, w] () noexcept { if (w->done) { return stop_iteration::yes; } + if (w->merging_paused) { + return stop_iteration::no; + } merge_some(); return stop_iteration::no; }); From d56b0c20f4feba3f58cbf1fc2cfeb4c2536d12a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Mon, 19 Jun 2023 17:32:50 +0200 Subject: [PATCH 2/3] cache_flat_mutation_reader: use the correct schema in prepare_hash Since `mvcc: make schema upgrades gentle` (51e3b9321b2e), rows pointed to by the cursor can have different (older) schema than the schema of the cursor's snapshot. However, one place in the code wasn't updated accordingly, causing a row to be processed with the wrong schema in the right circumstances. This passed through unit testing because it requires a digest-computing cache read after a schema change, and no test exercised this. Fixes #14110 --- cache_flat_mutation_reader.hh | 2 +- partition_snapshot_row_cursor.hh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cache_flat_mutation_reader.hh b/cache_flat_mutation_reader.hh index 7992e6e42bc7..f0cf3f50a2f6 100644 --- a/cache_flat_mutation_reader.hh +++ b/cache_flat_mutation_reader.hh @@ -894,7 +894,7 @@ void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_curs if (!row.dummy()) { _read_context.cache().on_row_hit(); if (_read_context.digest_requested()) { - row.latest_row().cells().prepare_hash(table_schema(), column_kind::regular_column); + row.latest_row_prepare_hash(); } add_clustering_row_to_buffer(mutation_fragment_v2(*_schema, _permit, row.row())); } else { diff --git a/partition_snapshot_row_cursor.hh b/partition_snapshot_row_cursor.hh index 149fc2b5c12c..745bce1dbc9e 100644 --- a/partition_snapshot_row_cursor.hh +++ b/partition_snapshot_row_cursor.hh @@ -565,8 +565,8 @@ public: } // Can be called only when cursor is valid and pointing at a row. - deletable_row& latest_row() const noexcept { - return _current_row[0].it->row(); + void latest_row_prepare_hash() const { + _current_row[0].it->row().cells().prepare_hash(*_current_row[0].schema, column_kind::regular_column); } // Can be called only when cursor is valid and pointing at a row. From 02bcb5d539af6a40d8af5a37e1470b42f0a1e313 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Mon, 19 Jun 2023 18:10:25 +0200 Subject: [PATCH 3/3] test: boost/row_cache_test: add a reproducer for #14110 --- test/boost/row_cache_test.cc | 67 ++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index 84807198bf97..1c59f0fe5be1 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4521,3 +4521,70 @@ SEASTAR_THREAD_TEST_CASE(test_population_of_subrange_of_expired_partition) { .produces(m1) .produces_end_of_stream(); } + +// Reproducer for #14110. +// Forces a scenario where digest is calculated for rows in old MVCC +// versions, incompatible with the current schema. +// In the original issue, this crashed the node with an assert failure, +// because the digest calculation was passed the current schema, +// instead of the row's actual old schema. +SEASTAR_THREAD_TEST_CASE(test_digest_read_during_schema_upgrade) { + // The test will insert a row into the cache, + // then drop a column, and read the old row with the new schema. + // If the old row was processed with the new schema, + // the test would fail because one of the row's columns would + // have no definition. + auto s1 = schema_builder("ks", "cf") + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("ck", utf8_type, column_kind::clustering_key) + .with_column("v1", utf8_type, column_kind::regular_column) + .build(); + auto s2 = schema_builder(s1) + .remove_column("v1") + .build(); + + // Create a mutation with one row, with inconsequential keys and values. + auto pk = partition_key::from_single_value(*s1, serialized(0)); + auto m1 = std::invoke([s1, pk] { + auto x = mutation(s1, pk); + auto ck = clustering_key::from_single_value(*s1, serialized(0)); + x.set_clustered_cell(ck, "v1", "v1_value", api::new_timestamp()); + return x; + }); + + // Populate the cache with m1. + memtable_snapshot_source underlying(s1); + underlying.apply(m1); + cache_tracker tracker; + row_cache cache(s1, snapshot_source([&] { return underlying(); }), tracker); + populate_range(cache); + + // A schema upgrade of a MVCC version happens by adding an empty version + // with the new schema next to it, and merging the old-schema version into + // the new-schema version. + // + // We want to test a read of rows which are still in the old-schema + // version. To ensure that, we have to prevent mutation_cleaner from + // merging the versions until the test is done. + auto pause_background_merges = tracker.cleaner().pause(); + + // Upgrade the cache + cache.set_schema(s2); + + // Create a digest-requesting reader for the tested partition. + tests::reader_concurrency_semaphore_wrapper semaphore; + auto pr = dht::partition_range::make_singular(dht::decorate_key(*s1, pk)); + auto slice = partition_slice_builder(*s2) + .with_option() + .build(); + auto rd = cache.make_reader(s2, semaphore.make_permit(), pr, slice); + auto close_rd = deferred_close(rd); + + // In the original issue reproduced by this test, the read would crash + // on an assert. + // So what we are really testing below is that the read doesn't crash. + // The comparison with m2 is just a sanity check. + auto m2 = m1; + m2.upgrade(s2); + assert_that(std::move(rd)).produces(m2); +}