Skip to content

Commit

Permalink
Merge '[Backport 5.2] Raft snapshot fixes' from Kamil Braun
Browse files Browse the repository at this point in the history
Backports required to fix #16683 in 5.2:
- when creating first group 0 server, create a snapshot with non-empty ID, and start it at index 1 instead of 0 to force snapshot transfer to servers that join group 0
- add an API to trigger Raft snapshot
- use the API when we restart and see that the existing snapshot is at index 0, to trigger a new one --- in order to fix broken deployments that already bootstrapped with index-0 snapshot.

Closes #17087

* github.com:scylladb/scylladb:
  test_raft_snapshot_request: fix flakiness (again)
  test_raft_snapshot_request: fix flakiness
  Merge 'raft_group0: trigger snapshot if existing snapshot index is 0' from Kamil Braun
  Merge 'Add an API to trigger snapshot in Raft servers' from Kamil Braun
  raft: server: add workaround for #12972
  raft: Store snapshot update and truncate log atomically
  service: raft: force initial snapshot transfer in new cluster
  raft_sys_table_storage: give initial snapshot a non zero value
  • Loading branch information
denesb committed Feb 7, 2024
2 parents 4546d07 + 4e257c5 commit 9291eaf
Show file tree
Hide file tree
Showing 22 changed files with 704 additions and 213 deletions.
43 changes: 43 additions & 0 deletions api/api-doc/raft.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"apiVersion":"0.0.1",
"swaggerVersion":"1.2",
"basePath":"{{Protocol}}://{{Host}}",
"resourcePath":"/raft",
"produces":[
"application/json"
],
"apis":[
{
"path":"/raft/trigger_snapshot/{group_id}",
"operations":[
{
"method":"POST",
"summary":"Triggers snapshot creation and log truncation for the given Raft group",
"type":"string",
"nickname":"trigger_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"group_id",
"description":"The ID of the group which should get snapshotted",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"timeout",
"description":"Timeout in seconds after which the endpoint returns a failure. If not provided, 60s is used.",
"required":false,
"allowMultiple":false,
"type":"long",
"paramType":"query"
}
]
}
]
}
]
}
13 changes: 13 additions & 0 deletions api/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "api/config.hh"
#include "task_manager.hh"
#include "task_manager_test.hh"
#include "raft.hh"

logging::logger apilog("api");

Expand Down Expand Up @@ -277,6 +278,18 @@ future<> set_server_task_manager_test(http_context& ctx, lw_shared_ptr<db::confi

#endif

future<> set_server_raft(http_context& ctx, sharded<service::raft_group_registry>& raft_gr) {
auto rb = std::make_shared<api_registry_builder>(ctx.api_doc);
return ctx.http_server.set_routes([rb, &ctx, &raft_gr] (routes& r) {
rb->register_function(r, "raft", "The Raft API");
set_raft(ctx, r, raft_gr);
});
}

future<> unset_server_raft(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); });
}

void req_params::process(const request& req) {
// Process mandatory parameters
for (auto& [name, ent] : params) {
Expand Down
3 changes: 3 additions & 0 deletions api/api_init.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace service {
class load_meter;
class storage_proxy;
class storage_service;
class raft_group_registry;

} // namespace service

Expand Down Expand Up @@ -116,5 +117,7 @@ future<> set_server_compaction_manager(http_context& ctx);
future<> set_server_done(http_context& ctx);
future<> set_server_task_manager(http_context& ctx);
future<> set_server_task_manager_test(http_context& ctx, lw_shared_ptr<db::config> cfg);
future<> set_server_raft(http_context&, sharded<service::raft_group_registry>&);
future<> unset_server_raft(http_context&);

}
70 changes: 70 additions & 0 deletions api/raft.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include <seastar/core/coroutine.hh>

#include "api/api.hh"
#include "api/api-doc/raft.json.hh"

#include "service/raft/raft_group_registry.hh"

using namespace seastar::httpd;

extern logging::logger apilog;

namespace api {

namespace r = httpd::raft_json;
using namespace json;

void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr) {
r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
raft::group_id gid{utils::UUID{req->param["group_id"]}};
auto timeout_dur = std::invoke([timeout_str = req->get_query_param("timeout")] {
if (timeout_str.empty()) {
return std::chrono::seconds{60};
}
auto dur = std::stoll(timeout_str);
if (dur <= 0) {
throw std::runtime_error{"Timeout must be a positive number."};
}
return std::chrono::seconds{dur};
});

std::atomic<bool> found_srv{false};
co_await raft_gr.invoke_on_all([gid, timeout_dur, &found_srv] (service::raft_group_registry& raft_gr) -> future<> {
auto* srv = raft_gr.find_server(gid);
if (!srv) {
co_return;
}

found_srv = true;
abort_on_expiry aoe(lowres_clock::now() + timeout_dur);
apilog.info("Triggering Raft group {} snapshot", gid);
auto result = co_await srv->trigger_snapshot(&aoe.abort_source());
if (result) {
apilog.info("New snapshot for Raft group {} created", gid);
} else {
apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid);
}
});

if (!found_srv) {
throw std::runtime_error{fmt::format("Server for group ID {} not found", gid)};
}

co_return json_void{};
});
}

