Skip to content

Commit

Permalink
Merge 'mutation/mutation_compactor: validate the input stream' from B…
Browse files Browse the repository at this point in the history
…otond Dénes

The mutation compactor has a validator which it uses to validate the stream of mutation fragments that passes through it. This validator is supposed to validate the stream as it enters the compactor, as opposed to its compacted form (output). This was true for most fragment kinds except range tombstones, as purged range tombstones were not visible to
the validator for the most part.

This mistake was introduced by https://github.com/scylladb/scylladb/commit e2c9cdb, which itself was a flawed attempt at fixing an error seen because purged tombstones were not terminated by the compactor.

This patch corrects this mistake by fixing the above problem properly: on page-cut, if the validator has an active tombstone, a closing tombstone is generated for it, to avoid the false-positive error. With this, range tombstones can be validated again as they come in.

The existing unit test checking the validation in the compactor is greatly expanded to check all (I hope) different validation scenarios.

Closes #13817

* github.com:scylladb/scylladb:
  test/mutation_test: test_compactor_validator_sanity_test
  mutation/mutation_compactor: fix indentation
  mutation/mutation_compactor: validate the input stream
  mutation: mutation_fragment_stream_validating_filter: add accessor to underlying validator
  readers: reader-from-fragment: don't modify stream when created without range
  • Loading branch information
nyh committed Jul 20, 2023
2 parents e00811c + a35f4f6 commit 5860820
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 57 deletions.
24 changes: 17 additions & 7 deletions mutation/mutation_compactor.hh
Expand Up @@ -177,6 +177,7 @@ private:
template <typename Consumer, typename GCConsumer>
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) {
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
stop_iteration gc_consumer_stop = stop_iteration::no;
stop_iteration consumer_stop = stop_iteration::no;
if (rtc.tombstone() <= _partition_tombstone) {
Expand All @@ -197,7 +198,6 @@ private:
if (_current_emitted_tombstone || (rtc.tombstone() && !can_purge)) {
partition_is_not_empty(consumer);
_current_emitted_tombstone = rtc.tombstone();
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
consumer_stop = consumer.consume(std::move(rtc));
}
return gc_consumer_stop || consumer_stop;
Expand Down Expand Up @@ -460,12 +460,22 @@ public:
template <typename Consumer, typename GCConsumer>
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) {
if (_effective_tombstone) {
auto rtc = range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), tombstone{});
// do_consume() overwrites _effective_tombstone with {}, so save and restore it.
auto prev_tombstone = _effective_tombstone;
do_consume(std::move(rtc), consumer, gc_consumer);
_effective_tombstone = prev_tombstone;
// Only check if the active tombstone has to be closed, if the partition
// was cut by the consumer. Otherwise, leave the stream as-is.
if (_stop) {
if (_effective_tombstone) {
auto rtc = range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), tombstone{});
// do_consume() overwrites _effective_tombstone with {}, so save and restore it.
auto prev_tombstone = _effective_tombstone;
do_consume(std::move(rtc), consumer, gc_consumer);
_effective_tombstone = prev_tombstone;
} else if (_validator.validator().current_tombstone()) {
// It is possible that the range-tombstone providing the active
// tombstone was purged and never got to the consumer and therefore
// didn't set `_effective_tombstone`. In this case we generate a
// closing tombstone just for the validator.
_validator(mutation_fragment_v2::kind::range_tombstone_change, position_in_partition::after_key(_schema, _last_pos), tombstone{});
}
}
_validator.on_end_of_partition();
if (!_empty_partition_in_gc_consumer) {
Expand Down
2 changes: 2 additions & 0 deletions mutation/mutation_fragment_stream_validator.hh
Expand Up @@ -203,6 +203,8 @@ public:

sstring full_name() const;

const mutation_fragment_stream_validator& validator() const { return _validator; }

bool operator()(const dht::decorated_key& dk);
bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
Expand Down
30 changes: 18 additions & 12 deletions readers/mutation_readers.cc
Expand Up @@ -1015,22 +1015,17 @@ make_flat_mutation_reader_from_mutations_v2(schema_ptr s, reader_permit permit,
return make_flat_mutation_reader_from_mutations_v2(s, std::move(permit), std::move(mutations), pr, s->full_slice(), fwd);
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), query::full_partition_range);
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr) {
static flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range* pr) {
class reader : public flat_mutation_reader_v2::impl {
std::deque<mutation_fragment_v2> _fragments;
const dht::partition_range* _pr;
const dht::partition_range* _pr = nullptr;
dht::ring_position_comparator _cmp;

private:
bool end_of_range() const {
return _fragments.empty() ||
(_fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp));
(_pr && _fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp));
}

