From 23c8cd45486492f860bd0a21c6e62141751b70c5 Mon Sep 17 00:00:00 2001 From: Georgiy Lebedev Date: Tue, 9 Sep 2025 16:21:45 +0200 Subject: [PATCH 1/3] client: add helpers to check readiness of requested and count responses Let's pull out the code for checking the readiness of requested and count responses in `waitAll` and `waitCount` correspondingly. We will further use these helpers to check the futures before waiting. Needed for #133 --- src/Client/Connector.hpp | 64 +++++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index 232d59b21..1c15dc03d 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -100,6 +100,23 @@ class Connector */ int connectionDecodeResponses(Connection &conn, int req_sync = -1, Response *result = nullptr); + /** + * A helper to check the readiness of requested responses. + * Decodes new responses from the connection and checks the readiness of the requested responses. `finish` + * indicates that all the requested responses are ready, and `last_not_ready` is reused between consecutive + * calls to this function. + * Returns -1 in case of any error, 0 on success. + */ + int connectionCheckResponsesReadiness(Connection &conn, const std::vector &futures, + size_t *last_not_ready, bool *finish); + /** + * A helper to check the readiness of at least `future_count` responses. Decodes new responses from the + * connection and checks that at least `future_count` responses are ready. `finish` indicates that at least + * `future_count` responses are ready. + * Returns -1 in case of any error, 0 on success. + */ + int connectionCheckCountResponsesReadiness(Connection &conn, size_t future_count, + bool *finish); private: NetProvider m_NetProvider; @@ -205,6 +222,36 @@ Connector::connectionDecodeResponses(Connection +int +Connector::connectionCheckResponsesReadiness(Connection &conn, + const std::vector &futures, + size_t *last_not_ready, bool *finish) +{ + if (conn.hasError() || connectionDecodeResponses(conn) != 0) + return -1; + *finish = true; + for (size_t i = *last_not_ready; i < futures.size(); ++i) { + if (!conn.futureIsReady(futures[i])) { + *finish = false; + *last_not_ready = i; + break; + } + } + return 0; +} + +template +int +Connector::connectionCheckCountResponsesReadiness(Connection &conn, + size_t future_count, bool *finish) +{ + if (conn.hasError() || connectionDecodeResponses(conn) != 0) + return -1; + *finish = conn.getFutureCount() >= future_count; + return 0; +} + template int Connector::wait(Connection &conn, @@ -271,16 +318,9 @@ Connector::waitAll(Connection &conn, strerror(errno), errno); return -1; } - if (connectionDecodeResponses(conn) != 0) + bool finish = false; + if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0) return -1; - bool finish = true; - for (size_t i = last_not_ready; i < futures.size(); ++i) { - if (!conn.futureIsReady(futures[i])) { - finish = false; - last_not_ready = i; - break; - } - } if (finish) return 0; if (timer.isExpired()) @@ -324,15 +364,17 @@ Connector::waitCount(Connection &conn, Timer timer{timeout}; timer.start(); size_t ready_futures = conn.getFutureCount(); + size_t expected_future_count = ready_futures + future_count; while (!conn.hasError()) { if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; } - if (connectionDecodeResponses(conn) != 0) + bool finish = false; + if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0) return -1; - if ((conn.getFutureCount() - ready_futures) >= future_count) + if (finish) return 0; if (timer.isExpired()) break; From d478b7964255ce85c02642d0ada9d801366ce0f5 Mon Sep 17 00:00:00 2001 From: Georgiy Lebedev Date: Tue, 9 Sep 2025 16:27:02 +0200 Subject: [PATCH 2/3] client: check ready futures in `waitAll` and `waitCount` before waiting Currently, `waitAll` and `waitCount` unconditionally `wait` instead of checking response readiness. If there are no more responses, it will cause them to hang indefinitely. To fix this, let's check the response readiness first. We should also move the time start to the beginning of the waiting loop, since the initial response checking overhead should not be accounted for the waiting time. Closes #133 --- src/Client/Connector.hpp | 18 +++++++++++++----- test/ClientTest.cpp | 22 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index 1c15dc03d..4b72f3bc9 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -309,16 +309,20 @@ Connector::waitAll(Connection &conn, const std::vector &futures, int timeout) { + size_t last_not_ready = 0; + bool finish = false; + if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0) + return -1; + if (finish) + return 0; Timer timer{timeout}; timer.start(); - size_t last_not_ready = 0; while (!conn.hasError()) { if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; } - bool finish = false; if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0) return -1; if (finish) @@ -361,17 +365,21 @@ int Connector::waitCount(Connection &conn, size_t future_count, int timeout) { - Timer timer{timeout}; - timer.start(); size_t ready_futures = conn.getFutureCount(); size_t expected_future_count = ready_futures + future_count; + bool finish = false; + if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0) + return -1; + if (finish) + return 0; + Timer timer{timeout}; + timer.start(); while (!conn.hasError()) { if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; } - bool finish = false; if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0) return -1; if (finish) diff --git a/test/ClientTest.cpp b/test/ClientTest.cpp index b42ff747e..bc4860811 100644 --- a/test/ClientTest.cpp +++ b/test/ClientTest.cpp @@ -1324,6 +1324,28 @@ test_wait(Connector &client) fail_unless(result.header.sync == static_cast(f1)); fail_unless(result.header.code == 0); + TEST_CASE("wait method check future readiness before waiting (gh-133"); + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(client.wait(conn, f) == 0); + conn.getResponse(f); + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(client.waitAll(conn, {f}) == 0); + conn.getResponse(f); + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + /* FIXME(gh-143): test solely that we check future readiness before waiting. */ + fail_unless(client.waitCount(conn, 0) == 0); + conn.getResponse(f); + /* FIXME(gh-132): waitAny does not check connections for ready futures. */ +#if 0 + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(client.waitAny(conn).has_value()); + conn.getResponse(f); +#endif + client.close(conn); } From 804c0f547793fb0dabc10c43b38df578df551420 Mon Sep 17 00:00:00 2001 From: Georgiy Lebedev Date: Thu, 14 Aug 2025 15:55:19 +0200 Subject: [PATCH 3/3] client: check `wait` return code in `waitAny` Currently, we do not check the return code of `wait` in `waitAny`. If there is an error, we must log it and return a `nullopt`. Closes #121 --- src/Client/Connector.hpp | 5 ++- test/ClientTest.cpp | 78 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index 4b72f3bc9..f0d52ded1 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -345,7 +345,10 @@ Connector::waitAny(int timeout) Timer timer{timeout}; timer.start(); while (m_ReadyToDecode.empty()) { - m_NetProvider.wait(timer.timeLeft()); + if (m_NetProvider.wait(timer.timeLeft()) != 0) { + LOG_ERROR("Failed to poll connections: ", strerror(errno)); + return std::nullopt; + } if (timer.isExpired()) break; } diff --git a/test/ClientTest.cpp b/test/ClientTest.cpp index bc4860811..41e1424fd 100644 --- a/test/ClientTest.cpp +++ b/test/ClientTest.cpp @@ -35,6 +35,8 @@ #include "../src/Client/LibevNetProvider.hpp" #include "../src/Client/Connector.hpp" +#include + const char *localhost = "127.0.0.1"; int port = 3301; int dummy_server_port = 3302; @@ -1144,6 +1146,47 @@ response_decoding(Connector &client) client.close(conn); } +#ifdef __linux__ +/** Sleep time for internal wait failure test. */ +static constexpr double INTERNAL_WAIT_FAILURE_SLEEP_TIME = 1e5; + +/** No-op signal handler for internal wait failure test. */ +void +sigusr_handler(int signo) +{ + fail_unless(signo != SIGINT); +} + +/** + * Helper for setting up an internal wait failure test case. Creates a request and spawns a thread that will send + * a signal to the request processing thread to interrupt its wait method. + */ +void +setup_internal_wait_failure_test_case(Connection &conn, rid_t *f, std::thread *signal_thread) +{ + *f = conn.call("remote_sleep", std::forward_as_tuple(INTERNAL_WAIT_FAILURE_SLEEP_TIME / 1e6)); + fail_unless(!conn.futureIsReady(*f)); + pthread_t tid = pthread_self(); + *signal_thread = std::thread([tid] { + usleep(INTERNAL_WAIT_FAILURE_SLEEP_TIME / 2); + pthread_kill(tid, SIGUSR1); + }); +} + +/** + * Helper for tearing down an internal wait failure test case. Gets the response for the future and joins the + * signalling thread. + */ +void +teardown_internal_wait_failure_test_case(Connection &conn, rid_t f, std::thread &signal_thread) +{ + fail_unless(conn.futureIsReady(f)); + std::optional> response = conn.getResponse(f); + fail_unless(response.has_value()); + signal_thread.join(); +} +#endif /* __linux__ */ + /** Checks all available `wait` methods of connector. */ template void @@ -1346,6 +1389,41 @@ test_wait(Connector &client) conn.getResponse(f); #endif +#ifdef __linux__ + TEST_CASE("wait methods internal wait failure (gh-121)"); + struct sigaction act; + act.sa_handler = sigusr_handler; + act.sa_flags = 0; + sigemptyset(&act.sa_mask); + sigaction(SIGUSR1, &act, nullptr); + std::thread signal_thread; + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(client.wait(conn, f) != 0); + conn.reset(); + fail_unless(client.wait(conn, f) == 0); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(client.waitAll(conn, {f}) != 0); + conn.reset(); + fail_unless(client.waitAll(conn, {f}) == 0); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(client.waitCount(conn, 1) != 0); + conn.reset(); + fail_unless(client.waitCount(conn, 1) == 0); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(!client.waitAny().has_value()); + fail_unless(client.waitAny().has_value()); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + sigaction(SIGUSR1, nullptr, nullptr); +#endif /* __linux__ */ + client.close(conn); }