diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h index 5bb8bbe953..1ebef9d90c 100644 --- a/ompi/mca/osc/rdma/osc_rdma.h +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -423,8 +423,8 @@ static inline void mark_incoming_completion (ompi_osc_rdma_module_t *module, int { if (MPI_PROC_NULL == source) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "mark_incoming_completion marking active incoming complete. count = %d", - (int) module->active_incoming_frag_count + 1)); + "mark_incoming_completion marking active incoming complete. count = %d. signal = %d", + (int) module->active_incoming_frag_count + 1, module->active_incoming_frag_signal_count)); OPAL_THREAD_ADD32(&module->active_incoming_frag_count, 1); if (module->active_incoming_frag_count >= module->active_incoming_frag_signal_count) { opal_condition_broadcast(&module->cond); diff --git a/ompi/mca/osc/rdma/osc_rdma_active_target.c b/ompi/mca/osc/rdma/osc_rdma_active_target.c index 135aa87da2..e2b2ab4a91 100644 --- a/ompi/mca/osc/rdma/osc_rdma_active_target.c +++ b/ompi/mca/osc/rdma/osc_rdma_active_target.c @@ -461,7 +461,7 @@ ompi_osc_rdma_wait(ompi_win_t *win) OPAL_THREAD_LOCK(&module->lock); while (0 != module->num_complete_msgs || - module->active_incoming_frag_count < module->active_incoming_frag_signal_count) { + module->active_incoming_frag_count != module->active_incoming_frag_signal_count) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "num_complete_msgs = %d, active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d", module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count)); @@ -501,7 +501,7 @@ ompi_osc_rdma_test(ompi_win_t *win, OPAL_THREAD_LOCK(&(module->lock)); if (0 != module->num_complete_msgs || - module->active_incoming_frag_count < module->active_incoming_frag_signal_count) { + module->active_incoming_frag_count != module->active_incoming_frag_signal_count) { *flag = 0; ret = OMPI_SUCCESS; goto cleanup; diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c index b3de4df0c5..4f0399f792 100644 --- a/ompi/mca/osc/rdma/osc_rdma_comm.c +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -349,6 +349,16 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count, tag = get_tag(module); } + /* flush will be called at the end of this function. make sure the post message has + * arrived. */ + if ((is_long_msg || request) && module->sc_group) { + while (0 != module->num_post_msgs) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); + opal_condition_wait(&module->cond, &module->lock); + } + } + OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -525,6 +535,16 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count, tag = get_tag (module); } + /* flush will be called at the end of this function. make sure the post message has + * arrived. */ + if ((is_long_msg || request) && module->sc_group) { + while (0 != module->num_post_msgs) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); + opal_condition_wait(&module->cond, &module->lock); + } + } + OPAL_THREAD_UNLOCK(&module->lock); header = (ompi_osc_rdma_header_acc_t*) ptr; @@ -607,7 +627,7 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count, ret = ompi_osc_rdma_frag_finish(module, frag); - if (request) { + if (is_long_msg || request) { /* need to flush now in case the caller decides to wait on the request */ ompi_osc_rdma_frag_flush_target (module, target); } @@ -858,6 +878,16 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou /* for bookkeeping the get is "outgoing" */ ompi_osc_signal_outgoing (module, target, 1); + /* flush will be called at the end of this function. make sure the post message has + * arrived. */ + if (!release_req && module->sc_group) { + while (0 != module->num_post_msgs) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); + opal_condition_wait(&module->cond, &module->lock); + } + } + OPAL_THREAD_UNLOCK(&module->lock); header = (ompi_osc_rdma_header_get_t*) ptr; @@ -1088,6 +1118,16 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count, /* increment the number of outgoing fragments */ ompi_osc_signal_outgoing (module, target_rank, rdma_request->outstanding_requests); + /* flush will be called at the end of this function. make sure the post message has + * arrived. */ + if (!release_req && module->sc_group) { + while (0 != module->num_post_msgs) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); + opal_condition_wait(&module->cond, &module->lock); + } + } + OPAL_THREAD_UNLOCK(&module->lock); header = (ompi_osc_rdma_header_acc_t *) ptr; diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index f4e8d21399..833a2c6feb 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -1331,8 +1331,10 @@ static inline int process_complete (ompi_osc_rdma_module_t *module, int source, ompi_osc_rdma_header_complete_t *complete_header) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc rdma: process_complete got complete message from %d. expected fragment count %d", - source, complete_header->frag_count)); + "osc rdma: process_complete got complete message from %d. expected fragment count %d. " + "current signal count %d. current incomming count: %d", + source, complete_header->frag_count, module->active_incoming_frag_signal_count, + module->active_incoming_frag_count)); OPAL_THREAD_LOCK(&module->lock); @@ -1677,8 +1679,12 @@ static int ompi_osc_rdma_callback (ompi_request_t *request) /* restart the receive request */ OPAL_THREAD_LOCK(&module->lock); - mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ? - source : MPI_PROC_NULL); + /* post messages come unbuffered and should NOT increment the incoming completion + * counters */ + if (OMPI_OSC_RDMA_HDR_TYPE_POST != base_header->type) { + mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ? + source : MPI_PROC_NULL); + } osc_rdma_gc_clean (); diff --git a/ompi/mca/osc/rdma/osc_rdma_frag.c b/ompi/mca/osc/rdma/osc_rdma_frag.c index 09ef067d09..6ee389d9bd 100644 --- a/ompi/mca/osc/rdma/osc_rdma_frag.c +++ b/ompi/mca/osc/rdma/osc_rdma_frag.c @@ -54,7 +54,6 @@ static int frag_send_cb (ompi_request_t *request) return OMPI_SUCCESS; } - static int frag_send(ompi_osc_rdma_module_t *module, ompi_osc_rdma_frag_t *frag) @@ -67,6 +66,10 @@ frag_send(ompi_osc_rdma_module_t *module, "osc rdma: frag_send called to %d, frag = %p, count = %d", frag->target, (void *) frag, count)); + /* we need to signal now that a frag is outgoing to ensure the count sent + * with the unlock message is correct */ + ompi_osc_signal_outgoing (module, frag->target, 1); + return ompi_osc_rdma_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_RDMA_FRAG_TAG, module->comm, frag_send_cb, frag); } @@ -81,10 +84,6 @@ ompi_osc_rdma_frag_start(ompi_osc_rdma_module_t *module, assert(0 == frag->pending); assert(module->peers[frag->target].active_frag != frag); - /* we need to signal now that a frag is outgoing to ensure the count sent - * with the unlock message is correct */ - ompi_osc_signal_outgoing (module, frag->target, 1); - /* if eager sends are not active, can't send yet, so buffer and get out... */ if (module->passive_target_access_epoch) {