Skip to content

Commit

Permalink
Merge 'raft: topology: outside topology-on-raft mode, make sure not t…
Browse files Browse the repository at this point in the history
…o use its RPCs' from Piotr Dulikowski

Topology on raft is still an experimental feature. The RPC verbs
introduced in that mode shouldn't be used when it's disabled, otherwise
we lose the right to make breaking changes to those verbs.

First, make sure that the aforementioned verbs are not sent outside the
mode. It turns out that `raft_pull_topology_snapshot` could be sent
outside topology-on-raft mode - after the PR, it no longer can.

Second, topology-on-raft mode verbs are now not registered at all on the
receiving side when the mode is disabled.

Additionally tested by running `topology/` tests with
`consistent_cluster_management: True` but with experimental features
disabled.

Fixes: #15862

Closes #15917

* github.com:scylladb/scylladb:
  storage_service: fix indentation
  raft: topology: only register verbs in topology-on-raft mode
  raft: topology: only pull topology snapshot in topology-on-raft mode
  • Loading branch information
kbr-scylla committed Nov 2, 2023
2 parents 798eede + 6d15f02 commit 5cf18b1
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 108 deletions.
2 changes: 1 addition & 1 deletion main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// Need to do it before allowing incomming messaging service connections since
// storage proxy's and migration manager's verbs may access group0.
// This will also disable migration manager schema pulls if needed.
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local(), raft_topology_change_enabled).get();

// It's essential to load fencing_version prior to starting the messaging service,
// since incoming messages may require fencing.
Expand Down
9 changes: 6 additions & 3 deletions service/raft/group0_state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
on_internal_error(slogger, "Expected MIGRATION_REQUEST to return canonical mutations");
}

auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, from_id, service::raft_topology_pull_params{});
std::optional<service::raft_topology_snapshot> topology_snp;
if (_topology_change_enabled) {
topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, from_id, service::raft_topology_pull_params{});
}

auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary());

Expand All @@ -239,8 +242,8 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::

co_await _mm.merge_schema_from(addr, std::move(*cm));

if (!topology_snp.topology_mutations.empty()) {
co_await _ss.merge_topology_snapshot(std::move(topology_snp));
if (topology_snp && !topology_snp->topology_mutations.empty()) {
co_await _ss.merge_topology_snapshot(std::move(*topology_snp));
// Flush so that current supported and enabled features are readable before commitlog replay
co_await _sp.get_db().local().flush(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
}
Expand Down
5 changes: 3 additions & 2 deletions service/raft/group0_state_machine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ class group0_state_machine : public raft_state_machine {
raft_address_map& _address_map;
seastar::gate _gate;
abort_source _abort_source;
bool _topology_change_enabled;

future<> merge_and_apply(group0_state_machine_merger& merger);
public:
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss, raft_address_map& address_map)
: _client(client), _mm(mm), _sp(sp), _ss(ss), _address_map(address_map) {}
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss, raft_address_map& address_map, bool topology_change_enabled)
: _client(client), _mm(mm), _sp(sp), _ss(ss), _address_map(address_map), _topology_change_enabled(topology_change_enabled) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
Expand Down
42 changes: 21 additions & 21 deletions service/raft/raft_group0.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ const raft::server_id& raft_group0::load_my_id() {
}

raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm) {
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map());
service::migration_manager& mm, bool topology_change_enabled) {
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), topology_change_enabled);
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
Expand Down Expand Up @@ -378,7 +378,7 @@ future<> raft_group0::abort() {
co_await stop_group0();
}

future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
assert(group0_id != raft::group_id{});
// The address map may miss our own id in case we connect
// to an existing Raft Group 0 leader.
Expand All @@ -390,7 +390,7 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service:
// we ensure we haven't missed any IP update in the map.
load_initial_raft_address_map();
group0_log.info("Server {} is starting group 0 with id {}", my_id, group0_id);
co_await _raft_gr.start_server_for_group(create_server_for_group0(group0_id, my_id, ss, qp, mm));
co_await _raft_gr.start_server_for_group(create_server_for_group0(group0_id, my_id, ss, qp, mm, topology_change_enabled));
_group0.emplace<raft::group_id>(group0_id);
}

Expand All @@ -416,14 +416,14 @@ future<> raft_group0::leadership_monitor_fiber() {
}

