Skip to content

Commit

Permalink
test: view_build_test: add test_view_udate_generator_buffering_with_r…
Browse files Browse the repository at this point in the history
…andom_mutations

A random mutation test for view_updating_consumer's buffering logic.
Reproduces scylladb#14503.
  • Loading branch information
michoecho committed Jul 11, 2023
1 parent d45dec4 commit b3f1789
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions test/boost/view_build_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "test/lib/data_model.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/mutation_assertions.hh"
#include "utils/ranges.hh"

#include "readers/from_mutations_v2.hh"
Expand Down Expand Up @@ -871,3 +873,96 @@ SEASTAR_TEST_CASE(test_load_view_build_progress_with_values_missing) {
BOOST_REQUIRE(db::system_keyspace::load_view_build_progress().get0().empty());
});
}

// A random mutation test for view_updating_consumer's buffering logic.
// Passes random mutations through a view_updating_consumer with a extremely
// small buffer, which should cause a buffer flush after every mutation fragment.
// Should check that flushing works correctly in every position, and regardless
// of the last fragment and the last range tombstone change,
//
// Inspired by #14503.
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutations) {
// Collects the mutations produced by the tested view_updating_consumer into a vector.
class consumer_verifier {
schema_ptr _schema;
std::vector<mutation>& _collected_muts;
std::unique_ptr<row_locker> _rl;
std::unique_ptr<row_locker::stats> _rl_stats;
bool& _ok;

private:
void check(mutation mut) {
BOOST_REQUIRE(!mut.partition().empty());
_collected_muts.push_back(std::move(mut));
}

public:
consumer_verifier(schema_ptr schema, std::vector<mutation>& collected_muts, bool& ok)
: _schema(std::move(schema))
, _collected_muts(collected_muts)
, _rl(std::make_unique<row_locker>(_schema))
, _rl_stats(std::make_unique<row_locker::stats>())
, _ok(ok)
{ }

future<row_locker::lock_holder> operator()(mutation mut) {
try {
check(std::move(mut));
} catch (...) {
_ok = false;
BOOST_FAIL(fmt::format("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception()));
}
return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats);
}
};

// Create a random mutation.
// We don't really want a random `mutation`, but a random valid mutation fragment
// stream. But I don't know a better way to get that other than to create a random
// `mutation` and shove it through readers.
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
mutation mut = gen();
schema_ptr schema = gen.schema();

// Turn the random mutation into a mutation fragment stream,
// so it can be fed to the view_updating_consumer.
// Quite verbose. Perhaps there exists a simpler way to do this.
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost);
auto stop_sem = deferred_stop(sem);
const abort_source as;
auto mt = make_lw_shared<replica::memtable>(schema);
mt->apply(mut);
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout).get0();
auto p = make_manually_paused_evictable_reader_v2(
mt->as_data_source(),
schema,
permit,
query::full_partition_range,
schema->full_slice(),
service::get_local_streaming_priority(),
nullptr,
::mutation_reader::forwarding::no);
auto& staging_reader = std::get<0>(p);
auto& staging_reader_handle = std::get<1>(p);
auto close_staging_reader = deferred_close(staging_reader);

// Feed the random valid mutation fragment stream to the view_updating_consumer,
// and collect its outputs.
std::vector<mutation> collected_muts;
bool ok = true;
auto vuc = db::view::view_updating_consumer(schema, permit, as, staging_reader_handle,
consumer_verifier(schema, collected_muts, ok));
vuc.set_buffer_size_limit_for_testing_purposes(1);
staging_reader.consume_in_thread(std::move(vuc));

// Check that the outputs sum up to the initial mutation.
// We could also check that they are non-overlapping, which is
// expected from the view_updating_consumer flushes, but it's
// not necessary for correctness.
BOOST_REQUIRE(ok);
mutation total(schema, mut.decorated_key());
for (const auto& x : collected_muts) {
total += x;
}
assert_that(total).is_equal_to_compacted(mut);
}

0 comments on commit b3f1789

Please sign in to comment.