Skip to content

Commit

Permalink
Merge "raft: (service) implement group 0 as a service" from Kostja
Browse files Browse the repository at this point in the history
"
To ensure consistency of schema and topology changes,
Scylla needs a linearizable storage for this data
available at every member of the database cluster.

The series introduces such storage as a service,
available to all Scylla subsystems. Using this service, any other
internal service such as gossip or migrations (schema) could
persist changes to cluster metadata and expect this to be done in
a consistent, linearizable way.

The series uses the built-in Raft library to implement a
dedicated Raft group, running on shard 0, which includes all
members of the cluster (group 0), adds hooks to topology change
events, such as adding or removing nodes of the cluster, to update
group 0 membership, ensures the group is started when the
server boots.

The state machine for the group, i.e. the actual storage
for cluster-wide information still remains a stub. Extending
it to actually persist changes of schema or token ring
is subject to a subsequent series.

Another Raft related service was implemented earlier: Raft Group
Registry. The purpose of the registry is to allow Scylla have an
arbitrary number of groups, each with its own subset of cluster
members and a relevant state machine, sharing a common transport.
Group 0 is one (the first) group among many.
"

* 'raft-group-0-v12' of github.com:scylladb/scylla-dev:
  raft: (server) improve tracing
  raft: (metrics) fix spelling of waiters_awaken
  raft: make forwarding optional
  raft: (service) manage Raft configuration during topology changes
  raft: (service) break a dependency loop
  raft: (discovery) introduce leader discovery state machine
  system_keyspace: mark scylla_local table as always-sync commitlog
  system_keyspace: persistence for Raft Group 0 id and Raft Server Id
  raft: add a test case for adding entries on follower
  raft: (server) allow adding entries/modify config on a follower
  raft: (test) replace virtual with override in derived class
  raft: (server) fix a typo in exception message
  raft: (server) implement id() helper
  raft: (server) remove apply_dummy_entry()
  raft: (test) fix missing initialization in generator.hh
  • Loading branch information
avikivity committed Nov 30, 2021
2 parents 0d5ac84 + eea82f1 commit 078f69c
Show file tree
Hide file tree
Showing 36 changed files with 1,831 additions and 265 deletions.
8 changes: 8 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ def find_headers(repodir, excluded_dirs):
'test/raft/etcd_test',
'test/raft/raft_sys_table_storage_test',
'test/raft/raft_address_map_test',
'test/raft/discovery_test',
])

apps = set([
Expand Down Expand Up @@ -1018,6 +1019,8 @@ def find_headers(repodir, excluded_dirs):
'service/raft/raft_rpc.cc',
'service/raft/raft_gossip_failure_detector.cc',
'service/raft/raft_group_registry.cc',
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
Expand Down Expand Up @@ -1117,6 +1120,7 @@ def find_headers(repodir, excluded_dirs):
'idl/messaging_service.idl.hh',
'idl/paxos.idl.hh',
'idl/raft.idl.hh',
'idl/group0.idl.hh',
'idl/hinted_handoff.idl.hh',
]

Expand Down Expand Up @@ -1279,6 +1283,10 @@ def find_headers(repodir, excluded_dirs):
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
scylla_core + scylla_tests_generic_dependencies
deps['test/raft/raft_address_map_test'] = ['test/raft/raft_address_map_test.cc'] + scylla_core
deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
'test/raft/helpers.cc',
'test/lib/log.cc',
'service/raft/discovery.cc'] + scylla_raft_dependencies

deps['utils/gz/gen_crc_combine_table'] = ['utils/gz/gen_crc_combine_table.cc']

