Skip to content

Commit

Permalink
Merge '[Backport 5.4] Raft snapshot fixes' from Kamil Braun
Browse files Browse the repository at this point in the history
Backports required to fix #16683 in 5.4:
- add an API to trigger Raft snapshot
- use the API when we restart and see that the existing snapshot is at index 0, to trigger a new one --- in order to fix broken deployments that already bootstrapped with index-0 snapshot (we may get such deployments by upgrading from 5.2)

Closes #17123

* github.com:scylladb/scylladb:
  test_raft_snapshot_request: fix flakiness (again)
  test_raft_snapshot_request: fix flakiness
  Merge 'raft_group0: trigger snapshot if existing snapshot index is 0' from Kamil Braun
  Merge 'Add an API to trigger snapshot in Raft servers' from Kamil Braun
  • Loading branch information
denesb committed Feb 7, 2024
2 parents 8398f36 + 311e31b commit 8080c15
Show file tree
Hide file tree
Showing 20 changed files with 710 additions and 201 deletions.
43 changes: 43 additions & 0 deletions api/api-doc/raft.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"apiVersion":"0.0.1",
"swaggerVersion":"1.2",
"basePath":"{{Protocol}}://{{Host}}",
"resourcePath":"/raft",
"produces":[
"application/json"
],
"apis":[
{
"path":"/raft/trigger_snapshot/{group_id}",
"operations":[
{
"method":"POST",
"summary":"Triggers snapshot creation and log truncation for the given Raft group",
"type":"string",
"nickname":"trigger_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"group_id",
"description":"The ID of the group which should get snapshotted",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"timeout",
"description":"Timeout in seconds after which the endpoint returns a failure. If not provided, 60s is used.",
"required":false,
"allowMultiple":false,
"type":"long",
"paramType":"query"
}
]
}
]
}
]
}
13 changes: 13 additions & 0 deletions api/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "api/config.hh"
#include "task_manager.hh"
#include "task_manager_test.hh"
#include "raft.hh"

logging::logger apilog("api");

Expand Down Expand Up @@ -294,6 +295,18 @@ future<> set_server_task_manager_test(http_context& ctx) {

#endif

future<> set_server_raft(http_context& ctx, sharded<service::raft_group_registry>& raft_gr) {
auto rb = std::make_shared<api_registry_builder>(ctx.api_doc);
return ctx.http_server.set_routes([rb, &ctx, &raft_gr] (routes& r) {
rb->register_function(r, "raft", "The Raft API");
set_raft(ctx, r, raft_gr);
});
}

future<> unset_server_raft(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); });
}

void req_params::process(const request& req) {
// Process mandatory parameters
for (auto& [name, ent] : params) {
Expand Down
3 changes: 3 additions & 0 deletions api/api_init.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class load_meter;
class storage_proxy;
class storage_service;
class raft_group0_client;
class raft_group_registry;

} // namespace service

Expand Down Expand Up @@ -117,5 +118,7 @@ future<> set_server_compaction_manager(http_context& ctx);
future<> set_server_done(http_context& ctx);
future<> set_server_task_manager(http_context& ctx, lw_shared_ptr<db::config> cfg);
future<> set_server_task_manager_test(http_context& ctx);
future<> set_server_raft(http_context&, sharded<service::raft_group_registry>&);
future<> unset_server_raft(http_context&);

}
70 changes: 70 additions & 0 deletions api/raft.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include <seastar/core/coroutine.hh>

#include "api/api.hh"
#include "api/api-doc/raft.json.hh"

#include "service/raft/raft_group_registry.hh"

using namespace seastar::httpd;

extern logging::logger apilog;

