Skip to content
This repository has been archived by the owner on Sep 5, 2023. It is now read-only.

test: mt: introduce a mtt-framework-provided semaphores #1050

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions tests/multithreaded/common/mtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct mtt_thread_args {
unsigned id; /* a thread id */
void *state; /* a thread-specific state */
struct mtt_test *test;
sem_t **sems; /* a thread's pointer to the semaphores array */
struct mtt_result ret; /* a thread return object */
};

Expand All @@ -42,7 +43,35 @@ struct mtt_thread_args {
*/
#define MTT_CALL_INIT_FINI(test, func, thread_args, result) \
(test)->func((thread_args)->id, (test)->prestate, \
&(thread_args)->state, (result))
&(thread_args)->state, (thread_args)->sems, (result))

/*
* mtt_semaphores_open -- XXX
*/
static sem_t **
mtt_semaphores_open(unsigned sems_num)
{
/*
* XXX use sem_open() in a for-loop
* XXX names of the semaphores have to be generated at runtime
* preferably in a way guaranteeing its uniqueness for a given MT test
*/

return NULL;
}

/*
* mtt_semaphores_close -- XXX
*/
static int
mtt_semaphores_close(sem_t **sems, unsigned sems_num)
{
/*
* XXX use sem_close() in a for-loop
*/

return 0;
}

