Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ class Connector
*/
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync = -1,
Response<BUFFER> *result = nullptr);
/**
* A helper to check the readiness of requested responses.
* Decodes new responses from the connection and checks the readiness of the requested responses. `finish`
* indicates that all the requested responses are ready, and `last_not_ready` is reused between consecutive
* calls to this function.
* Returns -1 in case of any error, 0 on success.
*/
int connectionCheckResponsesReadiness(Connection<BUFFER, NetProvider> &conn, const std::vector<rid_t> &futures,
size_t *last_not_ready, bool *finish);
/**
* A helper to check the readiness of at least `future_count` responses. Decodes new responses from the
* connection and checks that at least `future_count` responses are ready. `finish` indicates that at least
* `future_count` responses are ready.
* Returns -1 in case of any error, 0 on success.
*/
int connectionCheckCountResponsesReadiness(Connection<BUFFER, NetProvider> &conn, size_t future_count,
bool *finish);

private:
NetProvider m_NetProvider;
Expand Down Expand Up @@ -205,6 +222,36 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
return rc;
}

template <class BUFFER, class NetProvider>
int
Connector<BUFFER, NetProvider>::connectionCheckResponsesReadiness(Connection<BUFFER, NetProvider> &conn,
const std::vector<rid_t> &futures,
size_t *last_not_ready, bool *finish)
{
if (conn.hasError() || connectionDecodeResponses(conn) != 0)
return -1;
*finish = true;
for (size_t i = *last_not_ready; i < futures.size(); ++i) {
if (!conn.futureIsReady(futures[i])) {
*finish = false;
*last_not_ready = i;
break;
}
}
return 0;
}

template <class BUFFER, class NetProvider>
int
Connector<BUFFER, NetProvider>::connectionCheckCountResponsesReadiness(Connection<BUFFER, NetProvider> &conn,
size_t future_count, bool *finish)
{
if (conn.hasError() || connectionDecodeResponses(conn) != 0)
return -1;
*finish = conn.getFutureCount() >= future_count;
return 0;
}

template<class BUFFER, class NetProvider>
int
Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
Expand Down Expand Up @@ -262,25 +309,22 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
const std::vector<rid_t> &futures,
int timeout)
{
size_t last_not_ready = 0;
bool finish = false;
if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0)
return -1;
if (finish)
return 0;
Timer timer{timeout};
timer.start();
size_t last_not_ready = 0;
while (!conn.hasError()) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
conn.setError(std::string("Failed to poll: ") +
strerror(errno), errno);
return -1;
}
if (connectionDecodeResponses(conn) != 0)
if (connectionCheckResponsesReadiness(conn, futures, &last_not_ready, &finish) != 0)
return -1;
bool finish = true;
for (size_t i = last_not_ready; i < futures.size(); ++i) {
if (!conn.futureIsReady(futures[i])) {
finish = false;
last_not_ready = i;
break;
}
}
if (finish)
return 0;
if (timer.isExpired())
Expand All @@ -301,7 +345,10 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
Timer timer{timeout};
timer.start();
while (m_ReadyToDecode.empty()) {
m_NetProvider.wait(timer.timeLeft());
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
LOG_ERROR("Failed to poll connections: ", strerror(errno));
return std::nullopt;
}
if (timer.isExpired())
break;
}
Expand All @@ -321,18 +368,24 @@ int
Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
size_t future_count, int timeout)
{
size_t ready_futures = conn.getFutureCount();
size_t expected_future_count = ready_futures + future_count;
bool finish = false;
if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0)
return -1;
if (finish)
return 0;
Timer timer{timeout};
timer.start();
size_t ready_futures = conn.getFutureCount();
while (!conn.hasError()) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
conn.setError(std::string("Failed to poll: ") +
strerror(errno), errno);
return -1;
}
if (connectionDecodeResponses(conn) != 0)
if (connectionCheckCountResponsesReadiness(conn, expected_future_count, &finish) != 0)
return -1;
if ((conn.getFutureCount() - ready_futures) >= future_count)
if (finish)
return 0;
if (timer.isExpired())
break;
Expand Down
100 changes: 100 additions & 0 deletions test/ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "../src/Client/LibevNetProvider.hpp"
#include "../src/Client/Connector.hpp"

#include <thread>

