Skip to content

Commit

Permalink
Merge 'multishard_mutation_query: add tablets support' from Botond Dénes
Browse files Browse the repository at this point in the history
When reading a list of ranges with tablets, we don't need a multishard reader. Instead, we intersect the range list with the local nodes tablet ranges, then read each range from the respective shard.
The individual ranges are read sequentially, with database::query[_mutations](), merging the results into a single
instance. This makes the code simple. For tablets multishard_mutation_query.cc is no longer on the hot paths, range scans
on tables with tablets fork off to a different code-path in the coordinator. The only code using multishard_mutation_query.cc are forced, replica-local scans, like those used by SELECT * FROM MUTATION_FRAGMENTS(). These are mainly used for diagnostics and tests, so we optimize for simplicity, not performance.

Fixes: #16484

Closes #16802

* github.com:scylladb/scylladb:
  test/cql-pytest: remove skip_with_tablets fixture
  test/cql-pytest: test_select_from_mutation_fragments.py parameterize tests
  test/cql-pytest: test_select_from_mutation_fragments.py: remove skip_with_tablets
  multishard_mutation_query: add tablets support
  multishard_mutation_query: remove compaction-state from result-builder factory
  multishard_mutation_query: do_query(): return foreign_ptr<lw_shared_ptr<result>>
  mutation_query: reconcilable_result: add merge_disjoint()
  locator: introduce tablet_range_spliter
  dht/i_partitioner: to_partition_range(): don't assume input is fully inclusive
  interval: add before() overload which takes another interval
  • Loading branch information
