Skip to content

Commit

Permalink
Add client
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Nov 26, 2023
1 parent f216135 commit 7e07f63
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ add_library(miniraft
add_executable(test_raft test/test_raft.cpp)
add_executable(test_read_write test/test_read_write.cpp)
add_executable(server server/server.cpp)
add_executable(client client/client.cpp)

include_directories(${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/coroio/src)

target_link_libraries(server miniraft)
target_link_libraries(client miniraft)

target_include_directories(test_raft PRIVATE ${CMOCKA_INCLUDE_DIRS})
target_link_directories(test_raft PRIVATE ${CMOCKA_LIBRARY_DIRS})
Expand Down
24 changes: 0 additions & 24 deletions server/server.cpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,10 @@
#include "socket.hpp"
#include <charconv>
#include <csignal>
#include <poll.hpp>
#include <timesource.h>
#include <raft.h>
#include <server.h>

struct THost {
std::string Address;
int Port = 0;
uint32_t Id = 0;

THost() { }

THost(const std::string& str) {
std::string_view s(str);
auto p = s.find(':');
Address = s.substr(0, p);
s = s.substr(p + 1);
p = s.find(':');
std::from_chars(s.begin(), s.begin()+p, Port);
s = s.substr(p + 1);
std::from_chars(s.begin(), s.begin()+p, Id);

std::cout << "Addr: '" << Address << "'\n";
std::cout << "Port: " << Port << "\n";
std::cout << "Id: " << Id << "\n";
}
};

int main(int argc, char** argv) {
signal(SIGPIPE, SIG_IGN);
std::vector<THost> hosts;
Expand Down
7 changes: 7 additions & 0 deletions src/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <memory>
#include <vector>

#include <assert.h>
#include <stdint.h>
#include <typeinfo>

Expand Down Expand Up @@ -193,6 +194,12 @@ TMessageHolder<T> NewHoldedMessage() {
return NewHoldedMessage<T>(static_cast<uint32_t>(T::MessageType), sizeof(T));
}

template<typename T>
TMessageHolder<T> NewHoldedMessage(uint32_t size) {
assert(size >= sizeof(T));
return NewHoldedMessage<T>(static_cast<uint32_t>(T::MessageType), size);
}

inline TMessageHolder<TTimeout> NewTimeout() {
return NewHoldedMessage<TTimeout>(static_cast<uint32_t>(EMessageType::TIMEOUT), sizeof(TTimeout));
}
24 changes: 23 additions & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@
#include <exception>
#include <iostream>
#include <stdexcept>
#include <vector>
#include "server.h"
#include "messages.h"

namespace {
class TClientNode: public INode {
public:
void Send(const TMessageHolder<TMessage>& mes) override {
Messages.push_back(mes);
}

void Drain() override { }

std::vector<TMessageHolder<TMessage>> Messages;
};

};

TPromise<void>::TTask TWriter::Write(TMessageHolder<TMessage> message) {
auto payload = std::move(message.Payload);
char* p = (char*)message.Mes; // TODO: const char
Expand Down Expand Up @@ -128,10 +143,17 @@ NNet::TTestTask TNode::DoConnect() {

NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) {
try {
TClientNode client;
while (true) {
auto mes = co_await TReader(socket).Read();
std::cout << "Got message " << mes->Type << "\n";
Raft->Process(std::move(mes));
Raft->Process(std::move(mes), &client);
if (!client.Messages.empty()) {
auto tosend = std::move(client.Messages); client.Messages.clear();
for (auto&& mes : tosend) {
co_await TWriter(socket).Write(std::move(mes));
}
}
DrainNodes();
}
} catch (const std::exception & ex) {
Expand Down
26 changes: 26 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <exception>
#include <memory>
#include <coroutine>
#include <string_view>

#include <all.hpp>

Expand Down Expand Up @@ -126,6 +127,31 @@ class TWriter {
NNet::TPoll::TSocket& Socket;
};

struct THost {
std::string Address;
int Port = 0;
uint32_t Id = 0;

THost() { }

THost(const std::string& str) {
std::string_view s(str);
auto p = s.find(':');
Address = s.substr(0, p);
s = s.substr(p + 1);
p = s.find(':');
std::from_chars(s.begin(), s.begin()+p, Port);
s = s.substr(p + 1);
std::from_chars(s.begin(), s.begin()+p, Id);
}

void DebugPrint() const {
std::cout << "Addr: '" << Address << "'\n";
std::cout << "Port: " << Port << "\n";
std::cout << "Id: " << Id << "\n";
}
};

class TNode: public INode {
public:
TNode(NNet::TPoll& poller, uint32_t id, NNet::TAddress address, const std::shared_ptr<ITimeSource>& ts)
Expand Down

0 comments on commit 7e07f63

Please sign in to comment.