Skip to content

Commit

Permalink
selftests/bpf: pin some tests to worker 0
Browse files Browse the repository at this point in the history
This patch adds a simple name list to pin some tests that fail to run in
parallel to worker 0. On encountering these tests, all other threads will wait
on a conditional variable, which worker 0 will signal once the tests has
finished running.

Additionally, before running the test, thread 0 also check and wait until all
other threads has finished their work, to make sure the pinned test really are
the only test running in the system.

After this change, all tests should pass in '-j' mode.

Signed-off-by: Yucong Sun <sunyucong@gmail.com>
  • Loading branch information
thefallentree committed Sep 15, 2021
1 parent 522e159 commit f009fe8
Showing 1 changed file with 103 additions and 12 deletions.
115 changes: 103 additions & 12 deletions tools/testing/selftests/bpf/test_progs.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
#include <sys/socket.h>
#include <sys/un.h>

char *TESTS_MUST_SERIALIZE[] = {
"netcnt",
"select_reuseport",
"sockmap_listen",
"tc_redirect",
"xdp_bonding",
"xdp_info",
NULL,
};

/* Adapted from perf/util/string.c */
static bool glob_match(const char *str, const char *pat)
{
Expand Down Expand Up @@ -822,6 +832,7 @@ void crash_handler(int signum)

static int current_test_idx = 0;
static pthread_mutex_t current_test_lock;
static pthread_cond_t wait_for_worker0 = PTHREAD_COND_INITIALIZER;

struct test_result {
int error_cnt;
Expand Down Expand Up @@ -904,6 +915,29 @@ static void run_one_test(int test_num) {
cleanup_cgroup_environment();
}

static const char *get_test_name(int idx)
{
struct prog_test_def *test;

test = &prog_test_defs[idx];
return test->test_name;
}

static bool is_test_must_serialize(int idx)
{
struct prog_test_def *test;
char **p;

test = &prog_test_defs[idx];
p = &TESTS_MUST_SERIALIZE[0];
while (*p != NULL) {
if (strcmp(*p, test->test_name) == 0)
return true;
p++;
}
return false;
}

struct dispatch_data {
int worker_id;
int sock_fd;
Expand All @@ -922,6 +956,8 @@ void *dispatch_thread(void *ctx)
struct prog_test_def *test;
struct test_result *result;

env.worker_current_test[data->worker_id] = -1;

/* grab a test */
{
pthread_mutex_lock(&current_test_lock);
Expand All @@ -938,7 +974,36 @@ void *dispatch_thread(void *ctx)
}

test_to_run = current_test_idx;
current_test_idx++;

test = &prog_test_defs[test_to_run];

if (!test->should_run) {
pthread_mutex_unlock(&current_test_lock);
goto done;
}

if (is_test_must_serialize(current_test_idx)) {
if (data->worker_id != 0) {
fprintf(stderr, "[%d]: Waiting for thread 0 to finish serialized test: %d.\n",
data->worker_id, current_test_idx + 1);
/* wait for worker 0 to pick this job up and finish */
pthread_cond_wait(&wait_for_worker0, &current_test_lock);
pthread_mutex_unlock(&current_test_lock);
goto next;
} else {
/* wait until all other worker has parked */
for (int i = 1; i < env.workers; i++) {
if (env.worker_current_test[i] != -1) {
fprintf(stderr, "[%d]: Waiting for other threads to finish current test...\n", data->worker_id);
pthread_mutex_unlock(&current_test_lock);
usleep(1 * 1000 * 1000);
goto next;
}
}
}
} else {
current_test_idx++;
}

pthread_mutex_unlock(&current_test_lock);
}
Expand Down Expand Up @@ -995,7 +1060,15 @@ void *dispatch_thread(void *ctx)
fclose(log_fd);
log_fd = NULL;
}
} /* wait for test done */

/* unblock all other dispatcher threads */
if (is_test_must_serialize(test_to_run) && data->worker_id == 0) {
current_test_idx++;
pthread_cond_broadcast(&wait_for_worker0);
}
next:
continue;
} /* while (true) */
error:
fprintf(stderr, "[%d]: Protocol/IO error: %s", data->worker_id, strerror(errno));
Expand All @@ -1017,16 +1090,19 @@ static int server_main(void)
{
pthread_t *dispatcher_threads;
struct dispatch_data *data;
int all_finished = false;

dispatcher_threads = calloc(sizeof(pthread_t), env.workers);
data = calloc(sizeof(struct dispatch_data), env.workers);

env.worker_current_test = calloc(sizeof(int), env.workers);

for (int i = 0; i < env.workers; i++) {
int rc;

data[i].worker_id = i;
data[i].sock_fd = env.worker_socks[i];
env.worker_current_test[i] = -1;
rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]);
if (rc < 0) {
perror("Failed to launch dispatcher thread");
Expand All @@ -1035,17 +1111,27 @@ static int server_main(void)
}

/* wait for all dispatcher to finish */
for (int i = 0; i < env.workers; i++) {
while (true) {
struct timespec timeout = {
.tv_sec = time(NULL) + 5,
.tv_nsec = 0
};
if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT)
break;
fprintf(stderr, "Still waiting for thread %d (test %d).\n", i, env.worker_current_test[i] + 1);
while (!all_finished) {
all_finished = true;
for (int i = 0; i < env.workers; i++) {
if (!dispatcher_threads[i])
continue;

if (pthread_tryjoin_np(dispatcher_threads[i], NULL) == EBUSY) {
all_finished = false;
if (env.worker_current_test[i] == -1)
fprintf(stderr, "Still waiting for thread %d (blocked by thread 0).\n", i);
else
fprintf(stderr, "Still waiting for thread %d (test #%d:%s).\n",
i, env.worker_current_test[i] + 1,
get_test_name(env.worker_current_test[i]));
} else {
dispatcher_threads[i] = 0;
}
}
usleep(10 * 1000 * 1000);
}

free(dispatcher_threads);
free(env.worker_current_test);
free(data);
Expand Down Expand Up @@ -1121,8 +1207,11 @@ static int worker_main(int sock)

test_to_run = msg.u.message_do_test.num;

fprintf(stderr, "[%d]: Running test %d.\n",
env.worker_index, test_to_run + 1);
if (env.verbosity > VERBOSE_NONE)
fprintf(stderr, "[%d]: #%d:%s running.\n",
env.worker_index,
test_to_run + 1,
get_test_name(test_to_run));

test = &prog_test_defs[test_to_run];

Expand Down Expand Up @@ -1177,6 +1266,8 @@ static int worker_main(int sock)
env.log_buf = NULL;
env.log_cnt = 0;
}
fprintf(stderr, "[%d]: #%d:%s done.\n",
env.worker_index, test_to_run + 1, get_test_name(test_to_run));
break;
} /* case MSG_DO_TEST */
default:
Expand Down

0 comments on commit f009fe8

Please sign in to comment.