avikivity committed Feb 21, 2024
2 parents 94dac43 + ca58590 commit 4be70bf
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 45 deletions.
11 changes: 7 additions & 4 deletions dht/i_partitioner.cc
Expand Up @@ -300,19 +300,22 @@ std::strong_ordering ring_position_comparator_for_sstables::operator()(sstables:

dht::partition_range
to_partition_range(dht::token_range r) {
using bound = dht::partition_range::bound;
using bound_opt = std::optional<dht::partition_range::bound>;
auto start = r.start()
? bound_opt(dht::ring_position(r.start()->value(),
? bound_opt(bound(dht::ring_position(r.start()->value(),
r.start()->is_inclusive()
? dht::ring_position::token_bound::start
: dht::ring_position::token_bound::end))
: dht::ring_position::token_bound::end),
r.start()->is_inclusive()))
: bound_opt();

auto end = r.end()
? bound_opt(dht::ring_position(r.end()->value(),
? bound_opt(bound(dht::ring_position(r.end()->value(),
r.end()->is_inclusive()
? dht::ring_position::token_bound::end
: dht::ring_position::token_bound::start))
: dht::ring_position::token_bound::start),
r.end()->is_inclusive()))
: bound_opt();

return { std::move(start), std::move(end) };
Expand Down
32 changes: 32 additions & 0 deletions interval.hh
Expand Up @@ -154,6 +154,33 @@ public:
}
return false;
}
// the other inverval is before this interval (works only for non wrapped intervals)
// Comparator must define a total ordering on T.
bool other_is_before(const wrapping_interval<T>& o, IntervalComparatorFor<T> auto&& cmp) const {
assert(!is_wrap_around(cmp));
assert(!o.is_wrap_around(cmp));
if (!start() || !o.end()) {
return false;
}
auto r = cmp(o.end()->value(), start()->value());
if (r < 0) {
return true;
}
if (r > 0) {
return false;
}
// o.end()->value() == start()->value(), we decide based on inclusiveness
const auto ei = o.end()->is_inclusive();
const auto si = start()->is_inclusive();
if (!ei && !si) {
return true;
}
// At least one is inclusive, check that the other isn't
if (ei != si) {
return true;
}
return false;
}
// the point is after the interval (works only for non wrapped intervals)
// Comparator must define a total ordering on T.
bool after(const T& point, IntervalComparatorFor<T> auto&& cmp) const {
Expand Down Expand Up @@ -482,6 +509,11 @@ public:
bool before(const T& point, IntervalComparatorFor<T> auto&& cmp) const {
return _interval.before(point, std::forward<decltype(cmp)>(cmp));
}
// the other interval is before this interval.
// Comparator must define a total ordering on T.
bool other_is_before(const nonwrapping_interval<T>& o, IntervalComparatorFor<T> auto&& cmp) const {
return _interval.other_is_before(o, std::forward<decltype(cmp)>(cmp));
}
// the point is after the interval.
// Comparator must define a total ordering on T.
bool after(const T& point, IntervalComparatorFor<T> auto&& cmp) const {
Expand Down
52 changes: 52 additions & 0 deletions locator/tablets.cc
Expand Up @@ -444,6 +444,58 @@ load_stats& load_stats::operator+=(const load_stats& s) {
return *this;
}

tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges)
: _schema(std::move(schema))
, _ranges(ranges)
, _ranges_it(_ranges.begin())
{
// Filter all tablets and save only those that have a replica on the specified host.
for (auto tid = std::optional(tablets.first_tablet()); tid; tid = tablets.next_tablet(*tid)) {
const auto& tablet_info = tablets.get_tablet_info(*tid);

auto replica_it = std::ranges::find_if(tablet_info.replicas, [&] (auto&& r) { return r.host == host; });
if (replica_it == tablet_info.replicas.end()) {
continue;
}

_tablet_ranges.emplace_back(range_split_result{replica_it->shard, dht::to_partition_range(tablets.get_token_range(*tid))});
}
_tablet_ranges_it = _tablet_ranges.begin();
}

std::optional<tablet_range_splitter::range_split_result> tablet_range_splitter::operator()() {
if (_ranges_it == _ranges.end() || _tablet_ranges_it == _tablet_ranges.end()) {
return {};
}

dht::ring_position_comparator cmp(*_schema);

while (_ranges_it != _ranges.end()) {
// First, skip all tablet-ranges that are completely before the current range.
while (_ranges_it->other_is_before(_tablet_ranges_it->range, cmp)) {
++_tablet_ranges_it;
}
// Generate intersections with all tablet-ranges that overlap with the current range.
if (auto intersection = _ranges_it->intersection(_tablet_ranges_it->range, cmp)) {
const auto shard = _tablet_ranges_it->shard;
if (_ranges_it->end() && cmp(_ranges_it->end()->value(), _tablet_ranges_it->range.end()->value()) < 0) {
// The current tablet range extends beyond the current range,
// move to the next range.
++_ranges_it;
} else {
// The current range extends beyond the current tablet range,
// move to the next tablet range.
++_tablet_ranges_it;
}
return range_split_result{shard, std::move(*intersection)};
}
// Current tablet-range is completely after the current range, move to the next range.
++_ranges_it;
}

return {};
}

// Estimates the external memory usage of std::unordered_map<>.
// Does not include external memory usage of elements.
template <typename K, typename V>
Expand Down
30 changes: 30 additions & 0 deletions locator/tablets.hh
Expand Up @@ -13,6 +13,7 @@
#include "locator/host_id.hh"
#include "service/session.hh"
#include "dht/i_partitioner_fwd.hh"
#include "dht/ring_position.hh"
#include "schema/schema_fwd.hh"
#include "utils/chunked_vector.hh"
#include "utils/hash.hh"
Expand Down Expand Up @@ -468,6 +469,35 @@ struct tablet_routing_info {
std::pair<dht::token, dht::token> token_range;
};

/// Split a list of ranges, such that conceptually each input range is
/// intersected with each tablet range.
/// Tablets are pre-filtered, slecting only tablets that have a replica on the
/// given host.
/// Return the resulting intersections, in order.
/// The ranges are generated lazily (one at a time).
///
/// Note: the caller is expected to pin tablets, by keeping an
/// effective-replication-map alive.
class tablet_range_splitter {
public:
struct range_split_result {
shard_id shard; // shard where the tablet owning this range lives
dht::partition_range range;
};

private:
schema_ptr _schema;
const dht::partition_range_vector& _ranges;
dht::partition_range_vector::const_iterator _ranges_it;
std::vector<range_split_result> _tablet_ranges;
std::vector<range_split_result>::iterator _tablet_ranges_it;

public:
tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges);
/// Returns nullopt when there are no more ranges.
std::optional<range_split_result> operator()();
};

}

template <>
Expand Down

0 comments on commit 4be70bf

Please sign in to comment.