/*
* mtt_thread_main -- wait for the synchronization conditional and run the test
Expand Down Expand Up @@ -86,7 +115,7 @@ mtt_thread_main(void *arg)
return tr;
}

test->thread_func(ta->id, test->prestate, ta->state, tr);
test->thread_func(ta->id, test->prestate, ta->state, ta->sems, tr);

if (test->thread_fini_func) {
/*
Expand Down Expand Up @@ -270,7 +299,7 @@ mtt_malloc_aligned(size_t size, struct mtt_result *tr)
*/
static int
mtt_start_child_process(mtt_child_process_func child_process_func,
void *prestate)
void *prestate, sem_t **sems)
{
int pid;
int ret;
Expand All @@ -285,7 +314,7 @@ mtt_start_child_process(mtt_child_process_func child_process_func,
*/
switch ((pid = fork())) {
case 0: /* this is the child */
ret = child_process_func(prestate);
ret = child_process_func(prestate, sems);
exit(ret);
case -1: /* fork failed */
perror("fork");
Expand All @@ -298,11 +327,15 @@ mtt_start_child_process(mtt_child_process_func child_process_func,

/*
* mtt_run -- run the provided test using provided number of threads
*
* Setting sems_num to a non-zero value allows requesting for a number
* of semaphores dedicated for interprocess synchronization.
*/
int
mtt_run(struct mtt_test *test, unsigned threads_num)
mtt_run(struct mtt_test *test, unsigned threads_num, unsigned sems_num)
{
pthread_t *threads;
sem_t **sems;
struct mtt_thread_args *threads_args;
struct mtt_thread_args *ta;
struct mtt_result *tr;
Expand All @@ -315,22 +348,29 @@ mtt_run(struct mtt_test *test, unsigned threads_num)
unsigned i;
int result = 0;

/* open required number of semaphores */
if (sems_num > 0) {
sems = mtt_semaphores_open(sems_num);
if (sems == NULL)
return -1;
}

/* configure logging thresholds to see more details */
rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, RPMA_LOG_LEVEL_INFO);

if (test->child_process_func) {
child_pid = mtt_start_child_process(test->child_process_func,
test->child_prestate);
test->child_prestate, sems);
if (child_pid == -1) {
MTT_INTERNAL_ERR("starting child process failed");
return -1;
goto err_sems_close;
}
}

/* initialize the prestate */
if (test->prestate_init_func) {
test->prestate_init_func(test->prestate, &tr_local);
test->prestate_init_func(test->prestate, sems, &tr_local);
if (tr_local.ret) {
MTT_INTERNAL_ERR("%s", tr_local.errmsg);
result = tr_local.ret;
Expand All @@ -356,6 +396,7 @@ mtt_run(struct mtt_test *test, unsigned threads_num)
for (i = 0; i < threads_num; i++) {
ta = &threads_args[i];
ta->id = i;
ta->sems = sems;
ta->test = test;

if (test->thread_seq_init_func) {
Expand Down Expand Up @@ -437,7 +478,7 @@ mtt_run(struct mtt_test *test, unsigned threads_num)
err_cleanup_prestate:
/* clean up the prestate */
if (test->prestate_fini_func) {
test->prestate_fini_func(test->prestate, &tr_local);
test->prestate_fini_func(test->prestate, sems, &tr_local);
if (tr_local.ret) {
MTT_INTERNAL_ERR("%s", tr_local.errmsg);
result = tr_local.ret;
Expand All @@ -454,5 +495,12 @@ mtt_run(struct mtt_test *test, unsigned threads_num)
}
}

err_sems_close:
if (sems_num > 0) {
ret = mtt_semaphores_close(sems, sems_num);
if (ret)
result = ret;
}

return result;
}
13 changes: 8 additions & 5 deletions tests/multithreaded/common/mtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#ifndef MTT
#define MTT

#include <semaphore.h>
#include <stddef.h>

/* arguments coming from the command line */
Expand Down Expand Up @@ -81,10 +82,12 @@ void *mtt_malloc_aligned(size_t size, struct mtt_result *tr);
* Arguments:
* - prestate - a pointer to the test-provided data. It is the only function
* type in which the prestate is expected to be modified.
* - sems - an array of semaphores dedicated for synchronization with
* the child process.
* - result - the result. On error the test is responsible for providing
* the error details (using e.g. MTT_ERR or MTT_RPMA_ERR macros).
*/
typedef void (*mtt_prestate_init_fini_func)(void *prestate,
typedef void (*mtt_prestate_init_fini_func)(void *prestate, sem_t **sems,
struct mtt_result *result);

/*
Expand All @@ -108,7 +111,7 @@ typedef void (*mtt_prestate_init_fini_func)(void *prestate,
* and thread_fini_func).
*/
typedef void (*mtt_thread_init_fini_func)(unsigned id, void *prestate,
void **state_ptr, struct mtt_result *result);
void **state_ptr, sem_t **sems, struct mtt_result *result);

/*
* mtt_thread_func -- a function type used for the main execution step
Expand All @@ -131,7 +134,7 @@ typedef void (*mtt_thread_init_fini_func)(unsigned id, void *prestate,
* and thread_fini_func).
*/
typedef void (*mtt_thread_func)(unsigned id, void *prestate, void *state,
struct mtt_result *result);
sem_t **sems, struct mtt_result *result);

/*
* mtt_child_process_func -- a function type used for the child process
Expand All @@ -140,7 +143,7 @@ typedef void (*mtt_thread_func)(unsigned id, void *prestate, void *state,
* Arguments:
* - prestate - a pointer to the test-provided data.
*/
typedef int (*mtt_child_process_func)(void *prestate);
typedef int (*mtt_child_process_func)(void *prestate, sem_t **sems);

struct mtt_test {
/*
Expand Down Expand Up @@ -199,6 +202,6 @@ struct mtt_test {
void *child_prestate;
};

int mtt_run(struct mtt_test *test, unsigned threads_num);
int mtt_run(struct mtt_test *test, unsigned threads_num, unsigned sems_num);

#endif /* MTT */
10 changes: 5 additions & 5 deletions tests/multithreaded/conn/rpma_conn_get_private_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct prestate {
* and save it in order to verify it later.
*/
static void
prestate_init(void *prestate, struct mtt_result *tr)
prestate_init(void *prestate, sem_t **sems, struct mtt_result *tr)
{
struct prestate *pr = (struct prestate *)prestate;
struct ibv_context *dev;
Expand Down Expand Up @@ -103,7 +103,7 @@ prestate_init(void *prestate, struct mtt_result *tr)
* thread -- get and verify the private data
*/
static void
thread(unsigned id, void *prestate, void *state,
thread(unsigned id, void *prestate, void *state, sem_t **sems,
struct mtt_result *result)
{
struct prestate *pr = (struct prestate *)prestate;
Expand Down Expand Up @@ -136,7 +136,7 @@ thread(unsigned id, void *prestate, void *state,
* prestate_fini -- disconnect and delete the peer object
*/
static void
prestate_fini(void *prestate, struct mtt_result *tr)
prestate_fini(void *prestate, sem_t **sems, struct mtt_result *tr)
{
struct prestate *pr = (struct prestate *)prestate;
enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
Expand Down Expand Up @@ -182,7 +182,7 @@ int server_main(int argc, char *argv[]);
* from the 02-read-to-volatile example.
*/
int
server_func(void *prestate)
server_func(void *prestate, sem_t **sems)
{
struct server_prestate *pst = prestate;
return server_main(pst->argc, pst->argv);
Expand Down Expand Up @@ -214,5 +214,5 @@ main(int argc, char *argv[])
&server_prestate
};

return mtt_run(&test, args.threads_num);
return mtt_run(&test, args.threads_num, 0);
}
8 changes: 4 additions & 4 deletions tests/multithreaded/ep/rpma_ep_get_fd.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct prestate {
* get the endpoint's event file descriptor
*/
static void
prestate_init(void *prestate, struct mtt_result *tr)
prestate_init(void *prestate, sem_t **sems, struct mtt_result *tr)
{
struct prestate *pr = (struct prestate *)prestate;
int ret;
Expand Down Expand Up @@ -60,7 +60,7 @@ prestate_init(void *prestate, struct mtt_result *tr)
* thread -- get the endpoint's event file descriptor
*/
static void
thread(unsigned id, void *prestate, void *state,
thread(unsigned id, void *prestate, void *state, sem_t **sems,
struct mtt_result *result)
{
struct prestate *pr = (struct prestate *)prestate;
Expand All @@ -81,7 +81,7 @@ thread(unsigned id, void *prestate, void *state,
* prestate_fini -- shutdown the endpoint and delete the peer object
*/
static void
prestate_fini(void *prestate, struct mtt_result *tr)
prestate_fini(void *prestate, sem_t **sems, struct mtt_result *tr)
{
struct prestate *pr = (struct prestate *)prestate;
int ret;
Expand Down Expand Up @@ -114,5 +114,5 @@ main(int argc, char *argv[])
prestate_fini
};

return mtt_run(&test, args.threads_num);
return mtt_run(&test, args.threads_num, 0);
}
12 changes: 6 additions & 6 deletions tests/multithreaded/ep/rpma_ep_listen.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct state {
* and create a new peer object
*/
static void
prestate_init(void *prestate, struct mtt_result *tr)
prestate_init(void *prestate, sem_t **sems, struct mtt_result *tr)
{
struct prestate *pr = (struct prestate *)prestate;
int ret;
Expand All @@ -45,7 +45,7 @@ prestate_init(void *prestate, struct mtt_result *tr)
* init -- allocate state
*/
void
init(unsigned id, void *prestate, void **state_ptr,
init(unsigned id, void *prestate, void **state_ptr, sem_t **sems,
struct mtt_result *tr)
{
struct state *st = (struct state *)calloc(1, sizeof(struct state));
Expand All @@ -61,7 +61,7 @@ init(unsigned id, void *prestate, void **state_ptr,
* thread -- start a listening endpoint
*/
static void
thread(unsigned id, void *prestate, void *state,
thread(unsigned id, void *prestate, void *state, sem_t **sems,
struct mtt_result *result)
{
struct prestate *pr = (struct prestate *)prestate;
Expand All @@ -79,7 +79,7 @@ thread(unsigned id, void *prestate, void *state,
* fini -- shutdown the endpoint and free the state
*/
static void
fini(unsigned id, void *prestate, void **state_ptr,
fini(unsigned id, void *prestate, void **state_ptr, sem_t **sems,
struct mtt_result *tr)
{
struct state *st = (struct state *)*state_ptr;
Expand All @@ -97,7 +97,7 @@ fini(unsigned id, void *prestate, void **state_ptr,
* prestate_fini -- delete the peer object
*/
static void
prestate_fini(void *prestate, struct mtt_result *tr)
prestate_fini(void *prestate, sem_t **sems, struct mtt_result *tr)
{
struct prestate *pr = (struct prestate *)prestate;
int ret;
Expand Down Expand Up @@ -127,5 +127,5 @@ main(int argc, char *argv[])
prestate_fini
};

return mtt_run(&test, args.threads_num);
return mtt_run(&test, args.threads_num, 0);
}
Loading