Skip to content

Commit

Permalink
Merge 'Add an API to trigger snapshot in Raft servers' from Kamil Braun
Browse files Browse the repository at this point in the history
This allows the user of `raft::server` to cause it to create a snapshot
and truncate the Raft log (leaving no trailing entries; in the future we
may extend the API to specify number of trailing entries left if
needed). In a later commit we'll add a REST endpoint to Scylla to
trigger group 0 snapshots.

One use case for this API is to create group 0 snapshots in Scylla
deployments which upgraded to Raft in version 5.2 and started with an
empty Raft log with no snapshot at the beginning. This causes problems,
e.g. when a new node bootstraps to the cluster, it will not receive a
snapshot that would contain both schema and group 0 history, which would
then lead to inconsistent schema state and trigger assertion failures as
observed in scylladb#16683.

In 5.4 the logic of initial group 0 setup was changed to start the Raft
log with a snapshot at index 1 (ff386e7)
but a problem remains with these existing deployments coming from 5.2,
we need a way to trigger a snapshot in them (other than performing 1000
arbitrary schema changes).

Another potential use case in the future would be to trigger snapshots
based on external memory pressure in tablet Raft groups (for strongly
consistent tables).

The PR adds the API to `raft::server` and a HTTP endpoint that uses it.

In a follow-up PR, we plan to modify group 0 server startup logic to automatically
call this API if it sees that no snapshot is present yet (to automatically
fix the aforementioned 5.2 deployments once they upgrade.)

Closes scylladb#16816

* github.com:scylladb/scylladb:
  raft: remove `empty()` from `fsm_output`
  test: add test for manual triggering of Raft snapshots
  api: add HTTP endpoint to trigger Raft snapshots
  raft: server: add `trigger_snapshot` API
  raft: server: track last persisted snapshot descriptor index
  raft: server: framework for handling server requests
  raft: server: inline `poll_fsm_output`
  raft: server: fix indentation
  raft: server: move `io_fiber`'s processing of `batch` to a separate function
  raft: move `poll_output()` from `fsm` to `server`
  raft: move `_sm_events` from `fsm` to `server`
  raft: fsm: remove constructor used only in tests
  raft: fsm: move trace message from `poll_output` to `has_output`
  raft: fsm: extract `has_output()`
  raft: pass `max_trailing_entries` through `fsm_output` to `store_snapshot_descriptor`
  raft: server: pass `*_aborted` to `set_exception` call

(cherry picked from commit d202d32)

Backport note: the HTTP API is only started if raft_group_registry is
started.
  • Loading branch information
denesb authored and kbr-scylla committed Feb 2, 2024
1 parent e83c4cc commit 26b8120
Show file tree
Hide file tree
Showing 17 changed files with 597 additions and 200 deletions.
43 changes: 43 additions & 0 deletions api/api-doc/raft.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"apiVersion":"0.0.1",
"swaggerVersion":"1.2",
"basePath":"{{Protocol}}://{{Host}}",
"resourcePath":"/raft",
"produces":[
"application/json"
],
"apis":[
{
"path":"/raft/trigger_snapshot/{group_id}",
"operations":[
{
"method":"POST",
"summary":"Triggers snapshot creation and log truncation for the given Raft group",
"type":"string",
"nickname":"trigger_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"group_id",
"description":"The ID of the group which should get snapshotted",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"timeout",
"description":"Timeout in seconds after which the endpoint returns a failure. If not provided, 60s is used.",
"required":false,
"allowMultiple":false,
"type":"long",
"paramType":"query"
}
]
}
]
}
]
}
13 changes: 13 additions & 0 deletions api/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "api/config.hh"
#include "task_manager.hh"
#include "task_manager_test.hh"
#include "raft.hh"

logging::logger apilog("api");

Expand Down Expand Up @@ -294,6 +295,18 @@ future<> set_server_task_manager_test(http_context& ctx) {

#endif

future<> set_server_raft(http_context& ctx, sharded<service::raft_group_registry>& raft_gr) {
auto rb = std::make_shared<api_registry_builder>(ctx.api_doc);
return ctx.http_server.set_routes([rb, &ctx, &raft_gr] (routes& r) {
rb->register_function(r, "raft", "The Raft API");
set_raft(ctx, r, raft_gr);
});
}

future<> unset_server_raft(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); });
}