void do_fast_forward_to(const dht::partition_range& pr) {
Expand All @@ -1043,12 +1038,13 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit
}

public:
reader(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr)
reader(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range* pr)
: flat_mutation_reader_v2::impl(std::move(schema), std::move(permit))
, _fragments(std::move(fragments))
, _pr(&pr)
, _cmp(*_schema) {
do_fast_forward_to(*_pr);
if (pr) {
do_fast_forward_to(*pr);
}
}
virtual future<> fill_buffer() override {
while (!(_end_of_stream = end_of_range()) && !is_buffer_full()) {
Expand Down Expand Up @@ -1080,6 +1076,16 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit
return make_flat_mutation_reader_v2<reader>(std::move(schema), std::move(permit), std::move(fragments), pr);
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), &pr);
}

flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), nullptr);
}

std::deque<mutation_fragment_v2> reverse_fragments(const schema& schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
std::deque<mutation_fragment_v2> reversed_fragments;

Expand Down
151 changes: 113 additions & 38 deletions test/boost/mutation_test.cc
Expand Up @@ -3576,51 +3576,49 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) {
}
};

// Check that consumed fragments are forwarded intact to the validator.
SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) {
SEASTAR_THREAD_TEST_CASE(test_compactor_validator) {
const auto abort_ie = set_abort_on_internal_error(false);
auto reset_abort_ie = defer([abort_ie] {
set_abort_on_internal_error(abort_ie);
});

simple_schema ss;
auto pk = ss.make_pkey();
auto pks = ss.make_pkeys(2);
auto cks = ss.make_ckeys(3);
auto s = ss.schema();

tests::reader_concurrency_semaphore_wrapper semaphore;

auto permit = semaphore.make_permit();

const auto expiry_point = gc_clock::now() + std::chrono::days(10);
const auto base_ts = ss.new_timestamp();
const auto row_ts = base_ts + 1;
const auto rtc_tombstone_ts = base_ts + 4;
const auto partition_tombstone_ts = base_ts + 5;
const auto row_ts2 = base_ts + 6;

const auto marker_ts = ss.new_timestamp();
const auto tomb_ts = ss.new_timestamp();
const auto row_ts = ss.new_timestamp();

const auto query_time = gc_clock::now();
const auto max_rows = std::numeric_limits<uint64_t>::max();
const auto max_partitions = std::numeric_limits<uint32_t>::max();

auto make_frags = [&] {
std::deque<mutation_fragment_v2> frags;

frags.emplace_back(*s, permit, partition_start(pk, {}));

frags.emplace_back(*s, permit, ss.make_static_row_v2(permit, "static_row"));

auto make_cr = [&] (const clustering_key& ckey, api::timestamp_type ts) {
const auto& v_def = *s->get_column_definition(to_bytes("v"));

for (uint32_t ck = 0; ck < 2; ++ck) {
auto ckey = ss.make_ckey(ck);
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ckey), tombstone{tomb_ts, expiry_point}));
auto row = clustering_row(ckey);
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v")));
row.marker() = row_marker(marker_ts);
frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row)));
}

frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{}));

frags.emplace_back(*s, permit, partition_end{});

return frags;
auto row = clustering_row(ckey);
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, ts, serialized("v")));
row.marker() = row_marker(ts);
return mutation_fragment_v2(*s, permit, std::move(row));
};

mutation_fragment_v2 ps1(*s, permit, partition_start(pks[0], {}));
mutation_fragment_v2 ps1_tomb(*s, permit, partition_start(pks[0], {partition_tombstone_ts, expiry_point}));
mutation_fragment_v2 ps2(*s, permit, partition_start(pks[1], {}));
mutation_fragment_v2 sr(*s, permit, ss.make_static_row_v2(permit, "static_row"));
mutation_fragment_v2 cr1(*s, permit, make_cr(cks[0], row_ts));
mutation_fragment_v2 cr1_high_ts(*s, permit, make_cr(cks[0], row_ts2));
mutation_fragment_v2 cr2(*s, permit, make_cr(cks[1], row_ts));
mutation_fragment_v2 cr3(*s, permit, make_cr(cks[2], row_ts));
mutation_fragment_v2 rtc1(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[0]), tombstone{rtc_tombstone_ts, expiry_point}));
mutation_fragment_v2 rtc2(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[1]), tombstone{rtc_tombstone_ts, expiry_point}));
mutation_fragment_v2 rtc_end(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, cks[2]), tombstone{}));
mutation_fragment_v2 pe(*s, permit, partition_end());

