From fbbf743c368cad6d2422c5712a04f92446880617 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 18 Aug 2016 08:50:06 -0600 Subject: [PATCH] comm/cid: fix threaded CID allocation This commit should restore the pre-non-blocking behavior of the CID allocator when threads are used. There are two primary changes: 1) do not hold the cid allocator lock past the end of a request callback, and 2) if a lower id communicator is detected during CID allocation back off and let the lower id communicator finish before continuing. Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 50 +++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 1954a27de92..deaaefc8ccd 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t context->newcomm = newcomm; context->comm = comm; context->bridgecomm = bridgecomm; + context->pml_tag = 0; /* Determine which implementation of allreduce we have to use * for the current mode. */ @@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); -/* lock the cid generator */ -static int ompi_comm_cid_lock (ompi_comm_request_t *request); + +static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX; int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, @@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com request->context = &context->super; - ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); ompi_comm_request_start (request); *req = &request->super; @@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, return rc; } -static int ompi_comm_cid_lock (ompi_comm_request_t *request) -{ - if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { - return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - } - - return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); -} - static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag); ompi_request_t *subreq; bool flag; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + if (ompi_comm_cid_lowest_id < my_id) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + ompi_comm_cid_lowest_id = my_id; + /** * This is the real algorithm described in the doc */ flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, context->comm); + flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, + context->comm); if (true == flag) { context->nextlocal_cid = i; break; @@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, context, &subreq); if (OMPI_SUCCESS != ret) { + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -341,10 +346,12 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) if (flag) { opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } - + + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* next we want to verify that the resulting commid is ok */ return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); @@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ompi_request_t *subreq; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0); + } + context->flag = (context->nextcid == context->nextlocal_cid); if (!context->flag) { @@ -372,6 +383,8 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ret; } @@ -379,12 +392,17 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0); + } + if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); /* unlock the cid generator */ + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ @@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) ++context->iter; + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + /* try again */ return ompi_comm_allreduce_getnextcid (request); }