Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t
context->newcomm = newcomm;
context->comm = comm;
context->bridgecomm = bridgecomm;
context->pml_tag = 0;

/* Determine which implementation of allreduce we have to use
* for the current mode. */
Expand Down Expand Up @@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
static int ompi_comm_checkcid (ompi_comm_request_t *request);
/* verify that the cid was available globally */
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
/* lock the cid generator */
static int ompi_comm_cid_lock (ompi_comm_request_t *request);

static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;

int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
Expand All @@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com

request->context = &context->super;

ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0);
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
ompi_comm_request_start (request);

*req = &request->super;
Expand Down Expand Up @@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
return rc;
}

static int ompi_comm_cid_lock (ompi_comm_request_t *request)
{
if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
}

return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0);
}

static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
ompi_request_t *subreq;
bool flag;
int ret;

if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
}

if (ompi_comm_cid_lowest_id < my_id) {
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
}

ompi_comm_cid_lowest_id = my_id;

/**
* This is the real algorithm described in the doc
*/
flag = false;
context->nextlocal_cid = mca_pml.pml_max_contextid;
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators,
i, context->comm);
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
context->comm);
if (true == flag) {
context->nextlocal_cid = i;
break;
Expand All @@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
context, &subreq);
if (OMPI_SUCCESS != ret) {
ompi_comm_cid_lowest_id = INT64_MAX;
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return ret;
}
Expand All @@ -341,10 +346,12 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
if (flag) {
opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
}


ompi_comm_cid_lowest_id = INT64_MAX;
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);

/* next we want to verify that the resulting commid is ok */
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
Expand All @@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
ompi_request_t *subreq;
int ret;

if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
}

context->flag = (context->nextcid == context->nextlocal_cid);

if (!context->flag) {
Expand All @@ -372,19 +383,26 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
}

OPAL_THREAD_UNLOCK(&ompi_cid_lock);

return ret;
}

static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;

if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
}

if (1 == context->rflag) {
/* set the according values to the newcomm */
context->newcomm->c_contextid = context->nextcid;
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);

/* unlock the cid generator */
ompi_comm_cid_lowest_id = INT64_MAX;
OPAL_THREAD_UNLOCK(&ompi_cid_lock);

/* done! */
Expand All @@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)

++context->iter;

OPAL_THREAD_UNLOCK(&ompi_cid_lock);

/* try again */
return ompi_comm_allreduce_getnextcid (request);
}
Expand Down