Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

can opal_progress() call opal_progress()? (test hanging in MPI_COMM_DUP) #2025

Open
jsquyres opened this issue Aug 27, 2016 · 29 comments
Open
Labels

Comments

@jsquyres
Copy link
Member

An intel test is hanging in MPI_COMM_DUP (MPI_Keyval1_c and MPI_Keyval1_f), and the backtrace from one of the hung processes is a bit strange:

$ mpirun -np 4 --mca btl vader,self ./MPI_Keyval1_c
[...hangs...]

A snapshot backtrace from a hung process is:

(gdb) bt
#0  0x00002aaaaaafecb2 in opal_list_remove_first (list=0x773508) at ../opal/class/opal_list.h:670
#1  0x00002aaaaaaff885 in ompi_comm_request_progress () at communicator/comm_request.c:114
#2  0x00002aaaab1ad39e in opal_progress () at runtime/opal_progress.c:221
#3  0x00002aaaaab215a1 in ompi_request_default_test_all (count=1, requests=0x779d10, completed=0x7fffffffc00c, statuses=0x0) at request/req_test.c:214
#4  0x00002aaabbfa5095 in NBC_Progress (handle=0x7741d8) at nbc.c:326
#5  0x00002aaabbfa30ec in ompi_coll_libnbc_progress () at coll_libnbc_component.c:242
#6  0x00002aaaab1ad39e in opal_progress () at runtime/opal_progress.c:221
#7  0x00002aaaaab21d64 in ompi_request_wait_completion (req=0x773580) at ../ompi/request/request.h:397
#8  0x00002aaaaab21da2 in ompi_request_default_wait (req_ptr=0x7fffffffc180, status=0x0) at request/req_wait.c:40
#9  0x00002aaaaaaf636a in ompi_comm_set (ncomm=0x7fffffffc1f8, oldcomm=0x78ef60, local_size=0, local_ranks=0x0, remote_size=0, remote_ranks=0x0, attr=0x76d6f0, errh=0x619040 <ompi_mpi_errors_return>, copy_topocomponent=true, local_group=0x76cd10, remote_group=0x778820) at communicator/comm.c:122
#10 0x00002aaaaaaf84b2 in ompi_comm_dup_with_info (comm=0x78ef60, info=0x0, newcomm=0x7fffffffc3c8) at communicator/comm.c:988
#11 0x00002aaaaaaf83f6 in ompi_comm_dup (comm=0x78ef60, newcomm=0x7fffffffc3c8) at communicator/comm.c:969
#12 0x00002aaaaab4d2cf in PMPI_Comm_dup (comm=0x78ef60, newcomm=0x7fffffffc3c8) at pcomm_dup.c:63
#13 0x0000000000402ba0 in main (argc=1, argv=0x7fffffffc528) at MPI_Keyval1_c.c:454

Notes:

  • ompi_comm_dup_with_info() is essentially waiting on a request that never completes. This seems to be the real issue. The only communication this test does is duping communicators.
  • A secondary issue (only happened to be noticed by this snapshot backtrace): is opal_progress() allowed to call opal_progress()?

These 2 tests (the C and Fortran versions) are not hanging on the v2.x branch.

@bosilca @hjelmn

@jsquyres jsquyres added the bug label Aug 27, 2016
@bosilca
Copy link
Member

bosilca commented Aug 27, 2016

It is difficult to see the problem only from this stack trace. Moreover, recursively calling opal_progress is not forbidden (especially in this particular context where we need to progress non-blocking non-PML requests).
@hjelmn I look at the ompi_comm_request_progress function, and I do not think there is a reason for it to exists. We have added completion callback to the requests to avoid having to register/unregister specialized progress functions.

@hjelmn
Copy link
Member

hjelmn commented Aug 27, 2016

yeah. was written before i knew about the callback. will update this soon

@jsquyres
Copy link
Member Author

@hjelmn You made mention that you were going to update the request callback function. Have you had a chance to do so?

@sam6258
Copy link
Contributor

sam6258 commented Feb 21, 2018

I am seeing an issue in v3.0.x, v3.1.x, and master that resembles a similar stack trace as this issue, however the issue is with multi threaded support. When performing MPI_Comm_dup in a loop with threads, the problem is almost instantly reproducible.

The stack trace looks as follows:

Rank 0:

