Skip to content

Commit

Permalink
test: Use a pipe to synchronise wait_for_xxx events
Browse files Browse the repository at this point in the history
The condition variable thing clearly not working for us.
  • Loading branch information
chrissie-c committed Aug 26, 2021
1 parent 4257b45 commit e07b944
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 40 deletions.
2 changes: 2 additions & 0 deletions libknet/tests/fun_onwire_upgrade.c
Expand Up @@ -33,6 +33,7 @@ static int upgrade_onwire_max_ver(knet_handle_t knet_h, int nodes, uint8_t min,

if (wait_for_nodes_state(knet_h, TESTNODES, 0, seconds, logfd, std) < 0) {
printf("Failed waiting for nodes 0\n");
return -1;
}

knet_h->onwire_min_ver = min;
Expand All @@ -44,6 +45,7 @@ static int upgrade_onwire_max_ver(knet_handle_t knet_h, int nodes, uint8_t min,
if (nodes) {
if (wait_for_nodes_state(knet_h, nodes, 1, seconds, logfd, std) < 0) {
printf("Failed waiting for nodes 1\n");
return -1;
}
}

Expand Down
143 changes: 103 additions & 40 deletions libknet/tests/test-common.c
Expand Up @@ -20,6 +20,7 @@
#include <pthread.h>
#include <dirent.h>
#include <sys/select.h>
#include <sys/poll.h>

#include "libknet.h"
#include "test-common.h"
Expand Down Expand Up @@ -838,8 +839,9 @@ void knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t nu


static int target=0;
static pthread_mutex_t wait_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t wait_cond = PTHREAD_COND_INITIALIZER;

static int state_wait_pipe[2] = {0,0};
static int host_wait_pipe[2] = {0,0};

static int count_nodes(knet_handle_t knet_h)
{
Expand All @@ -860,11 +862,30 @@ static void nodes_notify_callback(void *private_data,
{
knet_handle_t knet_h = (knet_handle_t) private_data;
int nodes;
int res;

nodes = count_nodes(knet_h);

if (nodes == target) {
pthread_cond_signal(&wait_cond);
res = write(state_wait_pipe[1], ".", 1);
if (res != 1) {
printf("***FAILed to signal wait_for_nodes_state: %s\n", strerror(errno));
}
}
}

/* Called atexit() */
static void finish_state_pipes()
{
if (state_wait_pipe[0] != 0) {
close(state_wait_pipe[0]);
close(state_wait_pipe[1]);
state_wait_pipe[0] = 0;
}
if (host_wait_pipe[0] != 0) {
close(host_wait_pipe[0]);
close(host_wait_pipe[1]);
host_wait_pipe[0] = 0;
}
}

Expand All @@ -873,20 +894,69 @@ static void host_notify_callback(void *private_data,
uint8_t reachable, uint8_t remote, uint8_t external)
{
knet_handle_t knet_h = (knet_handle_t) private_data;
int res;

if (knet_h->host_index[host_id]->status.reachable == 1) {
pthread_cond_signal(&wait_cond);
res = write(host_wait_pipe[1], ".", 1);
if (res != 1) {
printf("***FAILed to signal wait_for_host: %s\n", strerror(errno));
}
}
}

static int wait_for_reply(int seconds, int pipefd)
{
int res;
struct pollfd pfds;
char tmpbuf[32];

pfds.fd = pipefd;
pfds.events = POLLIN | POLLERR | POLLHUP;
pfds.revents = 0;

res = poll(&pfds, 1, seconds*1000);
if (res == 1) {
if (pfds.revents & POLLIN) {
res = read(pipefd, tmpbuf, sizeof(tmpbuf));
if (res > 0) {
return 0;
}
} else {
printf("Error on pipe poll revent = 0x%x\n", pfds.revents);
errno = EIO;
}
}
if (res == 0) {
errno = ETIMEDOUT;
return -1;
}

return -1;
}

/* Wait for a cluster of 'numnodes' to come up/go down */
int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
uint8_t state, uint32_t timeout,
int logfd, FILE *std)
{
struct timespec ts;
int res, savederrno = 0;

if (state_wait_pipe[0] == 0) {
res = pipe(state_wait_pipe);
if (res == -1) {
savederrno = errno;
printf("Error creating host reply pipe: %s\n", strerror(errno));
errno = savederrno;
return -1;
}
if (atexit(finish_state_pipes)) {
printf("Unable to register atexit handler to close pipes: %s\n",
strerror(errno));
exit(FAIL);
}

}

if (state) {
target = numnodes-1; /* exclude us */
} else {
Expand All @@ -906,25 +976,11 @@ int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
return 0;
}

clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout;
if (pthread_mutex_lock(&wait_mutex)) {
fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno));
return -1;
}

res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts);
if (res != 0 && res != ETIMEDOUT) {
fprintf(stderr, "pthread_cond_timedwait fatal error\n");
errno = res;
return -1;
}
if (res == ETIMEDOUT) {
fprintf(stderr, "Timed-out\n");
savederrno = ETIMEDOUT;
res = -1;
res = wait_for_reply(timeout, state_wait_pipe[0]);
if (res == -1) {
savederrno = errno;
printf("Error waiting for nodes status reply: %s\n", strerror(errno));
}
pthread_mutex_unlock(&wait_mutex);

knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
Expand All @@ -935,14 +991,30 @@ int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
/* Wait for a single node to come up */
int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std)
{
int res;
struct timespec ts;
int res = 0;
int savederrno;

if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n");
seconds = seconds * 16;
}

if (host_wait_pipe[0] == 0) {
res = pipe(host_wait_pipe);
if (res == -1) {
savederrno = errno;
printf("Error creating host reply pipe: %s\n", strerror(errno));
errno = savederrno;
return -1;
}
if (atexit(finish_state_pipes)) {
printf("Unable to register atexit handler to close pipes: %s\n",
strerror(errno));
exit(FAIL);
}

}

/* Set this before checking existing status or there's a race condition */
knet_host_enable_status_change_notify(knet_h,
(void *)(long)knet_h,
Expand All @@ -955,26 +1027,17 @@ int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd
return 0;
}

clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += seconds;
if (pthread_mutex_lock(&wait_mutex)) {
fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno));
return -1;
}
res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts);
if (res == -1 && errno == ETIMEDOUT) {
fprintf(stderr, "Timed-out\n");
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
pthread_mutex_unlock(&wait_mutex);
flush_logs(logfd, std);
return -1;
res = wait_for_reply(seconds, host_wait_pipe[0]);
if (res == -1) {
savederrno = errno;
printf("Error waiting for host status reply: %s\n", strerror(errno));
}
pthread_mutex_unlock(&wait_mutex);

knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);

/* Still wait for it to settle */
flush_logs(logfd, std);
test_sleep(knet_h, 1);
return 0;
errno = savederrno;
return res;
}

0 comments on commit e07b944

Please sign in to comment.