Skip to content

Commit

Permalink
Add debug prints. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Nov 26, 2023
1 parent 4e08d98 commit c35ca62
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 19 deletions.
49 changes: 46 additions & 3 deletions server/server.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,44 @@
#include "socket.hpp"
#include <charconv>
#include <csignal>
#include <poll.hpp>
#include <timesource.h>
#include <raft.h>
#include <server.h>

struct THost {
std::string Address;
int Port = 0;
uint32_t Id = 0;

THost() { }

THost(const std::string& str) {
std::string_view s(str);
auto p = s.find(':');
Address = s.substr(0, p);
s = s.substr(p + 1);
p = s.find(':');
std::from_chars(s.begin(), s.begin()+p, Port);
s = s.substr(p + 1);
std::from_chars(s.begin(), s.begin()+p, Id);

std::cout << "Addr: '" << Address << "'\n";
std::cout << "Port: " << Port << "\n";
std::cout << "Id: " << Id << "\n";
}
};

int main(int argc, char** argv) {
std::vector<std::string> nodeStrings;
signal(SIGPIPE, SIG_IGN);
std::vector<THost> hosts;
THost myHost;
TNodeDict nodes;
uint32_t id;
uint32_t id = 0;
for (int i = 1; i < argc; i++) {
if (!strcmp(argv[i], "--node") && i < argc - 1) {
// address:port:id
nodeStrings.push_back(argv[++i]);
hosts.push_back(THost{argv[++i]});
} else if (!strcmp(argv[i], "--id") && i < argc - 1) {
id = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--help")) {
Expand All @@ -21,6 +49,21 @@ int main(int argc, char** argv) {
std::shared_ptr<ITimeSource> timeSource = std::make_shared<TTimeSource>();
NNet::TLoop<NNet::TPoll> loop;

for (auto& host : hosts) {
if (host.Id == id) {
myHost = host;
} else {
nodes[host.Id] = std::make_shared<TNode>(
loop.Poller(),
host.Id,
NNet::TAddress{host.Address, host.Port},
timeSource);
}
}

auto raft = std::make_shared<TRaft>(myHost.Id, nodes, timeSource);
TRaftServer server(loop.Poller(), NNet::TAddress{myHost.Address, myHost.Port}, raft, nodes, timeSource);
server.Serve();
loop.Loop();
return 0;
}
12 changes: 7 additions & 5 deletions src/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ std::vector<TMessageHolder<TAppendEntriesRequest>> TRaft::CreateAppendEntries()
lastIndex = prevIndex;
}

auto mes = NewHoldedMessage<TAppendEntriesRequest>(
static_cast<uint32_t>(EMessageType::APPEND_ENTRIES_REQUEST), sizeof(TAppendEntriesRequest));
auto mes = NewHoldedMessage<TAppendEntriesRequest>();

mes->Src = Id;
mes->Dst = nodeId;
Expand All @@ -287,6 +286,7 @@ std::vector<TMessageHolder<TAppendEntriesRequest>> TRaft::CreateAppendEntries()
payload.push_back(State->Log[i]);
}
mes.Payload = std::move(payload);
res.emplace_back(std::move(mes));
}
return res;
}
Expand Down Expand Up @@ -428,13 +428,15 @@ void TRaft::ApplyResult(ITimeSource::Time now, std::unique_ptr<TResult> result,
v->Send(messageEx);
}
} else {
Nodes[messageEx->Dst]->Send(messageEx);
std::cout << "Send reply to " << messageEx->Dst << "\n";
Nodes[messageEx->Dst]->Send(std::move(messageEx));
}
}
}
if (!result->Messages.empty()) {
for (auto& m : result->Messages) {
Nodes[m->Dst]->Send(m);
for (auto&& m : result->Messages) {
std::cout << "Send append entries to " << m->Dst << "\n";
Nodes[m->Dst]->Send(std::move(m));
}
}
if (result->NextStateName != EState::NONE) {
Expand Down
35 changes: 28 additions & 7 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ TPromise<void>::TTask TWriter::Write(TMessageHolder<TMessage> message) {
if (written == 0) {
throw std::runtime_error("Connection closed");
}
if (written < 0) {
continue; // retry;
}
p += written;
len -= written;
}
Expand All @@ -30,9 +33,13 @@ TPromise<TMessageHolder<TMessage>>::TTask TReader::Read() {
decltype(TMessage::Type) type;
decltype(TMessage::Len) len;
auto s = co_await Socket.ReadSome((char*)&type, sizeof(type));
assert(s == sizeof(type));
if (s != sizeof(type)) {
throw std::runtime_error("Connection closed");
}
s = co_await Socket.ReadSome((char*)&len, sizeof(len));
assert(s == sizeof(len));
if (s != sizeof(len)) {
throw std::runtime_error("Connection closed");
}
auto mes = NewHoldedMessage<TMessage>(type, len);
char* p = mes->Value;
len -= sizeof(TMessage);
Expand All @@ -41,6 +48,9 @@ TPromise<TMessageHolder<TMessage>>::TTask TReader::Read() {
if (s == 0) {
throw std::runtime_error("Connection closed");
}
if (s < 0) {
continue; // retry
}
p += s;
len -= s;
}
Expand Down Expand Up @@ -93,18 +103,24 @@ void TNode::Connect() {
}

Socket = NNet::TSocket(Address, Poller);
Connected = false;
Connector = DoConnect();
}
}

NNet::TTestTask TNode::DoConnect() {
std::cout << "Connecting " << Id << "\n";
while (!Connected) {
try {
auto deadline = TimeSource->Now() + std::chrono::milliseconds(15000);
auto deadline = NNet::TClock::now() + std::chrono::milliseconds(100); // TODO: broken timeout in coroio
co_await Socket.Connect(deadline);
std::cout << "Connected " << Id << "\n";
Connected = true;
} catch (const std::exception& ex) {
std::cout << "Error on connect: " << ex.what() << "\n";
std::cout << "Error on connect: " << Id << " " << ex.what() << "\n";
}
if (!Connected) {
co_await Poller.Sleep(std::chrono::milliseconds(1000));
}
}
co_return;
Expand All @@ -114,6 +130,7 @@ NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) {
try {
while (true) {
auto mes = co_await TReader(socket).Read();
std::cout << "Got message " << mes->Type << "\n";
Raft->Process(std::move(mes));
DrainNodes();
}
Expand All @@ -129,16 +146,19 @@ void TRaftServer::Serve() {
}

void TRaftServer::DrainNodes() {
for (auto [_, node] : Nodes) {
for (auto [id, node] : Nodes) {
node->Drain();
}
}

NNet::TSimpleTask TRaftServer::InboundServe() {
std::cout << "Bind\n";
Socket.Bind();
std::cout << "Listen\n";
Socket.Listen();
while (true) {
auto client = co_await Socket.Accept();
std::cout << "Accepted\n";
InboundConnection(std::move(client));
}
co_return;
Expand All @@ -147,15 +167,16 @@ NNet::TSimpleTask TRaftServer::InboundServe() {
NNet::TSimpleTask TRaftServer::Idle() {
auto t0 = TimeSource->Now();
auto dt = std::chrono::milliseconds(2000);
auto sleep = std::chrono::milliseconds(10);
auto sleep = std::chrono::milliseconds(100);
while (true) {
Raft->Process(NewTimeout());
DrainNodes();
auto t1 = TimeSource->Now();
if (t1 > t0 + dt) {
std::cout << "Idle " << (uint32_t)Raft->CurrentStateName() << "\n";
t0 = t1;
}
Poller.Sleep(sleep);
co_await Poller.Sleep(sleep);
}
co_return;
}
27 changes: 23 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <exception>
#include <memory>
#include <coroutine>

Expand Down Expand Up @@ -28,7 +29,11 @@ struct TPromise
}

T await_resume() {
return *this->promise().Value;
if (this->promise().Exception) {
std::rethrow_exception(this->promise().Exception);
} else {
return *this->promise().Value;
}
}
};

Expand All @@ -43,9 +48,15 @@ struct TPromise
}
}

void unhandled_exception() { }
void unhandled_exception() {
Exception = std::current_exception();
if (Caller) {
Caller.resume();
}
}

std::shared_ptr<T> Value;
std::exception_ptr Exception;
std::coroutine_handle<> Caller;
};

Expand All @@ -64,7 +75,11 @@ struct TPromise<void>
this->promise().Caller = caller;
}

void await_resume() { }
void await_resume() {
if (this->promise().Exception) {
std::rethrow_exception(this->promise().Exception);
}
}
};

TTask get_return_object() { return { TTask::from_promise(*this) }; }
Expand All @@ -77,9 +92,13 @@ struct TPromise<void>
Caller.resume();
}
}
void unhandled_exception() { }
void unhandled_exception() {
Exception = std::current_exception();
return_void();
}

bool Ready = false;
std::exception_ptr Exception;
std::coroutine_handle<> Caller;
};

Expand Down
2 changes: 2 additions & 0 deletions test/test_raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ void test_commit_advance_wrong_term(void**) {
assert_int_equal(s1.CommitIndex, 0);
}

// TODO: test leader hearbeat

int main() {
const struct CMUnitTest tests[] = {
cmocka_unit_test(test_empty),
Expand Down

0 comments on commit c35ca62

Please sign in to comment.