Thread 1 (Thread 0x100000047050 (LWP 151366)):
#0  0x00001000001de854 in pthread_spin_lock () from /lib64/libpthread.so.0
#1  0x0000100001f5d368 in mlx5_poll_cq_1 () from /lib64/libmlx5-rdmav2.so
#2  0x0000100002276ab8 in poll_device () from /u/smiller/ompi-v3.0.x/lib/openmpi/mca_btl_openib.so
#3  0x0000100002277a8c in btl_openib_component_progress () from /u/smiller/ompi-v3.0.x/lib/openmpi/mca_btl_openib.so
#4  0x0000100000528640 in opal_progress () from /u/smiller/ompi-v3.0.x/lib/libopen-pal.so.40
#5  0x00001000005317e4 in sync_wait_mt () from /u/smiller/ompi-v3.0.x/lib/libopen-pal.so.40
#6  0x00001000000e0a14 in ompi_request_default_wait () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#7  0x000010000015081c in ompi_coll_base_barrier_intra_two_procs () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#8  0x00001000038e5124 in ompi_coll_tuned_barrier_intra_dec_fixed () from /u/smiller/ompi-v3.0.x/lib/openmpi/mca_coll_tuned.so
#9  0x00001000000fb85c in PMPI_Barrier () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#10 0x0000000010000dc0 in main (argc=1, argv=0x3fffe401eeb8) at mt_comm_dup_hang.c:81

Rank 1:

Thread 3 (Thread 0x10001046f1d0 (LWP 151445)):
#0  0x00001000000c1df0 in ompi_comm_request_progress () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#1  0x0000100000528640 in opal_progress () from /u/smiller/ompi-v3.0.x/lib/libopen-pal.so.40
#2  0x00001000000e028c in ompi_request_default_test_all () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#3  0x00001000039d7f90 in NBC_Progress () from /u/smiller/ompi-v3.0.x/lib/openmpi/mca_coll_libnbc.so
#4  0x00001000039d673c in ompi_coll_libnbc_progress () from /u/smiller/ompi-v3.0.x/lib/openmpi/mca_coll_libnbc.so
#5  0x0000100000528640 in opal_progress () from /u/smiller/ompi-v3.0.x/lib/libopen-pal.so.40
#6  0x00001000005317e4 in sync_wait_mt () from /u/smiller/ompi-v3.0.x/lib/libopen-pal.so.40
#7  0x00001000000c12a0 in ompi_comm_nextcid () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#8  0x00001000000bb380 in ompi_comm_dup_with_info () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#9  0x00001000000bb420 in ompi_comm_dup () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#10 0x0000100000100ec4 in PMPI_Comm_dup () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#11 0x0000000010000e48 in runfunc (targ=0x100201c0 <thread_args>) at mt_comm_dup_hang.c:96
#12 0x00001000001d8728 in start_thread () from /lib64/libpthread.so.0
#13 0x000010000032d210 in clone () from /lib64/libc.so.6

Thread 2 (Thread 0x10001006f1d0 (LWP 151446)):
#0  0x00001000001ddd60 in pthread_cond_wait@@GLIBC_2.17 () from /lib64/libpthread.so.0
#1  0x00001000005317a4 in sync_wait_mt () from /u/smiller/ompi-v3.0.x/lib/libopen-pal.so.40
#2  0x00001000000c12a0 in ompi_comm_nextcid () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#3  0x00001000000bb380 in ompi_comm_dup_with_info () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#4  0x00001000000bb420 in ompi_comm_dup () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#5  0x0000100000100ec4 in PMPI_Comm_dup () from /u/smiller/ompi-v3.0.x/lib/libmpi.so.40
#6  0x0000000010000e48 in runfunc (targ=0x100201d0 <thread_args+16>) at mt_comm_dup_hang.c:96
#7  0x00001000001d8728 in start_thread () from /lib64/libpthread.so.0
#8  0x000010000032d210 in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x100000047050 (LWP 151367)):
#0  0x00001000001d9b34 in pthread_join () from /lib64/libpthread.so.0
#1  0x0000000010000d98 in main (argc=1, argv=0x3fffcd706188) at mt_comm_dup_hang.c:79

In opal/threads/wait_sync.c, it looks like we are stuck in the loop waiting on while(sync->count > 0) or opal_progress to exit. Rerunning a gdb capture of the stack traces will show opal_progress looping through the registered progress functions, so we know it is not a particular progress function that is hung.

A gist of the testcase can be found at: https://gist.github.com/sam6258/8233342cee7c0acc93d2c9ffbf18ba35

@jsquyres
Copy link
Member Author

jsquyres commented Feb 27, 2018

Recap:

  • Back at the beginning of time, we have 2 CID allocation algorithms:
    • Single threaded: simple and fast, which is now what is used everywhere
    • Multi threaded: thread safe, iterative, and slow.

It looks like we're using the single threaded/fast one everywhere now. That may be a mistake.

We may need to go back to:

  • Have both CID algorithms available at runtime
  • Always choose the safe/correct algorithm (e.g., in THREAD_MULTIPLE runs, choose the slow/correct one)
  • Allow the user to override to choose the fast one if they want (i.e., they can guarantee that they will never overlap multiple CID allocations) -- e.g., via info key, MCA var, MPIT, ...etc.

@ggouaillardet
Copy link
Contributor

