Skip to content
2 changes: 2 additions & 0 deletions evpp/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

namespace hv {

// Channel is a loop-bound wrapper around hio_t.
// The Channel address is stored in hio_context(io), so the object lifetime must cover all pending hio callbacks.
class Channel {
public:
Channel(hio_t* io = NULL) {
Expand Down
3 changes: 3 additions & 0 deletions evpp/EventLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

namespace hv {

// EventLoop is a loop-bound wrapper around hloop_t.
// When constructed with an external hloop_t, the caller remains responsible for that loop's lifetime.
class EventLoop : public Status {
public:

Expand Down Expand Up @@ -104,6 +106,7 @@ class EventLoop : public Status {

// setTimerInLoop thread-safe
TimerID setTimerInLoop(int timeout_ms, TimerCallback cb, uint32_t repeat = INFINITE, TimerID timerID = INVALID_TIMER_ID) {
if (loop_ == NULL) return INVALID_TIMER_ID;
if (timerID == INVALID_TIMER_ID) {
timerID = generateTimerID();
}
Expand Down
1 change: 1 addition & 0 deletions evpp/EventLoopThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace hv {

// EventLoopThread owns a background thread running one EventLoop.
class EventLoopThread : public Status {
public:
// Return 0 means OK, other failed.
Expand Down
27 changes: 24 additions & 3 deletions evpp/TcpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
namespace hv {

template<class TSocketChannel = SocketChannel>
// TcpClientEventLoopTmpl is a loop-bound wrapper around one outbound connection.
// When bound to an external EventLoopPtr, the caller must ensure the object is stopped and destroyed on the owner loop.
// For long-lived async usage, prefer heap allocation and use stop()/closesocket()/deleteInLoop() as the controlled teardown path.
class TcpClientEventLoopTmpl {
public:
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
Expand All @@ -23,9 +26,11 @@ class TcpClientEventLoopTmpl {
tls_setting = NULL;
reconn_setting = NULL;
unpack_setting = NULL;
reconn_timer_id = INVALID_TIMER_ID;
}

virtual ~TcpClientEventLoopTmpl() {
cancelReconnectTimer();
HV_FREE(tls_setting);
HV_FREE(reconn_setting);
HV_FREE(unpack_setting);
Expand All @@ -36,6 +41,7 @@ class TcpClientEventLoopTmpl {
}

// delete thread-safe
// NOTE: This is intended for heap objects that need to be destroyed on the owner loop.
void deleteInLoop() {
loop_->runInLoop([this](){
delete this;
Expand Down Expand Up @@ -104,6 +110,7 @@ class TcpClientEventLoopTmpl {
}

int startConnect() {
loop_->assertInLoopThread();
if (channel == NULL || channel->isClosed()) {
int connfd = -1;
if (reconn_setting && reconn_setting->cur_retry_cnt > 1) {
Expand Down Expand Up @@ -172,12 +179,15 @@ class TcpClientEventLoopTmpl {
}

int startReconnect() {
loop_->assertInLoopThread();
if (!reconn_setting) return -1;
if (!reconn_setting_can_retry(reconn_setting)) return -2;
uint32_t delay = reconn_setting_calc_delay(reconn_setting);
hlogi("reconnect... cnt=%d, delay=%d", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay);
loop_->setTimeout(delay, [this](TimerID timerID){
(void)(timerID);
reconn_timer_id = loop_->setTimeout(delay, [this](TimerID timerID){
if (reconn_timer_id == timerID) {
reconn_timer_id = INVALID_TIMER_ID;
}
Comment thread
ithewei marked this conversation as resolved.
startConnect();
});
return 0;
Expand Down Expand Up @@ -223,6 +233,7 @@ class TcpClientEventLoopTmpl {

void setReconnect(reconn_setting_t* setting) {
if (setting == NULL) {
cancelReconnectTimer();
HV_FREE(reconn_setting);
return;
}
Expand Down Expand Up @@ -265,7 +276,16 @@ class TcpClientEventLoopTmpl {
std::function<void(const TSocketChannelPtr&, Buffer*)> onWriteComplete;

private:
EventLoopPtr loop_;
void cancelReconnectTimer() {
if (reconn_timer_id != INVALID_TIMER_ID) {
loop_->killTimer(reconn_timer_id);
reconn_timer_id = INVALID_TIMER_ID;
}
}

private:
EventLoopPtr loop_;
TimerID reconn_timer_id;
};

template<class TSocketChannel = SocketChannel>
Expand Down Expand Up @@ -297,6 +317,7 @@ class TcpClientTmpl : private EventLoopThread, public TcpClientEventLoopTmpl<TSo
}

// stop thread-safe
// NOTE: When constructed with an external loop, this only closes the socket and does not stop that loop.
void stop(bool wait_threads_stopped = true) {
TcpClientEventLoopTmpl<TSocketChannel>::closesocket();
if (is_loop_owner) {
Expand Down
28 changes: 14 additions & 14 deletions evpp/TcpClient_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ int main(int argc, char* argv[]) {
remote_host = argv[2];
}

TcpClient cli;
int connfd = cli.createsocket(remote_port, remote_host);
auto cli = std::make_shared<TcpClient>();
int connfd = cli->createsocket(remote_port, remote_host);
if (connfd < 0) {
return -20;
}
printf("client connect to port %d, connfd=%d ...\n", remote_port, connfd);
cli.onConnection = [&cli](const SocketChannelPtr& channel) {
cli->onConnection = [cli](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr();
if (channel->isConnected()) {
printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
Expand All @@ -54,11 +54,11 @@ int main(int argc, char* argv[]) {
} else {
printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
}
if (cli.isReconnect()) {
printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay);
if (cli->isReconnect()) {
printf("reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
}
};
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
cli->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
};

Expand All @@ -69,27 +69,27 @@ int main(int argc, char* argv[]) {
reconn.min_delay = 1000;
reconn.max_delay = 10000;
reconn.delay_policy = 2;
cli.setReconnect(&reconn);
cli->setReconnect(&reconn);
#endif

#if TEST_TLS
cli.withTLS();
cli->withTLS();
#endif

cli.start();
cli->start();

std::string str;
while (std::getline(std::cin, str)) {
if (str == "close") {
cli.closesocket();
cli->closesocket();
} else if (str == "start") {
cli.start();
cli->start();
} else if (str == "stop") {
cli.stop();
cli->stop(true);
break;
} else {
if (!cli.isConnected()) break;
cli.send(str);
if (!cli->isConnected()) break;
cli->send(str);
}
}

Expand Down
15 changes: 12 additions & 3 deletions evpp/TcpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
namespace hv {

template<class TSocketChannel = SocketChannel>
// TcpServerEventLoopTmpl is a loop-bound wrapper around one listening socket and its accepted channels.
// When an external EventLoopPtr is supplied, the caller remains responsible for owner-loop shutdown and destruction ordering.
class TcpServerEventLoopTmpl {
public:
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
Expand Down Expand Up @@ -74,6 +76,7 @@ class TcpServerEventLoopTmpl {
}

int startAccept() {
acceptor_loop->assertInLoopThread();
if (listenfd < 0) {
listenfd = createsocket(port, host.c_str());
if (listenfd < 0) {
Expand Down Expand Up @@ -101,6 +104,7 @@ class TcpServerEventLoopTmpl {
}

int stopAccept() {
acceptor_loop->assertInLoopThread();
if (listenfd < 0) return -1;
hloop_t* loop = acceptor_loop->loop();
if (loop == NULL) return -2;
Expand All @@ -117,6 +121,7 @@ class TcpServerEventLoopTmpl {
acceptor_loop->runInLoop(std::bind(&TcpServerEventLoopTmpl::startAccept, this));
}
// stop thread-safe
// NOTE: When an external loop is supplied, this closes the listener but does not own that loop's lifetime.
void stop(bool wait_threads_stopped = true) {
closesocket();
if (worker_threads.threadNum() > 0) {
Expand Down Expand Up @@ -173,6 +178,7 @@ class TcpServerEventLoopTmpl {
return channels.size();
}

// NOTE: fn is executed while holding mutex_, so it must stay short and must not call server APIs that may lock channels again.
int foreachChannel(std::function<void(const TSocketChannelPtr& channel)> fn) {
std::lock_guard<std::mutex> locker(mutex_);
for (auto& pair : channels) {
Expand All @@ -194,16 +200,19 @@ class TcpServerEventLoopTmpl {

private:
static void newConnEvent(hio_t* connio) {
assert(connio != NULL);
TcpServerEventLoopTmpl* server = (TcpServerEventLoopTmpl*)hevent_userdata(connio);
assert(server != NULL);
EventLoop* worker_loop = currentThreadEventLoop;
assert(worker_loop != NULL);
if (server->connectionNum() >= server->max_connections) {
--worker_loop->connectionNum;
hlogw("over max_connections");
hio_close(connio);
return;
}

// NOTE: attach to worker loop
EventLoop* worker_loop = currentThreadEventLoop;
assert(worker_loop != NULL);
hio_attach(worker_loop->loop(), connio);

const TSocketChannelPtr& channel = server->addChannel(connio);
Expand All @@ -229,7 +238,7 @@ class TcpServerEventLoopTmpl {
server->onConnection(channel);
}
server->removeChannel(channel);
// NOTE: After removeChannel, channel may be destroyed,
// NOTE: After removeChannel, channel may be destroyed immediately,
// so in this lambda function, no code should be added below.
};

Expand Down
4 changes: 4 additions & 0 deletions evpp/UdpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
namespace hv {

template<class TSocketChannel = SocketChannel>
// UdpClientEventLoopTmpl is a loop-bound wrapper around one udp client socket.
// When used with an external EventLoopPtr, the caller must stop receiving and destroy the object on the owner loop.
class UdpClientEventLoopTmpl {
public:
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
Expand Down Expand Up @@ -72,6 +74,7 @@ class UdpClientEventLoopTmpl {
}

int startRecv() {
loop_->assertInLoopThread();
if (channel == NULL || channel->isClosed()) {
int sockfd = createsocket(remote_port, remote_host.c_str());
if (sockfd < 0) {
Expand Down Expand Up @@ -179,6 +182,7 @@ class UdpClientTmpl : private EventLoopThread, public UdpClientEventLoopTmpl<TSo
}

// stop thread-safe
// NOTE: When constructed with an external loop, this closes the socket but does not stop that loop.
void stop(bool wait_threads_stopped = true) {
UdpClientEventLoopTmpl<TSocketChannel>::closesocket();
if (is_loop_owner) {
Expand Down
20 changes: 10 additions & 10 deletions evpp/UdpClient_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,36 @@ int main(int argc, char* argv[]) {
remote_host = argv[2];
}

UdpClient cli;
int sockfd = cli.createsocket(remote_port, remote_host);
auto cli = std::make_shared<UdpClient>();
int sockfd = cli->createsocket(remote_port, remote_host);
if (sockfd < 0) {
return -20;
}
printf("client sendto port %d, sockfd=%d ...\n", remote_port, sockfd);
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
cli->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
};
cli.start();
cli->start();

// sendto(time) every 3s
cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
cli->loop()->setInterval(3000, [cli](TimerID timerID) {
char str[DATETIME_FMT_BUFLEN] = {0};
datetime_t dt = datetime_now();
datetime_fmt(&dt, str);
cli.sendto(str);
cli->sendto(str);
});

std::string str;
while (std::getline(std::cin, str)) {
if (str == "close") {
cli.closesocket();
cli->closesocket();
} else if (str == "start") {
cli.start();
cli->start();
} else if (str == "stop") {
cli.stop();
cli->stop(true);
break;
} else {
cli.sendto(str);
cli->sendto(str);
}
}

Expand Down
4 changes: 4 additions & 0 deletions evpp/UdpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
namespace hv {

template<class TSocketChannel = SocketChannel>
// UdpServerEventLoopTmpl is a loop-bound wrapper around one udp server socket.
// When used with an external EventLoopPtr, stop receiving first and destroy the object on the owner loop.
class UdpServerEventLoopTmpl {
public:
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
Expand Down Expand Up @@ -48,6 +50,7 @@ class UdpServerEventLoopTmpl {
}

int startRecv() {
loop_->assertInLoopThread();
if (channel == NULL || channel->isClosed()) {
int bindfd = createsocket(port, host.c_str());
if (bindfd < 0) {
Expand Down Expand Up @@ -153,6 +156,7 @@ class UdpServerTmpl : private EventLoopThread, public UdpServerEventLoopTmpl<TSo
}

// stop thread-safe
// NOTE: When constructed with an external loop, this closes the socket but does not stop that loop.
void stop(bool wait_threads_stopped = true) {
UdpServerEventLoopTmpl<TSocketChannel>::closesocket();
if (is_loop_owner) {
Expand Down
16 changes: 8 additions & 8 deletions evpp/UdpServer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,30 @@ int main(int argc, char* argv[]) {
}
int port = atoi(argv[1]);

UdpServer srv;
int bindfd = srv.createsocket(port);
auto srv = std::make_shared<UdpServer>();
int bindfd = srv->createsocket(port);
if (bindfd < 0) {
return -20;
}
printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);
srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
srv->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
channel->write(buf);
};
srv.start();
srv->start();

std::string str;
while (std::getline(std::cin, str)) {
if (str == "close") {
srv.closesocket();
srv->closesocket();
} else if (str == "start") {
srv.start();
srv->start();
} else if (str == "stop") {
srv.stop();
srv->stop(true);
break;
} else {
srv.sendto(str);
srv->sendto(str);
}
}

Expand Down
Loading