diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 517184d04759..bc6880439631 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -300,19 +300,22 @@ std::strong_ordering ring_position_comparator_for_sstables::operator()(sstables: dht::partition_range to_partition_range(dht::token_range r) { + using bound = dht::partition_range::bound; using bound_opt = std::optional; auto start = r.start() - ? bound_opt(dht::ring_position(r.start()->value(), + ? bound_opt(bound(dht::ring_position(r.start()->value(), r.start()->is_inclusive() ? dht::ring_position::token_bound::start - : dht::ring_position::token_bound::end)) + : dht::ring_position::token_bound::end), + r.start()->is_inclusive())) : bound_opt(); auto end = r.end() - ? bound_opt(dht::ring_position(r.end()->value(), + ? bound_opt(bound(dht::ring_position(r.end()->value(), r.end()->is_inclusive() ? dht::ring_position::token_bound::end - : dht::ring_position::token_bound::start)) + : dht::ring_position::token_bound::start), + r.end()->is_inclusive())) : bound_opt(); return { std::move(start), std::move(end) }; diff --git a/interval.hh b/interval.hh index 45365c84beef..1c2cbe30e26a 100644 --- a/interval.hh +++ b/interval.hh @@ -154,6 +154,33 @@ public: } return false; } + // the other inverval is before this interval (works only for non wrapped intervals) + // Comparator must define a total ordering on T. + bool other_is_before(const wrapping_interval& o, IntervalComparatorFor auto&& cmp) const { + assert(!is_wrap_around(cmp)); + assert(!o.is_wrap_around(cmp)); + if (!start() || !o.end()) { + return false; + } + auto r = cmp(o.end()->value(), start()->value()); + if (r < 0) { + return true; + } + if (r > 0) { + return false; + } + // o.end()->value() == start()->value(), we decide based on inclusiveness + const auto ei = o.end()->is_inclusive(); + const auto si = start()->is_inclusive(); + if (!ei && !si) { + return true; + } + // At least one is inclusive, check that the other isn't + if (ei != si) { + return true; + } + return false; + } // the point is after the interval (works only for non wrapped intervals) // Comparator must define a total ordering on T. bool after(const T& point, IntervalComparatorFor auto&& cmp) const { @@ -482,6 +509,11 @@ public: bool before(const T& point, IntervalComparatorFor auto&& cmp) const { return _interval.before(point, std::forward(cmp)); } + // the other interval is before this interval. + // Comparator must define a total ordering on T. + bool other_is_before(const nonwrapping_interval& o, IntervalComparatorFor auto&& cmp) const { + return _interval.other_is_before(o, std::forward(cmp)); + } // the point is after the interval. // Comparator must define a total ordering on T. bool after(const T& point, IntervalComparatorFor auto&& cmp) const { diff --git a/locator/tablets.cc b/locator/tablets.cc index d944e270822d..3b46cee789cc 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -444,6 +444,58 @@ load_stats& load_stats::operator+=(const load_stats& s) { return *this; } +tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges) + : _schema(std::move(schema)) + , _ranges(ranges) + , _ranges_it(_ranges.begin()) +{ + // Filter all tablets and save only those that have a replica on the specified host. + for (auto tid = std::optional(tablets.first_tablet()); tid; tid = tablets.next_tablet(*tid)) { + const auto& tablet_info = tablets.get_tablet_info(*tid); + + auto replica_it = std::ranges::find_if(tablet_info.replicas, [&] (auto&& r) { return r.host == host; }); + if (replica_it == tablet_info.replicas.end()) { + continue; + } + + _tablet_ranges.emplace_back(range_split_result{replica_it->shard, dht::to_partition_range(tablets.get_token_range(*tid))}); + } + _tablet_ranges_it = _tablet_ranges.begin(); +} + +std::optional tablet_range_splitter::operator()() { + if (_ranges_it == _ranges.end() || _tablet_ranges_it == _tablet_ranges.end()) { + return {}; + } + + dht::ring_position_comparator cmp(*_schema); + + while (_ranges_it != _ranges.end()) { + // First, skip all tablet-ranges that are completely before the current range. + while (_ranges_it->other_is_before(_tablet_ranges_it->range, cmp)) { + ++_tablet_ranges_it; + } + // Generate intersections with all tablet-ranges that overlap with the current range. + if (auto intersection = _ranges_it->intersection(_tablet_ranges_it->range, cmp)) { + const auto shard = _tablet_ranges_it->shard; + if (_ranges_it->end() && cmp(_ranges_it->end()->value(), _tablet_ranges_it->range.end()->value()) < 0) { + // The current tablet range extends beyond the current range, + // move to the next range. + ++_ranges_it; + } else { + // The current range extends beyond the current tablet range, + // move to the next tablet range. + ++_tablet_ranges_it; + } + return range_split_result{shard, std::move(*intersection)}; + } + // Current tablet-range is completely after the current range, move to the next range. + ++_ranges_it; + } + + return {}; +} + // Estimates the external memory usage of std::unordered_map<>. // Does not include external memory usage of elements. template diff --git a/locator/tablets.hh b/locator/tablets.hh index 85696ba9ab79..57f5a779cc6a 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -13,6 +13,7 @@ #include "locator/host_id.hh" #include "service/session.hh" #include "dht/i_partitioner_fwd.hh" +#include "dht/ring_position.hh" #include "schema/schema_fwd.hh" #include "utils/chunked_vector.hh" #include "utils/hash.hh" @@ -468,6 +469,35 @@ struct tablet_routing_info { std::pair token_range; }; +/// Split a list of ranges, such that conceptually each input range is +/// intersected with each tablet range. +/// Tablets are pre-filtered, slecting only tablets that have a replica on the +/// given host. +/// Return the resulting intersections, in order. +/// The ranges are generated lazily (one at a time). +/// +/// Note: the caller is expected to pin tablets, by keeping an +/// effective-replication-map alive. +class tablet_range_splitter { +public: + struct range_split_result { + shard_id shard; // shard where the tablet owning this range lives + dht::partition_range range; + }; + +private: + schema_ptr _schema; + const dht::partition_range_vector& _ranges; + dht::partition_range_vector::const_iterator _ranges_it; + std::vector _tablet_ranges; + std::vector::iterator _tablet_ranges_it; + +public: + tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges); + /// Returns nullopt when there are no more ranges. + std::optional operator()(); +}; + } template <> diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index c4b3e1cc3f4f..342832c54d54 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -12,6 +12,7 @@ #include "replica/database.hh" #include "db/config.hh" #include "query-result-writer.hh" +#include "query_result_merger.hh" #include "readers/multishard.hh" #include @@ -716,7 +717,7 @@ future> read_page( const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, - noncopyable_function result_builder_factory) { + noncopyable_function result_builder_factory) { auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(), cmd.partition_limit); @@ -727,11 +728,14 @@ future> read_page( } // Use coroutine::as_future to prevent exception on timesout. - auto f = co_await coroutine::as_future(query::consume_page(reader, compaction_state, cmd.slice, result_builder_factory(*compaction_state), cmd.get_row_limit(), + auto f = co_await coroutine::as_future(query::consume_page(reader, compaction_state, cmd.slice, result_builder_factory(), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp)); if (!f.failed()) { // no exceptions are thrown in this block auto result = std::move(f).get(); + if (compaction_state->are_limits_reached() || result.is_short_read()) { + ResultBuilder::maybe_set_last_position(result, compaction_state->current_full_position()); + } const auto& cstats = compaction_state->stats(); tracing::trace(trace_state, "Page stats: {} partition(s), {} static row(s) ({} live, {} dead), {} clustering row(s) ({} live, {} dead) and {} range tombstone(s)", cstats.partitions, @@ -753,14 +757,14 @@ future> read_page( } template -future do_query( +future>> do_query_vnodes( distributed& db, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, - noncopyable_function result_builder_factory) { + noncopyable_function result_builder_factory) { auto& table = db.local().find_column_family(s); auto erm = table.get_effective_replication_map(); auto ctx = seastar::make_shared(db, s, erm, cmd, ranges, trace_state, timeout); @@ -768,13 +772,13 @@ future do_query( // Use coroutine::as_future to prevent exception on timesout. auto f = co_await coroutine::as_future(ctx->lookup_readers(timeout).then([&, result_builder_factory = std::move(result_builder_factory)] () mutable { return read_page(ctx, s, cmd, ranges, trace_state, std::move(result_builder_factory)); - }).then([&] (page_consume_result r) -> future { + }).then([&] (page_consume_result r) -> future>> { if (r.compaction_state->are_limits_reached() || r.result.is_short_read()) { // Must call before calling `detach_state()`. auto last_pos = *r.compaction_state->current_full_position(); co_await ctx->save_readers(std::move(r.unconsumed_fragments), std::move(*r.compaction_state).detach_state(), std::move(last_pos)); } - co_return std::move(r.result); + co_return make_foreign(make_lw_shared(std::move(r.result))); })); co_await ctx->stop(); if (f.failed()) { @@ -783,6 +787,44 @@ future do_query( co_return f.get(); } +template +future>> do_query_tablets( + distributed& db, + schema_ptr s, + const query::read_command& cmd, + const dht::partition_range_vector& ranges, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout, + noncopyable_function result_builder_factory) { + auto& table = db.local().find_column_family(s); + auto erm = table.get_effective_replication_map(); + const auto& token_metadata = erm->get_token_metadata(); + const auto& tablets = token_metadata.tablets().get_tablet_map(s->id()); + const auto this_node_id = token_metadata.get_topology().this_node()->host_id(); + + auto query_cmd = cmd; + auto result_builder = result_builder_factory(); + + std::vector>> results; + locator::tablet_range_splitter range_splitter{s, tablets, this_node_id, ranges}; + while (auto range_opt = range_splitter()) { + auto& r = *results.emplace_back(co_await db.invoke_on(range_opt->shard, + [&result_builder, gs = global_schema_ptr(s), &query_cmd, &range_opt, gts = tracing::global_trace_state_ptr(trace_state), timeout] (replica::database& db) { + return result_builder.query(db, gs, query_cmd, range_opt->range, gts, timeout); + })); + + // Substract result from limit, watch for underflow + query_cmd.partition_limit -= std::min(query_cmd.partition_limit, ResultBuilder::get_partition_count(r)); + query_cmd.set_row_limit(query_cmd.get_row_limit() - std::min(query_cmd.get_row_limit(), ResultBuilder::get_row_count(r))); + + if (!query_cmd.partition_limit || !query_cmd.get_row_limit() || r.is_short_read()) { + break; + } + } + + co_return result_builder.merge(std::move(results)); +} + template static future>, cache_temperature>> do_query_on_all_shards( distributed& db, @@ -791,7 +833,7 @@ static future result_builder_factory) { + std::function result_builder_factory) { if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) { co_return std::tuple( make_foreign(make_lw_shared()), @@ -802,18 +844,21 @@ static future()); + const auto& keyspace = local_db.find_keyspace(s->ks_name()); + auto query_method = keyspace.get_replication_strategy().uses_tablets() ? do_query_tablets : do_query_vnodes; + try { auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed); - auto result = co_await do_query(db, s, cmd, ranges, std::move(trace_state), timeout, - [result_builder_factory, accounter = std::move(accounter)] (const compact_for_query_state_v2& compaction_state) mutable { - return result_builder_factory(std::move(accounter), compaction_state); + auto result = co_await query_method(db, s, cmd, ranges, std::move(trace_state), timeout, + [result_builder_factory, accounter = std::move(accounter)] () mutable { + return result_builder_factory(std::move(accounter)); }); ++stats.total_reads; - stats.short_mutation_queries += bool(result.is_short_read()); + stats.short_mutation_queries += bool(result->is_short_read()); auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate(); - co_return std::tuple(make_foreign(make_lw_shared(std::move(result))), hit_rate); + co_return std::tuple(std::move(result), hit_rate); } catch (...) { ++stats.total_reads_failed; throw; @@ -828,10 +873,13 @@ class mutation_query_result_builder { private: reconcilable_result_builder _builder; + schema_ptr _s; public: mutation_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter) - : _builder(s, slice, std::move(accounter)) { } + : _builder(s, slice, std::move(accounter)) + , _s(s.shared_from_this()) + { } void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); } void consume(tombstone t) { _builder.consume(t); } @@ -840,6 +888,32 @@ class mutation_query_result_builder { stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } + + future>> query( + replica::database& db, + schema_ptr schema, + const query::read_command& cmd, + const dht::partition_range& range, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout) { + auto res = co_await db.query_mutations(std::move(schema), cmd, range, std::move(trace_state), timeout); + co_return make_foreign(make_lw_shared(std::get<0>(std::move(res)))); + } + + foreign_ptr> merge(std::vector>> results) { + if (results.empty()) { + return make_foreign(make_lw_shared()); + } + auto& first = results.front(); + for (auto it = results.begin() + 1; it != results.end(); ++it) { + first->merge_disjoint(_s, **it); + } + return std::move(first); + } + + static void maybe_set_last_position(result_type& r, std::optional full_position) { } + static uint32_t get_partition_count(result_type& r) { return r.partitions().size(); } + static uint64_t get_row_count(result_type& r) { return r.row_count(); } }; class data_query_result_builder { @@ -847,16 +921,17 @@ class data_query_result_builder { using result_type = query::result; private: - const compact_for_query_state_v2& _compaction_state; std::unique_ptr _res_builder; query_result_builder _builder; + query::result_options _opts; public: data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, - query::result_memory_accounter&& accounter, const compact_for_query_state_v2& compaction_state, uint64_t tombstone_limit) - : _compaction_state(compaction_state) - , _res_builder(std::make_unique(slice, opts, std::move(accounter), tombstone_limit)) - , _builder(s, *_res_builder) { } + query::result_memory_accounter&& accounter, uint64_t tombstone_limit) + : _res_builder(std::make_unique(slice, opts, std::move(accounter), tombstone_limit)) + , _builder(s, *_res_builder) + , _opts(opts) + { } void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); } void consume(tombstone t) { _builder.consume(t); } @@ -866,11 +941,45 @@ class data_query_result_builder { stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { _builder.consume_end_of_stream(); - if (_compaction_state.are_limits_reached() || _res_builder->is_short_read()) { - return _res_builder->build(_compaction_state.current_full_position()); - } return _res_builder->build(); } + + future>> query( + replica::database& db, + schema_ptr schema, + const query::read_command& cmd, + const dht::partition_range& range, + tracing::trace_state_ptr trace_state, + db::timeout_clock::time_point timeout) { + dht::partition_range_vector ranges; + ranges.emplace_back(range); + auto res = co_await db.query(std::move(schema), cmd, _opts, ranges, std::move(trace_state), timeout); + co_return std::get<0>(std::move(res)); + } + + foreign_ptr> merge(std::vector>> results) { + if (results.empty()) { + return make_foreign(make_lw_shared()); + } + query::result_merger merger(query::max_rows, query::max_partitions); + merger.reserve(results.size()); + for (auto&& r: results) { + merger(std::move(r)); + } + return merger.get(); + } + + static void maybe_set_last_position(result_type& r, std::optional full_position) { + r.set_last_position(std::move(full_position)); + } + static uint32_t get_partition_count(result_type& r) { + r.ensure_counts(); + return *r.partition_count(); + } + static uint64_t get_row_count(result_type& r) { + r.ensure_counts(); + return *r.row_count(); + } }; } // anonymous namespace @@ -885,7 +994,7 @@ future>, cache_tempera schema_ptr query_schema = cmd.slice.is_reversed() ? table_schema->make_reversed() : table_schema; return do_query_on_all_shards(db, query_schema, cmd, ranges, std::move(trace_state), timeout, - [table_schema, &cmd] (query::result_memory_accounter&& accounter, const compact_for_query_state_v2& compaction_state) { + [table_schema, &cmd] (query::result_memory_accounter&& accounter) { return mutation_query_result_builder(*table_schema, cmd.slice, std::move(accounter)); }); } @@ -901,7 +1010,7 @@ future>, cache_temperature>> schema_ptr query_schema = cmd.slice.is_reversed() ? table_schema->make_reversed() : table_schema; return do_query_on_all_shards(db, query_schema, cmd, ranges, std::move(trace_state), timeout, - [table_schema, &cmd, opts] (query::result_memory_accounter&& accounter, const compact_for_query_state_v2& compaction_state) { - return data_query_result_builder(*table_schema, cmd.slice, opts, std::move(accounter), compaction_state, cmd.tombstone_limit); + [table_schema, &cmd, opts] (query::result_memory_accounter&& accounter) { + return data_query_result_builder(*table_schema, cmd.slice, opts, std::move(accounter), cmd.tombstone_limit); }); } diff --git a/mutation_query.cc b/mutation_query.cc index bb2e293c8de0..e1bdf05b88b1 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -42,6 +42,15 @@ reconcilable_result::operator==(const reconcilable_result& other) const { return boost::equal(_partitions, other._partitions); } +void +reconcilable_result::merge_disjoint(schema_ptr schema, const reconcilable_result& other) { + std::copy(other._partitions.begin(), other._partitions.end(), std::back_inserter(_partitions)); + _short_read = _short_read || other._short_read; + uint64_t row_count = this->row_count() + other.row_count(); + _row_count_low_bits = static_cast(row_count); + _row_count_high_bits = static_cast(row_count >> 32); +} + auto fmt::formatter::format( const reconcilable_result::printer& pr, fmt::format_context& ctx) const -> decltype(ctx.out()) { diff --git a/mutation_query.hh b/mutation_query.hh index df77817a0544..87ef2697dd0d 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -108,6 +108,10 @@ public: bool operator==(const reconcilable_result& other) const; + // other must be disjoint with this + // does not merge or update memory trackers + void merge_disjoint(schema_ptr schema, const reconcilable_result& other); + struct printer { const reconcilable_result& self; schema_ptr schema; diff --git a/test/boost/interval_test.cc b/test/boost/interval_test.cc index a47855f25815..3cb6c2ab1c4c 100644 --- a/test/boost/interval_test.cc +++ b/test/boost/interval_test.cc @@ -424,3 +424,72 @@ BOOST_AUTO_TEST_CASE(test_intersection) { BOOST_REQUIRE_EQUAL(r13.intersection(nwr(b(0), b(123, false)), cmp), std::nullopt); BOOST_REQUIRE_EQUAL(r13.intersection(nwr(b(0), b(1000)), cmp), nwr(b(123), b(123))); } + +BOOST_AUTO_TEST_CASE(test_before_point) { + using r = interval; + using b = r::bound; + auto cmp = unsigned_comparator(); + + BOOST_REQUIRE_EQUAL(r::make_open_ended_both_sides().before(8, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_ending_with(10).before(8, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_ending_with(10).before(100, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_starting_with(10).before(8, cmp), true); + BOOST_REQUIRE_EQUAL(r::make_starting_with(10).before(10, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_starting_with({10, true}).before(10, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_starting_with({10, false}).before(10, cmp), true); + BOOST_REQUIRE_EQUAL(r::make_starting_with(10).before(100, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_singular(10).before(8, cmp), true); + BOOST_REQUIRE_EQUAL(r::make_singular(10).before(10, cmp), false); + BOOST_REQUIRE_EQUAL(r::make_singular(10).before(100, cmp), false); + BOOST_REQUIRE_EQUAL(r::make(b(10, true), b(11, true)).before(10, cmp), false); + BOOST_REQUIRE_EQUAL(r::make(b(10, false), b(11, true)).before(10, cmp), true); + BOOST_REQUIRE_EQUAL(r::make(b(10, true), b(11, true)).before(11, cmp), false); + BOOST_REQUIRE_EQUAL(r::make(b(10, true), b(11, true)).before(100, cmp), false); +} + +BOOST_AUTO_TEST_CASE(test_before_interval) { + using r = interval; + using b = r::bound; + auto cmp = unsigned_comparator(); + + auto check = [&] (const r& r1, const r& r2, bool is_before, std::source_location sl = std::source_location::current()) { + BOOST_TEST_MESSAGE(fmt::format("check() @ {}:{} {} before {} -> {}", sl.file_name(), sl.line(), r2, r1, is_before)); + BOOST_REQUIRE_EQUAL(r1.other_is_before(r2, cmp), is_before); + }; + + check(r::make_open_ended_both_sides(), r::make_singular(10), false); + check(r::make_open_ended_both_sides(), r::make_starting_with(10), false); + check(r::make_open_ended_both_sides(), r::make_ending_with(10), false); + check(r::make_open_ended_both_sides(), r::make(b(10, false), b(11, true)), false); + check(r::make_open_ended_both_sides(), r::make(b(10, false), b(11, false)), false); + + // Check with intervals that has inclusive start bound with value 10 + for (const auto& i : {r::make_starting_with(10), r::make_starting_with({10, true}), r::make_singular(10), r::make(b{10, true}, b{11, true})}) { + check(i, r::make_starting_with(1), false); + check(i, r::make_starting_with(10), false); + check(i, r::make_starting_with(100), false); + check(i, r::make(b(10, false), b(11, true)), false); + check(i, r::make(b(10, false), b(11, false)), false); + check(i, r::make_singular(8), true); + check(i, r::make_singular(10), false); + check(i, r::make_singular(10), false); + check(i, r::make_singular(100), false); + check(i, r::make_ending_with(8), true); + check(i, r::make_ending_with(10), false); + check(i, r::make_ending_with({10, true}), false); + check(i, r::make_ending_with({10, false}), true); + check(i, r::make_ending_with(100), false); + check(i, r::make(b{1, false}, b{10, true}), false); + check(i, r::make(b{1, false}, b{10, true}), false); + check(i, r::make(b{1, false}, b{10, false}), true); + } + + // Check with intervals that has exclusive start bound with value 10 + for (const auto& i : {r::make_starting_with({10, false}), r::make(b{10, false}, b{11, true})}) { + check(i, r::make_singular(10), true); + check(i, r::make_ending_with({10, true}), true); + check(i, r::make_ending_with({10, false}), true); + check(i, r::make(b{1, false}, b{10, true}), true); + check(i, r::make(b{1, false}, b{10, false}), true); + } +} diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 8b0c5586160e..b67e3daa9527 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -13,6 +13,7 @@ #include #include "test/lib/cql_test_env.hh" #include "test/lib/log.hh" +#include "test/lib/simple_schema.hh" #include "db/config.hh" #include "schema/schema_builder.hh" @@ -1738,3 +1739,138 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { } }).get(); } + +SEASTAR_THREAD_TEST_CASE(test_tablet_range_splitter) { + simple_schema ss; + + const auto dks = ss.make_pkeys(4); + + auto h1 = host_id(utils::UUID_gen::get_time_UUID()); + auto h2 = host_id(utils::UUID_gen::get_time_UUID()); + auto h3 = host_id(utils::UUID_gen::get_time_UUID()); + + tablet_map tmap(4); + auto tb = tmap.first_tablet(); + tmap.set_tablet(tb, tablet_info { + tablet_replica_set { + tablet_replica {h2, 0}, + tablet_replica {h3, 0}, + } + }); + tb = *tmap.next_tablet(tb); + tmap.set_tablet(tb, tablet_info { + tablet_replica_set { + tablet_replica {h1, 3}, + } + }); + tb = *tmap.next_tablet(tb); + tmap.set_tablet(tb, tablet_info { + tablet_replica_set { + tablet_replica {h2, 2}, + } + }); + tb = *tmap.next_tablet(tb); + tmap.set_tablet(tb, tablet_info { + tablet_replica_set { + tablet_replica {h1, 1}, + tablet_replica {h2, 1}, + } + }); + + using result = tablet_range_splitter::range_split_result; + using bound = dht::partition_range::bound; + + std::vector included_ranges; + std::vector excluded_ranges; + for (auto tid = std::optional(tmap.first_tablet()); tid; tid = tmap.next_tablet(*tid)) { + const auto& tablet_info = tmap.get_tablet_info(*tid); + auto replica_it = std::ranges::find_if(tablet_info.replicas, [&] (auto&& r) { return r.host == h1; }); + auto token_range = tmap.get_token_range(*tid); + auto range = dht::to_partition_range(token_range); + if (replica_it == tablet_info.replicas.end()) { + testlog.info("tablet#{}: {} (no replica on h1)", *tid, token_range); + excluded_ranges.emplace_back(std::move(range)); + } else { + testlog.info("tablet#{}: {} (shard {})", *tid, token_range, replica_it->shard); + included_ranges.emplace_back(result{replica_it->shard, std::move(range)}); + } + } + + dht::ring_position_comparator cmp(*ss.schema()); + + auto check = [&] (const dht::partition_range_vector& ranges, std::vector expected_result, + std::source_location sl = std::source_location::current()) { + testlog.info("check() @ {}:{} ranges={}", sl.file_name(), sl.line(), ranges); + locator::tablet_range_splitter range_splitter{ss.schema(), tmap, h1, ranges}; + auto it = expected_result.begin(); + while (auto range_opt = range_splitter()) { + testlog.debug("result: shard={} range={}", range_opt->shard, range_opt->range); + BOOST_REQUIRE(it != expected_result.end()); + testlog.debug("expected: shard={} range={}", it->shard, it->range); + BOOST_REQUIRE_EQUAL(it->shard, range_opt->shard); + BOOST_REQUIRE(it->range.equal(range_opt->range, cmp)); + ++it; + } + if (it != expected_result.end()) { + while (it != expected_result.end()) { + testlog.error("missing expected result: shard={} range={}", it->shard, it->range); + ++it; + } + BOOST_FAIL("splitter didn't provide all expected ranges"); + } + }; + auto check_single = [&] (const dht::partition_range& range, std::vector expected_result, + std::source_location sl = std::source_location::current()) { + dht::partition_range_vector ranges; + ranges.reserve(1); + ranges.push_back(std::move(range)); + check(ranges, std::move(expected_result), sl); + }; + auto intersect = [&] (const dht::partition_range& range) { + std::vector intersecting_ranges; + for (const auto& included_range : included_ranges) { + if (auto intersection = included_range.range.intersection(range, cmp)) { + intersecting_ranges.push_back({included_range.shard, std::move(*intersection)}); + } + } + return intersecting_ranges; + }; + auto check_intersection_single = [&] (const dht::partition_range& range, + std::source_location sl = std::source_location::current()) { + check_single(range, intersect(range), sl); + }; + auto check_intersection = [&] (const dht::partition_range_vector& ranges, + std::source_location sl = std::source_location::current()) { + std::vector expected_ranges; + for (const auto& range : ranges) { + auto res = intersect(range); + std::move(res.begin(), res.end(), std::back_inserter(expected_ranges)); + } + std::sort(expected_ranges.begin(), expected_ranges.end(), [&] (const auto& a, const auto& b) { + return !a.range.start() || b.range.before(a.range.start()->value(), cmp); + }); + check(ranges, expected_ranges, sl); + }; + + check_single(dht::partition_range::make_open_ended_both_sides(), included_ranges); + check(boost::copy_range(included_ranges | boost::adaptors::transformed([&] (auto& r) { return r.range; })), included_ranges); + check(excluded_ranges, {}); + + check_intersection_single({bound{dks[0], true}, bound{dks[1], false}}); + check_intersection_single({bound{dks[0], false}, bound{dks[2], true}}); + check_intersection_single({bound{dks[2], true}, bound{dks[3], false}}); + check_intersection_single({bound{dks[0], false}, bound{dks[3], false}}); + check_intersection_single(dht::partition_range::make_starting_with(bound(dks[2], true))); + check_intersection_single(dht::partition_range::make_ending_with(bound(dks[1], false))); + check_intersection_single(dht::partition_range::make_singular(dks[3])); + + check_intersection({ + dht::partition_range::make_ending_with(bound(dks[0], false)), + {bound{dks[1], true}, bound{dks[2], false}}, + dht::partition_range::make_starting_with(bound(dks[3], true))}); + + check_intersection({ + {bound{dks[0], true}, bound{dks[1], false}}, + {bound{dks[1], true}, bound{dks[2], false}}, + {bound{dks[2], true}, bound{dks[3], false}}}); +} diff --git a/test/cql-pytest/conftest.py b/test/cql-pytest/conftest.py index 9e8fee010c52..b719b1899705 100644 --- a/test/cql-pytest/conftest.py +++ b/test/cql-pytest/conftest.py @@ -258,11 +258,6 @@ def has_tablets(cql): with new_test_keyspace(cql, " WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1}") as keyspace: return keyspace_has_tablets(cql, keyspace) -@pytest.fixture(scope="function") -def skip_with_tablets(has_tablets): - if has_tablets: - pytest.skip("Test may crash with tablets experimental feature on") - @pytest.fixture(scope="function") def skip_without_tablets(scylla_only, has_tablets): if not has_tablets: diff --git a/test/cql-pytest/test_select_from_mutation_fragments.py b/test/cql-pytest/test_select_from_mutation_fragments.py index da39515eae19..7a4a37187be6 100644 --- a/test/cql-pytest/test_select_from_mutation_fragments.py +++ b/test/cql-pytest/test_select_from_mutation_fragments.py @@ -27,8 +27,8 @@ def test_table(cql, test_keyspace): yield table -# skip_with_tablets due to https://github.com/scylladb/scylladb/issues/16484 -def test_smoke(cql, test_table, scylla_only, skip_with_tablets): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_smoke(cql, test_table, scylla_only): """ Simple smoke tests, this should fail first if something is very wrong. """ partitions = {} for i in range(0, 1): @@ -71,6 +71,7 @@ def check_partition_rows(rows, expected_rows): check_partition_rows(rows, expected_rows) +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) def test_order_by(cql, test_table, scylla_only): """ ORDER BY is not allowed """ pk1 = util.unique_key_int() @@ -81,6 +82,7 @@ def test_order_by(cql, test_table, scylla_only): cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2} ORDER BY mutation_source DESC") +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) def test_mutation_source(cql, test_table, scylla_only): """ Manipulate where the data is located in the node, and check that the corred mutation source is reported. """ pk1 = util.unique_key_int() @@ -113,6 +115,7 @@ def expect_sources(*expected_sources): expect_sources('memtable', 'row-cache', 'sstable') +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) def test_mutation_dump_range_tombstone_changes(cql, test_table, scylla_only): """ Range tombstones can share the same position. @@ -134,6 +137,7 @@ def test_mutation_dump_range_tombstone_changes(cql, test_table, scylla_only): assert len(res) == 2 * rts + 1 +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) def test_count(cql, test_table, scylla_only): """ Test aggregation (COUNT). """ pk1 = util.unique_key_int() @@ -158,8 +162,8 @@ def check_count(kind, expected_count): check_count('partition end', 1) -# skip_with_tablets due to https://github.com/scylladb/scylladb/issues/16484 -def test_many_partition_scan(cql, test_keyspace, scylla_only, skip_with_tablets): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_many_partition_scan(cql, test_keyspace, scylla_only): """ Full scans work like secondary-index based scans. First, a query is issued to obtain partition-keys, then each partition is read individually. @@ -201,8 +205,8 @@ def test_many_partition_scan(cql, test_keyspace, scylla_only, skip_with_tablets) assert actual_partitions == partitions -# skip_with_tablets due to https://github.com/scylladb/scylladb/issues/16484 -def test_metadata_and_value(cql, test_keyspace, scylla_path, scylla_data_dir, scylla_only, skip_with_tablets): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_metadata_and_value(cql, test_keyspace, scylla_path, scylla_data_dir, scylla_only): """ Test that metadata + value columns allow reconstructing a full sstable dump. Meaning that their json representation of metadata and value is the same. @@ -288,7 +292,8 @@ def merged_value_into_metadata(metadata, value): assert reference_dump_json == reconstructed_dump_json -def test_paging(cql, test_table): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_paging(cql, test_table, scylla_only): """ Test that paging works properly. """ pk1 = util.unique_key_int() pk2 = util.unique_key_int() @@ -315,7 +320,8 @@ def test_paging(cql, test_table): result.fetch_next_page() -def test_slicing_rows(cql, test_table): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_slicing_rows(cql, test_table, scylla_only): """ Test that slicing rows from underlying works. """ pk1 = util.unique_key_int() pk2 = util.unique_key_int() @@ -352,7 +358,8 @@ def check_slice(ck1, ck2_start_inclusive, ck2_end_exclusive): check_slice(2, 0, 100) -def test_slicing_range_tombstone_changes(cql, test_table): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_slicing_range_tombstone_changes(cql, test_table, scylla_only): """ Test that slicing range-tombstone-changes from underlying works. """ pk1 = util.unique_key_int() pk2 = util.unique_key_int() @@ -384,7 +391,8 @@ def check_slice_ck1_fixed(ck2_start_inclusive, ck2_end_exclusive, expected_rtcs) check_slice_ck1_fixed(30, 38, [30, 30, 38]) -def test_ck_in_query(cql, test_table): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_ck_in_query(cql, test_table, scylla_only): pk1 = util.unique_key_int() pk2 = util.unique_key_int() @@ -427,8 +435,8 @@ def test_ck_in_query(cql, test_table): assert getattr(row, col_name) == expected_value -# skip_with_tablets due to https://github.com/scylladb/scylladb/issues/16484 -def test_many_partitions(cql, test_keyspace, scylla_only, skip_with_tablets): +@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) +def test_many_partitions(cql, test_keyspace, scylla_only): num_partitions = 5000 with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table: delete_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ?")