Expand Down
50 changes: 42 additions & 8 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,8 @@ schema_ptr system_keyspace::large_cells() {
"Scylla specific information about the local node"
);
builder.set_gc_grace_seconds(0);
// Raft Group id and server id updates must be sync
builder.set_wait_for_sync_to_commitlog(true);
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::no);
}();
Expand Down Expand Up @@ -1595,21 +1597,35 @@ template future<> system_keyspace::update_peer_info<sstring>(gms::inet_address e
template future<> system_keyspace::update_peer_info<utils::UUID>(gms::inet_address ep, sstring column_name, utils::UUID);
template future<> system_keyspace::update_peer_info<net::inet_address>(gms::inet_address ep, sstring column_name, net::inet_address);

future<> system_keyspace::set_scylla_local_param(const sstring& key, const sstring& value) {
sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", SCYLLA_LOCAL);
return qctx->execute_cql(req, value, key).discard_result();
template <typename T>
future<> set_scylla_local_param_as(const sstring& key, const T& value) {
sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", system_keyspace::SCYLLA_LOCAL);
auto type = data_type_for<T>();
return qctx->execute_cql(req, type->to_string_impl(data_value(value)), key).discard_result();
}

future<std::optional<sstring>> system_keyspace::get_scylla_local_param(const sstring& key){
sstring req = format("SELECT value FROM system.{} WHERE key = ?", SCYLLA_LOCAL);
return qctx->execute_cql(req, key).then([] (::shared_ptr<cql3::untyped_result_set> res) {
template <typename T>
future<std::optional<T>> get_scylla_local_param_as(const sstring& key) {
sstring req = format("SELECT value FROM system.{} WHERE key = ?", system_keyspace::SCYLLA_LOCAL);
return qctx->execute_cql(req, key).then([] (::shared_ptr<cql3::untyped_result_set> res)
-> future<std::optional<T>> {
if (res->empty() || !res->one().has("value")) {
return std::optional<sstring>();
return make_ready_future<std::optional<T>>(std::optional<T>());
}
return std::optional<sstring>(res->one().get_as<sstring>("value"));
auto type = data_type_for<T>();
return make_ready_future<std::optional<T>>(value_cast<T>(type->deserialize(
type->from_string(res->one().get_as<sstring>("value")))));
});
}

future<> system_keyspace::set_scylla_local_param(const sstring& key, const sstring& value) {
return set_scylla_local_param_as<sstring>(key, value);
}

future<std::optional<sstring>> system_keyspace::get_scylla_local_param(const sstring& key){
return get_scylla_local_param_as<sstring>(key);
}

future<> system_keyspace::update_schema_version(utils::UUID version) {
sstring req = format("INSERT INTO system.{} (key, schema_version) VALUES (?, ?)", LOCAL);
return qctx->execute_cql(req, sstring(LOCAL), version).discard_result();
Expand Down Expand Up @@ -2969,6 +2985,24 @@ future<> system_keyspace::enable_features_on_startup(sharded<gms::feature_servic
}
}

future<utils::UUID> system_keyspace::get_raft_group0_id() {
auto opt = co_await get_scylla_local_param_as<utils::UUID>("raft_group0_id");
co_return opt.value_or<utils::UUID>({});
}

future<utils::UUID> system_keyspace::get_raft_server_id() {
auto opt = co_await get_scylla_local_param_as<utils::UUID>("raft_server_id");
co_return opt.value_or<utils::UUID>({});
}

future<> system_keyspace::set_raft_group0_id(utils::UUID uuid) {
return set_scylla_local_param_as<utils::UUID>("raft_group0_id", uuid);
}

future<> system_keyspace::set_raft_server_id(utils::UUID uuid) {
return set_scylla_local_param_as<utils::UUID>("raft_server_id", uuid);
}

sstring system_keyspace_name() {
return system_keyspace::NAME;
}
Expand Down
15 changes: 14 additions & 1 deletion db/system_keyspace.hh
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public:
static schema_ptr batchlog();
};

static constexpr const char* extra_durable_tables[] = { PAXOS };
static constexpr const char* extra_durable_tables[] = { PAXOS, SCYLLA_LOCAL };

static bool is_extra_durable(const sstring& name);

Expand Down Expand Up @@ -420,6 +420,19 @@ public:
static future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);

static future<> enable_features_on_startup(sharded<gms::feature_service>& feat);

// Load Raft Group 0 id from scylla.local
static future<utils::UUID> get_raft_group0_id();

// Load this server id from scylla.local
static future<utils::UUID> get_raft_server_id();

// Persist Raft Group 0 id. Should be a TIMEUUID.
static future<> set_raft_group0_id(utils::UUID id);

// Called once at fresh server startup to make sure every server
// has a Raft ID
static future<> set_raft_server_id(utils::UUID id);
}; // class system_keyspace

future<> system_keyspace_make(distributed<database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g);
Expand Down
33 changes: 33 additions & 0 deletions idl/group0.idl.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021-present ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

namespace service {

struct group0_info {
raft::group_id group0_id;
raft::server_address addr;
};

struct group0_peer_exchange {
std::variant<std::monostate, service::group0_info, std::vector<raft::server_address>> info;
};

} // namespace raft
8 changes: 8 additions & 0 deletions idl/raft.idl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,12 @@ struct not_a_leader {
raft::internal::tagged_id<raft::server_id_tag> leader;
};

struct commit_status_unknown {
};

struct entry_id {
raft::internal::tagged_uint64<raft::term_tag> term;
raft::internal::tagged_uint64<raft::index_tag> idx;
};

} // namespace raft
27 changes: 12 additions & 15 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -817,11 +817,20 @@ int main(int ac, char** av) {
});
gossiper.invoke_on_all(&gms::gossiper::start).get();

