Skip to content

Commit

Permalink
Faster election
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Nov 29, 2023
1 parent 1c67226 commit 2a44758
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
3 changes: 1 addition & 2 deletions src/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ enum class EMessageType : uint32_t {
APPEND_ENTRIES_RESPONSE = 5,
COMMAND_REQUEST = 6,
COMMAND_RESPONSE = 7,
TIMEOUT = 8,
};

struct TMessage {
Expand Down Expand Up @@ -97,7 +96,7 @@ static_assert(sizeof(TCommandResponse) == sizeof(TMessage) + 8);

struct TTimeout {
static constexpr std::chrono::milliseconds Election = std::chrono::milliseconds(5000);
static constexpr std::chrono::milliseconds Heartbeat = std::chrono::milliseconds(2000);
static constexpr std::chrono::milliseconds Heartbeat = std::chrono::milliseconds(1000);
static constexpr std::chrono::milliseconds Rpc = std::chrono::milliseconds(10000);
};

Expand Down
69 changes: 41 additions & 28 deletions src/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,10 @@ std::unique_ptr<TResult> TRaft::OnRequestVote(TMessageHolder<TRequestVoteRespons
votes.insert(message->Src);
}

int nvotes = votes.size()+1;
std::cout << "Need/total: " << MinVotes << "/" << nvotes << "\n";
if (nvotes >= MinVotes) {
auto value = State->Log.size()+1;
decltype(VolatileState->NextIndex) nextIndex;
decltype(VolatileState->RpcDue) rpcDue;
for (auto [id, _] : Nodes) {
nextIndex.emplace(id, value);
rpcDue.emplace(id, ITimeSource::Max);
}
return std::make_unique<TResult>(TResult {
.NextVolatileState = std::make_unique<TVolatileState>(TVolatileState {
.CommitIndex = VolatileState->CommitIndex,
.LastApplied = VolatileState->LastApplied,
.NextIndex = nextIndex,
.RpcDue = rpcDue,
.ElectionDue = ITimeSource::Max,
}),
.NextStateName = EState::LEADER
});
}

auto nextVolatileState = *VolatileState;
nextVolatileState.SetVotes(votes);
nextVolatileState
.SetVotes(votes)
.MergeRpcDue({{message->Src, ITimeSource::Max}});
return std::make_unique<TResult>(TResult {
.NextVolatileState = std::make_unique<TVolatileState>(
nextVolatileState
Expand All @@ -190,6 +170,9 @@ std::unique_ptr<TResult> TRaft::OnRequestVote(TMessageHolder<TRequestVoteRespons

std::unique_ptr<TResult> TRaft::OnAppendEntries(ITimeSource::Time now, TMessageHolder<TAppendEntriesRequest> message) {
if (message->Term < State->CurrentTerm) {
auto nextVolatileState = *VolatileState;
nextVolatileState.ElectionDue = MakeElection(now);

auto reply = NewHoldedMessage(
TMessageEx {
.Src = Id,
Expand All @@ -201,7 +184,8 @@ std::unique_ptr<TResult> TRaft::OnAppendEntries(ITimeSource::Time now, TMessageH
.Success = false,
});
return std::make_unique<TResult>(TResult {
.Message = reply
.NextVolatileState = std::make_unique<TVolatileState>(nextVolatileState),
.Message = reply,
});
}

Expand Down Expand Up @@ -242,7 +226,6 @@ std::unique_ptr<TResult> TRaft::OnAppendEntries(ITimeSource::Time now, TMessageH
auto nextVolatileState = *VolatileState;
nextVolatileState.SetCommitIndex(commitIndex);
nextVolatileState.ElectionDue = MakeElection(now);

return std::make_unique<TResult>(TResult {
.NextState = std::move(state),
.NextVolatileState = std::make_unique<TVolatileState>(nextVolatileState),
Expand All @@ -263,13 +246,16 @@ std::unique_ptr<TResult> TRaft::OnAppendEntries(TMessageHolder<TAppendEntriesRes
nextVolatileState
.MergeMatchIndex({{nodeId, matchIndex}})
.MergeNextIndex({{nodeId, message->MatchIndex+1}})
.CommitAdvance(Nservers, State->Log.size(), *State);
.CommitAdvance(Nservers, State->Log.size(), *State)
.MergeRpcDue({{nodeId, ITimeSource::Time{}}});
return std::make_unique<TResult>(TResult {
.NextVolatileState = std::make_unique<TVolatileState>(nextVolatileState)
});
} else {
auto nextVolatileState = *VolatileState;
nextVolatileState.MergeNextIndex({{nodeId, std::max((uint64_t)1, VolatileState->NextIndex[nodeId]-1)}});
nextVolatileState
.MergeNextIndex({{nodeId, std::max((uint64_t)1, VolatileState->NextIndex[nodeId]-1)}})
.MergeRpcDue({{nodeId, ITimeSource::Time{}}});
return std::make_unique<TResult>(TResult {
.NextVolatileState = std::make_unique<TVolatileState>(nextVolatileState)
});
Expand Down Expand Up @@ -322,7 +308,9 @@ std::vector<TMessageHolder<TAppendEntriesRequest>> TRaft::CreateAppendEntries()
}

std::unique_ptr<TResult> TRaft::Follower(ITimeSource::Time now, TMessageHolder<TMessage> message) {
if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) {
return OnRequestVote(std::move(maybeResponseVote.Cast()));
} else if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
return OnRequestVote(now, std::move(maybeRequestVote.Cast()));
} else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
return OnAppendEntries(now, maybeAppendEntries.Cast());
Expand Down Expand Up @@ -496,6 +484,31 @@ void TRaft::LeaderTimeout(ITimeSource::Time now) {
}

void TRaft::ProcessTimeout(ITimeSource::Time now) {
if (StateName == EState::CANDIDATE) {
int nvotes = VolatileState->Votes.size()+1;
std::cout << "Need/total: " << MinVotes << "/" << nvotes << "\n";
if (nvotes >= MinVotes) {
auto value = State->Log.size()+1;
decltype(VolatileState->NextIndex) nextIndex;
decltype(VolatileState->RpcDue) rpcDue;
for (auto [id, _] : Nodes) {
nextIndex.emplace(id, value);
rpcDue.emplace(id, ITimeSource::Max);
}

auto nextVolatileState = std::make_unique<TVolatileState>(TVolatileState {
.CommitIndex = VolatileState->CommitIndex,
.LastApplied = VolatileState->LastApplied,
.NextIndex = nextIndex,
.RpcDue = rpcDue,
.ElectionDue = ITimeSource::Max,
});

VolatileState = std::move(nextVolatileState);
StateName = EState::LEADER;
}
}

switch (StateName) {
case EState::FOLLOWER:
FollowerTimeout(now); break;
Expand Down
2 changes: 1 addition & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) {
Nodes.insert(client);
while (true) {
auto mes = co_await TReader(client->Sock()).Read();
std::cout << "Got message " << mes->Type << "\n";
// std::cout << "Got message " << mes->Type << "\n";
Raft->Process(std::move(mes), client);
Raft->ProcessTimeout(TimeSource->Now());
DrainNodes();
Expand Down

0 comments on commit 2a44758

Please sign in to comment.