Skip to content

Commit

Permalink
raft: combine append_request _receive and _send
Browse files Browse the repository at this point in the history
Combine structs for append request send and receive into a single
struct.

Author:    Gleb Natapov <gleb@scylladb.com>
Date:      Mon Nov 23 14:33:14 2020 +0200
  • Loading branch information
Gleb Natapov authored and alecco committed Jan 18, 2021
1 parent df3ef80 commit 6d47a53
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 60 deletions.
21 changes: 10 additions & 11 deletions raft/fsm.cc
Expand Up @@ -47,7 +47,7 @@ const log_entry& fsm::add_entry(T command) {
// It's only possible to add entries on a leader.
check_is_leader();

_log.emplace_back(log_entry{_current_term, _log.next_idx(), std::move(command)});
_log.emplace_back(seastar::make_lw_shared<log_entry>(log_entry{_current_term, _log.next_idx(), std::move(command)}));
_sm_events.signal();

return *_log[_log.last_idx()];
Expand Down Expand Up @@ -326,10 +326,10 @@ void fsm::tick() {
}
}

void fsm::append_entries(server_id from, append_request_recv&& request) {
void fsm::append_entries(server_id from, append_request&& request) {
logger.trace("append_entries[{}] received ct={}, prev idx={} prev term={} commit idx={}, idx={} num entries={}",
_my_id, request.current_term, request.prev_log_idx, request.prev_log_term,
request.leader_commit_idx, request.entries.size() ? request.entries[0].idx : index_t(0), request.entries.size());
request.leader_commit_idx, request.entries.size() ? request.entries[0]->idx : index_t(0), request.entries.size());

assert(is_follower());
// 3.4. Leader election
Expand Down Expand Up @@ -549,14 +549,13 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) {
prev_term = s.idx == prev_idx ? s.term : _log[prev_idx]->term;
}

