Skip to content

Commit

Permalink
Drain node
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Nov 25, 2023
1 parent c8dfaea commit 9085609
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ TPromise<TMessageHolder<TMessage>>::TTask TReader::Read() {
co_return mes;
}

void TNode::Send(const TMessageHolder<TMessage>& message) {
Messages.emplace_back(message);
}

void TNode::Drain() {
if (!Drainer || Drainer.done()) {
if (Drainer && Drainer.done()) {
Drainer.destroy();
}
Drainer = DoDrain();
}
}

NNet::TTestTask TNode::DoDrain() {
if (!Connected) {
co_return;
}
auto tosend = std::move(Messages);
for (auto&& m : tosend) {
co_await TWriter(Socket).Write(std::move(m));
}
Messages.clear();
co_return;
}

NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) {
try {
while (true) {
Expand Down
7 changes: 7 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <all.hpp>

#include "poll.hpp"
#include "timesource.h"
#include "messages.h"
#include "raft.h"
Expand Down Expand Up @@ -118,9 +119,15 @@ class TNode: public INode {
void Drain() override;

private:
NNet::TTestTask DoDrain();

NNet::TPoll& Poller;
uint32_t Id;
NNet::TAddress Address;
NNet::TPoll::TSocket Socket;
bool Connected = false;

std::coroutine_handle<> Drainer;

std::vector<TMessageHolder<TMessage>> Messages;
};
Expand Down

0 comments on commit 9085609

Please sign in to comment.