future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_ptr<service::group0_handshaker> handshaker, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm,
db::system_keyspace& sys_ks) {
db::system_keyspace& sys_ks, bool topology_change_enabled) {
assert(this_shard_id() == 0);
assert(!joined_group0());

auto group0_id = raft::group_id{co_await sys_ks.get_raft_group0_id()};
if (group0_id) {
// Group 0 ID present means we've already joined group 0 before.
co_return co_await start_server_for_group0(group0_id, ss, qp, mm);
co_return co_await start_server_for_group0(group0_id, ss, qp, mm, topology_change_enabled);
}

raft::server* server = nullptr;
Expand Down Expand Up @@ -467,7 +467,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
// Bootstrap the initial configuration
co_await raft_sys_table_storage(qp, group0_id, my_id)
.bootstrap(std::move(initial_configuration), nontrivial_snapshot);
co_await start_server_for_group0(group0_id, ss, qp, mm);
co_await start_server_for_group0(group0_id, ss, qp, mm, topology_change_enabled);
server = &_raft_gr.group0();
// FIXME if we crash now or after getting added to the config but before storing group 0 ID,
// we'll end with a bootstrapped server that possibly added some entries, but we won't remember that we have such a server
Expand Down Expand Up @@ -608,7 +608,7 @@ future<bool> raft_group0::use_raft() {
co_return true;
}

future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
if (!co_await use_raft()) {
co_return;
}
Expand All @@ -627,7 +627,7 @@ future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service
if (group0_id) {
// Group 0 ID is present => we've already joined group 0 earlier.
group0_log.info("setup_group0: group 0 ID present. Starting existing Raft server.");
co_await start_server_for_group0(group0_id, ss, qp, mm);
co_await start_server_for_group0(group0_id, ss, qp, mm, topology_change_enabled);

// Start group 0 leadership monitor fiber.
_leadership_monitor = leadership_monitor_fiber();
Expand Down Expand Up @@ -656,7 +656,7 @@ future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service

future<> raft_group0::setup_group0(
db::system_keyspace& sys_ks, const std::unordered_set<gms::inet_address>& initial_contact_nodes, shared_ptr<group0_handshaker> handshaker,
std::optional<replace_info> replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
std::optional<replace_info> replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
if (!co_await use_raft()) {
co_return;
}
Expand Down Expand Up @@ -691,7 +691,7 @@ future<> raft_group0::setup_group0(
}

group0_log.info("setup_group0: joining group 0...");
co_await join_group0(std::move(seeds), std::move(handshaker), ss, qp, mm, sys_ks);
co_await join_group0(std::move(seeds), std::move(handshaker), ss, qp, mm, sys_ks, topology_change_enabled);
group0_log.info("setup_group0: successfully joined group 0.");

// Start group 0 leadership monitor fiber.
Expand Down Expand Up @@ -757,7 +757,7 @@ void raft_group0::load_initial_raft_address_map() {
});
}

future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
if (joined_group0()) {
group0_log.info("finish_setup_after_join: group 0 ID present, loading server info.");
auto my_id = load_my_id();
Expand Down Expand Up @@ -792,10 +792,10 @@ future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3
}

// The listener may fire immediately, create a thread for that case.
co_await seastar::async([this, &ss, &qp, &mm] {
_raft_support_listener = _feat.supports_raft_cluster_mgmt.when_enabled([this, &ss, &qp, &mm] {
co_await seastar::async([this, &ss, &qp, &mm, topology_change_enabled] {
_raft_support_listener = _feat.supports_raft_cluster_mgmt.when_enabled([this, &ss, &qp, &mm, topology_change_enabled] {
group0_log.info("finish_setup_after_join: SUPPORTS_RAFT feature enabled. Starting internal upgrade-to-raft procedure.");
upgrade_to_group0(ss, qp, mm).get();
upgrade_to_group0(ss, qp, mm, topology_change_enabled).get();
});
});
}
Expand Down Expand Up @@ -1566,7 +1566,7 @@ static auto warn_if_upgrade_takes_too_long() {
});
}

future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
assert(this_shard_id() == 0);

// The SUPPORTS_RAFT cluster feature is enabled, so the local RAFT feature must also be enabled
Expand All @@ -1592,10 +1592,10 @@ future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::quer
}

(void)[] (raft_group0& self, abort_source& as, group0_upgrade_state start_state, gate::holder pause_shutdown, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm) -> future<> {
service::migration_manager& mm, bool topology_change_enabled) -> future<> {
auto warner = warn_if_upgrade_takes_too_long();
try {
co_await self.do_upgrade_to_group0(start_state, ss, qp, mm);
co_await self.do_upgrade_to_group0(start_state, ss, qp, mm, topology_change_enabled);
co_await self._client.set_group0_upgrade_state(group0_upgrade_state::use_post_raft_procedures);
upgrade_log.info("Raft upgrade finished. Disabling migration_manager schema pulls.");
co_await mm.disable_schema_pulls();
Expand All @@ -1605,11 +1605,11 @@ future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::quer
" If the procedure gets stuck, manual recovery may be required."
" Consult the relevant documentation: {}", std::current_exception(), raft_upgrade_doc);
}
}(std::ref(*this), std::ref(_abort_source), start_state, _shutdown_gate.hold(), ss, qp, mm);
}(std::ref(*this), std::ref(_abort_source), start_state, _shutdown_gate.hold(), ss, qp, mm, topology_change_enabled);
}