append_request_send req = {{
.current_term = _current_term,
.leader_id = _my_id,
.prev_log_idx = prev_idx,
.prev_log_term = prev_term,
.leader_commit_idx = _commit_idx
},
std::vector<log_entry_ptr>()
append_request req = {
.current_term = _current_term,
.leader_id = _my_id,
.prev_log_idx = prev_idx,
.prev_log_term = prev_term,
.leader_commit_idx = _commit_idx,
.entries = std::vector<log_entry_ptr>()
};

if (next_idx) {
Expand Down
10 changes: 5 additions & 5 deletions raft/fsm.hh
Expand Up @@ -232,7 +232,7 @@ class fsm {
// and allow_empty is true, send a heartbeat.
void replicate_to(follower_progress& progress, bool allow_empty);
void replicate();
void append_entries(server_id from, append_request_recv&& append_request);
void append_entries(server_id from, append_request&& append_request);
void append_entries_reply(server_id from, append_reply&& reply);

void request_vote(server_id from, vote_request&& vote_request);
Expand Down Expand Up @@ -353,7 +353,7 @@ void fsm::step(server_id from, Message&& msg) {
logger.trace("{} [term: {}] received a message with higher term from {} [term: {}]",
_my_id, _current_term, from, msg.current_term);

if constexpr (std::is_same_v<Message, append_request_recv>) {
if constexpr (std::is_same_v<Message, append_request>) {
become_follower(from);
} else {
if constexpr (std::is_same_v<Message, vote_request>) {
Expand All @@ -373,7 +373,7 @@ void fsm::step(server_id from, Message&& msg) {
update_current_term(msg.current_term);

} else if (msg.current_term < _current_term) {
if constexpr (std::is_same_v<Message, append_request_recv>) {
if constexpr (std::is_same_v<Message, append_request>) {
// Instructs the leader to step down.
append_reply reply{_current_term, _commit_idx, append_reply::rejected{msg.prev_log_idx, _log.last_idx()}};
send_to(from, std::move(reply));
Expand All @@ -387,7 +387,7 @@ void fsm::step(server_id from, Message&& msg) {
return;

} else /* _current_term == msg.current_term */ {
if constexpr (std::is_same_v<Message, append_request_recv> ||
if constexpr (std::is_same_v<Message, append_request> ||
std::is_same_v<Message, install_snapshot>) {
if (is_candidate()) {
// 3.4 Leader Election
Expand All @@ -412,7 +412,7 @@ void fsm::step(server_id from, Message&& msg) {
auto visitor = [this, from, msg = std::move(msg)](auto state) mutable {
using State = decltype(state);

if constexpr (std::is_same_v<Message, append_request_recv>) {
if constexpr (std::is_same_v<Message, append_request>) {
// Got AppendEntries RPC from self
append_entries(from, std::move(msg));
} else if constexpr (std::is_same_v<Message, append_reply>) {
Expand Down
26 changes: 13 additions & 13 deletions raft/log.cc
Expand Up @@ -31,8 +31,8 @@ log_entry_ptr& log::operator[](size_t i) {
return get_entry(index_t(i));
}

void log::emplace_back(log_entry&& e) {
_log.emplace_back(seastar::make_lw_shared(std::move(e)));
void log::emplace_back(log_entry_ptr&& e) {
_log.emplace_back(std::move(e));
}

bool log::empty() const {
Expand Down Expand Up @@ -122,34 +122,34 @@ std::pair<bool, term_t> log::match_term(index_t idx, term_t term) const {
return my_term == term ? std::make_pair(true, term_t(0)) : std::make_pair(false, my_term);
}

index_t log::maybe_append(std::vector<log_entry>&& entries) {
index_t log::maybe_append(std::vector<log_entry_ptr>&& entries) {
assert(!entries.empty());

index_t last_new_idx = entries.back().idx;
index_t last_new_idx = entries.back()->idx;

// We must scan through all entries if the log already
// contains them to ensure the terms match.
for (auto& e : entries) {
if (e.idx <= last_idx()) {
if (e.idx < start_idx()) {
if (e->idx <= last_idx()) {
if (e->idx < start_idx()) {
logger.trace("append_entries: skipping entry with idx {} less than log start {}",
e.idx, start_idx());
e->idx, start_idx());
continue;
}
if (e.term == get_entry(e.idx)->term) {
logger.trace("append_entries: entries with index {} has matching terms {}", e.idx, e.term);
if (e->term == get_entry(e->idx)->term) {
logger.trace("append_entries: entries with index {} has matching terms {}", e->idx, e->term);
continue;
}
logger.trace("append_entries: entries with index {} has non matching terms e.term={}, _log[i].term = {}",
e.idx, e.term, get_entry(e.idx)->term);
e->idx, e->term, get_entry(e->idx)->term);
// If an existing entry conflicts with a new one (same
// index but different terms), delete the existing
// entry and all that follow it (§5.3).
truncate_head(e.idx);
truncate_head(e->idx);
}
// Assert log monotonicity
assert(e.idx == next_idx());
_log.emplace_back(seastar::make_lw_shared(std::move(e)));
assert(e->idx == next_idx());
emplace_back(std::move(e));
}

return last_new_idx;
Expand Down
4 changes: 2 additions & 2 deletions raft/log.hh
Expand Up @@ -55,7 +55,7 @@ public:
// the function will abort()
log_entry_ptr& operator[](size_t i);
// Add an entry to the log.
void emplace_back(log_entry&& e);
void emplace_back(log_entry_ptr&& e);
// Mark all entries up to this index as stable.
void stable_to(index_t idx);
// Return true if in memory log is empty.
Expand Down Expand Up @@ -112,7 +112,7 @@ public:

// Called on a follower to append entries from a leader.
// @retval return an index of last appended entry
index_t maybe_append(std::vector<log_entry>&& entries);
index_t maybe_append(std::vector<log_entry_ptr>&& entries);

friend std::ostream& operator<<(std::ostream& os, const log& l);
};
Expand Down
16 changes: 4 additions & 12 deletions raft/raft.hh
Expand Up @@ -129,7 +129,7 @@ struct snapshot {
snapshot_id id;
};

struct append_request_base {
struct append_request {
// The leader's term.
term_t current_term;
// So that follower can redirect clients
Expand All @@ -141,18 +141,10 @@ struct append_request_base {
term_t prev_log_term;
// The leader's commit_idx.
index_t leader_commit_idx;
};

struct append_request_send : public append_request_base {
// Log entries to store (empty vector for heartbeat; may send more
// than one entry for efficiency).
std::vector<log_entry_ptr> entries;
};
struct append_request_recv : public append_request_base {
// Same as for append_request_send but unlike it here the
// message owns the entries.
std::vector<log_entry> entries;
};

struct append_reply {
struct rejected {
Expand Down Expand Up @@ -206,7 +198,7 @@ struct snapshot_reply {
bool success;
};

using rpc_message = std::variant<append_request_send, append_reply, vote_request, vote_reply, install_snapshot, snapshot_reply>;
using rpc_message = std::variant<append_request, append_reply, vote_request, vote_reply, install_snapshot, snapshot_reply>;

// we need something that can be truncated form both sides.
// std::deque move constructor is not nothrow hence cannot be used
Expand Down Expand Up @@ -282,7 +274,7 @@ public:
// Send provided append_request to the supplied server, does
// not wait for reply. The returned future resolves when
// message is sent. It does not mean it was received.
virtual future<> send_append_entries(server_id id, const append_request_send& append_request) = 0;
virtual future<> send_append_entries(server_id id, const append_request& append_request) = 0;

// Send a reply to an append_request. The returned future
// resolves when message is sent. It does not mean it was
Expand Down Expand Up @@ -323,7 +315,7 @@ public:
virtual ~rpc_server() {};

// This function is called by append_entries RPC
virtual void append_entries(server_id from, append_request_recv append_request) = 0;
virtual void append_entries(server_id from, append_request append_request) = 0;

// This function is called by append_entries_reply RPC
virtual void append_entries_reply(server_id from, append_reply reply) = 0;
Expand Down
6 changes: 3 additions & 3 deletions raft/server.cc
Expand Up @@ -48,7 +48,7 @@ class server_impl : public rpc_server, public server {
~server_impl() {}

// rpc_server interface
void append_entries(server_id from, append_request_recv append_request) override;
void append_entries(server_id from, append_request append_request) override;
void append_entries_reply(server_id from, append_reply reply) override;
void request_vote(server_id from, vote_request vote_request) override;
void request_vote_reply(server_id from, vote_reply vote_reply) override;
Expand Down Expand Up @@ -210,7 +210,7 @@ future<> server_impl::add_entry(command command, wait_type type) {
future<> server_impl::apply_dummy_entry() {
return add_entry_internal(log_entry::dummy(), wait_type::applied);
}
void server_impl::append_entries(server_id from, append_request_recv append_request) {
void server_impl::append_entries(server_id from, append_request append_request) {
_fsm->step(from, std::move(append_request));
}

Expand Down Expand Up @@ -271,7 +271,7 @@ future<> server_impl::send_message(server_id id, Message m) {
using T = std::decay_t<decltype(m)>;
if constexpr (std::is_same_v<T, append_reply>) {
return _rpc->send_append_entries_reply(id, m);
} else if constexpr (std::is_same_v<T, append_request_send>) {
} else if constexpr (std::is_same_v<T, append_request>) {
return _rpc->send_append_entries(id, m);
} else if constexpr (std::is_same_v<T, vote_request>) {
return _rpc->send_vote_request(id, m);
Expand Down
4 changes: 2 additions & 2 deletions test/boost/raft_fsm_test.cc
Expand Up @@ -195,7 +195,7 @@ BOOST_AUTO_TEST_CASE(test_election_four_nodes) {
BOOST_CHECK(fsm.is_follower());

// Inform FSM about a new leader at a new term
fsm.step(id4, raft::append_request_recv{term_t{1}, id4, index_t{1}, term_t{1}});
fsm.step(id4, raft::append_request{term_t{1}, id4, index_t{1}, term_t{1}});

(void) fsm.get_output();

Expand Down Expand Up @@ -234,7 +234,7 @@ BOOST_AUTO_TEST_CASE(test_log_matching_rule) {
raft::configuration cfg({id1, id2, id3});
raft::log log(raft::snapshot{.idx = index_t{999}, .config = cfg});

log.emplace_back(raft::log_entry{term_t{10}, index_t{1000}});
log.emplace_back(seastar::make_lw_shared<raft::log_entry>(raft::log_entry{term_t{10}, index_t{1000}}));
log.stable_to(log.last_idx());

raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), fd, fsm_cfg);
Expand Down
14 changes: 2 additions & 12 deletions test/raft/replication_test.cc
Expand Up @@ -261,21 +261,11 @@ class rpc : public raft::rpc {
snapshots[id] = snapshots[_id];
return net[id]->_client->apply_snapshot(_id, std::move(snap));
}
virtual future<> send_append_entries(raft::server_id id, const raft::append_request_send& append_request) {
virtual future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) {
if (is_disconnected(id) || is_disconnected(_id) || (drop_replication && !(rand() % 5))) {
return make_ready_future<>();
}
raft::append_request_recv req;
req.current_term = append_request.current_term;
req.leader_id = append_request.leader_id;
req.prev_log_idx = append_request.prev_log_idx;
req.prev_log_term = append_request.prev_log_term;
req.leader_commit_idx = append_request.leader_commit_idx;
for (auto&& e: append_request.entries) {
req.entries.push_back(*e);
}
net[id]->_client->append_entries(_id, std::move(req));
//co_return seastar::sleep(1us);
net[id]->_client->append_entries(_id, append_request);
return make_ready_future<>();
}
virtual future<> send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
Expand Down

0 comments on commit 6d47a53

Please sign in to comment.