Skip to content

Commit

Permalink
Merge 'staging sstables: filter tokens for view update generation' fr…
Browse files Browse the repository at this point in the history
…om Benny Halevy

This mini-series introduces dht::tokens_filter and uses it for consuming staging sstable in the view_update_generator.

The tokens_filter uses the token ranges owned by the current node, as retrieved by get_keyspace_local_ranges.

Refs #9559

Closes #11932

* github.com:scylladb/scylladb:
  db: view_update_generator: always clean up staging sstables
  compaction: extract incremental_owned_ranges_checker out to dht

(cherry picked from commit 3aff59f)
  • Loading branch information
denesb committed Aug 9, 2023
1 parent 17cb69e commit 3cc5c80
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 24 deletions.
25 changes: 2 additions & 23 deletions compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "mutation_compactor.hh"
#include "leveled_manifest.hh"
#include "dht/token.hh"
#include "dht/partition_filter.hh"
#include "mutation_writer/shard_based_splitting_writer.hh"
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "mutation_source_metadata.hh"
Expand Down Expand Up @@ -1080,30 +1081,8 @@ class regular_compaction : public compaction {
};

class cleanup_compaction final : public regular_compaction {
class incremental_owned_ranges_checker {
const dht::token_range_vector& _sorted_owned_ranges;
mutable dht::token_range_vector::const_iterator _it;
public:
incremental_owned_ranges_checker(const dht::token_range_vector& sorted_owned_ranges)
: _sorted_owned_ranges(sorted_owned_ranges)
, _it(_sorted_owned_ranges.begin()) {
}

// Must be called with increasing token values.
bool belongs_to_current_node(const dht::token& t) const {
// While token T is after a range Rn, advance the iterator.
// iterator will be stopped at a range which either overlaps with T (if T belongs to node),
// or at a range which is after T (if T doesn't belong to this node).
while (_it != _sorted_owned_ranges.end() && _it->after(t, dht::token_comparator())) {
_it++;
}

return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator());
}
};

owned_ranges_ptr _owned_ranges;
incremental_owned_ranges_checker _owned_ranges_checker;
mutable dht::incremental_owned_ranges_checker _owned_ranges_checker;
private:
// Called in a seastar thread
dht::partition_range_vector
Expand Down
4 changes: 3 additions & 1 deletion db/view/view_update_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "db/view/view_updating_consumer.hh"
#include "sstables/sstables.hh"
#include "readers/evictable.hh"
#include "dht/partition_filter.hh"

static logging::logger vug_logger("view_update_generator");

Expand Down Expand Up @@ -87,7 +88,8 @@ future<> view_update_generator::start() {
auto close_sr = deferred_close(staging_sstable_reader);

inject_failure("view_update_generator_consume_staging_sstable");
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle));
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle),
dht::incremental_owned_ranges_checker::make_partition_filter(_db.get_keyspace_local_ranges(s->ks_name())));
if (result == stop_iteration::yes) {
break;
}
Expand Down
7 changes: 7 additions & 0 deletions dht/i_partitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "sharder.hh"
#include <seastar/core/seastar.hh>
#include "dht/token-sharding.hh"
#include "dht/partition_filter.hh"
#include "utils/class_registrator.hh"
#include "types.hh"
#include "utils/murmur_hash.hh"
Expand Down Expand Up @@ -362,4 +363,10 @@ split_range_to_shards(dht::partition_range pr, const schema& s) {
return ret;
}

flat_mutation_reader_v2::filter incremental_owned_ranges_checker::make_partition_filter(const dht::token_range_vector& sorted_owned_ranges) {
return [checker = incremental_owned_ranges_checker(sorted_owned_ranges)] (const dht::decorated_key& dk) mutable {
return checker.belongs_to_current_node(dk.token());
};
}

}
41 changes: 41 additions & 0 deletions dht/partition_filter.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/

/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/

#pragma once

#include "dht/i_partitioner.hh"
#include "readers/flat_mutation_reader_v2.hh"

namespace dht {

class incremental_owned_ranges_checker {
const dht::token_range_vector& _sorted_owned_ranges;
mutable dht::token_range_vector::const_iterator _it;
public:
incremental_owned_ranges_checker(const dht::token_range_vector& sorted_owned_ranges)
: _sorted_owned_ranges(sorted_owned_ranges)
, _it(_sorted_owned_ranges.begin()) {
}

// Must be called with increasing token values.
bool belongs_to_current_node(const dht::token& t) {
// While token T is after a range Rn, advance the iterator.
// iterator will be stopped at a range which either overlaps with T (if T belongs to node),
// or at a range which is after T (if T doesn't belong to this node).
while (_it != _sorted_owned_ranges.end() && _it->after(t, dht::token_comparator())) {
_it++;
}

return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator());
}

static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges);
};

} // dht

0 comments on commit 3cc5c80

Please sign in to comment.