void req_params::process(const request& req) {
// Process mandatory parameters
for (auto& [name, ent] : params) {
Expand Down
3 changes: 3 additions & 0 deletions api/api_init.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class load_meter;
class storage_proxy;
class storage_service;
class raft_group0_client;
class raft_group_registry;

} // namespace service

Expand Down Expand Up @@ -117,5 +118,7 @@ future<> set_server_compaction_manager(http_context& ctx);
future<> set_server_done(http_context& ctx);
future<> set_server_task_manager(http_context& ctx, lw_shared_ptr<db::config> cfg);
future<> set_server_task_manager_test(http_context& ctx);
future<> set_server_raft(http_context&, sharded<service::raft_group_registry>&);
future<> unset_server_raft(http_context&);

}
70 changes: 70 additions & 0 deletions api/raft.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include <seastar/core/coroutine.hh>

#include "api/api.hh"
#include "api/api-doc/raft.json.hh"

#include "service/raft/raft_group_registry.hh"

using namespace seastar::httpd;

extern logging::logger apilog;

namespace api {

namespace r = httpd::raft_json;
using namespace json;

void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr) {
r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
raft::group_id gid{utils::UUID{req->param["group_id"]}};
auto timeout_dur = std::invoke([timeout_str = req->get_query_param("timeout")] {
if (timeout_str.empty()) {
return std::chrono::seconds{60};
}
auto dur = std::stoll(timeout_str);
if (dur <= 0) {
throw std::runtime_error{"Timeout must be a positive number."};
}
return std::chrono::seconds{dur};
});

std::atomic<bool> found_srv{false};
co_await raft_gr.invoke_on_all([gid, timeout_dur, &found_srv] (service::raft_group_registry& raft_gr) -> future<> {
auto* srv = raft_gr.find_server(gid);
if (!srv) {
co_return;
}

found_srv = true;
abort_on_expiry aoe(lowres_clock::now() + timeout_dur);
apilog.info("Triggering Raft group {} snapshot", gid);
auto result = co_await srv->trigger_snapshot(&aoe.abort_source());
if (result) {
apilog.info("New snapshot for Raft group {} created", gid);
} else {
apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid);
}
});

if (!found_srv) {
throw std::runtime_error{fmt::format("Server for group ID {} not found", gid)};
}

co_return json_void{};
});
}

void unset_raft(http_context&, httpd::routes& r) {
r::trigger_snapshot.unset(r);
}

}

18 changes: 18 additions & 0 deletions api/raft.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#pragma once

#include "api_init.hh"

namespace api {

void set_raft(http_context& ctx, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr);
void unset_raft(http_context& ctx, httpd::routes& r);

}
2 changes: 2 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,8 @@ def find_ninja():
Json2Code('api/api-doc/error_injection.json'),
'api/authorization_cache.cc',
Json2Code('api/api-doc/authorization_cache.json'),
'api/raft.cc',
Json2Code('api/api-doc/raft.json'),
]

alternator = [
Expand Down
7 changes: 7 additions & 0 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1402,13 +1402,19 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sst_format_listener.stop().get();
});

std::any stop_raft_api;
if (raft_gr.local().is_enabled()) {
if (!db.local().uses_schema_commitlog()) {
startlog.error("Bad configuration: consistent_cluster_management requires schema commit log to be enabled");
throw bad_configuration_error();
}
supervisor::notify("starting Raft Group Registry service");
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();

api::set_server_raft(ctx, raft_gr).get();
stop_raft_api = defer_verbose_shutdown("Raft API", [&ctx] {
api::unset_server_raft(ctx).get();
});
} else {
if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
startlog.error("Bad configuration: RAFT feature has to be enabled if BROADCAST_TABLES is enabled");
Expand All @@ -1419,6 +1425,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
throw bad_configuration_error();
}
}

group0_client.init().get();

// schema migration, if needed, is also done on shard 0
Expand Down
32 changes: 10 additions & 22 deletions raft/fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ leader::~leader() {
}

fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config config) :
index_t commit_idx, failure_detector& failure_detector, fsm_config config,
seastar::condition_variable& sm_events) :
_my_id(id), _current_term(current_term), _voted_for(voted_for),
_log(std::move(log)), _failure_detector(failure_detector), _config(config) {
_log(std::move(log)), _failure_detector(failure_detector), _config(config), _sm_events(sm_events) {
if (id == raft::server_id{}) {
throw std::invalid_argument("raft::fsm: raft instance cannot have id zero");
}
Expand All @@ -41,10 +42,6 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
}
}

fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}

future<semaphore_units<>> fsm::wait_for_memory_permit(seastar::abort_source* as, size_t size) {
check_is_leader();

Expand Down Expand Up @@ -303,23 +300,15 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
}
}

