Skip to content

Commit

Permalink
Merge 'view: fix range tombstone handling on flushes in view_updating…
Browse files Browse the repository at this point in the history
…_consumer' from Michał Chojnowski

View update routines accept `mutation` objects.
But what comes out of staging sstable readers is a stream of mutation_fragment_v2 objects.
To build view updates after a repair/streaming, we have to convert the fragment stream into `mutation`s. This is done by piping the stream to mutation_rebuilder_v2.

To keep memory usage limited, the stream for a single partition might have to be split into multiple partial `mutation` objects. view_update_consumer does that, but in improper way -- when the split/flush happens inside an active range tombstone, the range tombstone isn't closed properly. This is illegal, and triggers an internal error.

This patch fixes the problem by closing the active range tombstone (and reopening in the same position in the next `mutation` object).

The tombstone is closed just after the last seen clustered position. This is not necessary for correctness -- for example we could delay all processing of the range tombstone until we see its end bound -- but it seems like the most natural semantic.

Fixes #14503

Closes #14502

* github.com:scylladb/scylladb:
  test: view_build_test: add range tombstones to test_view_update_generator_buffering
  test: view_build_test: add test_view_udate_generator_buffering_with_random_mutations
  view_updating_consumer: make buffer limit a variable
  view: fix range tombstone handling on flushes in view_updating_consumer

(cherry picked from commit c25201c)
  • Loading branch information
tgrabiec authored and nyh committed Jul 11, 2023
1 parent 0f4f863 commit 3f2f060
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 18 deletions.
11 changes: 5 additions & 6 deletions db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2552,9 +2552,6 @@ future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_d
});
}

const size_t view_updating_consumer::buffer_size_soft_limit{1 * 1024 * 1024};
const size_t view_updating_consumer::buffer_size_hard_limit{2 * 1024 * 1024};

void view_updating_consumer::do_flush_buffer() {
_staging_reader_handle.pause();

Expand All @@ -2577,6 +2574,10 @@ void view_updating_consumer::do_flush_buffer() {
}

void view_updating_consumer::flush_builder() {
_buffer.emplace_back(_mut_builder->flush());
}

void view_updating_consumer::end_builder() {
_mut_builder->consume_end_of_partition();
if (auto mut_opt = _mut_builder->consume_end_of_stream()) {
_buffer.emplace_back(std::move(*mut_opt));
Expand All @@ -2585,11 +2586,9 @@ void view_updating_consumer::flush_builder() {
}

void view_updating_consumer::maybe_flush_buffer_mid_partition() {
if (_buffer_size >= buffer_size_hard_limit) {
if (_buffer_size >= _buffer_size_hard_limit) {
flush_builder();
auto dk = _buffer.back().decorated_key();
do_flush_buffer();
consume_new_partition(dk);
}
}

Expand Down
18 changes: 14 additions & 4 deletions db/view/view_updating_consumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,17 @@ public:
// We prefer flushing on partition boundaries, so at the end of a partition,
// we flush on reaching the soft limit. Otherwise we continue accumulating
// data. We flush mid-partition if we reach the hard limit.
static const size_t buffer_size_soft_limit;
static const size_t buffer_size_hard_limit;
static constexpr size_t buffer_size_soft_limit_default = 1 * 1024 * 1024;
static constexpr size_t buffer_size_hard_limit_default = 2 * 1024 * 1024;
private:
size_t _buffer_size_soft_limit = buffer_size_soft_limit_default;
size_t _buffer_size_hard_limit = buffer_size_hard_limit_default;
public:
// Meant only for usage in tests.
void set_buffer_size_limit_for_testing_purposes(size_t sz) {
_buffer_size_soft_limit = sz;
_buffer_size_hard_limit = sz;
}

private:
schema_ptr _schema;
Expand All @@ -49,6 +58,7 @@ private:
private:
void do_flush_buffer();
void flush_builder();
void end_builder();
void maybe_flush_buffer_mid_partition();

public:
Expand Down Expand Up @@ -113,8 +123,8 @@ public:
if (_as->abort_requested()) {
return stop_iteration::yes;
}
flush_builder();
if (_buffer_size >= buffer_size_soft_limit) {
end_builder();
if (_buffer_size >= _buffer_size_soft_limit) {
do_flush_buffer();
}
return stop_iteration::no;
Expand Down
32 changes: 32 additions & 0 deletions mutation_rebuilder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ public:
return stop_iteration::yes;
}

// Might only be called between consume_new_partition()
// and consume_end_of_partition().
//
// Returns (and forgets) the partition contents consumed so far.
// Can be used to split the processing of a large mutation into
// multiple smaller `mutation` objects (which add up to the full mutation).
mutation flush() {
assert(_m);
return std::exchange(*_m, mutation(_s, _m->decorated_key()));
}

mutation_opt consume_end_of_stream() {
return std::move(_m);
}
Expand All @@ -67,6 +78,7 @@ class mutation_rebuilder_v2 {
schema_ptr _s;
mutation_rebuilder _builder;
range_tombstone_assembler _rt_assembler;
position_in_partition _pos = position_in_partition::before_all_clustered_rows();
public:
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { }
public:
Expand All @@ -91,6 +103,7 @@ public:
}

stop_iteration consume(range_tombstone_change&& rt) {
_pos = rt.position();
if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) {
_builder.consume(std::move(*rt_opt));
}
Expand All @@ -103,6 +116,7 @@ public:
}

stop_iteration consume(clustering_row&& cr) {
_pos = position_in_partition::after_key(*_s, cr.position());
_builder.consume(std::move(cr));
return stop_iteration::no;
}
Expand All @@ -116,4 +130,22 @@ public:
_rt_assembler.on_end_of_stream();
return _builder.consume_end_of_stream();
}

// Might only be called between consume_new_partition()
// and consume_end_of_partition().
//
// Returns (and forgets) the partition contents consumed so far.
// Can be used to split the processing of a large mutation into
// multiple smaller `mutation` objects (which add up to the full mutation).
//
// The active range tombstone (if present) is flushed with end bound
// just after the last seen clustered position, but the range tombstone
// remains active, and the next mutation will see it restarted at the
// position it was flushed at.
mutation flush() {
if (auto rt_opt = _rt_assembler.flush(*_s, _pos)) {
_builder.consume(std::move(*rt_opt));
}
return _builder.flush();
}
};
69 changes: 69 additions & 0 deletions test/boost/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2620,6 +2620,75 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) {
}
}

