Skip to content

Commit

Permalink
Merge "Add effective_replication_map" from Benny
Browse files Browse the repository at this point in the history
"
The current api design of abstract_replication_strategy
provides a can_yield parameter to calls that may stall
when traversing the token metadata in O(n^2) and even
in O(n) for a large number of token ranges.

But, to use this option the caller must run in a seastar thread.
It can't be used if the caller runs a coroutine or plain
async tasks.

Rather than keep adding threads (e.g. in storage_service::load_and_stream
or storage_service::describe_ring), the series offers an infrastructure
change: precalculating the token->endpoints map once, using an async task,
and keeping the results in a `effective_replication_map` object.
The latter can be used for efficient and stall-free calls, like
get_natural_endpoints, or get_ranges/get_primary_range, replacing their
equivalents in abstract_replication_strategy, and dropping the public
abstract_replication_strategy::calculate_natural_endpoints and its
internal cached_endpoints map.

Other than the performance benefits of:
1. The current calls require running a thread to yield.
Precalculating the map (using async task) allows us to use synchronous calls
without stalling the rector.

2. The replication maps can and should be shared
between keyspaces that use the same replication strategy.
(Will be sent as a follow-up to the series)

The bigger benefits (courtesy of Avi Kivity) are laying the groundwork for:
1. atomic replication metadata - an operation can capture a replication map once, and then use consistent information from the map without worrying that it changes under its feet. We may even be able to s/inet_address/replica_ptr/ later.

2. establish boundaries on the use of replication information - by making a replication map not visible, and observing when its reference count drops to zero, we can tell when the new replication map is fully in use. When we start writing to a new node we'll be able to locate a point in time where all writes that were not aware of the new node were completed (this is the point where we should start streaming).

Notes:
* The get_natural_endpoints method that uses the effective_replication_map
  is still provided as a abstract_replication_strategy virtual method
  so that local_strategy can override it and privide natural endpoints
  for any search token, even in the absence of token_metadata, when\
  called early-on, before token_metadata has been established.

  The effective_replication_map materializes the replication strategy
  over a given replication strategy options and token_metadata.
  Whenever either of those change for a keyspace, we make a new
  effective_replication_map and keep it in the keyspace for latter use.

  Methods that depend on an ad-hoc token_metadata (e.g. during
  node operations like bootstrap or replace) are still provided
  by abstract_replication_strategy.

TODO:
- effective_replication_map registry
- Move pending ranges from token_metadata to replication map
- get rid of abstract_replication_strategy::get_range_addresses(token_metadata&)
  - calculate replication map and use it instead.

Test: unit(dev, debug)
Dtest: next-gating, bootstrap_test.py update_cluster_layout_tests.py alternator_tests.py -a 'dtest-full,!dtest-heavy' (release)
"

* tag 'effective_replication_strategy-v6' of github.com:bhalevy/scylla: (44 commits)
  effective_replication_map: add get_range_addresses
  abstract_replication_strategy: get rid of shared_token_metadata member and ctor param
  abstract_replication_strategy: recognized_options: pass const topology&
  abstract_replication_strategy: precacluate get_replication_factor for effective_replication_map
  token_metadata: get rid of now-unused sync methods
  abstract_replication_strategy: get rid of do_calculate_natural_endpoints
  abstract_replication_strategy: futurize get_*address_ranges
  abstract_replication_strategy: futurize get_range_addresses
  abstract_replication_strategy: futurize get_ranges(inet_address ep, token_metadata_ptr)
  abstract_replication_strategy: move get_ranges and get_primary_ranges* to effective_replication_map
  compaction_manager: pass owned_ranges via cleanup/upgrade options
  abstract_replication_strategy: get rid of cached_endpoints
  all replication strategies: get rid of do_get_natural_endpoints
  storage_proxy: use effective_replication_map token_metadata_ptr along with endpoints
  abstract_replication_strategy: move get_natural_endpoints_without_node_being_replaced to effective_replication_map
  storage_service: bootstrap: add log messages
  storage_service: get_mutable_token_metadata_ptr: always invalidate_cached_rings
  shared_token_metadata: set: check version monotonicity
  token_metadata: use static ring version
  token_metadata: get rid of copy constructor and assignment operator
  ...
  • Loading branch information