// `start_state` is either `use_pre_raft_procedures` or `synchronize`.
future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
assert(this_shard_id() == 0);

// Check if every peer knows about the upgrade procedure.
Expand Down Expand Up @@ -1637,7 +1637,7 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, ser
if (!joined_group0()) {
upgrade_log.info("Joining group 0...");
auto handshaker = make_legacy_handshaker(true); // Voter
co_await join_group0(co_await _sys_ks.load_peers(), std::move(handshaker), ss, qp, mm, _sys_ks);
co_await join_group0(co_await _sys_ks.load_peers(), std::move(handshaker), ss, qp, mm, _sys_ks, topology_change_enabled);
} else {
upgrade_log.info(
"We're already a member of group 0."
Expand Down
16 changes: 8 additions & 8 deletions service/raft/raft_group0.hh
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public:
//
// Also make sure to call `finish_setup_after_join` after the node has joined the cluster and entered NORMAL state.
future<> setup_group0(db::system_keyspace&, const std::unordered_set<gms::inet_address>& initial_contact_nodes, shared_ptr<group0_handshaker> handshaker,
std::optional<replace_info>, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
std::optional<replace_info>, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);

// Call during the startup procedure before networking is enabled.
//
Expand All @@ -174,7 +174,7 @@ public:
//
// Cannot be called twice.
//
future<> setup_group0_if_exist(db::system_keyspace&, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> setup_group0_if_exist(db::system_keyspace&, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);

// Call at the end of the startup procedure, after the node entered NORMAL state.
// `setup_group0()` must have finished earlier.
Expand All @@ -183,7 +183,7 @@ public:
//
// If the node has just upgraded, enables a feature listener for the RAFT feature
// which will start a procedure to create group 0 and switch administrative operations to use it.
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);

// If Raft is disabled or in RECOVERY mode, returns `false`.
// Otherwise:
Expand Down Expand Up @@ -282,7 +282,7 @@ private:
future<group0_peer_exchange> peer_exchange(discovery::peer_list peers);

raft_server_for_group create_server_for_group0(raft::group_id id, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm);
service::migration_manager& mm, bool topology_change_enabled);

// Run the discovery algorithm.
//
Expand All @@ -302,10 +302,10 @@ private:
// from places which must not block.
//
// Precondition: the SUPPORTS_RAFT cluster feature is enabled.
future<> upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);

// Blocking part of `upgrade_to_group0`, runs in background.
future<> do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);

// Start a Raft server for the cluster-wide group 0 and join it to the group.
// Called during bootstrap or upgrade.
Expand Down Expand Up @@ -334,7 +334,7 @@ private:
// Preconditions: Raft local feature enabled
// and we haven't initialized group 0 yet after last Scylla start (`joined_group0()` is false).
// Postcondition: `joined_group0()` is true.
future<> join_group0(std::vector<gms::inet_address> seeds, shared_ptr<group0_handshaker> handshaker, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, db::system_keyspace& sys_ks);
future<> join_group0(std::vector<gms::inet_address> seeds, shared_ptr<group0_handshaker> handshaker, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, db::system_keyspace& sys_ks, bool topology_change_enabled);

// Start an existing Raft server for the cluster-wide group 0.
// Assumes the server was already added to the group earlier so we don't attempt to join it again.
Expand All @@ -347,7 +347,7 @@ private:
// XXX: perhaps it would be good to make this function callable multiple times,
// if we want to handle crashes of the group 0 server without crashing the entire Scylla process
// (we could then try restarting the server internally).
future<> start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);

// Make the given server a non-voter in Raft group 0 configuration.
// Retries on raft::commit_status_unknown.
Expand Down

0 comments on commit 5cf18b1

Please sign in to comment.