diff --git a/mutation/mutation_compactor.hh b/mutation/mutation_compactor.hh index ac9580e88966..c527d554a999 100644 --- a/mutation/mutation_compactor.hh +++ b/mutation/mutation_compactor.hh @@ -177,6 +177,7 @@ private: template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { + _validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone()); stop_iteration gc_consumer_stop = stop_iteration::no; stop_iteration consumer_stop = stop_iteration::no; if (rtc.tombstone() <= _partition_tombstone) { @@ -197,7 +198,6 @@ private: if (_current_emitted_tombstone || (rtc.tombstone() && !can_purge)) { partition_is_not_empty(consumer); _current_emitted_tombstone = rtc.tombstone(); - _validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone()); consumer_stop = consumer.consume(std::move(rtc)); } return gc_consumer_stop || consumer_stop; @@ -460,12 +460,22 @@ public: template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { - if (_effective_tombstone) { - auto rtc = range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), tombstone{}); - // do_consume() overwrites _effective_tombstone with {}, so save and restore it. - auto prev_tombstone = _effective_tombstone; - do_consume(std::move(rtc), consumer, gc_consumer); - _effective_tombstone = prev_tombstone; + // Only check if the active tombstone has to be closed, if the partition + // was cut by the consumer. Otherwise, leave the stream as-is. + if (_stop) { + if (_effective_tombstone) { + auto rtc = range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), tombstone{}); + // do_consume() overwrites _effective_tombstone with {}, so save and restore it. + auto prev_tombstone = _effective_tombstone; + do_consume(std::move(rtc), consumer, gc_consumer); + _effective_tombstone = prev_tombstone; + } else if (_validator.validator().current_tombstone()) { + // It is possible that the range-tombstone providing the active + // tombstone was purged and never got to the consumer and therefore + // didn't set `_effective_tombstone`. In this case we generate a + // closing tombstone just for the validator. + _validator(mutation_fragment_v2::kind::range_tombstone_change, position_in_partition::after_key(_schema, _last_pos), tombstone{}); + } } _validator.on_end_of_partition(); if (!_empty_partition_in_gc_consumer) { diff --git a/mutation/mutation_fragment_stream_validator.hh b/mutation/mutation_fragment_stream_validator.hh index c07ebf4751df..9998b22d910f 100644 --- a/mutation/mutation_fragment_stream_validator.hh +++ b/mutation/mutation_fragment_stream_validator.hh @@ -203,6 +203,8 @@ public: sstring full_name() const; + const mutation_fragment_stream_validator& validator() const { return _validator; } + bool operator()(const dht::decorated_key& dk); bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional new_current_tombstone); bool operator()(mutation_fragment::kind kind, position_in_partition_view pos); diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index d10990235679..70b60f2302be 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -1015,22 +1015,17 @@ make_flat_mutation_reader_from_mutations_v2(schema_ptr s, reader_permit permit, return make_flat_mutation_reader_from_mutations_v2(s, std::move(permit), std::move(mutations), pr, s->full_slice(), fwd); } -flat_mutation_reader_v2 -make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque fragments) { - return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), query::full_partition_range); -} - -flat_mutation_reader_v2 -make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque fragments, const dht::partition_range& pr) { +static flat_mutation_reader_v2 +make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque fragments, const dht::partition_range* pr) { class reader : public flat_mutation_reader_v2::impl { std::deque _fragments; - const dht::partition_range* _pr; + const dht::partition_range* _pr = nullptr; dht::ring_position_comparator _cmp; private: bool end_of_range() const { return _fragments.empty() || - (_fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp)); + (_pr && _fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp)); } void do_fast_forward_to(const dht::partition_range& pr) { @@ -1043,12 +1038,13 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit } public: - reader(schema_ptr schema, reader_permit permit, std::deque fragments, const dht::partition_range& pr) + reader(schema_ptr schema, reader_permit permit, std::deque fragments, const dht::partition_range* pr) : flat_mutation_reader_v2::impl(std::move(schema), std::move(permit)) , _fragments(std::move(fragments)) - , _pr(&pr) , _cmp(*_schema) { - do_fast_forward_to(*_pr); + if (pr) { + do_fast_forward_to(*pr); + } } virtual future<> fill_buffer() override { while (!(_end_of_stream = end_of_range()) && !is_buffer_full()) { @@ -1080,6 +1076,16 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit return make_flat_mutation_reader_v2(std::move(schema), std::move(permit), std::move(fragments), pr); } +flat_mutation_reader_v2 +make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque fragments, const dht::partition_range& pr) { + return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), &pr); +} + +flat_mutation_reader_v2 +make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque fragments) { + return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), nullptr); +} + std::deque reverse_fragments(const schema& schema, reader_permit permit, std::deque fragments) { std::deque reversed_fragments; diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index b566111107c4..5edfe7693af4 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -3576,10 +3576,15 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) { } }; -// Check that consumed fragments are forwarded intact to the validator. -SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) { +SEASTAR_THREAD_TEST_CASE(test_compactor_validator) { + const auto abort_ie = set_abort_on_internal_error(false); + auto reset_abort_ie = defer([abort_ie] { + set_abort_on_internal_error(abort_ie); + }); + simple_schema ss; - auto pk = ss.make_pkey(); + auto pks = ss.make_pkeys(2); + auto cks = ss.make_ckeys(3); auto s = ss.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; @@ -3587,40 +3592,33 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) { auto permit = semaphore.make_permit(); const auto expiry_point = gc_clock::now() + std::chrono::days(10); + const auto base_ts = ss.new_timestamp(); + const auto row_ts = base_ts + 1; + const auto rtc_tombstone_ts = base_ts + 4; + const auto partition_tombstone_ts = base_ts + 5; + const auto row_ts2 = base_ts + 6; - const auto marker_ts = ss.new_timestamp(); - const auto tomb_ts = ss.new_timestamp(); - const auto row_ts = ss.new_timestamp(); - - const auto query_time = gc_clock::now(); - const auto max_rows = std::numeric_limits::max(); - const auto max_partitions = std::numeric_limits::max(); - - auto make_frags = [&] { - std::deque frags; - - frags.emplace_back(*s, permit, partition_start(pk, {})); - - frags.emplace_back(*s, permit, ss.make_static_row_v2(permit, "static_row")); - + auto make_cr = [&] (const clustering_key& ckey, api::timestamp_type ts) { const auto& v_def = *s->get_column_definition(to_bytes("v")); - - for (uint32_t ck = 0; ck < 2; ++ck) { - auto ckey = ss.make_ckey(ck); - frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ckey), tombstone{tomb_ts, expiry_point})); - auto row = clustering_row(ckey); - row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v"))); - row.marker() = row_marker(marker_ts); - frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row))); - } - - frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{})); - - frags.emplace_back(*s, permit, partition_end{}); - - return frags; + auto row = clustering_row(ckey); + row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, ts, serialized("v"))); + row.marker() = row_marker(ts); + return mutation_fragment_v2(*s, permit, std::move(row)); }; + mutation_fragment_v2 ps1(*s, permit, partition_start(pks[0], {})); + mutation_fragment_v2 ps1_tomb(*s, permit, partition_start(pks[0], {partition_tombstone_ts, expiry_point})); + mutation_fragment_v2 ps2(*s, permit, partition_start(pks[1], {})); + mutation_fragment_v2 sr(*s, permit, ss.make_static_row_v2(permit, "static_row")); + mutation_fragment_v2 cr1(*s, permit, make_cr(cks[0], row_ts)); + mutation_fragment_v2 cr1_high_ts(*s, permit, make_cr(cks[0], row_ts2)); + mutation_fragment_v2 cr2(*s, permit, make_cr(cks[1], row_ts)); + mutation_fragment_v2 cr3(*s, permit, make_cr(cks[2], row_ts)); + mutation_fragment_v2 rtc1(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[0]), tombstone{rtc_tombstone_ts, expiry_point})); + mutation_fragment_v2 rtc2(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[1]), tombstone{rtc_tombstone_ts, expiry_point})); + mutation_fragment_v2 rtc_end(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, cks[2]), tombstone{})); + mutation_fragment_v2 pe(*s, permit, partition_end()); + struct consumer_v2 { void consume_new_partition(const dht::decorated_key& dk) { } void consume(const tombstone& t) { } @@ -3642,11 +3640,88 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) { void consume_end_of_stream() { } }; - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions, - mutation_fragment_stream_validation_level::clustering_key); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - reader.consume(compact_for_query_v2(compaction_state, consumer_v2{})).get(); + auto check = [&] (std::initializer_list> frag_refs, bool expected_is_valid) { + std::deque frags; + for (const auto& frag_ref : frag_refs) { + frags.emplace_back(*s, permit, frag_ref.get()); + } + + auto compaction_state = make_lw_shared>(*s, gc_clock::now(), s->full_slice(), + std::numeric_limits::max(), std::numeric_limits::max(), mutation_fragment_stream_validation_level::clustering_key); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, std::move(frags)); + auto close_reader = deferred_close(reader); + bool is_valid = true; + try { + reader.consume(compact_for_query_v2(compaction_state, consumer_v2{})).get(); + } catch (invalid_mutation_fragment_stream& ex) { + is_valid = false; + } + if (expected_is_valid != is_valid) { + auto msg = fmt::format("expected_is_valid ({}) != is_valid ({}), fragments:\n{}", + expected_is_valid, + is_valid, + fmt::join(frag_refs | boost::adaptors::transformed([&] (std::reference_wrapper mf) { + return fmt::to_string(mutation_fragment_v2::printer(*s, mf.get())); + }), "\n")); + BOOST_FAIL(msg); + } + }; + auto check_valid = [&] (std::initializer_list> frag_refs) { + return check(frag_refs, true); + }; + auto check_invalid = [&] (std::initializer_list> frag_refs) { + return check(frag_refs, false); + }; + + // Partitions + check_valid({ps1, pe}); + check_valid({ps1, pe, ps2, pe}); + check_invalid({pe, ps1, pe}); + check_invalid({ps2, pe, ps1, pe}); + check_invalid({ps1}); + check_invalid({ps1, pe, ps2}); + + // + static row + check_valid({ps1, sr, pe}); + check_valid({ps1_tomb, sr, pe}); + check_valid({ps1, sr, pe, ps2, sr, pe}); + check_invalid({ps1, pe, sr, ps2, pe}); + check_invalid({sr, ps1, pe}); + + // + clustering row + check_valid({ps1, cr1, pe}); + check_valid({ps1, sr, cr1, pe}); + check_valid({ps1, cr1, cr2, pe}); + check_valid({ps1, sr, cr1, cr2, pe}); + check_valid({ps1_tomb, cr1, pe}); + check_valid({ps1_tomb, cr1, cr2, pe}); + check_valid({ps1, cr1, pe, ps2, cr1, pe}); + check_invalid({ps1, pe, cr1, ps2, pe}); + check_invalid({cr1, ps1, pe}); + check_invalid({ps1, cr1, sr, pe}); + check_invalid({ps1_tomb, cr1, sr, pe}); + check_invalid({ps1_tomb, cr1_high_ts, sr, pe}); + check_invalid({ps1, cr2, cr1, pe}); + + // + range tombstones + check_valid({ps1, rtc1, rtc_end, pe}); + check_valid({ps1, rtc1, rtc1, rtc_end, pe}); + check_valid({ps1, rtc1, rtc1, cr1, rtc_end, pe}); + check_valid({ps1, sr, rtc1, cr1, rtc_end, pe}); + check_valid({ps1, rtc1, rtc2, rtc_end, pe}); + check_valid({ps1, sr, rtc1, cr1, rtc2, cr2, rtc_end, pe}); + check_valid({ps1_tomb, rtc1, cr1, rtc_end, pe}); + check_valid({ps1_tomb, rtc1, cr1, rtc2, cr2, rtc_end, pe}); + check_valid({ps1, rtc1, cr1, rtc_end, pe, ps2, rtc1, cr1, rtc_end, pe}); + check_invalid({ps1, rtc1, pe}); + check_invalid({ps1, pe, rtc1, rtc_end, ps2, pe}); + check_invalid({rtc1, ps1, pe}); + check_invalid({ps1, rtc1, rtc_end, sr, pe}); + check_invalid({ps1, sr, cr1, rtc1, rtc_end, pe}); + check_invalid({ps1_tomb, cr1, rtc1, rtc_end, pe}); + check_invalid({ps1_tomb, cr1_high_ts, rtc1, rtc_end, pe}); + check_invalid({ps1, rtc2, rtc1, rtc_end, pe}); + check_invalid({ps1_tomb, rtc2, rtc1, rtc_end, pe}); }; SEASTAR_TEST_CASE(test_tracing_format) {