future<fsm_output> fsm::poll_output() {
logger.trace("fsm::poll_output() {} stable index: {} last index: {}",
bool fsm::has_output() const {
logger.trace("fsm::has_output() {} stable index: {} last index: {}",
_my_id, _log.stable_idx(), _log.last_idx());

while (true) {
auto diff = _log.last_idx() - _log.stable_idx();
auto diff = _log.last_idx() - _log.stable_idx();

if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum ||
(is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty() || _output.state_changed) {
break;
}
co_await _sm_events.wait();
}
while (utils::get_local_injector().enter("fsm::poll_output/pause")) {
co_await seastar::sleep(std::chrono::milliseconds(100));
}
co_return get_output();
return diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum
|| (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty()
|| _output.state_changed;
}

fsm_output fsm::get_output() {
Expand Down Expand Up @@ -1029,7 +1018,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, s
// If the snapshot is local, _commit_idx is larger than snp.idx.
// Otherwise snp.idx becomes the new commit index.
_commit_idx = std::max(_commit_idx, snp.idx);
_output.snp.emplace(fsm_output::applied_snapshot{snp, local});
_output.snp.emplace(fsm_output::applied_snapshot{snp, local, max_trailing_entries});
size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes);
if (is_leader()) {
logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units);
Expand Down Expand Up @@ -1142,7 +1131,6 @@ void fsm::stop() {
// (in particular, abort waits on log_limiter_semaphore and prevent new ones).
become_follower({});
}
_sm_events.broken();
}

std::ostream& operator<<(std::ostream& os, const fsm& f) {
Expand Down
40 changes: 18 additions & 22 deletions raft/fsm.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ struct fsm_output {
struct applied_snapshot {
snapshot_descriptor snp;
bool is_local;

// Always 0 for non-local snapshots.
size_t max_trailing_entries;

// FIXME: include max_trailing_bytes here and in store_snapshot_descriptor
};
std::optional<std::pair<term_t, server_id>> term_and_vote;
std::vector<log_entry_ptr> log_entries;
Expand All @@ -41,14 +46,6 @@ struct fsm_output {
bool state_changed = false;
// Set to true if a leadership transfer was aborted since the last output
bool abort_leadership_transfer;

// True if there is no new output
bool empty() const {
return !term_and_vote &&
log_entries.size() == 0 && messages.size() == 0 &&
committed.size() == 0 && !snp && snps_to_drop.empty() &&
!configuration && !max_read_id_with_quorum;
}
};

struct fsm_config {
Expand Down Expand Up @@ -141,9 +138,13 @@ struct leader {
// in-memory state machine with a catch-all API step(message)
// method. The method handles any kind of input and performs the
// needed state machine state transitions. To get state machine output
// poll_output() function has to be called. This call produces an output
// get_output() function has to be called. To check first if
// any new output is present, call has_output(). To wait for new
// new output events, use the sm_events condition variable passed
// to fsm constructor; fs` signals it each time new output may appear.
// The get_output() call produces an output
// object, which encapsulates a list of actions that must be
// performed until the next poll_output() call can be made. The time is
// performed until the next get_output() call can be made. The time is
// represented with a logical timer. The client is responsible for
// periodically invoking tick() method, which advances the state
// machine time and allows it to track such events as election or
Expand Down Expand Up @@ -231,7 +232,7 @@ private:
std::vector<std::pair<server_id, rpc_message>> _messages;

// Signaled when there is a IO event to process.
seastar::condition_variable _sm_events;
seastar::condition_variable& _sm_events;

// Called when one of the replicas advances its match index
// so it may be the case that some entries are committed now.
Expand Down Expand Up @@ -343,10 +344,8 @@ protected: // For testing

public:
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config conf);

explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config conf);
index_t commit_idx, failure_detector& failure_detector, fsm_config conf,
seastar::condition_variable& sm_events);

bool is_leader() const {
return std::holds_alternative<leader>(_state);
Expand Down Expand Up @@ -414,12 +413,9 @@ public:
// committed to the persistent Raft log afterwards.
template<typename T> const log_entry& add_entry(T command);

// Wait until there is, and return state machine output that
// needs to be handled.
// This includes a list of the entries that need
// to be logged. The logged entries are eventually
// discarded from the state machine after applying a snapshot.
future<fsm_output> poll_output();
// Check if there is any state machine output
// that `get_output()` will return.
bool has_output() const;

// Get state machine output, if there is any. Doesn't
// wait. It is public for use in testing.
Expand All @@ -432,7 +428,7 @@ public:

// Feed one Raft RPC message into the state machine.
// Advances the state machine state and generates output,
// accessible via poll_output().
// accessible via get_output().
template <typename Message>
void step(server_id from, Message&& msg);

Expand Down

0 comments on commit 26b8120

Please sign in to comment.