diff --git a/evpp/Channel.h b/evpp/Channel.h index 17eb58b43..55e078bfb 100644 --- a/evpp/Channel.h +++ b/evpp/Channel.h @@ -13,6 +13,8 @@ namespace hv { +// Channel is a loop-bound wrapper around hio_t. +// The Channel address is stored in hio_context(io), so the object lifetime must cover all pending hio callbacks. class Channel { public: Channel(hio_t* io = NULL) { diff --git a/evpp/EventLoop.h b/evpp/EventLoop.h index e21ee1d33..5e7da9552 100644 --- a/evpp/EventLoop.h +++ b/evpp/EventLoop.h @@ -15,6 +15,8 @@ namespace hv { +// EventLoop is a loop-bound wrapper around hloop_t. +// When constructed with an external hloop_t, the caller remains responsible for that loop's lifetime. class EventLoop : public Status { public: @@ -104,6 +106,7 @@ class EventLoop : public Status { // setTimerInLoop thread-safe TimerID setTimerInLoop(int timeout_ms, TimerCallback cb, uint32_t repeat = INFINITE, TimerID timerID = INVALID_TIMER_ID) { + if (loop_ == NULL) return INVALID_TIMER_ID; if (timerID == INVALID_TIMER_ID) { timerID = generateTimerID(); } diff --git a/evpp/EventLoopThread.h b/evpp/EventLoopThread.h index 51b87b9cc..c876accaa 100644 --- a/evpp/EventLoopThread.h +++ b/evpp/EventLoopThread.h @@ -9,6 +9,7 @@ namespace hv { +// EventLoopThread owns a background thread running one EventLoop. class EventLoopThread : public Status { public: // Return 0 means OK, other failed. diff --git a/evpp/TcpClient.h b/evpp/TcpClient.h index aa3886526..fb6391211 100644 --- a/evpp/TcpClient.h +++ b/evpp/TcpClient.h @@ -11,6 +11,9 @@ namespace hv { template +// TcpClientEventLoopTmpl is a loop-bound wrapper around one outbound connection. +// When bound to an external EventLoopPtr, the caller must ensure the object is stopped and destroyed on the owner loop. +// For long-lived async usage, prefer heap allocation and use stop()/closesocket()/deleteInLoop() as the controlled teardown path. class TcpClientEventLoopTmpl { public: typedef std::shared_ptr TSocketChannelPtr; @@ -23,9 +26,11 @@ class TcpClientEventLoopTmpl { tls_setting = NULL; reconn_setting = NULL; unpack_setting = NULL; + reconn_timer_id = INVALID_TIMER_ID; } virtual ~TcpClientEventLoopTmpl() { + cancelReconnectTimer(); HV_FREE(tls_setting); HV_FREE(reconn_setting); HV_FREE(unpack_setting); @@ -36,6 +41,7 @@ class TcpClientEventLoopTmpl { } // delete thread-safe + // NOTE: This is intended for heap objects that need to be destroyed on the owner loop. void deleteInLoop() { loop_->runInLoop([this](){ delete this; @@ -104,6 +110,7 @@ class TcpClientEventLoopTmpl { } int startConnect() { + loop_->assertInLoopThread(); if (channel == NULL || channel->isClosed()) { int connfd = -1; if (reconn_setting && reconn_setting->cur_retry_cnt > 1) { @@ -172,12 +179,15 @@ class TcpClientEventLoopTmpl { } int startReconnect() { + loop_->assertInLoopThread(); if (!reconn_setting) return -1; if (!reconn_setting_can_retry(reconn_setting)) return -2; uint32_t delay = reconn_setting_calc_delay(reconn_setting); hlogi("reconnect... cnt=%d, delay=%d", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay); - loop_->setTimeout(delay, [this](TimerID timerID){ - (void)(timerID); + reconn_timer_id = loop_->setTimeout(delay, [this](TimerID timerID){ + if (reconn_timer_id == timerID) { + reconn_timer_id = INVALID_TIMER_ID; + } startConnect(); }); return 0; @@ -223,6 +233,7 @@ class TcpClientEventLoopTmpl { void setReconnect(reconn_setting_t* setting) { if (setting == NULL) { + cancelReconnectTimer(); HV_FREE(reconn_setting); return; } @@ -265,7 +276,16 @@ class TcpClientEventLoopTmpl { std::function onWriteComplete; private: - EventLoopPtr loop_; + void cancelReconnectTimer() { + if (reconn_timer_id != INVALID_TIMER_ID) { + loop_->killTimer(reconn_timer_id); + reconn_timer_id = INVALID_TIMER_ID; + } + } + +private: + EventLoopPtr loop_; + TimerID reconn_timer_id; }; template @@ -297,6 +317,7 @@ class TcpClientTmpl : private EventLoopThread, public TcpClientEventLoopTmpl::closesocket(); if (is_loop_owner) { diff --git a/evpp/TcpClient_test.cpp b/evpp/TcpClient_test.cpp index 2ac096e13..c9ec30642 100644 --- a/evpp/TcpClient_test.cpp +++ b/evpp/TcpClient_test.cpp @@ -28,13 +28,13 @@ int main(int argc, char* argv[]) { remote_host = argv[2]; } - TcpClient cli; - int connfd = cli.createsocket(remote_port, remote_host); + auto cli = std::make_shared(); + int connfd = cli->createsocket(remote_port, remote_host); if (connfd < 0) { return -20; } printf("client connect to port %d, connfd=%d ...\n", remote_port, connfd); - cli.onConnection = [&cli](const SocketChannelPtr& channel) { + cli->onConnection = [cli](const SocketChannelPtr& channel) { std::string peeraddr = channel->peeraddr(); if (channel->isConnected()) { printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd()); @@ -54,11 +54,11 @@ int main(int argc, char* argv[]) { } else { printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd()); } - if (cli.isReconnect()) { - printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay); + if (cli->isReconnect()) { + printf("reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay); } }; - cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { + cli->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); }; @@ -69,27 +69,27 @@ int main(int argc, char* argv[]) { reconn.min_delay = 1000; reconn.max_delay = 10000; reconn.delay_policy = 2; - cli.setReconnect(&reconn); + cli->setReconnect(&reconn); #endif #if TEST_TLS - cli.withTLS(); + cli->withTLS(); #endif - cli.start(); + cli->start(); std::string str; while (std::getline(std::cin, str)) { if (str == "close") { - cli.closesocket(); + cli->closesocket(); } else if (str == "start") { - cli.start(); + cli->start(); } else if (str == "stop") { - cli.stop(); + cli->stop(true); break; } else { - if (!cli.isConnected()) break; - cli.send(str); + if (!cli->isConnected()) break; + cli->send(str); } } diff --git a/evpp/TcpServer.h b/evpp/TcpServer.h index df6a932c6..251660341 100644 --- a/evpp/TcpServer.h +++ b/evpp/TcpServer.h @@ -11,6 +11,8 @@ namespace hv { template +// TcpServerEventLoopTmpl is a loop-bound wrapper around one listening socket and its accepted channels. +// When an external EventLoopPtr is supplied, the caller remains responsible for owner-loop shutdown and destruction ordering. class TcpServerEventLoopTmpl { public: typedef std::shared_ptr TSocketChannelPtr; @@ -74,6 +76,7 @@ class TcpServerEventLoopTmpl { } int startAccept() { + acceptor_loop->assertInLoopThread(); if (listenfd < 0) { listenfd = createsocket(port, host.c_str()); if (listenfd < 0) { @@ -101,6 +104,7 @@ class TcpServerEventLoopTmpl { } int stopAccept() { + acceptor_loop->assertInLoopThread(); if (listenfd < 0) return -1; hloop_t* loop = acceptor_loop->loop(); if (loop == NULL) return -2; @@ -117,6 +121,7 @@ class TcpServerEventLoopTmpl { acceptor_loop->runInLoop(std::bind(&TcpServerEventLoopTmpl::startAccept, this)); } // stop thread-safe + // NOTE: When an external loop is supplied, this closes the listener but does not own that loop's lifetime. void stop(bool wait_threads_stopped = true) { closesocket(); if (worker_threads.threadNum() > 0) { @@ -173,6 +178,7 @@ class TcpServerEventLoopTmpl { return channels.size(); } + // NOTE: fn is executed while holding mutex_, so it must stay short and must not call server APIs that may lock channels again. int foreachChannel(std::function fn) { std::lock_guard locker(mutex_); for (auto& pair : channels) { @@ -194,16 +200,19 @@ class TcpServerEventLoopTmpl { private: static void newConnEvent(hio_t* connio) { + assert(connio != NULL); TcpServerEventLoopTmpl* server = (TcpServerEventLoopTmpl*)hevent_userdata(connio); + assert(server != NULL); + EventLoop* worker_loop = currentThreadEventLoop; + assert(worker_loop != NULL); if (server->connectionNum() >= server->max_connections) { + --worker_loop->connectionNum; hlogw("over max_connections"); hio_close(connio); return; } // NOTE: attach to worker loop - EventLoop* worker_loop = currentThreadEventLoop; - assert(worker_loop != NULL); hio_attach(worker_loop->loop(), connio); const TSocketChannelPtr& channel = server->addChannel(connio); @@ -229,7 +238,7 @@ class TcpServerEventLoopTmpl { server->onConnection(channel); } server->removeChannel(channel); - // NOTE: After removeChannel, channel may be destroyed, + // NOTE: After removeChannel, channel may be destroyed immediately, // so in this lambda function, no code should be added below. }; diff --git a/evpp/UdpClient.h b/evpp/UdpClient.h index e7bbb6e1c..c85c879cf 100644 --- a/evpp/UdpClient.h +++ b/evpp/UdpClient.h @@ -9,6 +9,8 @@ namespace hv { template +// UdpClientEventLoopTmpl is a loop-bound wrapper around one udp client socket. +// When used with an external EventLoopPtr, the caller must stop receiving and destroy the object on the owner loop. class UdpClientEventLoopTmpl { public: typedef std::shared_ptr TSocketChannelPtr; @@ -72,6 +74,7 @@ class UdpClientEventLoopTmpl { } int startRecv() { + loop_->assertInLoopThread(); if (channel == NULL || channel->isClosed()) { int sockfd = createsocket(remote_port, remote_host.c_str()); if (sockfd < 0) { @@ -179,6 +182,7 @@ class UdpClientTmpl : private EventLoopThread, public UdpClientEventLoopTmpl::closesocket(); if (is_loop_owner) { diff --git a/evpp/UdpClient_test.cpp b/evpp/UdpClient_test.cpp index 4ab91d711..a1775a128 100644 --- a/evpp/UdpClient_test.cpp +++ b/evpp/UdpClient_test.cpp @@ -25,36 +25,36 @@ int main(int argc, char* argv[]) { remote_host = argv[2]; } - UdpClient cli; - int sockfd = cli.createsocket(remote_port, remote_host); + auto cli = std::make_shared(); + int sockfd = cli->createsocket(remote_port, remote_host); if (sockfd < 0) { return -20; } printf("client sendto port %d, sockfd=%d ...\n", remote_port, sockfd); - cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { + cli->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); }; - cli.start(); + cli->start(); // sendto(time) every 3s - cli.loop()->setInterval(3000, [&cli](TimerID timerID) { + cli->loop()->setInterval(3000, [cli](TimerID timerID) { char str[DATETIME_FMT_BUFLEN] = {0}; datetime_t dt = datetime_now(); datetime_fmt(&dt, str); - cli.sendto(str); + cli->sendto(str); }); std::string str; while (std::getline(std::cin, str)) { if (str == "close") { - cli.closesocket(); + cli->closesocket(); } else if (str == "start") { - cli.start(); + cli->start(); } else if (str == "stop") { - cli.stop(); + cli->stop(true); break; } else { - cli.sendto(str); + cli->sendto(str); } } diff --git a/evpp/UdpServer.h b/evpp/UdpServer.h index 798c8200d..f23d4a07f 100644 --- a/evpp/UdpServer.h +++ b/evpp/UdpServer.h @@ -9,6 +9,8 @@ namespace hv { template +// UdpServerEventLoopTmpl is a loop-bound wrapper around one udp server socket. +// When used with an external EventLoopPtr, stop receiving first and destroy the object on the owner loop. class UdpServerEventLoopTmpl { public: typedef std::shared_ptr TSocketChannelPtr; @@ -48,6 +50,7 @@ class UdpServerEventLoopTmpl { } int startRecv() { + loop_->assertInLoopThread(); if (channel == NULL || channel->isClosed()) { int bindfd = createsocket(port, host.c_str()); if (bindfd < 0) { @@ -153,6 +156,7 @@ class UdpServerTmpl : private EventLoopThread, public UdpServerEventLoopTmpl::closesocket(); if (is_loop_owner) { diff --git a/evpp/UdpServer_test.cpp b/evpp/UdpServer_test.cpp index 7e4fe4677..b7b079a15 100644 --- a/evpp/UdpServer_test.cpp +++ b/evpp/UdpServer_test.cpp @@ -20,30 +20,30 @@ int main(int argc, char* argv[]) { } int port = atoi(argv[1]); - UdpServer srv; - int bindfd = srv.createsocket(port); + auto srv = std::make_shared(); + int bindfd = srv->createsocket(port); if (bindfd < 0) { return -20; } printf("server bind on port %d, bindfd=%d ...\n", port, bindfd); - srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { + srv->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { // echo printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); channel->write(buf); }; - srv.start(); + srv->start(); std::string str; while (std::getline(std::cin, str)) { if (str == "close") { - srv.closesocket(); + srv->closesocket(); } else if (str == "start") { - srv.start(); + srv->start(); } else if (str == "stop") { - srv.stop(); + srv->stop(true); break; } else { - srv.sendto(str); + srv->sendto(str); } }