@jsquyres as far as I understand, we now only use the thread safe, iterative and slow algorithm.
At first glance, and with respect to CID allocation, I think this is now a requirement because of MPI_Comm_idup(), that adds similar restrictions than MPI_THREAD_MULTIPLE.
Some time ago, I prepared a PR to add a new framework, and MPI_THREAD_MULTIPLE value was passed to the components when they are open.

@bosilca
Copy link
Member

bosilca commented Feb 28, 2018

There is an opportunity for these two issues not to be related. The original issue has not been updated for months (years), I will therefore assume it has been fixed.

Let me scale back this example to try to explain what I think is the problem. Duplicating a communicator involves 2 allreduce, one with MPI_MAX (to find a suitable cid) and one with MPI_MIN (to agree on it). These reductions are done sequentially on the same communicator. In single threaded everything goes smooth, the global order of these 2 allreduce is ALWAYS respected.

In multi-threaded cases, starting multiple duplications in same time will basically see a tuple of (max, mix) allreduce posted sequentially per dup, on the same communicator on each process. MPI requires that collectives are posted in the same order globally, but in a distributed system we have no way to enforce a global order between the tuple of allreduce from the dup operation. The current algorithm work just fine if the global order is respected, but fails when this is not the case, which is exactly the reported behavior: random deadlocks.

For a toy example that tries to mimic the behavior of multiple duplications, replace the runfunc from the previous gist with the following:

void *runfunc(void * args) {
    MPI_Request req;
    MPI_Status status;
    int myrank, mysize;

    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    MPI_Comm_size(MPI_COMM_WORLD, &mysize);

    int old_value = myrank, new_value;
    int *sbuf = &old_value, *rbuf = &new_value;

    MPI_Iallreduce( sbuf, rbuf, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD, &req );
    MPI_Wait(&req, &status);
    assert( new_value == (mysize - 1) );

    MPI_Iallreduce( sbuf, rbuf, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD, &req );
    MPI_Wait(&req, &status);
    assert( new_value == 0 );

    pthread_exit(NULL);
}

So the problem of our cid selection in multithreaded cases is that we mix the reduces in a totally nondeterministic way.

I can see few solutions, but none trivial.

  • have a tag range in the libnbc that for reductions accounts for the MPI_Op (for reductions the handle->tag should not be simply monotonically increasing per operation but instead monotonically increasing per collective and MPI_Op).
  • add multiple internal communicators for collectives (2 in this case), first to avoid the current collision between the internal reduction and user posted iallreduce) and second to give a different communication context to the 2 allreduces (the max and the min will then be posted on 2 different internal communicators).

@ggouaillardet
Copy link
Contributor

@bosilca @sam6258 I think this is an invalid example with respect to the standard.

MPI_Comm_dup() is a collective operation, and it is hence incorrect to invoke it on the same communicator (MPI_COMM_WORLD in this case) simultaneously on multiple threads.

makes sense ?

@sam6258
Copy link
Contributor

sam6258 commented Mar 1, 2018

I think @ggouaillardet is correct. I will have to re-evaluate the test case from which we wrote this small test case to see that it is not violating the standard. I added the necessary synchronization in runfunc to ensure two threads were not calling the same collective on COMM_WORLD at the same time, and the test case does not hang. Thanks Gilles! And everyone for looking into this.

@jsquyres
Copy link
Member Author

jsquyres commented Mar 1, 2018

Sounds like we can close this issue, then...?

@sam6258
Copy link
Contributor

sam6258 commented Mar 1, 2018

I will verify with our tester tomorrow and close if I see a violation of the standard in our original testcase. Thanks!

@bosilca
Copy link
Member

bosilca commented Mar 1, 2018

This particular example is indeed incorrect, but replace the blocking dup with a non-blocking dup and we are back with a valid non-functional example.

@ggouaillardet
Copy link
Contributor

@bosilca in order to avoid any confusion/misunderstanding, would you mind updating such a valid non-functional example ?

@bosilca
Copy link
Member

bosilca commented Mar 1, 2018

What we need to emphasize is the non-determinism between internal steps of the operation itself. We can achieve this by changing the runfunc in the gist example by

/*   Thread function */
void *runfunc(void * args) {
    MPI_Comm comms[2];
    MPI_Request reqs[1] = {MPI_REQUEST_NULL};

    MPI_Comm_idup( MPI_COMM_WORLD, &comms[0], &reqs[0] );

    MPI_Waitall(1, reqs, MPI_STATUSES_IGNORE);

    MPI_Comm_free(&comms[0]);
    pthread_exit(NULL);
}

The question I couldn't find an answer to in the MPI Standard is if we are allowed to post 2 idup on the same communicator at the same time.

@ggouaillardet
Copy link
Contributor

well, I ran into Bill Gropp at SC'16 and asked him a related question. He replied that communicator management is considered as collective operations. So my view is that if MPI_Iallreduce() is not allowed here because it is a collective operation, then MPI_comm_idup() would be not be allowed too for the same reason.

