Skip to content

Commit

Permalink
view: fix range tombstone handling on flushes in view_updating_consumer
Browse files Browse the repository at this point in the history
View update routines accept `mutation` objects.
But what comes out of staging sstable readers is a stream of
mutation_fragment_v2 objects.
To build view updates after a repair/streaming, we have to
convert the fragment stream into `mutation`s. This is done by piping
the stream to mutation_rebuilder_v2.

To keep memory usage limited, the stream for a single partition might
have to be split into multiple partial `mutation` objects.
view_update_consumer does that, but in improper way -- when the
split/flush happens inside an active range tombstone, the range
tombstone isn't closed properly. This is illegal, and triggers an
internal error.

This patch fixes the problem by closing the active range tombstone
(and reopening in the same position in the next `mutation` object).

The tombstone is closed just after the last seen clustered position.
This is not necessary for correctness -- for example we could delay
all processing of the range tombstone until we see its end
bound -- but it seems like the most natural semantic.

Fixes #14503
  • Loading branch information
michoecho committed Jul 4, 2023
1 parent 1ab2bb6 commit 5ad0846
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 3 deletions.
6 changes: 4 additions & 2 deletions db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,10 @@ void view_updating_consumer::do_flush_buffer() {
}

void view_updating_consumer::flush_builder() {
_buffer.emplace_back(_mut_builder->flush());
}

void view_updating_consumer::end_builder() {
_mut_builder->consume_end_of_partition();
if (auto mut_opt = _mut_builder->consume_end_of_stream()) {
_buffer.emplace_back(std::move(*mut_opt));
Expand All @@ -2605,9 +2609,7 @@ void view_updating_consumer::flush_builder() {
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
if (_buffer_size >= buffer_size_hard_limit) {
flush_builder();
auto dk = _buffer.back().decorated_key();
do_flush_buffer();
consume_new_partition(dk);
}
}

Expand Down
3 changes: 2 additions & 1 deletion db/view/view_updating_consumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private:
private:
void do_flush_buffer();
void flush_builder();
void end_builder();
void maybe_flush_buffer_mid_partition();

public:
Expand Down Expand Up @@ -115,7 +116,7 @@ public:
if (_as->abort_requested()) {
return stop_iteration::yes;
}
flush_builder();
end_builder();
if (_buffer_size >= buffer_size_soft_limit) {
do_flush_buffer();
}
Expand Down
32 changes: 32 additions & 0 deletions mutation/mutation_rebuilder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ public:
return stop_iteration::yes;
}

// Might only be called between consume_new_partition()
// and consume_end_of_partition().
//
// Returns (and forgets) the partition contents consumed so far.
// Can be used to split the processing of a large mutation into
// multiple smaller `mutation` objects (which add up to the full mutation).
mutation flush() {
assert(_m);
return std::exchange(*_m, mutation(_s, _m->decorated_key()));
}

mutation_opt consume_end_of_stream() {
return std::move(_m);
}
Expand All @@ -67,6 +78,7 @@ class mutation_rebuilder_v2 {
schema_ptr _s;
mutation_rebuilder _builder;
range_tombstone_assembler _rt_assembler;
position_in_partition _pos = position_in_partition::before_all_clustered_rows();
public:
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { }
public:
Expand All @@ -91,6 +103,7 @@ public:
}

stop_iteration consume(range_tombstone_change&& rt) {
_pos = rt.position();
if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) {
_builder.consume(std::move(*rt_opt));
}
Expand All @@ -103,6 +116,7 @@ public:
}

stop_iteration consume(clustering_row&& cr) {
_pos = position_in_partition::after_key(*_s, cr.position());
_builder.consume(std::move(cr));
return stop_iteration::no;
}
Expand All @@ -116,4 +130,22 @@ public:
_rt_assembler.on_end_of_stream();
return _builder.consume_end_of_stream();
}

// Might only be called between consume_new_partition()
// and consume_end_of_partition().
//
// Returns (and forgets) the partition contents consumed so far.
// Can be used to split the processing of a large mutation into
// multiple smaller `mutation` objects (which add up to the full mutation).
//
// The active range tombstone (if present) is flushed with end bound
// just after the last seen clustered position, but the range tombstone
// remains active, and the next mutation will see it restarted at the
// position it was flushed at.
mutation flush() {
if (auto rt_opt = _rt_assembler.flush(*_s, _pos)) {
_builder.consume(std::move(*rt_opt));
}
return _builder.flush();
}
};
69 changes: 69 additions & 0 deletions test/boost/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3027,6 +3027,75 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) {
}
}

// Tests mutation_rebuilder_v2::flush().
SEASTAR_THREAD_TEST_CASE(test_mutation_rebuilder_v2_flush) {
simple_schema ss;
schema_ptr s = ss.schema();
auto pk = ss.make_pkey();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto p = semaphore.make_permit();

// Main idea of the test: we prepare a stream with all "interesting"
// situations (with respect to positions), for example:
// - RTC right before and after a key
// - Overlapping RTCs
// - Keys without a RTC in between, but with an active RTC from before
// - Keys without a RTC in between, but without an active RTC from before
// etc.
//
// Then we pass this stream through mutation_rebuilder_v2 with two flushes
// in between (on all possible positions), and check that the result is
// the same as without flushes.
auto frags = std::vector<mutation_fragment_v2>();
frags.emplace_back(*s, p, partition_start(pk, {}));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_all_clustered_rows(), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(0)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(1)), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(1)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(2)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(3)), tombstone{}));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(3)));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(4)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(4)), ss.new_tombstone()));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(5)), ss.new_tombstone()));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(5)));
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(6)));
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_all_clustered_rows(), tombstone{}));
frags.emplace_back(*s, p, partition_end());

mutation_rebuilder_v2 rebuilder_without_flush(s);
for (int i = 0; i < frags.size(); ++i) {
rebuilder_without_flush.consume(mutation_fragment_v2(*s, p, frags[i]));
}
auto m_expected = std::move(*rebuilder_without_flush.consume_end_of_stream());

// We do two flushes (we test all possible combinations of their positions,
// including no flush).
// This is to test that the first flush doesn't break the rebuilder in
// a way that prevents another flush.
for (int first_flush = 0; first_flush < frags.size(); ++first_flush) {
for (int second_flush = first_flush; second_flush < frags.size(); ++second_flush) {
mutation_rebuilder_v2 rebuilder(s);
auto m1 = mutation(s, pk); // Contents of flush 1.
auto m2 = mutation(s, pk); // Contents of flush 2.
auto m3 = mutation(s, pk); // Contents of final flush.
for (int i = 0; i < frags.size(); ++i) {
rebuilder.consume(mutation_fragment_v2(*s, p, frags[i]));
if (i == first_flush) {
m1 = rebuilder.flush();
}
if (i == second_flush) {
m2 = rebuilder.flush();
}
}
m3 = std::move(*rebuilder.consume_end_of_stream());
assert_that(m1 + m2 + m3).is_equal_to(m_expected);
}
}
}

SEASTAR_TEST_CASE(mutation_with_dummy_clustering_row_is_consumed_monotonically) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
Expand Down

0 comments on commit 5ad0846

Please sign in to comment.