namespace api {

namespace r = httpd::raft_json;
using namespace json;

void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr) {
r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
raft::group_id gid{utils::UUID{req->param["group_id"]}};
auto timeout_dur = std::invoke([timeout_str = req->get_query_param("timeout")] {
if (timeout_str.empty()) {
return std::chrono::seconds{60};
}
auto dur = std::stoll(timeout_str);
if (dur <= 0) {
throw std::runtime_error{"Timeout must be a positive number."};
}
return std::chrono::seconds{dur};
});

std::atomic<bool> found_srv{false};
co_await raft_gr.invoke_on_all([gid, timeout_dur, &found_srv] (service::raft_group_registry& raft_gr) -> future<> {
auto* srv = raft_gr.find_server(gid);
if (!srv) {
co_return;
}

found_srv = true;
abort_on_expiry aoe(lowres_clock::now() + timeout_dur);
apilog.info("Triggering Raft group {} snapshot", gid);
auto result = co_await srv->trigger_snapshot(&aoe.abort_source());
if (result) {
apilog.info("New snapshot for Raft group {} created", gid);
} else {
apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid);
}
});

if (!found_srv) {
throw std::runtime_error{fmt::format("Server for group ID {} not found", gid)};
}

co_return json_void{};
});
}

void unset_raft(http_context&, httpd::routes& r) {
r::trigger_snapshot.unset(r);
}

}

18 changes: 18 additions & 0 deletions api/raft.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#pragma once

#include "api_init.hh"

namespace api {

void set_raft(http_context& ctx, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr);
void unset_raft(http_context& ctx, httpd::routes& r);

}
2 changes: 2 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,8 @@ def find_ninja():
Json2Code('api/api-doc/error_injection.json'),
'api/authorization_cache.cc',
Json2Code('api/api-doc/authorization_cache.json'),
'api/raft.cc',
Json2Code('api/api-doc/raft.json'),
]

alternator = [
Expand Down
7 changes: 7 additions & 0 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1402,13 +1402,19 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sst_format_listener.stop().get();
});

std::any stop_raft_api;
if (raft_gr.local().is_enabled()) {
if (!db.local().uses_schema_commitlog()) {
startlog.error("Bad configuration: consistent_cluster_management requires schema commit log to be enabled");
throw bad_configuration_error();
}
supervisor::notify("starting Raft Group Registry service");
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();

api::set_server_raft(ctx, raft_gr).get();
stop_raft_api = defer_verbose_shutdown("Raft API", [&ctx] {
api::unset_server_raft(ctx).get();
});
} else {
if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
startlog.error("Bad configuration: RAFT feature has to be enabled if BROADCAST_TABLES is enabled");
Expand All @@ -1419,6 +1425,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
throw bad_configuration_error();
}
}

group0_client.init().get();

// schema migration, if needed, is also done on shard 0
Expand Down
32 changes: 10 additions & 22 deletions raft/fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ leader::~leader() {
}

fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config config) :
index_t commit_idx, failure_detector& failure_detector, fsm_config config,
seastar::condition_variable& sm_events) :
_my_id(id), _current_term(current_term), _voted_for(voted_for),
_log(std::move(log)), _failure_detector(failure_detector), _config(config) {
_log(std::move(log)), _failure_detector(failure_detector), _config(config), _sm_events(sm_events) {
if (id == raft::server_id{}) {
throw std::invalid_argument("raft::fsm: raft instance cannot have id zero");
}
Expand All @@ -41,10 +42,6 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
}
}

fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}

future<semaphore_units<>> fsm::wait_for_memory_permit(seastar::abort_source* as, size_t size) {
check_is_leader();

Expand Down Expand Up @@ -303,23 +300,15 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
}
}