@bosilca
Copy link
Member

bosilca commented Mar 1, 2018

I think Bill made reference to Section 5.12 from the MPI standard. While this section clarifies some corner cases, I do not see anything in there that forbid the scenario we are talking about. In fact here are the important bits from the Section 5.12:

Multiple nonblocking collective operations can be outstanding on a single communicator.

All processes must call collective operations (blocking and nonblocking) in the same order per communicator. In particular, once a process calls a collective operation, all other processes in the communicator must eventually call the same collective operation, and no other collective operation with the same communicator in between.

According to this, it should be legal to have multiple outstanding idup on the same communicator, because from the MPI standard viewpoint, an idup is considered as a single operation. Unfortunately, from OMPI perspective this is not the case.

@ggouaillardet
Copy link
Contributor

I am still a bit puzzled, but anyway ...

Would updating ompi_comm_request_progress() so it progresses any communicator only once fix that issue ?

@bosilca
Copy link
Member

bosilca commented Mar 1, 2018

I am puzzled as well. I would be in favor to ask this to the Forum, either by sending an email or by volunteering @jsquyres (who is currently attending the MPI forum) to ask around what the forum think about having multiple outstanding idup on the same communicator.

@jsquyres
Copy link
Member Author

jsquyres commented Mar 1, 2018

Fun fact: I'm not actually at the Forum this week 😲

@ggouaillardet
Copy link
Contributor

well, regardless this is valid or not, the following snippet (no need for multiple threads) might lead to incorrect results (it does not hang, but a given communicator might have several CIDs), the issue can be evidenced after a few runs on 4 MPI tasks.

    MPI_Comm_idup(MPI_COMM_WORLD, comm, req);
    MPI_Comm_idup(MPI_COMM_WORLD, comm+1, req+1);
    MPI_Waitall(2, req, MPI_STATUSES_IGNORE);

as I suggested earlier, progressing the second request after the first request is complete (since they both operate on the same MPI_COMM_WORLD communicator)) fixes the issue.

