From c35ca626551520d6c0ce2ce88a4bdeb72345655d Mon Sep 17 00:00:00 2001 From: Alexey Ozeritskiy Date: Sun, 26 Nov 2023 12:04:01 +0300 Subject: [PATCH] Add debug prints. Fixes --- server/server.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++--- src/raft.cpp | 12 +++++++----- src/server.cpp | 35 ++++++++++++++++++++++++++------- src/server.h | 27 +++++++++++++++++++++---- test/test_raft.cpp | 2 ++ 5 files changed, 106 insertions(+), 19 deletions(-) diff --git a/server/server.cpp b/server/server.cpp index cf54e9c..d1ac556 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -1,16 +1,44 @@ +#include "socket.hpp" +#include +#include #include #include #include #include +struct THost { + std::string Address; + int Port = 0; + uint32_t Id = 0; + + THost() { } + + THost(const std::string& str) { + std::string_view s(str); + auto p = s.find(':'); + Address = s.substr(0, p); + s = s.substr(p + 1); + p = s.find(':'); + std::from_chars(s.begin(), s.begin()+p, Port); + s = s.substr(p + 1); + std::from_chars(s.begin(), s.begin()+p, Id); + + std::cout << "Addr: '" << Address << "'\n"; + std::cout << "Port: " << Port << "\n"; + std::cout << "Id: " << Id << "\n"; + } +}; + int main(int argc, char** argv) { - std::vector nodeStrings; + signal(SIGPIPE, SIG_IGN); + std::vector hosts; + THost myHost; TNodeDict nodes; - uint32_t id; + uint32_t id = 0; for (int i = 1; i < argc; i++) { if (!strcmp(argv[i], "--node") && i < argc - 1) { // address:port:id - nodeStrings.push_back(argv[++i]); + hosts.push_back(THost{argv[++i]}); } else if (!strcmp(argv[i], "--id") && i < argc - 1) { id = atoi(argv[++i]); } else if (!strcmp(argv[i], "--help")) { @@ -21,6 +49,21 @@ int main(int argc, char** argv) { std::shared_ptr timeSource = std::make_shared(); NNet::TLoop loop; + for (auto& host : hosts) { + if (host.Id == id) { + myHost = host; + } else { + nodes[host.Id] = std::make_shared( + loop.Poller(), + host.Id, + NNet::TAddress{host.Address, host.Port}, + timeSource); + } + } + + auto raft = std::make_shared(myHost.Id, nodes, timeSource); + TRaftServer server(loop.Poller(), NNet::TAddress{myHost.Address, myHost.Port}, raft, nodes, timeSource); + server.Serve(); loop.Loop(); return 0; } diff --git a/src/raft.cpp b/src/raft.cpp index 3a27c8e..dde7e20 100644 --- a/src/raft.cpp +++ b/src/raft.cpp @@ -270,8 +270,7 @@ std::vector> TRaft::CreateAppendEntries() lastIndex = prevIndex; } - auto mes = NewHoldedMessage( - static_cast(EMessageType::APPEND_ENTRIES_REQUEST), sizeof(TAppendEntriesRequest)); + auto mes = NewHoldedMessage(); mes->Src = Id; mes->Dst = nodeId; @@ -287,6 +286,7 @@ std::vector> TRaft::CreateAppendEntries() payload.push_back(State->Log[i]); } mes.Payload = std::move(payload); + res.emplace_back(std::move(mes)); } return res; } @@ -428,13 +428,15 @@ void TRaft::ApplyResult(ITimeSource::Time now, std::unique_ptr result, v->Send(messageEx); } } else { - Nodes[messageEx->Dst]->Send(messageEx); + std::cout << "Send reply to " << messageEx->Dst << "\n"; + Nodes[messageEx->Dst]->Send(std::move(messageEx)); } } } if (!result->Messages.empty()) { - for (auto& m : result->Messages) { - Nodes[m->Dst]->Send(m); + for (auto&& m : result->Messages) { + std::cout << "Send append entries to " << m->Dst << "\n"; + Nodes[m->Dst]->Send(std::move(m)); } } if (result->NextStateName != EState::NONE) { diff --git a/src/server.cpp b/src/server.cpp index 6b34b03..b9f1ea8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -15,6 +15,9 @@ TPromise::TTask TWriter::Write(TMessageHolder message) { if (written == 0) { throw std::runtime_error("Connection closed"); } + if (written < 0) { + continue; // retry; + } p += written; len -= written; } @@ -30,9 +33,13 @@ TPromise>::TTask TReader::Read() { decltype(TMessage::Type) type; decltype(TMessage::Len) len; auto s = co_await Socket.ReadSome((char*)&type, sizeof(type)); - assert(s == sizeof(type)); + if (s != sizeof(type)) { + throw std::runtime_error("Connection closed"); + } s = co_await Socket.ReadSome((char*)&len, sizeof(len)); - assert(s == sizeof(len)); + if (s != sizeof(len)) { + throw std::runtime_error("Connection closed"); + } auto mes = NewHoldedMessage(type, len); char* p = mes->Value; len -= sizeof(TMessage); @@ -41,6 +48,9 @@ TPromise>::TTask TReader::Read() { if (s == 0) { throw std::runtime_error("Connection closed"); } + if (s < 0) { + continue; // retry + } p += s; len -= s; } @@ -93,18 +103,24 @@ void TNode::Connect() { } Socket = NNet::TSocket(Address, Poller); + Connected = false; Connector = DoConnect(); } } NNet::TTestTask TNode::DoConnect() { + std::cout << "Connecting " << Id << "\n"; while (!Connected) { try { - auto deadline = TimeSource->Now() + std::chrono::milliseconds(15000); + auto deadline = NNet::TClock::now() + std::chrono::milliseconds(100); // TODO: broken timeout in coroio co_await Socket.Connect(deadline); + std::cout << "Connected " << Id << "\n"; Connected = true; } catch (const std::exception& ex) { - std::cout << "Error on connect: " << ex.what() << "\n"; + std::cout << "Error on connect: " << Id << " " << ex.what() << "\n"; + } + if (!Connected) { + co_await Poller.Sleep(std::chrono::milliseconds(1000)); } } co_return; @@ -114,6 +130,7 @@ NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) { try { while (true) { auto mes = co_await TReader(socket).Read(); + std::cout << "Got message " << mes->Type << "\n"; Raft->Process(std::move(mes)); DrainNodes(); } @@ -129,16 +146,19 @@ void TRaftServer::Serve() { } void TRaftServer::DrainNodes() { - for (auto [_, node] : Nodes) { + for (auto [id, node] : Nodes) { node->Drain(); } } NNet::TSimpleTask TRaftServer::InboundServe() { + std::cout << "Bind\n"; Socket.Bind(); + std::cout << "Listen\n"; Socket.Listen(); while (true) { auto client = co_await Socket.Accept(); + std::cout << "Accepted\n"; InboundConnection(std::move(client)); } co_return; @@ -147,15 +167,16 @@ NNet::TSimpleTask TRaftServer::InboundServe() { NNet::TSimpleTask TRaftServer::Idle() { auto t0 = TimeSource->Now(); auto dt = std::chrono::milliseconds(2000); - auto sleep = std::chrono::milliseconds(10); + auto sleep = std::chrono::milliseconds(100); while (true) { Raft->Process(NewTimeout()); DrainNodes(); auto t1 = TimeSource->Now(); if (t1 > t0 + dt) { + std::cout << "Idle " << (uint32_t)Raft->CurrentStateName() << "\n"; t0 = t1; } - Poller.Sleep(sleep); + co_await Poller.Sleep(sleep); } co_return; } diff --git a/src/server.h b/src/server.h index b30b4e5..d3cc003 100644 --- a/src/server.h +++ b/src/server.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -28,7 +29,11 @@ struct TPromise } T await_resume() { - return *this->promise().Value; + if (this->promise().Exception) { + std::rethrow_exception(this->promise().Exception); + } else { + return *this->promise().Value; + } } }; @@ -43,9 +48,15 @@ struct TPromise } } - void unhandled_exception() { } + void unhandled_exception() { + Exception = std::current_exception(); + if (Caller) { + Caller.resume(); + } + } std::shared_ptr Value; + std::exception_ptr Exception; std::coroutine_handle<> Caller; }; @@ -64,7 +75,11 @@ struct TPromise this->promise().Caller = caller; } - void await_resume() { } + void await_resume() { + if (this->promise().Exception) { + std::rethrow_exception(this->promise().Exception); + } + } }; TTask get_return_object() { return { TTask::from_promise(*this) }; } @@ -77,9 +92,13 @@ struct TPromise Caller.resume(); } } - void unhandled_exception() { } + void unhandled_exception() { + Exception = std::current_exception(); + return_void(); + } bool Ready = false; + std::exception_ptr Exception; std::coroutine_handle<> Caller; }; diff --git a/test/test_raft.cpp b/test/test_raft.cpp index fb90213..09b04fa 100644 --- a/test/test_raft.cpp +++ b/test/test_raft.cpp @@ -597,6 +597,8 @@ void test_commit_advance_wrong_term(void**) { assert_int_equal(s1.CommitIndex, 0); } +// TODO: test leader hearbeat + int main() { const struct CMUnitTest tests[] = { cmocka_unit_test(test_empty),