// Tests mutation_rebuilder_v2::flush().
SEASTAR_THREAD_TEST_CASE(test_mutation_rebuilder_v2_flush) {
simple_schema ss;
schema_ptr s = ss.schema();
auto pk = ss.make_pkey();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto p = semaphore.make_permit();

// Main idea of the test: we prepare a stream with all "interesting"
// situations (with respect to positions), for example:
// - RTC right before and after a key
// - Overlapping RTCs
// - Keys without a RTC in between, but with an active RTC from before
// - Keys without a RTC in between, but without an active RTC from before
// etc.
//
// Then we pass this stream through mutation_rebuilder_v2 with two flushes
// in between (on all possible positions), and check that the result is
// the same as without flushes.
auto frags = std::vector<mutation_fragment_v2>();
frags.emplace_back(*s, p, partition_start(pk, {}));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_all_clustered_rows(), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(0)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(1)), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(1)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(2)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(3)), tombstone{}));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(3)));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(4)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(4)), ss.new_tombstone()));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(5)), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(5)));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(6)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_all_clustered_rows(), tombstone{}));
frags.emplace_back(*s, p, partition_end());

mutation_rebuilder_v2 rebuilder_without_flush(s);
for (int i = 0; i < frags.size(); ++i) {
rebuilder_without_flush.consume(mutation_fragment_v2(*s, p, frags[i]));
}
auto m_expected = std::move(*rebuilder_without_flush.consume_end_of_stream());

// We do two flushes (we test all possible combinations of their positions,
// including no flush).
// This is to test that the first flush doesn't break the rebuilder in
// a way that prevents another flush.
for (int first_flush = 0; first_flush < frags.size(); ++first_flush) {
for (int second_flush = first_flush; second_flush < frags.size(); ++second_flush) {
mutation_rebuilder_v2 rebuilder(s);
auto m1 = mutation(s, pk); // Contents of flush 1.
auto m2 = mutation(s, pk); // Contents of flush 2.
auto m3 = mutation(s, pk); // Contents of final flush.
for (int i = 0; i < frags.size(); ++i) {
rebuilder.consume(mutation_fragment_v2(*s, p, frags[i]));
if (i == first_flush) {
m1 = rebuilder.flush();
}
if (i == second_flush) {
m2 = rebuilder.flush();
}
}
m3 = std::move(*rebuilder.consume_end_of_stream());
assert_that(m1 + m2 + m3).is_equal_to(m_expected);
}
}
}

SEASTAR_TEST_CASE(mutation_with_dummy_clustering_row_is_consumed_monotonically) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
Expand Down
115 changes: 107 additions & 8 deletions test/boost/view_build_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include "test/lib/data_model.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/key_utils.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/mutation_assertions.hh"
#include "utils/ranges.hh"

#include "readers/from_mutations_v2.hh"
Expand Down Expand Up @@ -730,23 +733,25 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
total_rows,
_buffer_rows);

BOOST_REQUIRE(current_rows);
BOOST_REQUIRE(!mut.partition().empty());
BOOST_REQUIRE(current_rows <= _max_rows_hard);
BOOST_REQUIRE(_buffer_rows <= _max_rows_hard);