here are attached two examples and a proof of concept.
note you might have to adjust them since I basically hardcoded offsetof(ompi_communicator_t, c_contextid) in order to retrieve the CID (and I configure'd with --enable-debug fwiw)

with MPI_Comm_dup(), it always work

#include <stdio.h>
#include <mpi.h>

int main(int argc, char *argv[]) {
    MPI_Comm comm[3];
    int rank, size;
    int mycids[2];
    int cids[8];

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (4 < size) {
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    if (0 == rank) {
        MPI_Comm_dup(MPI_COMM_SELF, comm);
        MPI_Comm_dup(MPI_COMM_SELF, comm+2);
        MPI_Comm_free(comm);
    } else {
        MPI_Comm_dup(MPI_COMM_SELF, comm+2);
    }

    MPI_Comm_dup(MPI_COMM_WORLD, comm);
    MPI_Comm_dup(MPI_COMM_WORLD, comm+1);

    mycids[0] = ((int *)comm[0])[0x138/4];
    mycids[1] = ((int *)comm[1])[0x138/4];

    MPI_Gather(mycids, 2, MPI_INT, cids, 2, MPI_INT, 0, MPI_COMM_WORLD);

    if (0 == rank) {
        int i;
        for (i=1; i<size; i++) {
            if (cids[0] != cids[2*i]) {
                fprintf(stderr, "mismatch for comm 0 on rank %d, got %d but has %d\n", i, cids[0], cids[2*i]);
                 MPI_Abort(MPI_COMM_WORLD, 2);
            }
            if (cids[1] != cids[2*i+1]) {
                fprintf(stderr, "mismatch for comm 1 on rank %d, got %d but has %d\n", i, cids[1], cids[2*i+1]);
                 MPI_Abort(MPI_COMM_WORLD, 2);
            }
        }
        printf ("OK\n");
    }

    MPI_Finalize();
    return 0;
}

with MPI_Comm_idup(), it does not always work

#include <stdio.h>
#include <mpi.h>

int main(int argc, char *argv[]) {
    MPI_Comm comm[3];
    MPI_Request req[2];
    int rank, size;
    int mycids[2];
    int cids[8];

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (4 < size) {
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    if (0 == rank) {
        MPI_Comm_dup(MPI_COMM_SELF, comm);
        MPI_Comm_dup(MPI_COMM_SELF, comm+2);
        MPI_Comm_free(comm);
    } else {
        MPI_Comm_dup(MPI_COMM_SELF, comm+2);
    }

    MPI_Comm_idup(MPI_COMM_WORLD, comm, req);
    MPI_Comm_idup(MPI_COMM_WORLD, comm+1, req+1);
    MPI_Waitall(2, req, MPI_STATUSES_IGNORE);

    mycids[0] = ((int *)comm[0])[0x138/4];
    mycids[1] = ((int *)comm[1])[0x138/4];

    MPI_Gather(mycids, 2, MPI_INT, cids, 2, MPI_INT, 0, MPI_COMM_WORLD);

    if (0 == rank) {
        int i;
        for (i=1; i<size; i++) {
            if (cids[0] != cids[2*i]) {
                fprintf(stderr, "mismatch for comm 0 on rank %d, got %d but has %d\n", i, cids[0], cids[2*i]);
                 MPI_Abort(MPI_COMM_WORLD, 2);
            }
            if (cids[1] != cids[2*i+1]) {
                fprintf(stderr, "mismatch for comm 1 on rank %d, got %d but has %d\n", i, cids[1], cids[2*i+1]);
                 MPI_Abort(MPI_COMM_WORLD, 2);
            }
        }
        printf ("OK\n");
    }

    MPI_Finalize();
    return 0;
}

here is a proof of concept that seems to fix the issue

diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c
index 228abae..3a7bece 100644
--- a/ompi/communicator/comm.c
+++ b/ompi/communicator/comm.c
@@ -1071,6 +1071,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro
     if (NULL == request) {
         return OMPI_ERR_OUT_OF_RESOURCE;
     }
+    request->contextid = comm->c_contextid;
 
     context = OBJ_NEW(ompi_comm_idup_with_info_context_t);
     if (NULL == context) {
diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c
index 272fc33..6dbdd39 100644
--- a/ompi/communicator/comm_request.c
+++ b/ompi/communicator/comm_request.c
@@ -22,6 +22,7 @@
 
 static opal_free_list_t ompi_comm_requests;
 static opal_list_t ompi_comm_requests_active;
+static opal_list_t ompi_comm_requests_queue;
 static opal_mutex_t ompi_comm_request_mutex;
 bool ompi_comm_request_progress_active = false;
 bool ompi_comm_request_initialized = false;
@@ -44,6 +45,7 @@ void ompi_comm_request_init (void)
                                 NULL, 0, NULL, NULL, NULL);
 
     OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
+    OBJ_CONSTRUCT(&ompi_comm_requests_queue, opal_list_t);
     ompi_comm_request_progress_active = false;
     OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
     ompi_comm_request_initialized = true;
@@ -64,6 +66,7 @@ void ompi_comm_request_fini (void)
     opal_mutex_unlock (&ompi_comm_request_mutex);
     OBJ_DESTRUCT(&ompi_comm_request_mutex);
     OBJ_DESTRUCT(&ompi_comm_requests_active);
+    OBJ_DESTRUCT(&ompi_comm_requests_queue);
     OBJ_DESTRUCT(&ompi_comm_requests);
 }
 
@@ -141,9 +144,17 @@ static int ompi_comm_request_progress (void)
 
         /* if the request schedule is empty then the request is complete */
         if (0 == opal_list_get_size (&request->schedule)) {
+            ompi_comm_request_t *req;
             opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
             request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : MPI_ERR_INTERN;
             ompi_request_complete (&request->super, true);
+            OPAL_LIST_FOREACH(req, &ompi_comm_requests_queue, ompi_comm_request_t) {
+                if (request->contextid == req->contextid) {
+                    opal_list_remove_item(&ompi_comm_requests_queue, (opal_list_item_t *)req);
+                    opal_list_append(&ompi_comm_requests_active, (opal_list_item_t *)req);
+                    break;
+                }
+            }
         }
     }
 
@@ -161,8 +172,21 @@ static int ompi_comm_request_progress (void)
 
 void ompi_comm_request_start (ompi_comm_request_t *request)
 {
+    ompi_comm_request_t *req;
+    bool queued = false;
     opal_mutex_lock (&ompi_comm_request_mutex);
-    opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
+    if (MPI_UNDEFINED != request->contextid) {
+        OPAL_LIST_FOREACH(req, &ompi_comm_requests_active, ompi_comm_request_t) {
+            if (request->contextid == req->contextid) {
+                opal_list_append(&ompi_comm_requests_queue, (opal_list_item_t *)request);
+                queued = true;
+                break;
+            }
+        }
+    }
+    if (!queued) {
+        opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
+    }
 
     /* check if we need to start the communicator request progress function */
     if (!ompi_comm_request_progress_active) {
@@ -230,6 +254,8 @@ static void ompi_comm_request_construct (ompi_comm_request_t *request)
     request->super.req_cancel = ompi_comm_request_cancel;
 
     OBJ_CONSTRUCT(&request->schedule, opal_list_t);
+
+    request->contextid = MPI_UNDEFINED;
 }
 
 static void ompi_comm_request_destruct (ompi_comm_request_t *request)
diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h
index 43082d6..cbc680e 100644
--- a/ompi/communicator/comm_request.h
+++ b/ompi/communicator/comm_request.h
@@ -23,6 +23,7 @@ typedef struct ompi_comm_request_t {
 
     opal_object_t *context;
     opal_list_t schedule;
+    uint32_t contextid;
 } ompi_comm_request_t;
 OBJ_CLASS_DECLARATION(ompi_comm_request_t);

@bosilca
Copy link
Member

bosilca commented Mar 2, 2018

Nice, I thought about this case but I couldn't make it break. I see your trick, force a mismatch on the cid by creating one additional communicator on the root.

Your solution is to basically sequentialize all dup on the same communicator. Sound reasonable.

@sam6258
Copy link
Contributor

sam6258 commented Mar 5, 2018

So far, I haven't found anything in our large test case that violates the standard. I will try applying the code @ggouaillardet posted above to see if that makes a difference in our situation.

@jeorsch
Copy link

jeorsch commented Sep 23, 2019

I would like to raise this issue, since there has been no activity for quite some time.
I am also having trouble with MPI_Comm_idup() interfering with other collectives, which results in reproducible deadlocks. In my case it might interfere with other MPI_Comm_idup() or MPI_Ibcast() calls. I cannot use the blocking MPI_Comm_dup() instead, since I use a multi-threaded environment and I cannot assure that I have a sufficient number of threads to kick-off all required MPI_Comm_dup() calls.
I actually found a workaround by protecting the communicator from the call of additional collectives until the MPI_Test() that finishes the current MPI_Comm_idup() operation is successful. However, this unnecessarily sequentializes my code (not too bad since it is only some kind of setup stage) and makes it more complicated.
The proof of concept by @ggouaillardet also does not help in my case. I assume that it does not account for other collectives (in my case MPI_Ibcast()) that might interfere with MPI_Comm_idup().

@devreal
Copy link
Contributor

devreal commented Sep 23, 2019

I actually found a workaround by protecting the communicator from the call of additional collectives until the MPI_Test() that finishes the current MPI_Comm_idup() operation is successful. However, this unnecessarily sequentializes my code (not too bad since it is only some kind of setup stage) and makes it more complicated.

Does that mean that threads issue collective operations (idup, Ibcast) on the same communicator concurrently? Can you provide an example of what the threads are doing?

@jeorsch
Copy link

jeorsch commented Sep 23, 2019

Does that mean that threads issue collective operations (idup, Ibcast) on the same communicator concurrently? Can you provide an example of what the threads are doing?

No, of course they have to obey a strict order of all collectives. I have a multi-task runtime that has to duplicate MPI communicators on-demand to allow collective patterns for a non-deterministic task execution.
Unfortunately, MPI does not offer tags for collectives and (not yet) persistent collectives. Therefore, I have to protect each MPI_Comm_idup() call for a specific task using a tagged inter-process mutex. Here, rank 0 of the communicator broadcasts via MPI_Ibcast() a tag of the task that should perform the next communicator duplication. The task order is generally random, but all ranks will follow rank 0's duplication sequence.
Once this broadcast is finished, one thread in each rank of the communicator enters a mutex to prevent concurrent threads from performing collective calls. After MPI_Comm_idup() is called, the tagged inter-process mutex is released. Now, another thread that executes a task with pending communicator duplication will lock the mutex and call a new MPI_Ibcast() so that all ranks can negotiate on the next MPI_Comm_idup() call.
There are no other collective operations on the parent communicator apart from these MPI_Comm_idup() and MPI_Ibcast() calls, so I can assume correctness of my call sequence w.r.t. the MPI standard. There is either exactly one active MPI_Ibcast() or exactly one new call of MPI_Comm_idup(). So there might be several concurrent MPI_Comm_idup() calls, but only one active MPI_Ibcast() request at a time. There might be some active non-blocking point-to-point operations as well.
As already mentioned, if I release the tagged inter-process mutex only after a successful MPI_Test() of the MPI_Comm_idup() request, there are no more deadlocks. The next MPI_Ibcast() is performed afterwards and there are no parallel MPI_Comm_idup() requests on this communicator at any time.

The application is closed source, so I tried to break it down to a minimal reproducer, however, only single-threaded for now:

#include <mpi.h>

int main(int argc, char *argv[]) {

    MPI_Request request[3];
    MPI_Comm newComm[2];
    int data = 1;
    int flag;

    MPI_Init(&argc, &argv);

    MPI_Comm_idup(MPI_COMM_WORLD, &newComm[0], &request[0]);

    MPI_Ibcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD, &request[1]);
    
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);
    MPI_Test(&request[0], &flag, MPI_STATUS_IGNORE);

    MPI_Wait(&request[1], MPI_STATUS_IGNORE);

    MPI_Comm_idup(MPI_COMM_WORLD, &newComm[1], &request[2]);

    MPI_Wait(&request[2], MPI_STATUS_IGNORE);
    MPI_Wait(&request[0], MPI_STATUS_IGNORE);

    MPI_Finalize();
    
    return 0;
}