avikivity committed Oct 13, 2021
2 parents d8832b9 + 17296cb commit 4f3b8f3
Show file tree
Hide file tree
Showing 49 changed files with 773 additions and 691 deletions.
2 changes: 2 additions & 0 deletions auth/authenticator.hh
Expand Up @@ -74,6 +74,8 @@ class authenticated_user;
///
class authenticator {
public:
using ptr_type = std::unique_ptr<authenticator>;

///
/// The name of the key to be used for the user-name part of password authentication with \ref authenticate.
///
Expand Down
2 changes: 2 additions & 0 deletions auth/authorizer.hh
Expand Up @@ -91,6 +91,8 @@ public:
///
class authorizer {
public:
using ptr_type = std::unique_ptr<authorizer>;

virtual ~authorizer() = default;

virtual future<> start() = 0;
Expand Down
1 change: 1 addition & 0 deletions auth/role_manager.hh
Expand Up @@ -106,6 +106,7 @@ public:
// i.e: given attribute name 'a' this map holds role name and it's assigned
// value of 'a'.
using attribute_vals = std::unordered_map<sstring, sstring>;
using ptr_type = std::unique_ptr<role_manager>;
public:
virtual ~role_manager() = default;

Expand Down
2 changes: 1 addition & 1 deletion auth/service.cc
Expand Up @@ -144,7 +144,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c
auto& db = _qp.db();

if (!db.has_keyspace(meta::AUTH_KS)) {
std::map<sstring, sstring> opts{{"replication_factor", "1"}};
locator::replication_strategy_config_options opts{{"replication_factor", "1"}};

auto ksm = keyspace_metadata::new_keyspace(
meta::AUTH_KS,
Expand Down
6 changes: 3 additions & 3 deletions auth/service.hh
Expand Up @@ -88,11 +88,11 @@ class service final : public seastar::peering_sharded_service<service> {

::service::migration_notifier& _mnotifier;

std::unique_ptr<authorizer> _authorizer;
authorizer::ptr_type _authorizer;

std::unique_ptr<authenticator> _authenticator;
authenticator::ptr_type _authenticator;

std::unique_ptr<role_manager> _role_manager;
role_manager::ptr_type _role_manager;

// Only one of these should be registered, so we end up with some unused instances. Not the end of the world.
std::unique_ptr<::service::migration_listener> _migration_listener;
Expand Down
8 changes: 4 additions & 4 deletions compaction/compaction.cc
Expand Up @@ -1114,18 +1114,18 @@ class cleanup_compaction final : public regular_compaction {
}

private:
cleanup_compaction(database& db, column_family& cf, compaction_descriptor descriptor, compaction_data& info)
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_data& info, dht::token_range_vector owned_ranges)
: regular_compaction(cf, std::move(descriptor), info)
, _owned_ranges(db.get_keyspace_local_ranges(_schema->ks_name()))
, _owned_ranges(std::move(owned_ranges))
, _owned_ranges_checker(_owned_ranges)
{
}

public:
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_data& info, compaction_type_options::cleanup opts)
: cleanup_compaction(opts.db, cf, std::move(descriptor), info) {}
: cleanup_compaction(cf, std::move(descriptor), info, opts.owned_ranges) {}
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_data& info, compaction_type_options::upgrade opts)
: cleanup_compaction(opts.db, cf, std::move(descriptor), info) {}
: cleanup_compaction(cf, std::move(descriptor), info, opts.owned_ranges) {}

flat_mutation_reader make_sstable_reader() const override {
return make_filtering_reader(regular_compaction::make_sstable_reader(), make_partition_filter());
Expand Down
12 changes: 6 additions & 6 deletions compaction/compaction_descriptor.hh
Expand Up @@ -67,10 +67,10 @@ public:
struct regular {
};
struct cleanup {
std::reference_wrapper<database> db;
dht::token_range_vector owned_ranges;
};
struct upgrade {
std::reference_wrapper<database> db;
dht::token_range_vector owned_ranges;
};
struct scrub {
enum class mode {
Expand Down Expand Up @@ -108,12 +108,12 @@ public:
return compaction_type_options(regular{});
}

static compaction_type_options make_cleanup(database& db) {
return compaction_type_options(cleanup{db});
static compaction_type_options make_cleanup(dht::token_range_vector&& owned_ranges) {
return compaction_type_options(cleanup{std::move(owned_ranges)});
}

static compaction_type_options make_upgrade(database& db) {
return compaction_type_options(upgrade{db});
static compaction_type_options make_upgrade(dht::token_range_vector&& owned_ranges) {
return compaction_type_options(upgrade{std::move(owned_ranges)});
}

static compaction_type_options make_scrub(scrub::mode mode) {
Expand Down
15 changes: 8 additions & 7 deletions compaction/compaction_manager.cc
Expand Up @@ -846,17 +846,16 @@ future<> compaction_manager::perform_cleanup(database& db, column_family* cf) {
}
return seastar::async([this, cf, &db] {
auto schema = cf->schema();
auto& rs = db.find_keyspace(schema->ks_name()).get_replication_strategy();
auto sorted_owned_ranges = rs.get_ranges(utils::fb_utilities::get_broadcast_address(), utils::can_yield::yes);
auto sorted_owned_ranges = db.get_keyspace_local_ranges(schema->ks_name());
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = get_candidates(*cf);
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema);
});
return sstables;
}).then([this, cf, &db] (std::vector<sstables::shared_sstable> sstables) {
return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(db),
return std::tuple<dht::token_range_vector, std::vector<sstables::shared_sstable>>(sorted_owned_ranges, sstables);
}).then_unpack([this, cf, &db] (dht::token_range_vector owned_ranges, std::vector<sstables::shared_sstable> sstables) {
return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(std::move(owned_ranges)),
[sstables = std::move(sstables)] (const table&) { return sstables; });
});
}
Expand All @@ -881,14 +880,16 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family
}
}
return make_ready_future<>();
}).then([this, &db, cf, &tables] {
}).then([&db, cf] {
return db.get_keyspace_local_ranges(cf->schema()->ks_name());
}).then([this, &db, cf, &tables] (dht::token_range_vector owned_ranges) {
// doing a "cleanup" is about as compacting as we need
// to be, provided we get to decide the tables to process,
// and ignoring any existing operations.
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(db), [&](auto&) mutable {
return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(std::move(owned_ranges)), [&](auto&) mutable {
return std::exchange(tables, {});
});
});
Expand Down
2 changes: 1 addition & 1 deletion compress.cc
Expand Up @@ -73,7 +73,7 @@ std::map<sstring, sstring> compressor::options() const {
return {};
}