// The current partition doesn't have all of its rows yet, verify
// that the new mutation contains the next rows for the same
// partition
if (!_collected_muts.empty() && rows_in_mut(_collected_muts.back()) < _partition_rows.at(_collected_muts.back().decorated_key())) {
BOOST_REQUIRE(_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key();
const auto& next_ckey = mut.partition().clustered_rows().begin()->key();
BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey));
if (!_collected_muts.empty() && _collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key())) {
if (rows_in_mut(_collected_muts.back()) && rows_in_mut(mut)) {
const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key();
const auto& next_ckey = mut.partition().clustered_rows().begin()->key();
BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey));
}
mutation_application_stats stats;
_collected_muts.back().partition().apply(*_schema, mut.partition(), *mut.schema(), stats);
// The new mutation is a new partition.
} else {
if (!_collected_muts.empty()) {
BOOST_REQUIRE(rows_in_mut(_collected_muts.back()) == _partition_rows.at(_collected_muts.back().decorated_key()));
BOOST_REQUIRE(!_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
}
_collected_muts.push_back(std::move(mut));
Expand All @@ -770,8 +775,8 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
, _rl(std::make_unique<row_locker>(_schema))
, _rl_stats(std::make_unique<row_locker::stats>())
, _less_cmp(*_schema)
, _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit))
, _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit))
, _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit_default))
, _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit_default))
, _ok(ok)
{ }

Expand Down Expand Up @@ -815,6 +820,8 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
for (auto ck = 0; ck < partition_size_100kb; ++ck) {
mut_desc.add_clustered_cell({int32_type->decompose(data_value(ck))}, "v", tests::data_model::mutation_description::value(blob_100kb));
}
// Reproduces #14503
mut_desc.add_range_tombstone(nonwrapping_range<tests::data_model::mutation_description::key>::make_open_ended_both_sides());
muts.push_back(mut_desc.build(schema));
partition_rows.emplace(muts.back().decorated_key(), partition_size_100kb);
}
Expand Down Expand Up @@ -871,3 +878,95 @@ SEASTAR_TEST_CASE(test_load_view_build_progress_with_values_missing) {
BOOST_REQUIRE(db::system_keyspace::load_view_build_progress().get0().empty());
});
}

// A random mutation test for view_updating_consumer's buffering logic.
// Passes random mutations through a view_updating_consumer with a extremely
// small buffer, which should cause a buffer flush after every mutation fragment.
// Should check that flushing works correctly in every position, and regardless
// of the last fragment and the last range tombstone change,
//
// Inspired by #14503.
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutations) {
// Collects the mutations produced by the tested view_updating_consumer into a vector.
class consumer_verifier {
schema_ptr _schema;
std::vector<mutation>& _collected_muts;
std::unique_ptr<row_locker> _rl;
std::unique_ptr<row_locker::stats> _rl_stats;
bool& _ok;

private:
void check(mutation mut) {
BOOST_REQUIRE(!mut.partition().empty());
_collected_muts.push_back(std::move(mut));
}

public:
consumer_verifier(schema_ptr schema, std::vector<mutation>& collected_muts, bool& ok)
: _schema(std::move(schema))
, _collected_muts(collected_muts)
, _rl(std::make_unique<row_locker>(_schema))
, _rl_stats(std::make_unique<row_locker::stats>())
, _ok(ok)
{ }

future<row_locker::lock_holder> operator()(mutation mut) {
try {
check(std::move(mut));
} catch (...) {
_ok = false;
BOOST_FAIL(fmt::format("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception()));
}
return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats);
}
};

// Create a random mutation.
// We don't really want a random `mutation`, but a random valid mutation fragment
// stream. But I don't know a better way to get that other than to create a random
// `mutation` and shove it through readers.
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
mutation mut = gen();
schema_ptr schema = gen.schema();

// Turn the random mutation into a mutation fragment stream,
// so it can be fed to the view_updating_consumer.
// Quite verbose. Perhaps there exists a simpler way to do this.
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost);
auto stop_sem = deferred_stop(sem);
const abort_source as;
auto mt = make_lw_shared<replica::memtable>(schema);
mt->apply(mut);
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
auto p = make_manually_paused_evictable_reader_v2(
mt->as_data_source(),
schema,
permit,
query::full_partition_range,
schema->full_slice(),
nullptr,
::mutation_reader::forwarding::no);
auto& staging_reader = std::get<0>(p);
auto& staging_reader_handle = std::get<1>(p);
auto close_staging_reader = deferred_close(staging_reader);

// Feed the random valid mutation fragment stream to the view_updating_consumer,
// and collect its outputs.
std::vector<mutation> collected_muts;
bool ok = true;
auto vuc = db::view::view_updating_consumer(schema, permit, as, staging_reader_handle,
consumer_verifier(schema, collected_muts, ok));
vuc.set_buffer_size_limit_for_testing_purposes(1);
staging_reader.consume_in_thread(std::move(vuc));

// Check that the outputs sum up to the initial mutation.
// We could also check that they are non-overlapping, which is
// expected from the view_updating_consumer flushes, but it's
// not necessary for correctness.
BOOST_REQUIRE(ok);
mutation total(schema, mut.decorated_key());
for (const auto& x : collected_muts) {
total += x;
}
assert_that(total).is_equal_to_compacted(mut);
}

0 comments on commit 3f2f060

Please sign in to comment.