supervisor::notify("starting Raft service");
raft_gr.start(std::ref(messaging), std::ref(gossiper), std::ref(qp)).get();

raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT),
std::ref(messaging), std::ref(gossiper)).get();
// XXX: stop_raft has to happen before query_processor
// is stopped, since some groups keep using the query
// processor until are stopped inside stop_raft.
auto stop_raft = defer_verbose_shutdown("Raft", [&raft_gr] {
raft_gr.stop().get();
});
if (cfg->check_experimental(db::experimental_features_t::RAFT)) {
supervisor::notify("starting Raft Group Registry service");
}
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();

supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
Expand Down Expand Up @@ -1035,18 +1044,6 @@ int main(int ac, char** av) {
proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get();
});

const bool raft_enabled = cfg->check_experimental(db::experimental_features_t::RAFT);
if (raft_enabled) {
supervisor::notify("starting Raft RPC");
raft_gr.invoke_on_all(&service::raft_group_registry::init).get();
}
auto stop_raft_rpc = defer_verbose_shutdown("Raft RPC", [&raft_gr] {
raft_gr.invoke_on_all(&service::raft_group_registry::uninit).get();
});
if (!raft_enabled) {
stop_raft_rpc->cancel();
}

debug::the_stream_manager = &stream_manager;
supervisor::notify("starting streaming service");
stream_manager.start(std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper)).get();
Expand Down Expand Up @@ -1180,7 +1177,7 @@ int main(int ac, char** av) {
}).get();

with_scheduling_group(maintenance_scheduling_group, [&] {
return ss.local().init_server();
return ss.local().init_server(qp.local());
}).get();

gossiper.local().wait_for_gossip_to_settle().get();
Expand Down
52 changes: 52 additions & 0 deletions message/messaging_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "idl/messaging_service.dist.hh"
#include "idl/paxos.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/group0.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
Expand All @@ -87,6 +88,7 @@
#include "idl/messaging_service.dist.impl.hh"
#include "idl/paxos.dist.impl.hh"
#include "idl/raft.dist.impl.hh"
#include "idl/group0.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
#include <seastar/rpc/multi_algo_compressor_factory.hh>
Expand Down Expand Up @@ -441,6 +443,10 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
case messaging_verb::GET_SCHEMA_VERSION:
// Raft peer exchange is mainly running at boot, but still
// should not be blocked by any data requests.
case messaging_verb::GROUP0_PEER_EXCHANGE:
case messaging_verb::GROUP0_MODIFY_CONFIG:
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
Expand Down Expand Up @@ -491,6 +497,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_READ_QUORUM:
case messaging_verb::RAFT_READ_QUORUM_REPLY:
case messaging_verb::RAFT_EXECUTE_READ_BARRIER_ON_LEADER:
case messaging_verb::RAFT_ADD_ENTRY:
case messaging_verb::RAFT_MODIFY_CONFIG:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
Expand Down Expand Up @@ -1412,6 +1420,50 @@ future<raft::read_barrier_reply> messaging_service::send_raft_execute_read_barri
return send_message_timeout<future<raft::read_barrier_reply>>(this, messaging_verb::RAFT_EXECUTE_READ_BARRIER_ON_LEADER, std::move(id), timeout, std::move(gid), std::move(from_id), std::move(dst_id));
}

void messaging_service::register_raft_add_entry(std::function<future<raft::add_entry_reply> (const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, raft::command)>&& func) {
register_handler(this, netw::messaging_verb::RAFT_ADD_ENTRY, std::move(func));
}
future<> messaging_service::unregister_raft_add_entry() {
return unregister_handler(netw::messaging_verb::RAFT_ADD_ENTRY);
}
future<raft::add_entry_reply> messaging_service::send_raft_add_entry(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, const raft::command& cmd) {
return send_message_timeout<raft::add_entry_reply>(this, messaging_verb::RAFT_ADD_ENTRY, std::move(id), timeout, std::move(gid), std::move(from_id), std::move(dst_id), cmd);
}

void messaging_service::register_raft_modify_config(std::function<future<raft::add_entry_reply>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, std::vector<raft::server_address> add, std::vector<raft::server_id> del)>&& func) {
register_handler(this, netw::messaging_verb::RAFT_MODIFY_CONFIG, std::move(func));
}

future<> messaging_service::unregister_raft_modify_config() {
return unregister_handler(netw::messaging_verb::RAFT_MODIFY_CONFIG);
}

future<raft::add_entry_reply> messaging_service::send_raft_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del) {
return send_message_timeout<raft::add_entry_reply>(this, messaging_verb::RAFT_MODIFY_CONFIG, std::move(id), timeout, std::move(gid), std::move(from_id), std::move(dst_id), add, del);
}