shared_ptr<compressor> compressor::create(const sstring& name, const opt_getter& opts) {
compressor::ptr_type compressor::create(const sstring& name, const opt_getter& opts) {
if (name.empty()) {
return {};
}
Expand Down
15 changes: 8 additions & 7 deletions compress.hh
Expand Up @@ -74,22 +74,23 @@ public:
// to cheaply bridge sstable compression options / maps
using opt_string = std::optional<sstring>;
using opt_getter = std::function<opt_string(const sstring&)>;
using ptr_type = shared_ptr<compressor>;

static shared_ptr<compressor> create(const sstring& name, const opt_getter&);
static shared_ptr<compressor> create(const std::map<sstring, sstring>&);
static ptr_type create(const sstring& name, const opt_getter&);
static ptr_type create(const std::map<sstring, sstring>&);

static thread_local const shared_ptr<compressor> lz4;
static thread_local const shared_ptr<compressor> snappy;
static thread_local const shared_ptr<compressor> deflate;
static thread_local const ptr_type lz4;
static thread_local const ptr_type snappy;
static thread_local const ptr_type deflate;

static const sstring namespace_prefix;
};

template<typename BaseType, typename... Args>
class class_registry;

using compressor_ptr = shared_ptr<compressor>;
using compressor_registry = class_registry<compressor_ptr, const typename compressor::opt_getter&>;
using compressor_ptr = compressor::ptr_type;
using compressor_registry = class_registry<compressor, const typename compressor::opt_getter&>;

class compression_parameters {
public:
Expand Down
1 change: 0 additions & 1 deletion cql3/statements/create_keyspace_statement.cc
Expand Up @@ -150,7 +150,6 @@ future<> cql3::statements::create_keyspace_statement::grant_permissions_to_creat

using strategy_class_registry = class_registry<
locator::abstract_replication_strategy,
const locator::shared_token_metadata&,
locator::snitch_ptr&,
const std::map<sstring, sstring>&>;

Expand Down
45 changes: 26 additions & 19 deletions database.cc
Expand Up @@ -57,6 +57,7 @@

#include "utils/human_readable.hh"
#include "utils/fb_utilities.hh"
#include "utils/stall_free.hh"

#include "db/timeout_clock.hh"
#include "db/large_data_handler.hh"
Expand Down Expand Up @@ -108,7 +109,7 @@ make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_sou
lw_shared_ptr<keyspace_metadata>
keyspace_metadata::new_keyspace(std::string_view name,
std::string_view strategy_name,
std::map<sstring, sstring> options,
locator::replication_strategy_config_options options,
bool durables_writes,
std::vector<schema_ptr> cf_defs)
{
Expand Down Expand Up @@ -863,7 +864,7 @@ future<> database::update_keyspace(sharded<service::storage_proxy>& proxy, const
}
}

ks.update_from(get_shared_token_metadata(), std::move(new_ksm));
co_await ks.update_from(get_shared_token_metadata(), std::move(new_ksm));
co_await get_notifier().update_keyspace(ks.metadata());
}

Expand Down Expand Up @@ -1041,13 +1042,20 @@ bool database::column_family_exists(const utils::UUID& uuid) const {
return _column_families.contains(uuid);
}

void
keyspace::create_replication_strategy(const locator::shared_token_metadata& stm, const std::map<sstring, sstring>& options) {
future<>
keyspace::create_replication_strategy(const locator::shared_token_metadata& stm, const locator::replication_strategy_config_options& options) {
using namespace locator;

_replication_strategy =
abstract_replication_strategy::create_replication_strategy(
_metadata->strategy_name(), stm, options);
_metadata->strategy_name(), options);

update_effective_replication_map(co_await calculate_effective_replication_map(_replication_strategy, stm.get()));
}

void
keyspace::update_effective_replication_map(locator::mutable_effective_replication_map_ptr erm) {
_effective_replication_map = std::move(erm);
}

locator::abstract_replication_strategy&
Expand All @@ -1061,9 +1069,9 @@ keyspace::get_replication_strategy() const {
return *_replication_strategy;
}

void keyspace::update_from(const locator::shared_token_metadata& stm, ::lw_shared_ptr<keyspace_metadata> ksm) {
future<> keyspace::update_from(const locator::shared_token_metadata& stm, ::lw_shared_ptr<keyspace_metadata> ksm) {
_metadata = std::move(ksm);
create_replication_strategy(stm, _metadata->strategy_options());
return create_replication_strategy(stm, _metadata->strategy_options());
}

future<> keyspace::ensure_populated() const {
Expand Down Expand Up @@ -1182,13 +1190,12 @@ const column_family& database::find_column_family(const schema_ptr& schema) cons

using strategy_class_registry = class_registry<
locator::abstract_replication_strategy,
const locator::shared_token_metadata&,
locator::snitch_ptr&,
const std::map<sstring, sstring>&>;
const locator::replication_strategy_config_options&>;

keyspace_metadata::keyspace_metadata(std::string_view name,
std::string_view strategy_name,
std::map<sstring, sstring> strategy_options,
locator::replication_strategy_config_options strategy_options,
bool durable_writes,
std::vector<schema_ptr> cf_defs)
: keyspace_metadata(name,
Expand All @@ -1200,7 +1207,7 @@ keyspace_metadata::keyspace_metadata(std::string_view name,

keyspace_metadata::keyspace_metadata(std::string_view name,
std::string_view strategy_name,
std::map<sstring, sstring> strategy_options,
locator::replication_strategy_config_options strategy_options,
bool durable_writes,
std::vector<schema_ptr> cf_defs,
user_types_metadata user_types)
Expand All @@ -1215,20 +1222,20 @@ keyspace_metadata::keyspace_metadata(std::string_view name,
}
}

void keyspace_metadata::validate(const locator::shared_token_metadata& stm) const {
void keyspace_metadata::validate(const locator::topology& topology) const {
using namespace locator;
abstract_replication_strategy::validate_replication_strategy(name(), strategy_name(), stm, strategy_options());
abstract_replication_strategy::validate_replication_strategy(name(), strategy_name(), strategy_options(), topology);
}

void database::validate_keyspace_update(keyspace_metadata& ksm) {
ksm.validate(get_shared_token_metadata());
ksm.validate(get_token_metadata().get_topology());
if (!has_keyspace(ksm.name())) {
throw exceptions::configuration_exception(format("Cannot update non existing keyspace '{}'.", ksm.name()));
}
}

void database::validate_new_keyspace(keyspace_metadata& ksm) {
ksm.validate(get_shared_token_metadata());
ksm.validate(get_token_metadata().get_topology());
if (has_keyspace(ksm.name())) {
throw exceptions::already_exists_exception{ksm.name()};
}
Expand Down Expand Up @@ -1269,7 +1276,7 @@ std::vector<view_ptr> database::get_views() const {
| boost::adaptors::transformed([] (auto& cf) { return view_ptr(cf->schema()); }));
}

void database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, system_keyspace system) {
future<> database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, system_keyspace system) {
auto kscfg = make_keyspace_config(*ksm);
if (system == system_keyspace::yes) {
kscfg.enable_disk_reads = kscfg.enable_disk_writes = kscfg.enable_commitlog = !_cfg.volatile_system_keyspace_for_testing();
Expand All @@ -1278,7 +1285,7 @@ void database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>&
kscfg.dirty_memory_manager = &_system_dirty_memory_manager;
}
keyspace ks(ksm, std::move(kscfg));
ks.create_replication_strategy(get_shared_token_metadata(), ksm->strategy_options());
co_await ks.create_replication_strategy(get_shared_token_metadata(), ksm->strategy_options());
_keyspaces.emplace(ksm->name(), std::move(ks));
}

Expand All @@ -1293,7 +1300,7 @@ database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, bool is_b
co_return;
}

create_in_memory_keyspace(ksm, system);
co_await create_in_memory_keyspace(ksm, system);
auto& ks = _keyspaces.at(ksm->name());
auto& datadir = ks.datadir();

Expand Down Expand Up @@ -2181,7 +2188,7 @@ const sstring& database::get_snitch_name() const {
}

dht::token_range_vector database::get_keyspace_local_ranges(sstring ks) {
return find_keyspace(ks).get_replication_strategy().get_ranges(utils::fb_utilities::get_broadcast_address());
return find_keyspace(ks).get_effective_replication_map()->get_ranges(utils::fb_utilities::get_broadcast_address());
}

/*!
Expand Down

0 comments on commit 4f3b8f3

Please sign in to comment.