Skip to content

Commit

Permalink
Make Tcp connections more robust, add fire once to timer
Browse files Browse the repository at this point in the history
  • Loading branch information
Oipo committed Feb 19, 2024
1 parent 574d82a commit c2a49c8
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 70 deletions.
23 changes: 21 additions & 2 deletions include/ichor/services/network/tcp/TcpConnectionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@
#include <ichor/services/timer/ITimerFactory.h>

namespace Ichor {

/**
* Service for managing a TCP connection
*
* Properties:
* - "Address" std::string - What address to connect to (required if Socket is not present)
* - "Port" uint16_t - What port to connect to (required if Socket is not present)
* - "Socket" int - An existing socket to manage (required if Address/Port are not present)
* - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY)
* - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000)
* - "TimeoutRecvUs" int64_t - Timeout in microseconds for recv calls (default 250'000)
*/
class TcpConnectionService final : public IConnectionService, public AdvancedService<TcpConnectionService> {
public:
TcpConnectionService(DependencyRegister &reg, Properties props);
Expand All @@ -26,15 +38,22 @@ namespace Ichor {
void addDependencyInstance(ITimerFactory &logger, IService &isvc);
void removeDependencyInstance(ITimerFactory &logger, IService &isvc);

void recvHandler();

friend DependencyRegister;

static uint64_t tcpConnId;
int _socket;
int _attempts;
uint64_t _id;
uint64_t _attempts;
uint64_t _priority;
uint64_t _msgIdCounter;
uint64_t _msgIdCounter{};
int64_t _sendTimeout{250'000};
int64_t _recvTimeout{250'000};
bool _quit;
ILogger *_logger{};
ITimerFactory *_timerFactory{};
ITimer *_timer{};
};
}

Expand Down
14 changes: 14 additions & 0 deletions include/ichor/services/network/tcp/TcpHostService.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ namespace Ichor {
static constexpr std::string_view NAME = Ichor::typeName<NewSocketEvent>();
};

/**
* Service for creating a TCP host
*
* Properties:
* - "Address" std::string - What address to bind to (default INADDR_ANY)
* - "Port" uint16_t - What port to bind to (required)
* - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY)
* - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000)
* - "TimeoutRecvUs" int64_t - Timeout in microseconds for recv calls (default 250'000)
*/
class TcpHostService final : public IHostService, public AdvancedService<TcpHostService> {
public:
TcpHostService(DependencyRegister &reg, Properties props);
Expand All @@ -43,16 +53,20 @@ namespace Ichor {
void removeDependencyInstance(ITimerFactory &logger, IService &isvc);

AsyncGenerator<IchorBehaviour> handleEvent(NewSocketEvent const &evt);
void acceptHandler();

friend DependencyRegister;
friend DependencyManager;

int _socket;
int _bindFd;
uint64_t _priority;
int64_t _sendTimeout{250'000};
int64_t _recvTimeout{250'000};
bool _quit;
ILogger *_logger{};
ITimerFactory *_timerFactory{};
ITimer *_timer{};
std::vector<NeverNull<TcpConnectionService*>> _connections;
EventHandlerRegistration _newSocketEventHandlerRegistration{};
};
Expand Down
5 changes: 5 additions & 0 deletions include/ichor/services/timer/ITimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ namespace Ichor {
/// Thread-safe.
[[nodiscard]] virtual uint64_t getPriority() const noexcept = 0;
/// Thread-safe.
/// \param fireOnce
virtual void setFireOnce(bool fireOnce) noexcept = 0;
/// Thread-safe.
[[nodiscard]] virtual bool getFireOnce() const noexcept = 0;
/// Thread-safe.
[[nodiscard]] virtual uint64_t getTimerId() const noexcept = 0;


Expand Down
9 changes: 7 additions & 2 deletions include/ichor/services/timer/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ namespace Ichor {
/// Thread-safe.
[[nodiscard]] uint64_t getPriority() const noexcept final;
/// Thread-safe.
void setFireOnce(bool fireOnce) noexcept final;
/// Thread-safe.
[[nodiscard]] bool getFireOnce() const noexcept final;
/// Thread-safe.
[[nodiscard]] uint64_t getTimerId() const noexcept final;

private:
///
/// \param timerId unique identifier for timer
/// \param svcId unique identifier for svc using this timer
Timer(IEventQueue *queue, uint64_t timerId, uint64_t svcId) noexcept;
Timer(NeverNull<IEventQueue*> queue, uint64_t timerId, uint64_t svcId) noexcept;

void insertEventLoop(bool fireImmediately);

friend class TimerFactory;

IEventQueue *_queue{};
NeverNull<IEventQueue*> _queue;
uint64_t _timerId{};
std::atomic<bool> _fireOnce{};
std::atomic<uint64_t> _intervalNanosec{1'000'000'000};
std::unique_ptr<std::thread> _eventInsertionThread{};
std::function<AsyncGenerator<IchorBehaviour>()> _fnAsync{};
Expand Down
166 changes: 134 additions & 32 deletions src/services/network/tcp/TcpConnectionService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
#include <ichor/DependencyManager.h>
#include <ichor/services/network/tcp/TcpConnectionService.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/events/RunFunctionEvent.h>
#include <ichor/ScopeGuard.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <thread>

Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister &reg, Properties props) : AdvancedService(std::move(props)), _socket(-1), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() {
uint64_t Ichor::TcpConnectionService::tcpConnId{};

Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister &reg, Properties props) : AdvancedService(std::move(props)), _socket(-1), _id(tcpConnId++), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() {
reg.registerDependency<ILogger>(this, DependencyFlags::REQUIRED);
reg.registerDependency<ITimerFactory>(this, DependencyFlags::REQUIRED);
}
Expand All @@ -18,22 +24,40 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpConnectionService::
if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) {
_priority = Ichor::any_cast<uint64_t>(propIt->second);
}
if(auto propIt = getProperties().find("TimeoutSendUs"); propIt != getProperties().end()) {
_sendTimeout = Ichor::any_cast<int64_t>(propIt->second);
}
if(auto propIt = getProperties().find("TimeoutRecvUs"); propIt != getProperties().end()) {
_recvTimeout = Ichor::any_cast<int64_t>(propIt->second);
}

if(getProperties().contains("Socket")) {
if(auto propIt = getProperties().find("Socket"); propIt != getProperties().end()) {
_socket = Ichor::any_cast<int>(propIt->second);
}
ICHOR_LOG_TRACE(_logger, "Starting TCP connection for existing socket");

int setting = 1;
::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting));

timeval timeout{};
timeout.tv_usec = _recvTimeout;
setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
timeout.tv_usec = _sendTimeout;
setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));

auto flags = ::fcntl(_socket, F_GETFL, 0);
::fcntl(_socket, F_SETFL, flags | O_NONBLOCK);
ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for existing socket", _id);
} else {
auto addrIt = getProperties().find("Address");
auto portIt = getProperties().find("Port");

if(addrIt == getProperties().end()) {
ICHOR_LOG_ERROR(_logger, "Missing address");
ICHOR_LOG_ERROR(_logger, "[{}] Missing address", _id);
co_return tl::unexpected(StartError::FAILED);
}
if(portIt == getProperties().end()) {
ICHOR_LOG_ERROR(_logger, "Missing port");
ICHOR_LOG_ERROR(_logger, "[{}] Missing port", _id);
co_return tl::unexpected(StartError::FAILED);
}

Expand All @@ -47,6 +71,13 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpConnectionService::

int setting = 1;
::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting));

timeval timeout{};
timeout.tv_usec = _recvTimeout;
setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
timeout.tv_usec = _sendTimeout;
setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));