const char *localhost = "127.0.0.1";
int port = 3301;
int dummy_server_port = 3302;
Expand Down Expand Up @@ -1144,6 +1146,47 @@ response_decoding(Connector<BUFFER, NetProvider> &client)
client.close(conn);
}

#ifdef __linux__
/** Sleep time for internal wait failure test. */
static constexpr double INTERNAL_WAIT_FAILURE_SLEEP_TIME = 1e5;

/** No-op signal handler for internal wait failure test. */
void
sigusr_handler(int signo)
{
fail_unless(signo != SIGINT);
}

/**
* Helper for setting up an internal wait failure test case. Creates a request and spawns a thread that will send
* a signal to the request processing thread to interrupt its wait method.
*/
void
setup_internal_wait_failure_test_case(Connection<Buf_t, NetProvider> &conn, rid_t *f, std::thread *signal_thread)
{
*f = conn.call("remote_sleep", std::forward_as_tuple(INTERNAL_WAIT_FAILURE_SLEEP_TIME / 1e6));
fail_unless(!conn.futureIsReady(*f));
pthread_t tid = pthread_self();
*signal_thread = std::thread([tid] {
usleep(INTERNAL_WAIT_FAILURE_SLEEP_TIME / 2);
pthread_kill(tid, SIGUSR1);
});
}

/**
* Helper for tearing down an internal wait failure test case. Gets the response for the future and joins the
* signalling thread.
*/
void
teardown_internal_wait_failure_test_case(Connection<Buf_t, NetProvider> &conn, rid_t f, std::thread &signal_thread)
{
fail_unless(conn.futureIsReady(f));
std::optional<Response<Buf_t>> response = conn.getResponse(f);
fail_unless(response.has_value());
signal_thread.join();
}
#endif /* __linux__ */

/** Checks all available `wait` methods of connector. */
template <class BUFFER, class NetProvider>
void
Expand Down Expand Up @@ -1324,6 +1367,63 @@ test_wait(Connector<BUFFER, NetProvider> &client)
fail_unless(result.header.sync == static_cast<int>(f1));
fail_unless(result.header.code == 0);

TEST_CASE("wait method check future readiness before waiting (gh-133");
f = conn.ping();
fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0);
fail_unless(client.wait(conn, f) == 0);
conn.getResponse(f);
f = conn.ping();
fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0);
fail_unless(client.waitAll(conn, {f}) == 0);
conn.getResponse(f);
f = conn.ping();
fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0);
/* FIXME(gh-143): test solely that we check future readiness before waiting. */
fail_unless(client.waitCount(conn, 0) == 0);
conn.getResponse(f);
/* FIXME(gh-132): waitAny does not check connections for ready futures. */
#if 0
f = conn.ping();
fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0);
fail_unless(client.waitAny(conn).has_value());
conn.getResponse(f);
#endif

#ifdef __linux__
TEST_CASE("wait methods internal wait failure (gh-121)");
struct sigaction act;
act.sa_handler = sigusr_handler;
act.sa_flags = 0;
sigemptyset(&act.sa_mask);
sigaction(SIGUSR1, &act, nullptr);
std::thread signal_thread;

setup_internal_wait_failure_test_case(conn, &f, &signal_thread);
fail_unless(client.wait(conn, f) != 0);
conn.reset();
fail_unless(client.wait(conn, f) == 0);
teardown_internal_wait_failure_test_case(conn, f, signal_thread);

setup_internal_wait_failure_test_case(conn, &f, &signal_thread);
fail_unless(client.waitAll(conn, {f}) != 0);
conn.reset();
fail_unless(client.waitAll(conn, {f}) == 0);
teardown_internal_wait_failure_test_case(conn, f, signal_thread);

setup_internal_wait_failure_test_case(conn, &f, &signal_thread);
fail_unless(client.waitCount(conn, 1) != 0);
conn.reset();
fail_unless(client.waitCount(conn, 1) == 0);
teardown_internal_wait_failure_test_case(conn, f, signal_thread);

setup_internal_wait_failure_test_case(conn, &f, &signal_thread);
fail_unless(!client.waitAny().has_value());
fail_unless(client.waitAny().has_value());
teardown_internal_wait_failure_test_case(conn, f, signal_thread);

sigaction(SIGUSR1, nullptr, nullptr);
#endif /* __linux__ */

client.close(conn);
}

Expand Down