Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ompi/mca/osc/rdma/osc_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ static inline void mark_incoming_completion (ompi_osc_rdma_module_t *module, int
{
if (MPI_PROC_NULL == source) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"mark_incoming_completion marking active incoming complete. count = %d",
(int) module->active_incoming_frag_count + 1));
"mark_incoming_completion marking active incoming complete. count = %d. signal = %d",
(int) module->active_incoming_frag_count + 1, module->active_incoming_frag_signal_count));
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, 1);
if (module->active_incoming_frag_count >= module->active_incoming_frag_signal_count) {
opal_condition_broadcast(&module->cond);
Expand Down
4 changes: 2 additions & 2 deletions ompi/mca/osc/rdma/osc_rdma_active_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ ompi_osc_rdma_wait(ompi_win_t *win)

OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_complete_msgs ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"num_complete_msgs = %d, active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count));
Expand Down Expand Up @@ -501,7 +501,7 @@ ompi_osc_rdma_test(ompi_win_t *win,
OPAL_THREAD_LOCK(&(module->lock));

if (0 != module->num_complete_msgs ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
*flag = 0;
ret = OMPI_SUCCESS;
goto cleanup;
Expand Down
42 changes: 41 additions & 1 deletion ompi/mca/osc/rdma/osc_rdma_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,16 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count,
tag = get_tag(module);
}

/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if ((is_long_msg || request) && module->sc_group) {
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}

OPAL_THREAD_UNLOCK(&module->lock);

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
Expand Down Expand Up @@ -525,6 +535,16 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,
tag = get_tag (module);
}

/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if ((is_long_msg || request) && module->sc_group) {
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}

OPAL_THREAD_UNLOCK(&module->lock);

header = (ompi_osc_rdma_header_acc_t*) ptr;
Expand Down Expand Up @@ -607,7 +627,7 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,

ret = ompi_osc_rdma_frag_finish(module, frag);

if (request) {
if (is_long_msg || request) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_rdma_frag_flush_target (module, target);
}
Expand Down Expand Up @@ -858,6 +878,16 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou
/* for bookkeeping the get is "outgoing" */
ompi_osc_signal_outgoing (module, target, 1);

/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if (!release_req && module->sc_group) {
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}

OPAL_THREAD_UNLOCK(&module->lock);

header = (ompi_osc_rdma_header_get_t*) ptr;
Expand Down Expand Up @@ -1088,6 +1118,16 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count,
/* increment the number of outgoing fragments */
ompi_osc_signal_outgoing (module, target_rank, rdma_request->outstanding_requests);

/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if (!release_req && module->sc_group) {
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}

OPAL_THREAD_UNLOCK(&module->lock);

header = (ompi_osc_rdma_header_acc_t *) ptr;
Expand Down
14 changes: 10 additions & 4 deletions ompi/mca/osc/rdma/osc_rdma_data_move.c
Original file line number Diff line number Diff line change
Expand Up @@ -1331,8 +1331,10 @@ static inline int process_complete (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_complete_t *complete_header)
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_complete got complete message from %d. expected fragment count %d",
source, complete_header->frag_count));
"osc rdma: process_complete got complete message from %d. expected fragment count %d. "
"current signal count %d. current incomming count: %d",
source, complete_header->frag_count, module->active_incoming_frag_signal_count,
module->active_incoming_frag_count));

OPAL_THREAD_LOCK(&module->lock);

Expand Down Expand Up @@ -1677,8 +1679,12 @@ static int ompi_osc_rdma_callback (ompi_request_t *request)
/* restart the receive request */
OPAL_THREAD_LOCK(&module->lock);

mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL);
/* post messages come unbuffered and should NOT increment the incoming completion
* counters */
if (OMPI_OSC_RDMA_HDR_TYPE_POST != base_header->type) {
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL);
}

osc_rdma_gc_clean ();

Expand Down
9 changes: 4 additions & 5 deletions ompi/mca/osc/rdma/osc_rdma_frag.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ static int frag_send_cb (ompi_request_t *request)
return OMPI_SUCCESS;
}


static int
frag_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_frag_t *frag)
Expand All @@ -67,6 +66,10 @@ frag_send(ompi_osc_rdma_module_t *module,
"osc rdma: frag_send called to %d, frag = %p, count = %d",
frag->target, (void *) frag, count));

/* we need to signal now that a frag is outgoing to ensure the count sent
* with the unlock message is correct */
ompi_osc_signal_outgoing (module, frag->target, 1);

return ompi_osc_rdma_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_RDMA_FRAG_TAG,
module->comm, frag_send_cb, frag);
}
Expand All @@ -81,10 +84,6 @@ ompi_osc_rdma_frag_start(ompi_osc_rdma_module_t *module,
assert(0 == frag->pending);
assert(module->peers[frag->target].active_frag != frag);

/* we need to signal now that a frag is outgoing to ensure the count sent
* with the unlock message is correct */
ompi_osc_signal_outgoing (module, frag->target, 1);

/* if eager sends are not active, can't send yet, so buffer and
get out... */
if (module->passive_target_access_epoch) {
Expand Down