auto flags = ::fcntl(_socket, F_GETFL, 0);
::fcntl(_socket, F_SETFL, flags | O_NONBLOCK);

Expand All @@ -60,38 +91,70 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpConnectionService::
throw std::runtime_error("inet_pton invalid address for given address family (has to be ipv4-valid address)");
}

while(connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0)
{
ICHOR_LOG_ERROR(_logger, "connect error {}", errno);
if(_attempts++ >= 5) {
bool connected{};
while(!connected && connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0) {
ICHOR_LOG_ERROR(_logger, "[{}] connect error {}", _id, errno);
if(errno == EINPROGRESS) {
while(_attempts++ >= 5) {
pollfd pfd{};
pfd.fd = _socket;
pfd.events = POLLOUT;
ret = poll(&pfd, 1, static_cast<int>(_sendTimeout));

if(ret < 0) {
ICHOR_LOG_ERROR(_logger, "[{}] poll error {}", _id, errno);
continue;
}

// timeout
if(ret == 0) {
continue;
}

if(pfd.revents & POLLERR) {
ICHOR_LOG_ERROR(_logger, "[{}] POLLERR {} {} {}", _id, pfd.revents);
} else if(pfd.revents & POLLHUP) {
ICHOR_LOG_ERROR(_logger, "[{}] POLLHUP {} {} {}", _id, pfd.revents);
} else if(pfd.revents & POLLOUT) {
int connect_result{};
socklen_t result_len = sizeof(connect_result);
ret = getsockopt(_socket, SOL_SOCKET, SO_ERROR, &connect_result, &result_len);

if(ret < 0) {
throw std::runtime_error("getsocketopt error: Couldn't connect");
}

// connect failed, retry
if(connect_result < 0) {
break;
}
connected = true;
break;
}
}
} else if(errno == EALREADY) {
std::this_thread::sleep_for(std::chrono::microseconds(_sendTimeout));
} else {
_attempts++;
}

// we don't want to increment attempts in the EINPROGRESS case, but we do want to check it here
if(_attempts >= 5) {
throw std::runtime_error("Couldn't connect");
}
}

auto *ip = ::inet_ntoa(address.sin_addr);
ICHOR_LOG_TRACE(_logger, "Starting TCP connection for {}:{}", ip, ::ntohs(address.sin_port));
ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for {}:{}", _id, ip, ::ntohs(address.sin_port));
}

auto &timer = _timerFactory->createTimer();
timer.setChronoInterval(20ms);
timer.setCallback([this]() {
std::array<char, 1024> buf{};
auto ret = recv(_socket, buf.data(), buf.size(), 0);

if (ret == 0) {
return;
}

if(ret < 0) {
ICHOR_LOG_ERROR(_logger, "Error receiving from socket: {}", errno);
GetThreadLocalEventQueue().pushEvent<RecoverableErrorEvent>(getServiceId(), 4u, "Error receiving from socket. errno = " + std::to_string(errno));
return;
}

GetThreadLocalEventQueue().pushPrioritisedEvent<NetworkDataEvent>(getServiceId(), _priority, std::vector<uint8_t>{buf.data(), buf.data() + ret});
return;
_timer = &_timerFactory->createTimer();
_timer->setFireOnce(true);
_timer->setChronoInterval(20ms);
_timer->setCallback([this]() {
recvHandler();
});
timer.startTimer();
_timer->startTimer(true);

co_return {};
}
Expand All @@ -111,25 +174,35 @@ void Ichor::TcpConnectionService::addDependencyInstance(ILogger &logger, IServic
_logger = &logger;
}

void Ichor::TcpConnectionService::removeDependencyInstance(ILogger &logger, IService&) {
void Ichor::TcpConnectionService::removeDependencyInstance(ILogger &, IService&) {
_logger = nullptr;
}

void Ichor::TcpConnectionService::addDependencyInstance(ITimerFactory &factory, IService &) {
_timerFactory = &factory;
void Ichor::TcpConnectionService::addDependencyInstance(ITimerFactory &timerFactory, IService &) {
_timerFactory = &timerFactory;
}

void Ichor::TcpConnectionService::removeDependencyInstance(ITimerFactory &factory, IService&) {
void Ichor::TcpConnectionService::removeDependencyInstance(ITimerFactory &, IService&) {
_timerFactory = nullptr;
}

tl::expected<uint64_t, Ichor::SendErrorReason> Ichor::TcpConnectionService::sendAsync(std::vector<uint8_t> &&msg) {
auto id = ++_msgIdCounter;
size_t sent_bytes = 0;

if(_quit) {
ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id);
return tl::unexpected(SendErrorReason::QUITTING);
}

while(sent_bytes < msg.size()) {
auto ret = ::send(_socket, msg.data() + sent_bytes, msg.size() - sent_bytes, 0);

if(_quit) {
ICHOR_LOG_TRACE(_logger, "[{}] quitting mid-send", _id);
return tl::unexpected(SendErrorReason::QUITTING);
}

if(ret < 0) {
GetThreadLocalEventQueue().pushEvent<FailedSendMessageEvent>(getServiceId(), std::move(msg), id);
break;
Expand All @@ -149,4 +222,33 @@ uint64_t Ichor::TcpConnectionService::getPriority() {
return _priority;
}

void Ichor::TcpConnectionService::recvHandler() {
ScopeGuard sg{[this]() {
if(!_quit) {
_timer->startTimer();
} else {
ICHOR_LOG_TRACE(_logger, "[{}] quitting, no push", _id);
}
}};
std::array<char, 1024> buf{};
auto ret = recv(_socket, buf.data(), buf.size(), 0);

if (ret == 0) {
return;
}

if (_quit) {
ICHOR_LOG_TRACE(_logger, "[{}] quitting", _id);
return;
}

if(ret < 0) {
ICHOR_LOG_ERROR(_logger, "[{}] Error receiving from socket: {}", _id, errno);
GetThreadLocalEventQueue().pushEvent<RecoverableErrorEvent>(getServiceId(), 4u, "Error receiving from socket. errno = " + std::to_string(errno));
return;
}

GetThreadLocalEventQueue().pushPrioritisedEvent<NetworkDataEvent>(getServiceId(), _priority, std::vector<uint8_t>{buf.data(), buf.data() + ret});
}

#endif
Loading

0 comments on commit c2a49c8

Please sign in to comment.