struct consumer_v2 {
void consume_new_partition(const dht::decorated_key& dk) { }
void consume(const tombstone& t) { }
Expand All @@ -3642,11 +3640,88 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) {
void consume_end_of_stream() { }
};

auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions,
mutation_fragment_stream_validation_level::clustering_key);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, consumer_v2{})).get();
auto check = [&] (std::initializer_list<std::reference_wrapper<const mutation_fragment_v2>> frag_refs, bool expected_is_valid) {
std::deque<mutation_fragment_v2> frags;
for (const auto& frag_ref : frag_refs) {
frags.emplace_back(*s, permit, frag_ref.get());
}

auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, gc_clock::now(), s->full_slice(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), mutation_fragment_stream_validation_level::clustering_key);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, std::move(frags));
auto close_reader = deferred_close(reader);
bool is_valid = true;
try {
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, consumer_v2{})).get();
} catch (invalid_mutation_fragment_stream& ex) {
is_valid = false;
}
if (expected_is_valid != is_valid) {
auto msg = fmt::format("expected_is_valid ({}) != is_valid ({}), fragments:\n{}",
expected_is_valid,
is_valid,
fmt::join(frag_refs | boost::adaptors::transformed([&] (std::reference_wrapper<const mutation_fragment_v2> mf) {
return fmt::to_string(mutation_fragment_v2::printer(*s, mf.get()));
}), "\n"));
BOOST_FAIL(msg);
}
};
auto check_valid = [&] (std::initializer_list<std::reference_wrapper<const mutation_fragment_v2>> frag_refs) {
return check(frag_refs, true);
};
auto check_invalid = [&] (std::initializer_list<std::reference_wrapper<const mutation_fragment_v2>> frag_refs) {
return check(frag_refs, false);
};

// Partitions
check_valid({ps1, pe});
check_valid({ps1, pe, ps2, pe});
check_invalid({pe, ps1, pe});
check_invalid({ps2, pe, ps1, pe});
check_invalid({ps1});
check_invalid({ps1, pe, ps2});

// + static row
check_valid({ps1, sr, pe});
check_valid({ps1_tomb, sr, pe});
check_valid({ps1, sr, pe, ps2, sr, pe});
check_invalid({ps1, pe, sr, ps2, pe});
check_invalid({sr, ps1, pe});

// + clustering row
check_valid({ps1, cr1, pe});
check_valid({ps1, sr, cr1, pe});
check_valid({ps1, cr1, cr2, pe});
check_valid({ps1, sr, cr1, cr2, pe});
check_valid({ps1_tomb, cr1, pe});
check_valid({ps1_tomb, cr1, cr2, pe});
check_valid({ps1, cr1, pe, ps2, cr1, pe});
check_invalid({ps1, pe, cr1, ps2, pe});
check_invalid({cr1, ps1, pe});
check_invalid({ps1, cr1, sr, pe});
check_invalid({ps1_tomb, cr1, sr, pe});
check_invalid({ps1_tomb, cr1_high_ts, sr, pe});
check_invalid({ps1, cr2, cr1, pe});

// + range tombstones
check_valid({ps1, rtc1, rtc_end, pe});
check_valid({ps1, rtc1, rtc1, rtc_end, pe});
check_valid({ps1, rtc1, rtc1, cr1, rtc_end, pe});
check_valid({ps1, sr, rtc1, cr1, rtc_end, pe});
check_valid({ps1, rtc1, rtc2, rtc_end, pe});
check_valid({ps1, sr, rtc1, cr1, rtc2, cr2, rtc_end, pe});
check_valid({ps1_tomb, rtc1, cr1, rtc_end, pe});
check_valid({ps1_tomb, rtc1, cr1, rtc2, cr2, rtc_end, pe});
check_valid({ps1, rtc1, cr1, rtc_end, pe, ps2, rtc1, cr1, rtc_end, pe});
check_invalid({ps1, rtc1, pe});
check_invalid({ps1, pe, rtc1, rtc_end, ps2, pe});
check_invalid({rtc1, ps1, pe});
check_invalid({ps1, rtc1, rtc_end, sr, pe});
check_invalid({ps1, sr, cr1, rtc1, rtc_end, pe});
check_invalid({ps1_tomb, cr1, rtc1, rtc_end, pe});
check_invalid({ps1_tomb, cr1_high_ts, rtc1, rtc_end, pe});
check_invalid({ps1, rtc2, rtc1, rtc_end, pe});
check_invalid({ps1_tomb, rtc2, rtc1, rtc_end, pe});
};

SEASTAR_TEST_CASE(test_tracing_format) {
Expand Down

0 comments on commit 5860820

Please sign in to comment.