We actually do not use MPI_Wait() calls, but this should not be relevant as the example deadlocks anyhow. It does not always deadlock on my machine, but in most runs using two ranks. It seems like the random number of tests on &request[0] triggers the erroneous behavior.

@bosilca
Copy link
Member

bosilca commented Sep 23, 2019

I think we are going back to the discussion we had last year that MPI_Comm_idup being implemented using multiple non blocking collectives we cannot allow other non-blocking collectives to be interpose between a idup and the corresponding completion. At least not without @ggouaillardet patch above.

@jeorsch
Copy link

jeorsch commented Sep 26, 2019

As I already mentioned the patch by @ggouaillardet did not help in case of my application.

@ggouaillardet
Copy link
Contributor

@jeorsch Here is an updated version of my patch.

long story short, only one MPI_Comm_idup() per communicator is progressed at once and its MPI_Iallreduce() use a dedicated tag that does not impact other collectives on this communicator.
this is really proof-of-concept and only work with intra communicators.

diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c
index 50b19ee..ce76610 100644
--- a/ompi/communicator/comm.c
+++ b/ompi/communicator/comm.c
@@ -1074,6 +1074,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro
     if (NULL == request) {
         return OMPI_ERR_OUT_OF_RESOURCE;
     }
+    request->contextid = comm->c_contextid;
 
     context = OBJ_NEW(ompi_comm_idup_with_info_context_t);
     if (NULL == context) {
diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c
index 3833d3a..f2cdea5 100644
--- a/ompi/communicator/comm_cid.c
+++ b/ompi/communicator/comm_cid.c
@@ -645,7 +645,7 @@ static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, str
     ompi_communicator_t *comm = context->comm;
 
     return comm->c_coll->coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm,
-                                         req, comm->c_coll->coll_iallreduce_module);
+                                         req, (struct mca_coll_base_module_2_3_0_t *)((ptrdiff_t)comm->c_coll->coll_iallreduce_module | 0x1));
 }
 
 /* Non-blocking version of ompi_comm_allreduce_inter */
diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c
index 4e3288e..97b2c49 100644
--- a/ompi/communicator/comm_request.c
+++ b/ompi/communicator/comm_request.c
@@ -22,6 +22,7 @@
 
 static opal_free_list_t ompi_comm_requests;
 static opal_list_t ompi_comm_requests_active;
+static opal_list_t ompi_comm_requests_queue;
 static opal_mutex_t ompi_comm_request_mutex;
 bool ompi_comm_request_progress_active = false;
 bool ompi_comm_request_initialized = false;
@@ -44,6 +45,7 @@ void ompi_comm_request_init (void)
                                 NULL, 0, NULL, NULL, NULL);
 
     OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
+    OBJ_CONSTRUCT(&ompi_comm_requests_queue, opal_list_t);
     ompi_comm_request_progress_active = false;
     OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
     ompi_comm_request_initialized = true;
@@ -64,6 +66,7 @@ void ompi_comm_request_fini (void)
     opal_mutex_unlock (&ompi_comm_request_mutex);
     OBJ_DESTRUCT(&ompi_comm_request_mutex);
     OBJ_DESTRUCT(&ompi_comm_requests_active);
+    OBJ_DESTRUCT(&ompi_comm_requests_queue);
     OBJ_DESTRUCT(&ompi_comm_requests);
 }
 
@@ -148,9 +151,17 @@ static int ompi_comm_request_progress (void)
 
         /* if the request schedule is empty then the request is complete */
         if (0 == opal_list_get_size (&request->schedule)) {
+            ompi_comm_request_t *req, *nreq;
             opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
             request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : rc;
             ompi_request_complete (&request->super, true);
+            OPAL_LIST_FOREACH_SAFE(req, nreq, &ompi_comm_requests_queue, ompi_comm_request_t) {
+                if (request->contextid == req->contextid) {
+                    opal_list_remove_item(&ompi_comm_requests_queue, (opal_list_item_t *)req);
+                    opal_list_append(&ompi_comm_requests_active, (opal_list_item_t *)req);
+                    break;
+                }
+            }
         }
     }
 