void unset_raft(http_context&, httpd::routes& r) {
r::trigger_snapshot.unset(r);
}

}

18 changes: 18 additions & 0 deletions api/raft.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#pragma once

#include "api_init.hh"

namespace api {

void set_raft(http_context& ctx, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr);
void unset_raft(http_context& ctx, httpd::routes& r);

}
2 changes: 2 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,8 @@ def find_headers(repodir, excluded_dirs):
Json2Code('api/api-doc/error_injection.json'),
'api/authorization_cache.cc',
Json2Code('api/api-doc/authorization_cache.json'),
'api/raft.cc',
Json2Code('api/api-doc/raft.json'),
]

alternator = [
Expand Down
7 changes: 6 additions & 1 deletion main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1164,20 +1164,25 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
cfg->host_id = sys_ks.local().load_local_host_id().get0();

std::any stop_raft_api;
if (raft_gr.local().is_enabled()) {
auto my_raft_id = raft::server_id{cfg->host_id.uuid()};
supervisor::notify("starting Raft Group Registry service");
raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) {
return raft_gr.start(my_raft_id);
}).get();

api::set_server_raft(ctx, raft_gr).get();
stop_raft_api = defer_verbose_shutdown("Raft API", [&ctx] {
api::unset_server_raft(ctx).get();
});
} else {
if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
startlog.error("Bad configuration: RAFT feature has to be enabled if BROADCAST_TABLES is enabled");
throw bad_configuration_error();
}
}


group0_client.init().get();

db::sstables_format_selector sst_format_selector(gossiper.local(), feature_service, db);
Expand Down
28 changes: 9 additions & 19 deletions raft/fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ leader::~leader() {
}

fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config config) :
index_t commit_idx, failure_detector& failure_detector, fsm_config config,
seastar::condition_variable& sm_events) :
_my_id(id), _current_term(current_term), _voted_for(voted_for),
_log(std::move(log)), _failure_detector(failure_detector), _config(config) {
_log(std::move(log)), _failure_detector(failure_detector), _config(config), _sm_events(sm_events) {
if (id == raft::server_id{}) {
throw std::invalid_argument("raft::fsm: raft instance cannot have id zero");
}
Expand All @@ -41,10 +42,6 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
}
}

fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}

future<semaphore_units<>> fsm::wait_for_memory_permit(seastar::abort_source* as, size_t size) {
check_is_leader();

Expand Down Expand Up @@ -296,20 +293,14 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
}
}

