From 8b2ac17c0ca08b9ef37fde2de9f0322cd3db627a Mon Sep 17 00:00:00 2001 From: mrabine Date: Fri, 15 May 2026 17:32:30 +0200 Subject: [PATCH 1/5] Add a proactor-based asynchronous acceptor --- core/CMakeLists.txt | 1 + core/include/join/acceptor.hpp | 214 +++++++++------ core/include/join/asyncacceptor.hpp | 247 +++++++++++++++++ core/include/join/protocol.hpp | 4 + core/include/join/socket.hpp | 3 + core/tests/CMakeLists.txt | 20 +- core/tests/tcpasyncacceptor_test.cpp | 253 ++++++++++++++++++ ...cceptor_test.cpp => unixacceptor_test.cpp} | 0 core/tests/unixasyncacceptor_test.cpp | 248 +++++++++++++++++ 9 files changed, 904 insertions(+), 86 deletions(-) create mode 100644 core/include/join/asyncacceptor.hpp create mode 100644 core/tests/tcpasyncacceptor_test.cpp rename core/tests/{unixstreamacceptor_test.cpp => unixacceptor_test.cpp} (100%) create mode 100644 core/tests/unixasyncacceptor_test.cpp diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 1351cf39..53bec347 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -29,6 +29,7 @@ set(PUBLIC_HEADERS include/join/socket.hpp include/join/socketstream.hpp include/join/acceptor.hpp + include/join/asyncacceptor.hpp include/join/openssl.hpp include/join/cpu.hpp include/join/clock.hpp diff --git a/core/include/join/acceptor.hpp b/core/include/join/acceptor.hpp index 23ad597e..04fa31c8 100644 --- a/core/include/join/acceptor.hpp +++ b/core/include/join/acceptor.hpp @@ -31,39 +31,37 @@ namespace join { /** - * @brief basic stream acceptor class. + * @brief basic acceptor class. */ template - class BasicStreamAcceptor + class BasicAcceptor { public: using Endpoint = typename Protocol::Endpoint; - using Socket = typename Protocol::Socket; - using Stream = typename Protocol::Stream; /** * @brief create the acceptor instance. */ - BasicStreamAcceptor () = default; + BasicAcceptor () = default; /** * @brief copy constructor. * @param other other object to copy. */ - BasicStreamAcceptor (const BasicStreamAcceptor& other) = delete; + BasicAcceptor (const BasicAcceptor& other) = delete; /** * @brief copy assignment operator. * @param other other object to assign. * @return assigned object. */ - BasicStreamAcceptor& operator= (const BasicStreamAcceptor& other) = delete; + BasicAcceptor& operator= (const BasicAcceptor& other) = delete; /** * @brief move constructor. * @param other other object to move. */ - BasicStreamAcceptor (BasicStreamAcceptor&& other) + BasicAcceptor (BasicAcceptor&& other) : _handle (other._handle) , _protocol (other._protocol) { @@ -76,7 +74,7 @@ namespace join * @param other other object to assign. * @return assigned object. */ - BasicStreamAcceptor& operator= (BasicStreamAcceptor&& other) + BasicAcceptor& operator= (BasicAcceptor&& other) { this->close (); @@ -92,9 +90,9 @@ namespace join /** * @brief destroy instance. */ - virtual ~BasicStreamAcceptor () + virtual ~BasicAcceptor () { - this->close (); + close (); } /** @@ -104,15 +102,15 @@ namespace join */ virtual int create (const Endpoint& endpoint) noexcept { - if (this->opened ()) + if (opened ()) { lastError = make_error_code (Errc::InUse); return -1; } - this->_handle = ::socket (endpoint.protocol ().family (), endpoint.protocol ().type () | SOCK_CLOEXEC, - endpoint.protocol ().protocol ()); - if (this->_handle == -1) + _handle = ::socket (endpoint.protocol ().family (), endpoint.protocol ().type () | SOCK_CLOEXEC, + endpoint.protocol ().protocol ()); + if (_handle == -1) { lastError = std::error_code (errno, std::generic_category ()); this->close (); @@ -123,7 +121,7 @@ namespace join { int off = 0; - if (::setsockopt (this->_handle, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof (off)) == -1) + if (::setsockopt (_handle, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof (off)) == -1) { lastError = std::error_code (errno, std::generic_category ()); this->close (); @@ -139,7 +137,7 @@ namespace join { int on = 1; - if (::setsockopt (this->_handle, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) == -1) + if (::setsockopt (_handle, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) == -1) { lastError = std::error_code (errno, std::generic_category ()); this->close (); @@ -147,15 +145,14 @@ namespace join } } - if ((::bind (this->_handle, endpoint.addr (), endpoint.length ()) == -1) || - (::listen (this->_handle, SOMAXCONN) == -1)) + if ((::bind (_handle, endpoint.addr (), endpoint.length ()) == -1) || (::listen (_handle, SOMAXCONN) == -1)) { lastError = std::error_code (errno, std::generic_category ()); this->close (); return -1; } - this->_protocol = endpoint.protocol (); + _protocol = endpoint.protocol (); return 0; } @@ -165,65 +162,25 @@ namespace join */ virtual void close () noexcept { - if (this->_handle != -1) - { - ::close (this->_handle); - this->_handle = -1; - } - - this->_protocol = Protocol (); - } - - /** - * @brief accept new connection and fill in the client object with connection parameters. - * @return the accepted client socket object. - */ - virtual Socket accept () const - { - struct sockaddr_storage sa; - socklen_t sa_len = sizeof (struct sockaddr_storage); - Socket sock; - - sock._handle = ::accept (this->_handle, reinterpret_cast (&sa), &sa_len); - if (sock._handle == -1) - { - lastError = std::error_code (errno, std::generic_category ()); - return sock; - } - - sock._remote = Endpoint (reinterpret_cast (&sa), sa_len); - sock._state = Socket::Connected; - - if (sock.protocol () == IPPROTO_TCP) + if (_handle != -1) { - sock.setOption (Socket::NoDelay, 1); + ::close (_handle); + _handle = -1; } - sock.setMode (Socket::NonBlocking); - return sock; - } - - /** - * @brief accept new connection and fill in the client object with connection parameters. - * @return The client stream object on success, nullptr on failure. - */ - virtual Stream acceptStream () const - { - Stream stream; - stream.socket () = this->accept (); - return stream; + _protocol = Protocol (); } /** * @brief determine the local endpoint associated with this socket. * @return local endpoint. */ - Endpoint localEndpoint () const + Endpoint localEndpoint () const noexcept { struct sockaddr_storage sa; socklen_t sa_len = sizeof (struct sockaddr_storage); - if (::getsockname (this->_handle, reinterpret_cast (&sa), &sa_len) == -1) + if (::getsockname (_handle, reinterpret_cast (&sa), &sa_len) == -1) { lastError = std::error_code (errno, std::generic_category ()); return {}; @@ -238,7 +195,7 @@ namespace join */ bool opened () const noexcept { - return (this->_handle != -1); + return (_handle != -1); } /** @@ -247,7 +204,7 @@ namespace join */ int family () const noexcept { - return this->_protocol.family (); + return _protocol.family (); } /** @@ -256,7 +213,7 @@ namespace join */ int type () const noexcept { - return this->_protocol.type (); + return _protocol.type (); } /** @@ -265,7 +222,7 @@ namespace join */ int protocol () const noexcept { - return this->_protocol.protocol (); + return _protocol.protocol (); } /** @@ -274,7 +231,7 @@ namespace join */ int handle () const noexcept { - return this->_handle; + return _handle; } protected: @@ -285,6 +242,101 @@ namespace join Protocol _protocol; }; + /** + * @brief basic stream acceptor class. + */ + template + class BasicStreamAcceptor : public BasicAcceptor + { + public: + using Endpoint = typename Protocol::Endpoint; + using Socket = typename Protocol::Socket; + using Stream = typename Protocol::Stream; + + /** + * @brief create the acceptor instance. + */ + BasicStreamAcceptor () = default; + + /** + * @brief copy constructor. + * @param other other object to copy. + */ + BasicStreamAcceptor (const BasicStreamAcceptor& other) = delete; + + /** + * @brief copy assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicStreamAcceptor& operator= (const BasicStreamAcceptor& other) = delete; + + /** + * @brief move constructor. + * @param other other object to move. + */ + BasicStreamAcceptor (BasicStreamAcceptor&& other) + : BasicAcceptor (std::move (other)) + { + } + + /** + * @brief move assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicStreamAcceptor& operator= (BasicStreamAcceptor&& other) + { + BasicAcceptor::operator= (std::move (other)); + return *this; + } + + /** + * @brief destroy instance. + */ + virtual ~BasicStreamAcceptor () = default; + + /** + * @brief accept new connection and fill in the client object with connection parameters. + * @return the accepted client socket object. + */ + virtual Socket accept () const noexcept + { + struct sockaddr_storage sa; + socklen_t sa_len = sizeof (struct sockaddr_storage); + Socket sock; + + sock._handle = ::accept (this->_handle, reinterpret_cast (&sa), &sa_len); + if (sock._handle == -1) + { + lastError = std::error_code (errno, std::generic_category ()); + return sock; + } + + sock._remote = Endpoint (reinterpret_cast (&sa), sa_len); + sock._state = Socket::Connected; + + if (sock.protocol () == IPPROTO_TCP) + { + sock.setOption (Socket::NoDelay, 1); + } + sock.setMode (Socket::NonBlocking); + + return sock; + } + + /** + * @brief accept new connection and fill in the client object with connection parameters. + * @return The client stream object on success, nullptr on failure. + */ + virtual Stream acceptStream () const noexcept + { + Stream stream; + stream.socket () = this->accept (); + return stream; + } + }; + /** * @brief basic TLS acceptor class. */ @@ -405,7 +457,7 @@ namespace join * @brief accept new connection and fill in the client object with connection parameters. * @return the accepted client socket object. */ - virtual Socket accept () const override + Socket accept () const noexcept override { struct sockaddr_storage sa; socklen_t sa_len = sizeof (struct sockaddr_storage); @@ -432,7 +484,7 @@ namespace join * @brief accept new connection and fill in the client object with connection parameters. * @return the accepted client socket object. */ - virtual Socket acceptEncrypted () const + Socket acceptEncrypted () const noexcept { Socket sock = this->accept (); if (!sock.connected ()) @@ -464,7 +516,7 @@ namespace join * @brief accept new connection and fill in the client object with connection parameters. * @return The client stream object on success, nullptr on failure. */ - virtual Stream acceptStreamEncrypted () const + Stream acceptStreamEncrypted () const noexcept { Stream stream; stream.socket () = this->acceptEncrypted (); @@ -477,7 +529,7 @@ namespace join * @param key private key path. * @return 0 on success, -1 on failure. */ - int setCertificate (const std::string& cert, const std::string& key = "") + int setCertificate (const std::string& cert, const std::string& key = "") noexcept { if (SSL_CTX_use_certificate_file (this->_tlsContext.get (), cert.c_str (), SSL_FILETYPE_PEM) == 0) { @@ -510,7 +562,7 @@ namespace join * @param caFile path of the trusted CA certificate file. * @return 0 on success, -1 on failure. */ - int setCaCertificate (const std::string& caFile) + int setCaCertificate (const std::string& caFile) noexcept { join::StackOfX509NamePtr certNames (SSL_load_client_CA_file (caFile.c_str ())); if (certNames == nullptr) @@ -530,7 +582,7 @@ namespace join * @param verify Enable peer verification if set to true, false otherwise. * @param depth The maximum certificate verification depth (default: no limit). */ - void setVerify (bool verify, int depth = -1) + void setVerify (bool verify, int depth = -1) noexcept { if (verify) { @@ -551,7 +603,7 @@ namespace join * @param cipher the cipher list. * @return 0 on success, -1 on failure. */ - int setCipher (const std::string& cipher) + int setCipher (const std::string& cipher) noexcept { if (SSL_CTX_set_cipher_list (this->_tlsContext.get (), cipher.c_str ()) == 0) { @@ -567,7 +619,7 @@ namespace join * @param cipher the cipher list. * @return 0 on success, -1 on failure. */ - int setCipher_1_3 (const std::string& cipher) + int setCipher_1_3 (const std::string& cipher) noexcept { if (SSL_CTX_set_ciphersuites (this->_tlsContext.get (), cipher.c_str ()) == 0) { @@ -584,7 +636,7 @@ namespace join * @param curves curve list. * @return 0 on success, -1 on failure. */ - int setCurve (const std::string& curves) + int setCurve (const std::string& curves) noexcept { if (SSL_CTX_set1_groups_list (this->_tlsContext.get (), curves.c_str ()) == 0) { @@ -602,7 +654,7 @@ namespace join * @note random Diffie-Hellman parameters generated using the command "openssl dhparam -C 2236". * @return Diffie-Hellman parameters. */ - static DH* getDh2236 () + static DH* getDh2236 () noexcept { static unsigned char dhp_2236[] = { 0x0C, 0xA5, 0x51, 0x2B, 0x8F, 0xF7, 0xA8, 0x74, 0x4D, 0x52, 0xD7, 0xED, 0x97, 0x83, 0xA4, 0xD2, 0x8B, diff --git a/core/include/join/asyncacceptor.hpp b/core/include/join/asyncacceptor.hpp new file mode 100644 index 00000000..2b725c20 --- /dev/null +++ b/core/include/join/asyncacceptor.hpp @@ -0,0 +1,247 @@ +/** + * MIT License + * + * Copyright (c) 2026 Mathieu Rabine + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef JOIN_CORE_ASYNCACCEPTOR_HPP +#define JOIN_CORE_ASYNCACCEPTOR_HPP + +// libjoin. +#include +#include + +namespace join +{ + /** + * @brief asynchronous stream acceptor class. + */ + template + class BasicAsyncStreamAcceptor : public BasicAcceptor, public CompletionHandler + { + public: + using Endpoint = typename Protocol::Endpoint; + using Socket = typename Protocol::Socket; + using Stream = typename Protocol::Stream; + using AcceptCb = void (*) (Socket&&, void*); + + /** + * @brief create the asynchronous acceptor instance. + * @param proactor event loop proactor. + */ + explicit BasicAsyncStreamAcceptor (Proactor& proactor = ProactorThread::proactor ()) + : BasicAcceptor () + , _proactor (proactor) + { + } + + /** + * @brief copy constructor. + * @param other other object to copy. + */ + BasicAsyncStreamAcceptor (const BasicAsyncStreamAcceptor& other) = delete; + + /** + * @brief copy assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicAsyncStreamAcceptor& operator= (const BasicAsyncStreamAcceptor& other) = delete; + + /** + * @brief move constructor. + * @param other other object to move. + */ + BasicAsyncStreamAcceptor (BasicAsyncStreamAcceptor&& other) = delete; + + /** + * @brief move assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicAsyncStreamAcceptor& operator= (BasicAsyncStreamAcceptor&& other) = delete; + + /** + * @brief destroy instance. + */ + virtual ~BasicAsyncStreamAcceptor () = default; + + /** + * @brief create acceptor + * @param endpoint endpoint to assign to the acceptor. + * @return 0 on success, -1 on failure. + */ + int create (const Endpoint& endpoint) noexcept override + { + if (BasicAcceptor::create (endpoint) == -1) + { + return -1; + } + + if (::fcntl (this->_handle, F_SETFL, ::fcntl (this->_handle, F_GETFL) | O_NONBLOCK) == -1) + { + // LCOV_EXCL_START + lastError = std::error_code (errno, std::generic_category ()); + this->close (); + return -1; + // LCOV_EXCL_STOP + } + + return 0; + } + + /** + * @brief close acceptor. + */ + void close () noexcept override + { + cancelAccept (); + BasicAcceptor::close (); + } + + /** + * @brief submit an asynchronous accept on the listening socket. + * @param callback completion callback. + * @param ctx opaque user context forwarded verbatim to the callback. + * @return 0 on successful submission, -1 on error. + */ + int accept (AcceptCb callback, void* ctx = nullptr) noexcept + { + if (JOIN_UNLIKELY (!callback)) + { + lastError = make_error_code (Errc::InvalidParam); + return -1; + } + + if (JOIN_UNLIKELY (_op.state != IoOperation::State::Idle)) + { + lastError = make_error_code (Errc::InUse); + return -1; + } + + _peerAddr = {}; + _peerAddrLen = sizeof (_peerAddr); + _callback = callback; + _ctx = ctx; + _op = IoOperation::makeAccept (this->_handle, reinterpret_cast (&_peerAddr), &_peerAddrLen, + SOCK_NONBLOCK | SOCK_CLOEXEC, this); + + if (JOIN_UNLIKELY (_proactor.submit (&_op) == -1)) + { + _callback = nullptr; + _ctx = nullptr; + _op = IoOperation{}; + return -1; + } + + return 0; + } + + /** + * @brief cancel a pending asynchronous accept, if any. + * @return 0 on success or if no operation was pending, -1 on error. + */ + int cancelAccept () noexcept + { + if (_op.state == IoOperation::State::Idle) + { + return 0; + } + + return _proactor.cancel (&_op); + } + + protected: + /** + * @brief method called when an operation completes successfully. + * @param op completed operation. + * @param result number of bytes transferred, or operation-specific value. + */ + void onComplete ([[maybe_unused]] IoOperation* op, int result) override + { + AcceptCb cb = _callback; + void* ctx = _ctx; + _callback = nullptr; + _ctx = nullptr; + + Socket sock; + + if (JOIN_LIKELY (result >= 0)) + { + sock._handle = result; + sock._remote = Endpoint (reinterpret_cast (&_peerAddr), _peerAddrLen); + sock._state = Socket::Connected; + + if (sock.protocol () == IPPROTO_TCP) + { + sock.setOption (Socket::NoDelay, 1); + } + } + else + { + lastError = std::error_code (-result, std::generic_category ()); // LCOV_EXCL_LINE + } + + if (cb) + { + cb (std::move (sock), ctx); + } + } + + /** + * @brief method called when an operation is cancelled. + * @param op cancelled operation. + * @param result -ECANCELED, or other negative errno on failure. + */ + void onCancel ([[maybe_unused]] IoOperation* op, [[maybe_unused]] int result) override + { + AcceptCb cb = _callback; + void* ctx = _ctx; + _callback = nullptr; + _ctx = nullptr; + + if (cb) + { + cb (Socket{}, ctx); + } + } + + /// proactor. + Proactor& _proactor; + + /// In-flight IoOperation. + IoOperation _op{}; + + /// Storage for the peer address. + sockaddr_storage _peerAddr{}; + + /// Length of the peer address. + socklen_t _peerAddrLen = 0; + + /// Pending completion callback. + AcceptCb _callback = nullptr; + + /// Opaque user context forwarded verbatim to the callback. + void* _ctx = nullptr; + }; +} + +#endif diff --git a/core/include/join/protocol.hpp b/core/include/join/protocol.hpp index ee2d53f3..59fa8d0f 100644 --- a/core/include/join/protocol.hpp +++ b/core/include/join/protocol.hpp @@ -51,6 +51,8 @@ namespace join template class BasicStreamAcceptor; template + class BasicAsyncStreamAcceptor; + template class BasicTlsAcceptor; template @@ -132,6 +134,7 @@ namespace join using Socket = BasicStreamSocket; using Stream = BasicSocketStream; using Acceptor = BasicStreamAcceptor; + using AsyncAcceptor = BasicAsyncStreamAcceptor; /** * @brief construct the unix stream protocol instance by default. @@ -503,6 +506,7 @@ namespace join using Socket = BasicStreamSocket; using Stream = BasicSocketStream; using Acceptor = BasicStreamAcceptor; + using AsyncAcceptor = BasicAsyncStreamAcceptor; /** * @brief create the tcp protocol instance. diff --git a/core/include/join/socket.hpp b/core/include/join/socket.hpp index 4d401318..6b740dcb 100644 --- a/core/include/join/socket.hpp +++ b/core/include/join/socket.hpp @@ -1480,6 +1480,9 @@ namespace join /// friendship with basic stream acceptor friend class BasicStreamAcceptor; + + /// friendship with basic asynchronous stream acceptor + friend class BasicAsyncStreamAcceptor; }; /** diff --git a/core/tests/CMakeLists.txt b/core/tests/CMakeLists.txt index 23aacee9..da9d1ace 100644 --- a/core/tests/CMakeLists.txt +++ b/core/tests/CMakeLists.txt @@ -182,6 +182,16 @@ target_link_libraries(unixdgramsocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME unixdgramsocket.gtest COMMAND unixdgramsocket.gtest) install(TARGETS unixdgramsocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) +add_executable(unixacceptor.gtest unixacceptor_test.cpp) +target_link_libraries(unixacceptor.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME unixacceptor.gtest COMMAND unixacceptor.gtest) +install(TARGETS unixacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + +add_executable(unixasyncacceptor.gtest unixasyncacceptor_test.cpp) +target_link_libraries(unixasyncacceptor.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME unixasyncacceptor.gtest COMMAND unixasyncacceptor.gtest) +install(TARGETS unixasyncacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + add_executable(unixstreamsocket.gtest unixstreamsocket_test.cpp) target_link_libraries(unixstreamsocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME unixstreamsocket.gtest COMMAND unixstreamsocket.gtest) @@ -192,11 +202,6 @@ target_link_libraries(netlinksocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME netlinksocket.gtest COMMAND netlinksocket.gtest) install(TARGETS netlinksocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) -add_executable(unixstreamacceptor.gtest unixstreamacceptor_test.cpp) -target_link_libraries(unixstreamacceptor.gtest ${JOIN_CORE} GTest::gtest_main) -add_test(NAME unixstreamacceptor.gtest COMMAND unixstreamacceptor.gtest) -install(TARGETS unixstreamacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) - add_executable(rawsocket.gtest rawsocket_test.cpp) target_link_libraries(rawsocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME rawsocket.gtest COMMAND rawsocket.gtest) @@ -227,6 +232,11 @@ target_link_libraries(tcpacceptor.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME tcpacceptor.gtest COMMAND tcpacceptor.gtest) install(TARGETS tcpacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) +add_executable(tcpasyncacceptor.gtest tcpasyncacceptor_test.cpp) +target_link_libraries(tcpasyncacceptor.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME tcpasyncacceptor.gtest COMMAND tcpasyncacceptor.gtest) +install(TARGETS tcpasyncacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + add_executable(tlserror.gtest tlserror_test.cpp) target_link_libraries(tlserror.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME tlserror.gtest COMMAND tlserror.gtest) diff --git a/core/tests/tcpasyncacceptor_test.cpp b/core/tests/tcpasyncacceptor_test.cpp new file mode 100644 index 00000000..055023fa --- /dev/null +++ b/core/tests/tcpasyncacceptor_test.cpp @@ -0,0 +1,253 @@ +/** + * MIT License + * + * Copyright (c) 2026 Mathieu Rabine + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +// libjoin. +#include +#include + +// Libraries. +#include + +using join::Errc; +using join::IpAddress; +using join::Tcp; +using join::Condition; +using join::Mutex; +using join::Thread; +using join::Proactor; + +IpAddress address = "::1"; +uint16_t port = 5000; + +/** + * @brief Context for async callbacks. + */ +struct AcceptContext +{ + Tcp::Socket socket; + bool called = false; + Condition cv; + Mutex mtx; +}; + +/** + * @brief Static callback for acceptance. + */ +static void onAccept (Tcp::Socket&& sock, void* ctx) noexcept +{ + auto* context = static_cast (ctx); + join::ScopedLock lock (context->mtx); + context->socket = std::move (sock); + context->called = true; + context->cv.signal (); +} + +/** + * @brief Test create method. + */ +TEST (TcpAsyncAcceptor, create) +{ + Tcp::AsyncAcceptor server1, server2; + + ASSERT_EQ (server1.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server1.create ({address, port}), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (server2.create ({address, port}), -1); + ASSERT_EQ (join::lastError, Errc::InUse); +} + +/** + * @brief Test close method. + */ +TEST (TcpAsyncAcceptor, close) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test accept method. + */ +TEST (TcpAsyncAcceptor, accept) +{ + Proactor proactor; + Thread th ([&proactor] () { + proactor.run (); + }); + + Tcp::Socket clientSocket (Tcp::Socket::Blocking); + Tcp::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (clientSocket.connect ({address, port}), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_TRUE (context.socket.connected ()); + ASSERT_EQ (context.socket.localEndpoint ().ip (), address); + ASSERT_EQ (context.socket.localEndpoint ().port (), port); + clientSocket.close (); + context.socket.close (); + server.close (); + + proactor.stop (); + th.join (); +} + +/** + * @brief Test cancelAccept method. + */ +TEST (TcpAsyncAcceptor, cancelAccept) +{ + Proactor proactor; + Thread th ([&proactor] () { + proactor.run (); + }); + + Tcp::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (server.cancelAccept (), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_FALSE (context.socket.connected ()); + server.close (); + + proactor.stop (); + th.join (); +} + +/** + * @brief Test localEndpoint method. + */ +TEST (TcpAsyncAcceptor, localEndpoint) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.localEndpoint (), Tcp::Endpoint{}); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.localEndpoint ().ip (), address); + ASSERT_EQ (server.localEndpoint ().port (), port); + server.close (); +} + +/** + * @brief Test opened method. + */ +TEST (TcpAsyncAcceptor, opened) +{ + Tcp::AsyncAcceptor server; + + ASSERT_FALSE (server.opened ()); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test family method. + */ +TEST (TcpAsyncAcceptor, family) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.family (), address.family ()); + server.close (); +} + +/** + * @brief Test type method. + */ +TEST (TcpAsyncAcceptor, type) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.type (), SOCK_STREAM); + server.close (); +} + +/** + * @brief Test protocol method. + */ +TEST (TcpAsyncAcceptor, protocol) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.protocol (), IPPROTO_TCP); + server.close (); +} + +/** + * @brief Test handle method. + */ +TEST (TcpAsyncAcceptor, handle) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.handle (), -1); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_GT (server.handle (), -1); + server.close (); + ASSERT_EQ (server.handle (), -1); +} + +int main (int argc, char** argv) +{ + testing::InitGoogleTest (&argc, argv); + return RUN_ALL_TESTS (); +} diff --git a/core/tests/unixstreamacceptor_test.cpp b/core/tests/unixacceptor_test.cpp similarity index 100% rename from core/tests/unixstreamacceptor_test.cpp rename to core/tests/unixacceptor_test.cpp diff --git a/core/tests/unixasyncacceptor_test.cpp b/core/tests/unixasyncacceptor_test.cpp new file mode 100644 index 00000000..d4d347cd --- /dev/null +++ b/core/tests/unixasyncacceptor_test.cpp @@ -0,0 +1,248 @@ +/** + * MIT License + * + * Copyright (c) 2026 Mathieu Rabine + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +// libjoin. +#include +#include + +// Libraries. +#include + +using join::Errc; +using join::IpAddress; +using join::UnixStream; +using join::Condition; +using join::Mutex; +using join::Thread; +using join::Proactor; + +std::string path = "/tmp/unixasyncacceptor_test.sock"; + +/** + * @brief Context for async callbacks. + */ +struct AcceptContext +{ + UnixStream::Socket socket; + bool called = false; + Condition cv; + Mutex mtx; +}; + +/** + * @brief Static callback for acceptance. + */ +static void onAccept (UnixStream::Socket&& sock, void* ctx) noexcept +{ + auto* context = static_cast (ctx); + join::ScopedLock lock (context->mtx); + context->socket = std::move (sock); + context->called = true; + context->cv.signal (); +} + +/** + * @brief Test create method. + */ +TEST (UnixAsyncAcceptor, create) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.create (path), -1); + ASSERT_EQ (join::lastError, Errc::InUse); +} + +/** + * @brief Test close method. + */ +TEST (UnixAsyncAcceptor, close) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test accept method. + */ +TEST (UnixAsyncAcceptor, accept) +{ + Proactor proactor; + Thread th ([&proactor] () { + proactor.run (); + }); + + UnixStream::Socket clientSocket (UnixStream::Socket::Blocking); + UnixStream::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (clientSocket.connect (path), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_TRUE (context.socket.connected ()); + ASSERT_EQ (context.socket.localEndpoint ().device (), path); + clientSocket.close (); + context.socket.close (); + server.close (); + + proactor.stop (); + th.join (); +} + +/** + * @brief Test cancelAccept method. + */ +TEST (UnixAsyncAcceptor, cancelAccept) +{ + Proactor proactor; + Thread th ([&proactor] () { + proactor.run (); + }); + + UnixStream::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (server.cancelAccept (), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_FALSE (context.socket.connected ()); + server.close (); + + proactor.stop (); + th.join (); +} + +/** + * @brief Test localEndpoint method. + */ +TEST (UnixAsyncAcceptor, localEndpoint) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.localEndpoint (), UnixStream::Endpoint{}); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.localEndpoint ().device (), path); + server.close (); +} + +/** + * @brief Test opened method. + */ +TEST (UnixAsyncAcceptor, opened) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_FALSE (server.opened ()); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test family method. + */ +TEST (UnixAsyncAcceptor, family) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.family (), AF_UNIX); + server.close (); +} + +/** + * @brief Test type method. + */ +TEST (UnixAsyncAcceptor, type) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.type (), SOCK_STREAM); + server.close (); +} + +/** + * @brief Test protocol method. + */ +TEST (UnixAsyncAcceptor, protocol) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.protocol (), 0); + server.close (); +} + +/** + * @brief Test handle method. + */ +TEST (UnixAsyncAcceptor, handle) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.handle (), -1); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_GT (server.handle (), -1); + server.close (); + ASSERT_EQ (server.handle (), -1); +} + +int main (int argc, char** argv) +{ + testing::InitGoogleTest (&argc, argv); + return RUN_ALL_TESTS (); +} From 76723f7cd6f09f6d49f68f2a7579b9dcc4dc6ee4 Mon Sep 17 00:00:00 2001 From: mrabine Date: Fri, 15 May 2026 17:36:13 +0200 Subject: [PATCH 2/5] cleanup --- core/tests/CMakeLists.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/tests/CMakeLists.txt b/core/tests/CMakeLists.txt index da9d1ace..16a04290 100644 --- a/core/tests/CMakeLists.txt +++ b/core/tests/CMakeLists.txt @@ -182,6 +182,11 @@ target_link_libraries(unixdgramsocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME unixdgramsocket.gtest COMMAND unixdgramsocket.gtest) install(TARGETS unixdgramsocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) +add_executable(unixstreamsocket.gtest unixstreamsocket_test.cpp) +target_link_libraries(unixstreamsocket.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME unixstreamsocket.gtest COMMAND unixstreamsocket.gtest) +install(TARGETS unixstreamsocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + add_executable(unixacceptor.gtest unixacceptor_test.cpp) target_link_libraries(unixacceptor.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME unixacceptor.gtest COMMAND unixacceptor.gtest) @@ -192,11 +197,6 @@ target_link_libraries(unixasyncacceptor.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME unixasyncacceptor.gtest COMMAND unixasyncacceptor.gtest) install(TARGETS unixasyncacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) -add_executable(unixstreamsocket.gtest unixstreamsocket_test.cpp) -target_link_libraries(unixstreamsocket.gtest ${JOIN_CORE} GTest::gtest_main) -add_test(NAME unixstreamsocket.gtest COMMAND unixstreamsocket.gtest) -install(TARGETS unixstreamsocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) - add_executable(netlinksocket.gtest netlinksocket_test.cpp) target_link_libraries(netlinksocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME netlinksocket.gtest COMMAND netlinksocket.gtest) From 6795b613bacbdb1b8f48d58a0e28ea2713a42d63 Mon Sep 17 00:00:00 2001 From: mrabine Date: Fri, 15 May 2026 17:46:31 +0200 Subject: [PATCH 3/5] prepare for asynchronous soket --- core/include/join/asyncacceptor.hpp | 12 ++++++------ core/include/join/protocol.hpp | 2 ++ core/tests/tcpasyncacceptor_test.cpp | 6 +++--- core/tests/unixasyncacceptor_test.cpp | 6 +++--- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/include/join/asyncacceptor.hpp b/core/include/join/asyncacceptor.hpp index 2b725c20..1908fc22 100644 --- a/core/include/join/asyncacceptor.hpp +++ b/core/include/join/asyncacceptor.hpp @@ -39,9 +39,9 @@ namespace join { public: using Endpoint = typename Protocol::Endpoint; - using Socket = typename Protocol::Socket; + using AsyncSocket = typename Protocol::AsyncSocket; using Stream = typename Protocol::Stream; - using AcceptCb = void (*) (Socket&&, void*); + using AcceptCb = void (*) (AsyncSocket&&, void*); /** * @brief create the asynchronous acceptor instance. @@ -182,17 +182,17 @@ namespace join _callback = nullptr; _ctx = nullptr; - Socket sock; + AsyncSocket sock; if (JOIN_LIKELY (result >= 0)) { sock._handle = result; sock._remote = Endpoint (reinterpret_cast (&_peerAddr), _peerAddrLen); - sock._state = Socket::Connected; + sock._state = AsyncSocket::Connected; if (sock.protocol () == IPPROTO_TCP) { - sock.setOption (Socket::NoDelay, 1); + sock.setOption (AsyncSocket::NoDelay, 1); } } else @@ -220,7 +220,7 @@ namespace join if (cb) { - cb (Socket{}, ctx); + cb (AsyncSocket{}, ctx); } } diff --git a/core/include/join/protocol.hpp b/core/include/join/protocol.hpp index 59fa8d0f..ff11d177 100644 --- a/core/include/join/protocol.hpp +++ b/core/include/join/protocol.hpp @@ -134,6 +134,7 @@ namespace join using Socket = BasicStreamSocket; using Stream = BasicSocketStream; using Acceptor = BasicStreamAcceptor; + using AsyncSocket = BasicStreamSocket; using AsyncAcceptor = BasicAsyncStreamAcceptor; /** @@ -506,6 +507,7 @@ namespace join using Socket = BasicStreamSocket; using Stream = BasicSocketStream; using Acceptor = BasicStreamAcceptor; + using AsyncSocket = BasicStreamSocket; using AsyncAcceptor = BasicAsyncStreamAcceptor; /** diff --git a/core/tests/tcpasyncacceptor_test.cpp b/core/tests/tcpasyncacceptor_test.cpp index 055023fa..79ce781a 100644 --- a/core/tests/tcpasyncacceptor_test.cpp +++ b/core/tests/tcpasyncacceptor_test.cpp @@ -45,7 +45,7 @@ uint16_t port = 5000; */ struct AcceptContext { - Tcp::Socket socket; + Tcp::AsyncSocket socket; bool called = false; Condition cv; Mutex mtx; @@ -54,7 +54,7 @@ struct AcceptContext /** * @brief Static callback for acceptance. */ -static void onAccept (Tcp::Socket&& sock, void* ctx) noexcept +static void onAccept (Tcp::AsyncSocket&& sock, void* ctx) noexcept { auto* context = static_cast (ctx); join::ScopedLock lock (context->mtx); @@ -100,7 +100,7 @@ TEST (TcpAsyncAcceptor, accept) proactor.run (); }); - Tcp::Socket clientSocket (Tcp::Socket::Blocking); + Tcp::AsyncSocket clientSocket (Tcp::AsyncSocket::Blocking); Tcp::AsyncAcceptor server; AcceptContext context; diff --git a/core/tests/unixasyncacceptor_test.cpp b/core/tests/unixasyncacceptor_test.cpp index d4d347cd..69eaf1f6 100644 --- a/core/tests/unixasyncacceptor_test.cpp +++ b/core/tests/unixasyncacceptor_test.cpp @@ -44,7 +44,7 @@ std::string path = "/tmp/unixasyncacceptor_test.sock"; */ struct AcceptContext { - UnixStream::Socket socket; + UnixStream::AsyncSocket socket; bool called = false; Condition cv; Mutex mtx; @@ -53,7 +53,7 @@ struct AcceptContext /** * @brief Static callback for acceptance. */ -static void onAccept (UnixStream::Socket&& sock, void* ctx) noexcept +static void onAccept (UnixStream::AsyncSocket&& sock, void* ctx) noexcept { auto* context = static_cast (ctx); join::ScopedLock lock (context->mtx); @@ -97,7 +97,7 @@ TEST (UnixAsyncAcceptor, accept) proactor.run (); }); - UnixStream::Socket clientSocket (UnixStream::Socket::Blocking); + UnixStream::AsyncSocket clientSocket (UnixStream::AsyncSocket::Blocking); UnixStream::AsyncAcceptor server; AcceptContext context; From 90d744da4d21c94130174d0d3cd1b3ee984d7b7d Mon Sep 17 00:00:00 2001 From: mrabine Date: Fri, 15 May 2026 17:53:55 +0200 Subject: [PATCH 4/5] remove useless code --- core/tests/tcpasyncacceptor_test.cpp | 19 +------------------ core/tests/unixasyncacceptor_test.cpp | 19 +------------------ 2 files changed, 2 insertions(+), 36 deletions(-) diff --git a/core/tests/tcpasyncacceptor_test.cpp b/core/tests/tcpasyncacceptor_test.cpp index 79ce781a..58ebb417 100644 --- a/core/tests/tcpasyncacceptor_test.cpp +++ b/core/tests/tcpasyncacceptor_test.cpp @@ -32,10 +32,9 @@ using join::Errc; using join::IpAddress; using join::Tcp; +using join::Thread; using join::Condition; using join::Mutex; -using join::Thread; -using join::Proactor; IpAddress address = "::1"; uint16_t port = 5000; @@ -95,11 +94,6 @@ TEST (TcpAsyncAcceptor, close) */ TEST (TcpAsyncAcceptor, accept) { - Proactor proactor; - Thread th ([&proactor] () { - proactor.run (); - }); - Tcp::AsyncSocket clientSocket (Tcp::AsyncSocket::Blocking); Tcp::AsyncAcceptor server; AcceptContext context; @@ -126,9 +120,6 @@ TEST (TcpAsyncAcceptor, accept) clientSocket.close (); context.socket.close (); server.close (); - - proactor.stop (); - th.join (); } /** @@ -136,11 +127,6 @@ TEST (TcpAsyncAcceptor, accept) */ TEST (TcpAsyncAcceptor, cancelAccept) { - Proactor proactor; - Thread th ([&proactor] () { - proactor.run (); - }); - Tcp::AsyncAcceptor server; AcceptContext context; @@ -162,9 +148,6 @@ TEST (TcpAsyncAcceptor, cancelAccept) } ASSERT_FALSE (context.socket.connected ()); server.close (); - - proactor.stop (); - th.join (); } /** diff --git a/core/tests/unixasyncacceptor_test.cpp b/core/tests/unixasyncacceptor_test.cpp index 69eaf1f6..cf5187c8 100644 --- a/core/tests/unixasyncacceptor_test.cpp +++ b/core/tests/unixasyncacceptor_test.cpp @@ -32,10 +32,9 @@ using join::Errc; using join::IpAddress; using join::UnixStream; +using join::Thread; using join::Condition; using join::Mutex; -using join::Thread; -using join::Proactor; std::string path = "/tmp/unixasyncacceptor_test.sock"; @@ -92,11 +91,6 @@ TEST (UnixAsyncAcceptor, close) */ TEST (UnixAsyncAcceptor, accept) { - Proactor proactor; - Thread th ([&proactor] () { - proactor.run (); - }); - UnixStream::AsyncSocket clientSocket (UnixStream::AsyncSocket::Blocking); UnixStream::AsyncAcceptor server; AcceptContext context; @@ -122,9 +116,6 @@ TEST (UnixAsyncAcceptor, accept) clientSocket.close (); context.socket.close (); server.close (); - - proactor.stop (); - th.join (); } /** @@ -132,11 +123,6 @@ TEST (UnixAsyncAcceptor, accept) */ TEST (UnixAsyncAcceptor, cancelAccept) { - Proactor proactor; - Thread th ([&proactor] () { - proactor.run (); - }); - UnixStream::AsyncAcceptor server; AcceptContext context; @@ -158,9 +144,6 @@ TEST (UnixAsyncAcceptor, cancelAccept) } ASSERT_FALSE (context.socket.connected ()); server.close (); - - proactor.stop (); - th.join (); } /** From 0a450692e456e29eac49129bf01f60b7d5e084eb Mon Sep 17 00:00:00 2001 From: mrabine Date: Fri, 15 May 2026 19:28:51 +0200 Subject: [PATCH 5/5] use accept4 --- core/include/join/acceptor.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/include/join/acceptor.hpp b/core/include/join/acceptor.hpp index 04fa31c8..ba4eb3dc 100644 --- a/core/include/join/acceptor.hpp +++ b/core/include/join/acceptor.hpp @@ -306,7 +306,8 @@ namespace join socklen_t sa_len = sizeof (struct sockaddr_storage); Socket sock; - sock._handle = ::accept (this->_handle, reinterpret_cast (&sa), &sa_len); + sock._handle = ::accept4 (this->_handle, reinterpret_cast (&sa), &sa_len, + SOCK_NONBLOCK | SOCK_CLOEXEC); if (sock._handle == -1) { lastError = std::error_code (errno, std::generic_category ()); @@ -320,7 +321,6 @@ namespace join { sock.setOption (Socket::NoDelay, 1); } - sock.setMode (Socket::NonBlocking); return sock; }