void messaging_service::register_group0_peer_exchange(std::function<future<service::group0_peer_exchange>(const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func) {
register_handler(this, netw::messaging_verb::GROUP0_PEER_EXCHANGE, std::move(func));
}
future<> messaging_service::unregister_group0_peer_exchange() {
return unregister_handler(netw::messaging_verb::GROUP0_PEER_EXCHANGE);
}
future<service::group0_peer_exchange> messaging_service::send_group0_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers) {
return send_message_timeout<service::group0_peer_exchange>(this, messaging_verb::GROUP0_PEER_EXCHANGE, std::move(id), timeout, peers);
}

void messaging_service::register_group0_modify_config(std::function<future<>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector<raft::server_address> add, std::vector<raft::server_id> del)>&& func) {
register_handler(this, netw::messaging_verb::GROUP0_MODIFY_CONFIG, std::move(func));
}

future<> messaging_service::unregister_group0_modify_config() {
return unregister_handler(netw::messaging_verb::GROUP0_MODIFY_CONFIG);
}

future<> messaging_service::send_group0_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del) {
return send_message_timeout<void>(this, messaging_verb::GROUP0_MODIFY_CONFIG, std::move(id), timeout, std::move(gid), add, del);
}

void init_messaging_service(sharded<messaging_service>& ms,
messaging_service::config mscfg, netw::messaging_service::scheduling_config scfg, const db::config& db_config) {
using encrypt_what = messaging_service::encrypt_what;
Expand Down
23 changes: 22 additions & 1 deletion message/messaging_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "cache_temperature.hh"
#include "service/paxos/prepare_response.hh"
#include "raft/raft.hh"
#include "service/raft/messaging.hh"

#include <list>
#include <vector>
Expand Down Expand Up @@ -156,7 +157,11 @@ enum class messaging_verb : int32_t {
RAFT_READ_QUORUM = 52,
RAFT_READ_QUORUM_REPLY = 53,
RAFT_EXECUTE_READ_BARRIER_ON_LEADER = 54,
LAST = 55,
RAFT_ADD_ENTRY = 55,
RAFT_MODIFY_CONFIG = 56,
GROUP0_PEER_EXCHANGE = 57,
GROUP0_MODIFY_CONFIG = 58,
LAST = 59,
};

} // namespace netw
Expand Down Expand Up @@ -591,6 +596,22 @@ public:
future<> unregister_raft_execute_read_barrier_on_leader();
future<raft::read_barrier_reply> send_raft_execute_read_barrier_on_leader(msg_addr id, clock_type::time_point timeout, raft::group_id, raft::server_id from_id, raft::server_id dst_id);

void register_raft_add_entry(std::function<future<raft::add_entry_reply> (const rpc::client_info&, rpc::opt_time_point, raft::group_id, raft::server_id from_id, raft::server_id dst_id, raft::command cmd)>&& func);
future<> unregister_raft_add_entry();
future<raft::add_entry_reply> send_raft_add_entry(msg_addr id, clock_type::time_point timeout, raft::group_id, raft::server_id from_id, raft::server_id dst_id, const raft::command& cmd);

void register_raft_modify_config(std::function<future<raft::add_entry_reply>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, std::vector<raft::server_address> add, std::vector<raft::server_id> del)>&& func);
future<> unregister_raft_modify_config();
future<raft::add_entry_reply> send_raft_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id from_id, raft::server_id dst_id, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del);

void register_group0_peer_exchange(std::function<future<service::group0_peer_exchange> (const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func);
future<> unregister_group0_peer_exchange();
future<service::group0_peer_exchange> send_group0_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers);

void register_group0_modify_config(std::function<future<>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector<raft::server_address> add, std::vector<raft::server_id> del)>&& func);
future<> unregister_group0_modify_config();
future<> send_group0_modify_config(msg_addr id, clock_type::time_point timeout, raft::group_id gid, const std::vector<raft::server_address>& add, const std::vector<raft::server_id>& del);

void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
Expand Down
2 changes: 1 addition & 1 deletion raft/fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ future<> fsm::wait_max_log_size() {
}

const configuration& fsm::get_configuration() const {
check_is_leader();
return _log.get_configuration();
}

Expand Down Expand Up @@ -538,6 +537,7 @@ void fsm::tick_leader() {
logger.trace("tick[{}]: stepdown is active", _my_id);
auto me = leader_state().tracker.find(_my_id);
if (me == nullptr || !me->can_vote) {
logger.trace("tick[{}]: not aborting stepdown because we have been removed from the configuration", _my_id);
// Do not abort stepdown if not part of the current
// config or non voting member since the node cannot
// be a leader any longer
Expand Down
Loading

0 comments on commit 078f69c

Please sign in to comment.