@@ -168,8 +179,21 @@ static int ompi_comm_request_progress (void)
 
 void ompi_comm_request_start (ompi_comm_request_t *request)
 {
+    ompi_comm_request_t *req;
+    bool queued = false;
     opal_mutex_lock (&ompi_comm_request_mutex);
-    opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
+    if (MPI_UNDEFINED != request->contextid) {
+        OPAL_LIST_FOREACH(req, &ompi_comm_requests_active, ompi_comm_request_t) {
+            if (request->contextid == req->contextid) {
+                opal_list_append(&ompi_comm_requests_queue, (opal_list_item_t *)request);
+                queued = true;
+                break;
+            }
+        }
+    }
+    if (!queued) {
+        opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
+    }
 
     /* check if we need to start the communicator request progress function */
     if (!ompi_comm_request_progress_active) {
@@ -238,6 +262,8 @@ static void ompi_comm_request_construct (ompi_comm_request_t *request)
     request->super.req_cancel = ompi_comm_request_cancel;
 
     OBJ_CONSTRUCT(&request->schedule, opal_list_t);
+
+    request->contextid = MPI_UNDEFINED;
 }
 
 static void ompi_comm_request_destruct (ompi_comm_request_t *request)
diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h
index 43082d6..cbc680e 100644
--- a/ompi/communicator/comm_request.h
+++ b/ompi/communicator/comm_request.h
@@ -23,6 +23,7 @@ typedef struct ompi_comm_request_t {
 
     opal_object_t *context;
     opal_list_t schedule;
+    uint32_t contextid;
 } ompi_comm_request_t;
 OBJ_CLASS_DECLARATION(ompi_comm_request_t);
 
diff --git a/ompi/mca/coll/base/coll_tags.h b/ompi/mca/coll/base/coll_tags.h
index f40f029..ec1eb1a 100644
--- a/ompi/mca/coll/base/coll_tags.h
+++ b/ompi/mca/coll/base/coll_tags.h
@@ -42,7 +42,8 @@
 #define MCA_COLL_BASE_TAG_SCATTER -25
 #define MCA_COLL_BASE_TAG_SCATTERV -26
 #define MCA_COLL_BASE_TAG_NONBLOCKING_BASE -27
-#define MCA_COLL_BASE_TAG_NONBLOCKING_END ((-1 * INT_MAX/2) + 1)
+#define MCA_COLL_BASE_TAG_NONBLOCKING_END ((-1 * INT_MAX/2) + 2)
+#define MCA_COLL_BASE_TAG_NONBLOCKING_DUP ((-1 * INT_MAX/2) + 1)
 #define MCA_COLL_BASE_TAG_HCOLL_BASE (-1 * INT_MAX/2)
 #define MCA_COLL_BASE_TAG_HCOLL_END (-1 * INT_MAX)
 #endif /* MCA_COLL_BASE_TAGS_H */
diff --git a/ompi/mca/coll/libnbc/nbc.c b/ompi/mca/coll/libnbc/nbc.c
index 171f5a3..60c8f9e 100644
--- a/ompi/mca/coll/libnbc/nbc.c
+++ b/ompi/mca/coll/libnbc/nbc.c
@@ -683,15 +683,19 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
       return OMPI_ERR_OUT_OF_RESOURCE;
     }
 
-    /* update the module->tag here because other processes may have operations
-     * and they may update the module->tag */
-    OPAL_THREAD_LOCK(&module->mutex);
-    tmp_tag = module->tag--;
-    if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
-      tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
-      NBC_DEBUG(2,"resetting tags ...\n");
+    if ((ptrdiff_t)module & 0x1) {
+      tmp_tag = MCA_COLL_BASE_TAG_NONBLOCKING_DUP;
+    } else {
+      /* update the module->tag here because other processes may have operations
+       * and they may update the module->tag */
+      OPAL_THREAD_LOCK(&module->mutex);
+      tmp_tag = module->tag--;
+      if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
+        tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
+        NBC_DEBUG(2,"resetting tags ...\n");
+      }
+      OPAL_THREAD_UNLOCK(&module->mutex);
     }
-    OPAL_THREAD_UNLOCK(&module->mutex);
 
     OBJ_RELEASE(schedule);
     free(tmpbuf);
@@ -712,18 +716,27 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
 
   /******************** Do the tag and shadow comm administration ...  ***************/
 
-  OPAL_THREAD_LOCK(&module->mutex);
-  tmp_tag = module->tag--;
-  if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
-      tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
-      NBC_DEBUG(2,"resetting tags ...\n");
-  }
+  if ((ptrdiff_t)module & 0x1) {
+    tmp_tag = MCA_COLL_BASE_TAG_NONBLOCKING_DUP;
+    module = (ompi_coll_libnbc_module_t *)((ptrdiff_t)module & ~(ptrdiff_t)0x1);
+    if (true != module->comm_registered) {
+        module->comm_registered = true;
+        need_register = true;
+    }
+  } else {
+    OPAL_THREAD_LOCK(&module->mutex);
+    tmp_tag = module->tag--;
+    if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
+        tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
+        NBC_DEBUG(2,"resetting tags ...\n");
+    }
 
-  if (true != module->comm_registered) {
-      module->comm_registered = true;
-      need_register = true;
+    if (true != module->comm_registered) {
+        module->comm_registered = true;
+        need_register = true;
+    }
+    OPAL_THREAD_UNLOCK(&module->mutex);
   }
-  OPAL_THREAD_UNLOCK(&module->mutex);
 
   handle->tag = tmp_tag;
 

@jeorsch
Copy link

jeorsch commented Oct 9, 2019

@ggouaillardet Unfortunately, your updated patch also does not help. However, once my application succeeded with two ranks which it never did with the old patch.
Currently, I am quite busy, but I will try to provide a reproducer for this communication pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants