diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index 232d59b21..f0d52ded1 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -100,6 +100,23 @@ class Connector */ int connectionDecodeResponses(Connection &conn, int req_sync = -1, Response *result = nullptr); + /** + * A helper to check the readiness of requested responses. + * Decodes new responses from the connection and checks the readiness of the requested responses. `finish` + * indicates that all the requested responses are ready, and `last_not_ready` is reused between consecutive + * calls to this function. + * Returns -1 in case of any error, 0 on success. + */ + int connectionCheckResponsesReadiness(Connection &conn, const std::vector &futures, + size_t *last_not_ready, bool *finish); + /** + * A helper to check the readiness of at least `future_count` responses. Decodes new responses from the + * connection and checks that at least `future_count` responses are ready. `finish` indicates that at least + * `future_count` responses are ready. + * Returns -1 in case of any error, 0 on success. + */ + int connectionCheckCountResponsesReadiness(Connection &conn, size_t future_count, + bool *finish); private: NetProvider m_NetProvider; @@ -205,6 +222,36 @@ Connector::connectionDecodeResponses(Connection +int +Connector::connectionCheckResponsesReadiness(Connection &conn, + const std::vector &futures, + size_t *last_not_ready, bool *finish) +{ + if (conn.hasError() || connectionDecodeResponses(conn) != 0) + return -1; + *finish = true; + for (size_t i = *last_not_ready; i < futures.size(); ++i) { + if (!conn.futureIsReady(futures[i])) { + *finish = false; + *last_not_ready = i; + break; + } + } + return 0; +} + +template +int +Connector::connectionCheckCountResponsesReadiness(Connection &conn, + size_t future_count, bool *finish) +{ + if (conn.hasError() || connectionDecodeResponses(conn) != 0) + return -1; + *finish = conn.getFutureCount() >= future_count; + return 0; +} + template int Connector::wait(Connection &conn, @@ -262,25 +309,22 @@ Connector::waitAll(Connection &conn, const std::vector &futures, int timeout) { + size_t last_not_ready = 0; + bool finish = false; + if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0) + return -1; + if (finish) + return 0; Timer timer{timeout}; timer.start(); - size_t last_not_ready = 0; while (!conn.hasError()) { if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; } - if (connectionDecodeResponses(conn) != 0) + if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0) return -1; - bool finish = true; - for (size_t i = last_not_ready; i < futures.size(); ++i) { - if (!conn.futureIsReady(futures[i])) { - finish = false; - last_not_ready = i; - break; - } - } if (finish) return 0; if (timer.isExpired()) @@ -301,7 +345,10 @@ Connector::waitAny(int timeout) Timer timer{timeout}; timer.start(); while (m_ReadyToDecode.empty()) { - m_NetProvider.wait(timer.timeLeft()); + if (m_NetProvider.wait(timer.timeLeft()) != 0) { + LOG_ERROR("Failed to poll connections: ", strerror(errno)); + return std::nullopt; + } if (timer.isExpired()) break; } @@ -321,18 +368,24 @@ int Connector::waitCount(Connection &conn, size_t future_count, int timeout) { + size_t ready_futures = conn.getFutureCount(); + size_t expected_future_count = ready_futures + future_count; + bool finish = false; + if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0) + return -1; + if (finish) + return 0; Timer timer{timeout}; timer.start(); - size_t ready_futures = conn.getFutureCount(); while (!conn.hasError()) { if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; } - if (connectionDecodeResponses(conn) != 0) + if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0) return -1; - if ((conn.getFutureCount() - ready_futures) >= future_count) + if (finish) return 0; if (timer.isExpired()) break; diff --git a/test/ClientTest.cpp b/test/ClientTest.cpp index b42ff747e..41e1424fd 100644 --- a/test/ClientTest.cpp +++ b/test/ClientTest.cpp @@ -35,6 +35,8 @@ #include "../src/Client/LibevNetProvider.hpp" #include "../src/Client/Connector.hpp" +#include + const char *localhost = "127.0.0.1"; int port = 3301; int dummy_server_port = 3302; @@ -1144,6 +1146,47 @@ response_decoding(Connector &client) client.close(conn); } +#ifdef __linux__ +/** Sleep time for internal wait failure test. */ +static constexpr double INTERNAL_WAIT_FAILURE_SLEEP_TIME = 1e5; + +/** No-op signal handler for internal wait failure test. */ +void +sigusr_handler(int signo) +{ + fail_unless(signo != SIGINT); +} + +/** + * Helper for setting up an internal wait failure test case. Creates a request and spawns a thread that will send + * a signal to the request processing thread to interrupt its wait method. + */ +void +setup_internal_wait_failure_test_case(Connection &conn, rid_t *f, std::thread *signal_thread) +{ + *f = conn.call("remote_sleep", std::forward_as_tuple(INTERNAL_WAIT_FAILURE_SLEEP_TIME / 1e6)); + fail_unless(!conn.futureIsReady(*f)); + pthread_t tid = pthread_self(); + *signal_thread = std::thread([tid] { + usleep(INTERNAL_WAIT_FAILURE_SLEEP_TIME / 2); + pthread_kill(tid, SIGUSR1); + }); +} + +/** + * Helper for tearing down an internal wait failure test case. Gets the response for the future and joins the + * signalling thread. + */ +void +teardown_internal_wait_failure_test_case(Connection &conn, rid_t f, std::thread &signal_thread) +{ + fail_unless(conn.futureIsReady(f)); + std::optional> response = conn.getResponse(f); + fail_unless(response.has_value()); + signal_thread.join(); +} +#endif /* __linux__ */ + /** Checks all available `wait` methods of connector. */ template void @@ -1324,6 +1367,63 @@ test_wait(Connector &client) fail_unless(result.header.sync == static_cast(f1)); fail_unless(result.header.code == 0); + TEST_CASE("wait method check future readiness before waiting (gh-133"); + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(client.wait(conn, f) == 0); + conn.getResponse(f); + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(client.waitAll(conn, {f}) == 0); + conn.getResponse(f); + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + /* FIXME(gh-143): test solely that we check future readiness before waiting. */ + fail_unless(client.waitCount(conn, 0) == 0); + conn.getResponse(f); + /* FIXME(gh-132): waitAny does not check connections for ready futures. */ +#if 0 + f = conn.ping(); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(client.waitAny(conn).has_value()); + conn.getResponse(f); +#endif + +#ifdef __linux__ + TEST_CASE("wait methods internal wait failure (gh-121)"); + struct sigaction act; + act.sa_handler = sigusr_handler; + act.sa_flags = 0; + sigemptyset(&act.sa_mask); + sigaction(SIGUSR1, &act, nullptr); + std::thread signal_thread; + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(client.wait(conn, f) != 0); + conn.reset(); + fail_unless(client.wait(conn, f) == 0); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(client.waitAll(conn, {f}) != 0); + conn.reset(); + fail_unless(client.waitAll(conn, {f}) == 0); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(client.waitCount(conn, 1) != 0); + conn.reset(); + fail_unless(client.waitCount(conn, 1) == 0); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + setup_internal_wait_failure_test_case(conn, &f, &signal_thread); + fail_unless(!client.waitAny().has_value()); + fail_unless(client.waitAny().has_value()); + teardown_internal_wait_failure_test_case(conn, f, signal_thread); + + sigaction(SIGUSR1, nullptr, nullptr); +#endif /* __linux__ */ + client.close(conn); }