future<fsm_output> fsm::poll_output() {
logger.trace("fsm::poll_output() {} stable index: {} last index: {}",
bool fsm::has_output() const {
logger.trace("fsm::has_output() {} stable index: {} last index: {}",
_my_id, _log.stable_idx(), _log.last_idx());

while (true) {
auto diff = _log.last_idx() - _log.stable_idx();
auto diff = _log.last_idx() - _log.stable_idx();

if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum ||
(is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty() || _output.state_changed) {
break;
}
co_await _sm_events.wait();
}
while (utils::get_local_injector().enter("fsm::poll_output/pause")) {
co_await seastar::sleep(std::chrono::milliseconds(100));
}
co_return get_output();
return diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum
|| (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty()
|| _output.state_changed;
}

fsm_output fsm::get_output() {
Expand Down Expand Up @@ -1029,7 +1018,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, s
// If the snapshot is local, _commit_idx is larger than snp.idx.
// Otherwise snp.idx becomes the new commit index.
_commit_idx = std::max(_commit_idx, snp.idx);
_output.snp.emplace(fsm_output::applied_snapshot{snp, local});
_output.snp.emplace(fsm_output::applied_snapshot{snp, local, max_trailing_entries});
size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes);
if (is_leader()) {
logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units);
Expand Down Expand Up @@ -1142,7 +1131,6 @@ void fsm::stop() {
// (in particular, abort waits on log_limiter_semaphore and prevent new ones).
become_follower({});
}
_sm_events.broken();
}

std::ostream& operator<<(std::ostream& os, const fsm& f) {
Expand Down
40 changes: 18 additions & 22 deletions raft/fsm.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ struct fsm_output {
struct applied_snapshot {
snapshot_descriptor snp;
bool is_local;

// Always 0 for non-local snapshots.
size_t max_trailing_entries;

// FIXME: include max_trailing_bytes here and in store_snapshot_descriptor
};
std::optional<std::pair<term_t, server_id>> term_and_vote;
std::vector<log_entry_ptr> log_entries;
Expand All @@ -41,14 +46,6 @@ struct fsm_output {
bool state_changed = false;
// Set to true if a leadership transfer was aborted since the last output
bool abort_leadership_transfer;

// True if there is no new output
bool empty() const {
return !term_and_vote &&
log_entries.size() == 0 && messages.size() == 0 &&
committed.size() == 0 && !snp && snps_to_drop.empty() &&
!configuration && !max_read_id_with_quorum;
}
};

struct fsm_config {
Expand Down Expand Up @@ -141,9 +138,13 @@ struct leader {
// in-memory state machine with a catch-all API step(message)
// method. The method handles any kind of input and performs the
// needed state machine state transitions. To get state machine output
// poll_output() function has to be called. This call produces an output
// get_output() function has to be called. To check first if
// any new output is present, call has_output(). To wait for new
// new output events, use the sm_events condition variable passed
// to fsm constructor; fs` signals it each time new output may appear.
// The get_output() call produces an output
// object, which encapsulates a list of actions that must be
// performed until the next poll_output() call can be made. The time is
// performed until the next get_output() call can be made. The time is
// represented with a logical timer. The client is responsible for
// periodically invoking tick() method, which advances the state
// machine time and allows it to track such events as election or
Expand Down Expand Up @@ -231,7 +232,7 @@ private:
std::vector<std::pair<server_id, rpc_message>> _messages;

// Signaled when there is a IO event to process.
seastar::condition_variable _sm_events;
seastar::condition_variable& _sm_events;

// Called when one of the replicas advances its match index
// so it may be the case that some entries are committed now.
Expand Down Expand Up @@ -343,10 +344,8 @@ protected: // For testing

public:
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config conf);

explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config conf);
index_t commit_idx, failure_detector& failure_detector, fsm_config conf,
seastar::condition_variable& sm_events);

bool is_leader() const {
return std::holds_alternative<leader>(_state);
Expand Down Expand Up @@ -414,12 +413,9 @@ public:
// committed to the persistent Raft log afterwards.
template<typename T> const log_entry& add_entry(T command);

// Wait until there is, and return state machine output that
// needs to be handled.
// This includes a list of the entries that need
// to be logged. The logged entries are eventually
// discarded from the state machine after applying a snapshot.
future<fsm_output> poll_output();
// Check if there is any state machine output
// that `get_output()` will return.
bool has_output() const;

// Get state machine output, if there is any. Doesn't
// wait. It is public for use in testing.
Expand All @@ -432,7 +428,7 @@ public:

// Feed one Raft RPC message into the state machine.
// Advances the state machine state and generates output,
// accessible via poll_output().
// accessible via get_output().
template <typename Message>
void step(server_id from, Message&& msg);

Expand Down

0 comments on commit 8080c15

Please sign in to comment.