diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 1351cf39..53bec347 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -29,6 +29,7 @@ set(PUBLIC_HEADERS include/join/socket.hpp include/join/socketstream.hpp include/join/acceptor.hpp + include/join/asyncacceptor.hpp include/join/openssl.hpp include/join/cpu.hpp include/join/clock.hpp diff --git a/core/include/join/acceptor.hpp b/core/include/join/acceptor.hpp index 23ad597e..ba4eb3dc 100644 --- a/core/include/join/acceptor.hpp +++ b/core/include/join/acceptor.hpp @@ -31,39 +31,37 @@ namespace join { /** - * @brief basic stream acceptor class. + * @brief basic acceptor class. */ template - class BasicStreamAcceptor + class BasicAcceptor { public: using Endpoint = typename Protocol::Endpoint; - using Socket = typename Protocol::Socket; - using Stream = typename Protocol::Stream; /** * @brief create the acceptor instance. */ - BasicStreamAcceptor () = default; + BasicAcceptor () = default; /** * @brief copy constructor. * @param other other object to copy. */ - BasicStreamAcceptor (const BasicStreamAcceptor& other) = delete; + BasicAcceptor (const BasicAcceptor& other) = delete; /** * @brief copy assignment operator. * @param other other object to assign. * @return assigned object. */ - BasicStreamAcceptor& operator= (const BasicStreamAcceptor& other) = delete; + BasicAcceptor& operator= (const BasicAcceptor& other) = delete; /** * @brief move constructor. * @param other other object to move. */ - BasicStreamAcceptor (BasicStreamAcceptor&& other) + BasicAcceptor (BasicAcceptor&& other) : _handle (other._handle) , _protocol (other._protocol) { @@ -76,7 +74,7 @@ namespace join * @param other other object to assign. * @return assigned object. */ - BasicStreamAcceptor& operator= (BasicStreamAcceptor&& other) + BasicAcceptor& operator= (BasicAcceptor&& other) { this->close (); @@ -92,9 +90,9 @@ namespace join /** * @brief destroy instance. */ - virtual ~BasicStreamAcceptor () + virtual ~BasicAcceptor () { - this->close (); + close (); } /** @@ -104,15 +102,15 @@ namespace join */ virtual int create (const Endpoint& endpoint) noexcept { - if (this->opened ()) + if (opened ()) { lastError = make_error_code (Errc::InUse); return -1; } - this->_handle = ::socket (endpoint.protocol ().family (), endpoint.protocol ().type () | SOCK_CLOEXEC, - endpoint.protocol ().protocol ()); - if (this->_handle == -1) + _handle = ::socket (endpoint.protocol ().family (), endpoint.protocol ().type () | SOCK_CLOEXEC, + endpoint.protocol ().protocol ()); + if (_handle == -1) { lastError = std::error_code (errno, std::generic_category ()); this->close (); @@ -123,7 +121,7 @@ namespace join { int off = 0; - if (::setsockopt (this->_handle, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof (off)) == -1) + if (::setsockopt (_handle, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof (off)) == -1) { lastError = std::error_code (errno, std::generic_category ()); this->close (); @@ -139,7 +137,7 @@ namespace join { int on = 1; - if (::setsockopt (this->_handle, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) == -1) + if (::setsockopt (_handle, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) == -1) { lastError = std::error_code (errno, std::generic_category ()); this->close (); @@ -147,15 +145,14 @@ namespace join } } - if ((::bind (this->_handle, endpoint.addr (), endpoint.length ()) == -1) || - (::listen (this->_handle, SOMAXCONN) == -1)) + if ((::bind (_handle, endpoint.addr (), endpoint.length ()) == -1) || (::listen (_handle, SOMAXCONN) == -1)) { lastError = std::error_code (errno, std::generic_category ()); this->close (); return -1; } - this->_protocol = endpoint.protocol (); + _protocol = endpoint.protocol (); return 0; } @@ -165,65 +162,25 @@ namespace join */ virtual void close () noexcept { - if (this->_handle != -1) + if (_handle != -1) { - ::close (this->_handle); - this->_handle = -1; + ::close (_handle); + _handle = -1; } - this->_protocol = Protocol (); - } - - /** - * @brief accept new connection and fill in the client object with connection parameters. - * @return the accepted client socket object. - */ - virtual Socket accept () const - { - struct sockaddr_storage sa; - socklen_t sa_len = sizeof (struct sockaddr_storage); - Socket sock; - - sock._handle = ::accept (this->_handle, reinterpret_cast (&sa), &sa_len); - if (sock._handle == -1) - { - lastError = std::error_code (errno, std::generic_category ()); - return sock; - } - - sock._remote = Endpoint (reinterpret_cast (&sa), sa_len); - sock._state = Socket::Connected; - - if (sock.protocol () == IPPROTO_TCP) - { - sock.setOption (Socket::NoDelay, 1); - } - sock.setMode (Socket::NonBlocking); - - return sock; - } - - /** - * @brief accept new connection and fill in the client object with connection parameters. - * @return The client stream object on success, nullptr on failure. - */ - virtual Stream acceptStream () const - { - Stream stream; - stream.socket () = this->accept (); - return stream; + _protocol = Protocol (); } /** * @brief determine the local endpoint associated with this socket. * @return local endpoint. */ - Endpoint localEndpoint () const + Endpoint localEndpoint () const noexcept { struct sockaddr_storage sa; socklen_t sa_len = sizeof (struct sockaddr_storage); - if (::getsockname (this->_handle, reinterpret_cast (&sa), &sa_len) == -1) + if (::getsockname (_handle, reinterpret_cast (&sa), &sa_len) == -1) { lastError = std::error_code (errno, std::generic_category ()); return {}; @@ -238,7 +195,7 @@ namespace join */ bool opened () const noexcept { - return (this->_handle != -1); + return (_handle != -1); } /** @@ -247,7 +204,7 @@ namespace join */ int family () const noexcept { - return this->_protocol.family (); + return _protocol.family (); } /** @@ -256,7 +213,7 @@ namespace join */ int type () const noexcept { - return this->_protocol.type (); + return _protocol.type (); } /** @@ -265,7 +222,7 @@ namespace join */ int protocol () const noexcept { - return this->_protocol.protocol (); + return _protocol.protocol (); } /** @@ -274,7 +231,7 @@ namespace join */ int handle () const noexcept { - return this->_handle; + return _handle; } protected: @@ -285,6 +242,101 @@ namespace join Protocol _protocol; }; + /** + * @brief basic stream acceptor class. + */ + template + class BasicStreamAcceptor : public BasicAcceptor + { + public: + using Endpoint = typename Protocol::Endpoint; + using Socket = typename Protocol::Socket; + using Stream = typename Protocol::Stream; + + /** + * @brief create the acceptor instance. + */ + BasicStreamAcceptor () = default; + + /** + * @brief copy constructor. + * @param other other object to copy. + */ + BasicStreamAcceptor (const BasicStreamAcceptor& other) = delete; + + /** + * @brief copy assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicStreamAcceptor& operator= (const BasicStreamAcceptor& other) = delete; + + /** + * @brief move constructor. + * @param other other object to move. + */ + BasicStreamAcceptor (BasicStreamAcceptor&& other) + : BasicAcceptor (std::move (other)) + { + } + + /** + * @brief move assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicStreamAcceptor& operator= (BasicStreamAcceptor&& other) + { + BasicAcceptor::operator= (std::move (other)); + return *this; + } + + /** + * @brief destroy instance. + */ + virtual ~BasicStreamAcceptor () = default; + + /** + * @brief accept new connection and fill in the client object with connection parameters. + * @return the accepted client socket object. + */ + virtual Socket accept () const noexcept + { + struct sockaddr_storage sa; + socklen_t sa_len = sizeof (struct sockaddr_storage); + Socket sock; + + sock._handle = ::accept4 (this->_handle, reinterpret_cast (&sa), &sa_len, + SOCK_NONBLOCK | SOCK_CLOEXEC); + if (sock._handle == -1) + { + lastError = std::error_code (errno, std::generic_category ()); + return sock; + } + + sock._remote = Endpoint (reinterpret_cast (&sa), sa_len); + sock._state = Socket::Connected; + + if (sock.protocol () == IPPROTO_TCP) + { + sock.setOption (Socket::NoDelay, 1); + } + + return sock; + } + + /** + * @brief accept new connection and fill in the client object with connection parameters. + * @return The client stream object on success, nullptr on failure. + */ + virtual Stream acceptStream () const noexcept + { + Stream stream; + stream.socket () = this->accept (); + return stream; + } + }; + /** * @brief basic TLS acceptor class. */ @@ -405,7 +457,7 @@ namespace join * @brief accept new connection and fill in the client object with connection parameters. * @return the accepted client socket object. */ - virtual Socket accept () const override + Socket accept () const noexcept override { struct sockaddr_storage sa; socklen_t sa_len = sizeof (struct sockaddr_storage); @@ -432,7 +484,7 @@ namespace join * @brief accept new connection and fill in the client object with connection parameters. * @return the accepted client socket object. */ - virtual Socket acceptEncrypted () const + Socket acceptEncrypted () const noexcept { Socket sock = this->accept (); if (!sock.connected ()) @@ -464,7 +516,7 @@ namespace join * @brief accept new connection and fill in the client object with connection parameters. * @return The client stream object on success, nullptr on failure. */ - virtual Stream acceptStreamEncrypted () const + Stream acceptStreamEncrypted () const noexcept { Stream stream; stream.socket () = this->acceptEncrypted (); @@ -477,7 +529,7 @@ namespace join * @param key private key path. * @return 0 on success, -1 on failure. */ - int setCertificate (const std::string& cert, const std::string& key = "") + int setCertificate (const std::string& cert, const std::string& key = "") noexcept { if (SSL_CTX_use_certificate_file (this->_tlsContext.get (), cert.c_str (), SSL_FILETYPE_PEM) == 0) { @@ -510,7 +562,7 @@ namespace join * @param caFile path of the trusted CA certificate file. * @return 0 on success, -1 on failure. */ - int setCaCertificate (const std::string& caFile) + int setCaCertificate (const std::string& caFile) noexcept { join::StackOfX509NamePtr certNames (SSL_load_client_CA_file (caFile.c_str ())); if (certNames == nullptr) @@ -530,7 +582,7 @@ namespace join * @param verify Enable peer verification if set to true, false otherwise. * @param depth The maximum certificate verification depth (default: no limit). */ - void setVerify (bool verify, int depth = -1) + void setVerify (bool verify, int depth = -1) noexcept { if (verify) { @@ -551,7 +603,7 @@ namespace join * @param cipher the cipher list. * @return 0 on success, -1 on failure. */ - int setCipher (const std::string& cipher) + int setCipher (const std::string& cipher) noexcept { if (SSL_CTX_set_cipher_list (this->_tlsContext.get (), cipher.c_str ()) == 0) { @@ -567,7 +619,7 @@ namespace join * @param cipher the cipher list. * @return 0 on success, -1 on failure. */ - int setCipher_1_3 (const std::string& cipher) + int setCipher_1_3 (const std::string& cipher) noexcept { if (SSL_CTX_set_ciphersuites (this->_tlsContext.get (), cipher.c_str ()) == 0) { @@ -584,7 +636,7 @@ namespace join * @param curves curve list. * @return 0 on success, -1 on failure. */ - int setCurve (const std::string& curves) + int setCurve (const std::string& curves) noexcept { if (SSL_CTX_set1_groups_list (this->_tlsContext.get (), curves.c_str ()) == 0) { @@ -602,7 +654,7 @@ namespace join * @note random Diffie-Hellman parameters generated using the command "openssl dhparam -C 2236". * @return Diffie-Hellman parameters. */ - static DH* getDh2236 () + static DH* getDh2236 () noexcept { static unsigned char dhp_2236[] = { 0x0C, 0xA5, 0x51, 0x2B, 0x8F, 0xF7, 0xA8, 0x74, 0x4D, 0x52, 0xD7, 0xED, 0x97, 0x83, 0xA4, 0xD2, 0x8B, diff --git a/core/include/join/asyncacceptor.hpp b/core/include/join/asyncacceptor.hpp new file mode 100644 index 00000000..1908fc22 --- /dev/null +++ b/core/include/join/asyncacceptor.hpp @@ -0,0 +1,247 @@ +/** + * MIT License + * + * Copyright (c) 2026 Mathieu Rabine + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef JOIN_CORE_ASYNCACCEPTOR_HPP +#define JOIN_CORE_ASYNCACCEPTOR_HPP + +// libjoin. +#include +#include + +namespace join +{ + /** + * @brief asynchronous stream acceptor class. + */ + template + class BasicAsyncStreamAcceptor : public BasicAcceptor, public CompletionHandler + { + public: + using Endpoint = typename Protocol::Endpoint; + using AsyncSocket = typename Protocol::AsyncSocket; + using Stream = typename Protocol::Stream; + using AcceptCb = void (*) (AsyncSocket&&, void*); + + /** + * @brief create the asynchronous acceptor instance. + * @param proactor event loop proactor. + */ + explicit BasicAsyncStreamAcceptor (Proactor& proactor = ProactorThread::proactor ()) + : BasicAcceptor () + , _proactor (proactor) + { + } + + /** + * @brief copy constructor. + * @param other other object to copy. + */ + BasicAsyncStreamAcceptor (const BasicAsyncStreamAcceptor& other) = delete; + + /** + * @brief copy assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicAsyncStreamAcceptor& operator= (const BasicAsyncStreamAcceptor& other) = delete; + + /** + * @brief move constructor. + * @param other other object to move. + */ + BasicAsyncStreamAcceptor (BasicAsyncStreamAcceptor&& other) = delete; + + /** + * @brief move assignment operator. + * @param other other object to assign. + * @return assigned object. + */ + BasicAsyncStreamAcceptor& operator= (BasicAsyncStreamAcceptor&& other) = delete; + + /** + * @brief destroy instance. + */ + virtual ~BasicAsyncStreamAcceptor () = default; + + /** + * @brief create acceptor + * @param endpoint endpoint to assign to the acceptor. + * @return 0 on success, -1 on failure. + */ + int create (const Endpoint& endpoint) noexcept override + { + if (BasicAcceptor::create (endpoint) == -1) + { + return -1; + } + + if (::fcntl (this->_handle, F_SETFL, ::fcntl (this->_handle, F_GETFL) | O_NONBLOCK) == -1) + { + // LCOV_EXCL_START + lastError = std::error_code (errno, std::generic_category ()); + this->close (); + return -1; + // LCOV_EXCL_STOP + } + + return 0; + } + + /** + * @brief close acceptor. + */ + void close () noexcept override + { + cancelAccept (); + BasicAcceptor::close (); + } + + /** + * @brief submit an asynchronous accept on the listening socket. + * @param callback completion callback. + * @param ctx opaque user context forwarded verbatim to the callback. + * @return 0 on successful submission, -1 on error. + */ + int accept (AcceptCb callback, void* ctx = nullptr) noexcept + { + if (JOIN_UNLIKELY (!callback)) + { + lastError = make_error_code (Errc::InvalidParam); + return -1; + } + + if (JOIN_UNLIKELY (_op.state != IoOperation::State::Idle)) + { + lastError = make_error_code (Errc::InUse); + return -1; + } + + _peerAddr = {}; + _peerAddrLen = sizeof (_peerAddr); + _callback = callback; + _ctx = ctx; + _op = IoOperation::makeAccept (this->_handle, reinterpret_cast (&_peerAddr), &_peerAddrLen, + SOCK_NONBLOCK | SOCK_CLOEXEC, this); + + if (JOIN_UNLIKELY (_proactor.submit (&_op) == -1)) + { + _callback = nullptr; + _ctx = nullptr; + _op = IoOperation{}; + return -1; + } + + return 0; + } + + /** + * @brief cancel a pending asynchronous accept, if any. + * @return 0 on success or if no operation was pending, -1 on error. + */ + int cancelAccept () noexcept + { + if (_op.state == IoOperation::State::Idle) + { + return 0; + } + + return _proactor.cancel (&_op); + } + + protected: + /** + * @brief method called when an operation completes successfully. + * @param op completed operation. + * @param result number of bytes transferred, or operation-specific value. + */ + void onComplete ([[maybe_unused]] IoOperation* op, int result) override + { + AcceptCb cb = _callback; + void* ctx = _ctx; + _callback = nullptr; + _ctx = nullptr; + + AsyncSocket sock; + + if (JOIN_LIKELY (result >= 0)) + { + sock._handle = result; + sock._remote = Endpoint (reinterpret_cast (&_peerAddr), _peerAddrLen); + sock._state = AsyncSocket::Connected; + + if (sock.protocol () == IPPROTO_TCP) + { + sock.setOption (AsyncSocket::NoDelay, 1); + } + } + else + { + lastError = std::error_code (-result, std::generic_category ()); // LCOV_EXCL_LINE + } + + if (cb) + { + cb (std::move (sock), ctx); + } + } + + /** + * @brief method called when an operation is cancelled. + * @param op cancelled operation. + * @param result -ECANCELED, or other negative errno on failure. + */ + void onCancel ([[maybe_unused]] IoOperation* op, [[maybe_unused]] int result) override + { + AcceptCb cb = _callback; + void* ctx = _ctx; + _callback = nullptr; + _ctx = nullptr; + + if (cb) + { + cb (AsyncSocket{}, ctx); + } + } + + /// proactor. + Proactor& _proactor; + + /// In-flight IoOperation. + IoOperation _op{}; + + /// Storage for the peer address. + sockaddr_storage _peerAddr{}; + + /// Length of the peer address. + socklen_t _peerAddrLen = 0; + + /// Pending completion callback. + AcceptCb _callback = nullptr; + + /// Opaque user context forwarded verbatim to the callback. + void* _ctx = nullptr; + }; +} + +#endif diff --git a/core/include/join/protocol.hpp b/core/include/join/protocol.hpp index ee2d53f3..ff11d177 100644 --- a/core/include/join/protocol.hpp +++ b/core/include/join/protocol.hpp @@ -51,6 +51,8 @@ namespace join template class BasicStreamAcceptor; template + class BasicAsyncStreamAcceptor; + template class BasicTlsAcceptor; template @@ -132,6 +134,8 @@ namespace join using Socket = BasicStreamSocket; using Stream = BasicSocketStream; using Acceptor = BasicStreamAcceptor; + using AsyncSocket = BasicStreamSocket; + using AsyncAcceptor = BasicAsyncStreamAcceptor; /** * @brief construct the unix stream protocol instance by default. @@ -503,6 +507,8 @@ namespace join using Socket = BasicStreamSocket; using Stream = BasicSocketStream; using Acceptor = BasicStreamAcceptor; + using AsyncSocket = BasicStreamSocket; + using AsyncAcceptor = BasicAsyncStreamAcceptor; /** * @brief create the tcp protocol instance. diff --git a/core/include/join/socket.hpp b/core/include/join/socket.hpp index 4d401318..6b740dcb 100644 --- a/core/include/join/socket.hpp +++ b/core/include/join/socket.hpp @@ -1480,6 +1480,9 @@ namespace join /// friendship with basic stream acceptor friend class BasicStreamAcceptor; + + /// friendship with basic asynchronous stream acceptor + friend class BasicAsyncStreamAcceptor; }; /** diff --git a/core/tests/CMakeLists.txt b/core/tests/CMakeLists.txt index 23aacee9..16a04290 100644 --- a/core/tests/CMakeLists.txt +++ b/core/tests/CMakeLists.txt @@ -187,16 +187,21 @@ target_link_libraries(unixstreamsocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME unixstreamsocket.gtest COMMAND unixstreamsocket.gtest) install(TARGETS unixstreamsocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) +add_executable(unixacceptor.gtest unixacceptor_test.cpp) +target_link_libraries(unixacceptor.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME unixacceptor.gtest COMMAND unixacceptor.gtest) +install(TARGETS unixacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + +add_executable(unixasyncacceptor.gtest unixasyncacceptor_test.cpp) +target_link_libraries(unixasyncacceptor.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME unixasyncacceptor.gtest COMMAND unixasyncacceptor.gtest) +install(TARGETS unixasyncacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + add_executable(netlinksocket.gtest netlinksocket_test.cpp) target_link_libraries(netlinksocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME netlinksocket.gtest COMMAND netlinksocket.gtest) install(TARGETS netlinksocket.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) -add_executable(unixstreamacceptor.gtest unixstreamacceptor_test.cpp) -target_link_libraries(unixstreamacceptor.gtest ${JOIN_CORE} GTest::gtest_main) -add_test(NAME unixstreamacceptor.gtest COMMAND unixstreamacceptor.gtest) -install(TARGETS unixstreamacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) - add_executable(rawsocket.gtest rawsocket_test.cpp) target_link_libraries(rawsocket.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME rawsocket.gtest COMMAND rawsocket.gtest) @@ -227,6 +232,11 @@ target_link_libraries(tcpacceptor.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME tcpacceptor.gtest COMMAND tcpacceptor.gtest) install(TARGETS tcpacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) +add_executable(tcpasyncacceptor.gtest tcpasyncacceptor_test.cpp) +target_link_libraries(tcpasyncacceptor.gtest ${JOIN_CORE} GTest::gtest_main) +add_test(NAME tcpasyncacceptor.gtest COMMAND tcpasyncacceptor.gtest) +install(TARGETS tcpasyncacceptor.gtest RUNTIME DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/test) + add_executable(tlserror.gtest tlserror_test.cpp) target_link_libraries(tlserror.gtest ${JOIN_CORE} GTest::gtest_main) add_test(NAME tlserror.gtest COMMAND tlserror.gtest) diff --git a/core/tests/tcpasyncacceptor_test.cpp b/core/tests/tcpasyncacceptor_test.cpp new file mode 100644 index 00000000..58ebb417 --- /dev/null +++ b/core/tests/tcpasyncacceptor_test.cpp @@ -0,0 +1,236 @@ +/** + * MIT License + * + * Copyright (c) 2026 Mathieu Rabine + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +// libjoin. +#include +#include + +// Libraries. +#include + +using join::Errc; +using join::IpAddress; +using join::Tcp; +using join::Thread; +using join::Condition; +using join::Mutex; + +IpAddress address = "::1"; +uint16_t port = 5000; + +/** + * @brief Context for async callbacks. + */ +struct AcceptContext +{ + Tcp::AsyncSocket socket; + bool called = false; + Condition cv; + Mutex mtx; +}; + +/** + * @brief Static callback for acceptance. + */ +static void onAccept (Tcp::AsyncSocket&& sock, void* ctx) noexcept +{ + auto* context = static_cast (ctx); + join::ScopedLock lock (context->mtx); + context->socket = std::move (sock); + context->called = true; + context->cv.signal (); +} + +/** + * @brief Test create method. + */ +TEST (TcpAsyncAcceptor, create) +{ + Tcp::AsyncAcceptor server1, server2; + + ASSERT_EQ (server1.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server1.create ({address, port}), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (server2.create ({address, port}), -1); + ASSERT_EQ (join::lastError, Errc::InUse); +} + +/** + * @brief Test close method. + */ +TEST (TcpAsyncAcceptor, close) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test accept method. + */ +TEST (TcpAsyncAcceptor, accept) +{ + Tcp::AsyncSocket clientSocket (Tcp::AsyncSocket::Blocking); + Tcp::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (clientSocket.connect ({address, port}), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_TRUE (context.socket.connected ()); + ASSERT_EQ (context.socket.localEndpoint ().ip (), address); + ASSERT_EQ (context.socket.localEndpoint ().port (), port); + clientSocket.close (); + context.socket.close (); + server.close (); +} + +/** + * @brief Test cancelAccept method. + */ +TEST (TcpAsyncAcceptor, cancelAccept) +{ + Tcp::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (server.cancelAccept (), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_FALSE (context.socket.connected ()); + server.close (); +} + +/** + * @brief Test localEndpoint method. + */ +TEST (TcpAsyncAcceptor, localEndpoint) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.localEndpoint (), Tcp::Endpoint{}); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.localEndpoint ().ip (), address); + ASSERT_EQ (server.localEndpoint ().port (), port); + server.close (); +} + +/** + * @brief Test opened method. + */ +TEST (TcpAsyncAcceptor, opened) +{ + Tcp::AsyncAcceptor server; + + ASSERT_FALSE (server.opened ()); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test family method. + */ +TEST (TcpAsyncAcceptor, family) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.family (), address.family ()); + server.close (); +} + +/** + * @brief Test type method. + */ +TEST (TcpAsyncAcceptor, type) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.type (), SOCK_STREAM); + server.close (); +} + +/** + * @brief Test protocol method. + */ +TEST (TcpAsyncAcceptor, protocol) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_EQ (server.protocol (), IPPROTO_TCP); + server.close (); +} + +/** + * @brief Test handle method. + */ +TEST (TcpAsyncAcceptor, handle) +{ + Tcp::AsyncAcceptor server; + + ASSERT_EQ (server.handle (), -1); + ASSERT_EQ (server.create ({address, port}), 0) << join::lastError.message (); + ASSERT_GT (server.handle (), -1); + server.close (); + ASSERT_EQ (server.handle (), -1); +} + +int main (int argc, char** argv) +{ + testing::InitGoogleTest (&argc, argv); + return RUN_ALL_TESTS (); +} diff --git a/core/tests/unixstreamacceptor_test.cpp b/core/tests/unixacceptor_test.cpp similarity index 100% rename from core/tests/unixstreamacceptor_test.cpp rename to core/tests/unixacceptor_test.cpp diff --git a/core/tests/unixasyncacceptor_test.cpp b/core/tests/unixasyncacceptor_test.cpp new file mode 100644 index 00000000..cf5187c8 --- /dev/null +++ b/core/tests/unixasyncacceptor_test.cpp @@ -0,0 +1,231 @@ +/** + * MIT License + * + * Copyright (c) 2026 Mathieu Rabine + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +// libjoin. +#include +#include + +// Libraries. +#include + +using join::Errc; +using join::IpAddress; +using join::UnixStream; +using join::Thread; +using join::Condition; +using join::Mutex; + +std::string path = "/tmp/unixasyncacceptor_test.sock"; + +/** + * @brief Context for async callbacks. + */ +struct AcceptContext +{ + UnixStream::AsyncSocket socket; + bool called = false; + Condition cv; + Mutex mtx; +}; + +/** + * @brief Static callback for acceptance. + */ +static void onAccept (UnixStream::AsyncSocket&& sock, void* ctx) noexcept +{ + auto* context = static_cast (ctx); + join::ScopedLock lock (context->mtx); + context->socket = std::move (sock); + context->called = true; + context->cv.signal (); +} + +/** + * @brief Test create method. + */ +TEST (UnixAsyncAcceptor, create) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.create (path), -1); + ASSERT_EQ (join::lastError, Errc::InUse); +} + +/** + * @brief Test close method. + */ +TEST (UnixAsyncAcceptor, close) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test accept method. + */ +TEST (UnixAsyncAcceptor, accept) +{ + UnixStream::AsyncSocket clientSocket (UnixStream::AsyncSocket::Blocking); + UnixStream::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (clientSocket.connect (path), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_TRUE (context.socket.connected ()); + ASSERT_EQ (context.socket.localEndpoint ().device (), path); + clientSocket.close (); + context.socket.close (); + server.close (); +} + +/** + * @brief Test cancelAccept method. + */ +TEST (UnixAsyncAcceptor, cancelAccept) +{ + UnixStream::AsyncAcceptor server; + AcceptContext context; + + ASSERT_EQ (server.accept (nullptr, &context), -1); + ASSERT_EQ (join::lastError, Errc::InvalidParam); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), 0) << join::lastError.message (); + ASSERT_EQ (server.accept (onAccept, &context), -1); + ASSERT_EQ (join::lastError, Errc::InUse); + ASSERT_EQ (server.cancelAccept (), 0) << join::lastError.message (); + { + join::ScopedLock lock (context.mtx); + while (!context.called) + { + context.cv.wait (lock); + } + } + ASSERT_FALSE (context.socket.connected ()); + server.close (); +} + +/** + * @brief Test localEndpoint method. + */ +TEST (UnixAsyncAcceptor, localEndpoint) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.localEndpoint (), UnixStream::Endpoint{}); + ASSERT_EQ (join::lastError, Errc::OperationFailed); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.localEndpoint ().device (), path); + server.close (); +} + +/** + * @brief Test opened method. + */ +TEST (UnixAsyncAcceptor, opened) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_FALSE (server.opened ()); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_TRUE (server.opened ()); + server.close (); + ASSERT_FALSE (server.opened ()); +} + +/** + * @brief Test family method. + */ +TEST (UnixAsyncAcceptor, family) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.family (), AF_UNIX); + server.close (); +} + +/** + * @brief Test type method. + */ +TEST (UnixAsyncAcceptor, type) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.type (), SOCK_STREAM); + server.close (); +} + +/** + * @brief Test protocol method. + */ +TEST (UnixAsyncAcceptor, protocol) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_EQ (server.protocol (), 0); + server.close (); +} + +/** + * @brief Test handle method. + */ +TEST (UnixAsyncAcceptor, handle) +{ + UnixStream::AsyncAcceptor server; + + ASSERT_EQ (server.handle (), -1); + ASSERT_EQ (server.create (path), 0) << join::lastError.message (); + ASSERT_GT (server.handle (), -1); + server.close (); + ASSERT_EQ (server.handle (), -1); +} + +int main (int argc, char** argv) +{ + testing::InitGoogleTest (&argc, argv); + return RUN_ALL_TESTS (); +}