future<fsm_output> fsm::poll_output() {
logger.trace("fsm::poll_output() {} stable index: {} last index: {}",
bool fsm::has_output() const {
logger.trace("fsm::has_output() {} stable index: {} last index: {}",
_my_id, _log.stable_idx(), _log.last_idx());

while (true) {
auto diff = _log.last_idx() - _log.stable_idx();
auto diff = _log.last_idx() - _log.stable_idx();

if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum ||
(is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty()) {
break;
}
co_await _sm_events.wait();
}
co_return get_output();
return diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum
|| (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty();
}

fsm_output fsm::get_output() {
Expand Down Expand Up @@ -1019,7 +1010,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, s
// If the snapshot is local, _commit_idx is larger than snp.idx.
// Otherwise snp.idx becomes the new commit index.
_commit_idx = std::max(_commit_idx, snp.idx);
_output.snp.emplace(fsm_output::applied_snapshot{snp, local});
_output.snp.emplace(fsm_output::applied_snapshot{snp, local, max_trailing_entries});
size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes);
if (is_leader()) {
logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units);
Expand Down Expand Up @@ -1132,7 +1123,6 @@ void fsm::stop() {
// (in particular, abort waits on log_limiter_semaphore and prevent new ones).
become_follower({});
}
_sm_events.broken();
}

std::ostream& operator<<(std::ostream& os, const fsm& f) {
Expand Down
40 changes: 18 additions & 22 deletions raft/fsm.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ struct fsm_output {
struct applied_snapshot {
snapshot_descriptor snp;
bool is_local;

// Always 0 for non-local snapshots.
size_t max_trailing_entries;

// FIXME: include max_trailing_bytes here and in store_snapshot_descriptor
};
std::optional<std::pair<term_t, server_id>> term_and_vote;
std::vector<log_entry_ptr> log_entries;
Expand All @@ -36,14 +41,6 @@ struct fsm_output {
std::optional<read_id> max_read_id_with_quorum;
// Set to true if a leadership transfer was aborted since the last output
bool abort_leadership_transfer;

// True if there is no new output
bool empty() const {
return !term_and_vote &&
log_entries.size() == 0 && messages.size() == 0 &&
committed.size() == 0 && !snp && snps_to_drop.empty() &&
!configuration;
}
};

struct fsm_config {
Expand Down Expand Up @@ -136,9 +133,13 @@ struct leader {
// in-memory state machine with a catch-all API step(message)
// method. The method handles any kind of input and performs the
// needed state machine state transitions. To get state machine output
// poll_output() function has to be called. This call produces an output
// get_output() function has to be called. To check first if
// any new output is present, call has_output(). To wait for new
// new output events, use the sm_events condition variable passed
// to fsm constructor; fs` signals it each time new output may appear.
// The get_output() call produces an output
// object, which encapsulates a list of actions that must be
// performed until the next poll_output() call can be made. The time is
// performed until the next get_output() call can be made. The time is
// represented with a logical timer. The client is responsible for
// periodically invoking tick() method, which advances the state
// machine time and allows it to track such events as election or
Expand Down Expand Up @@ -226,7 +227,7 @@ private:
std::vector<std::pair<server_id, rpc_message>> _messages;

// Signaled when there is a IO event to process.
seastar::condition_variable _sm_events;
seastar::condition_variable& _sm_events;

// Called when one of the replicas advances its match index
// so it may be the case that some entries are committed now.
Expand Down Expand Up @@ -338,10 +339,8 @@ protected: // For testing

public:
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config conf);

explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config conf);
index_t commit_idx, failure_detector& failure_detector, fsm_config conf,
seastar::condition_variable& sm_events);

bool is_leader() const {
return std::holds_alternative<leader>(_state);
Expand Down Expand Up @@ -409,12 +408,9 @@ public:
// committed to the persistent Raft log afterwards.
template<typename T> const log_entry& add_entry(T command);

// Wait until there is, and return state machine output that
// needs to be handled.
// This includes a list of the entries that need
// to be logged. The logged entries are eventually
// discarded from the state machine after applying a snapshot.
future<fsm_output> poll_output();
// Check if there is any state machine output
// that `get_output()` will return.
bool has_output() const;

// Get state machine output, if there is any. Doesn't
// wait. It is public for use in testing.
Expand All @@ -427,7 +423,7 @@ public:

// Feed one Raft RPC message into the state machine.
// Advances the state machine state and generates output,
// accessible via poll_output().
// accessible via get_output().
template <typename Message>
void step(server_id from, Message&& msg);

Expand Down
12 changes: 12 additions & 0 deletions raft/raft.hh
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,18 @@ public:
// apply call 'state_machine::load_snapshot(snapshot::id)'
// Called during Raft server initialization only, should not
// run in parallel with store.
//
// If you want to create a Raft cluster with a non-empty state
// machine, so that joining servers always receive a snapshot,
// you should:
// - make sure that members of the initial configuration have
// the same state machine state,
// - set the initial snapshot index on members of the initial
// configuration to 1,
// - set the initial snapshot index on all subsequently joining
// servers to 0.
// This also works if you start with an empty state machine,
// so consider it as the go-to default.
virtual future<snapshot_descriptor> load_snapshot_descriptor() = 0;

// Persist given log entries.
Expand Down

0 comments on commit 9291eaf

Please sign in to comment.