From 70f8a6e7920785ff3ebbd76fc9e744c1633b2134 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 24 Aug 2016 14:35:45 -0600 Subject: [PATCH] osc/pt2pt: fix several bugs This commit fixes some bugs uncovered during thread testing of 2.0.1rc1. With these fixes the component is running cleanly with threads. Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 5 +-- ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 16 +++++----- ompi/mca/osc/pt2pt/osc_pt2pt_frag.c | 31 +++++++++++++++++++ ompi/mca/osc/pt2pt/osc_pt2pt_frag.h | 7 +++-- ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c | 29 ++++++++++++----- 5 files changed, 68 insertions(+), 20 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 0c7e01b3ab5..c5d66ca2a3b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -936,13 +936,14 @@ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t * static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank) { ompi_osc_pt2pt_sync_t *sync; + ompi_osc_pt2pt_peer_t *peer; - sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL); + sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer); if (!sync) { return false; } - return sync->eager_send_active; + return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer); } END_C_DECLS diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index fc1c3c4f2a0..4d16f1a8ce9 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -116,7 +116,7 @@ static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co int ret; /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype, target, target_count, target_datatype); @@ -140,7 +140,7 @@ static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, vo int ret; /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ret = ompi_datatype_sndrcv (source, source_count, source_datatype, target, target_count, target_datatype); @@ -162,7 +162,7 @@ static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co ((unsigned long) target_disp * module->disp_unit); /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -186,7 +186,7 @@ static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co int ret; /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -336,7 +336,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ if (is_long_msg) { /* wait for eager sends to be active before starting a long put */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -495,7 +495,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, if (is_long_msg) { /* wait for synchronization before posting a long message */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } header = (ompi_osc_pt2pt_header_acc_t*) ptr; @@ -802,7 +802,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co if (!release_req) { /* wait for epoch to begin before starting rget operation */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } header = (ompi_osc_pt2pt_header_get_t*) ptr; @@ -968,7 +968,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin if (!release_req) { /* wait for epoch to begin before starting operation */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } /* optimize the self case. TODO: optimize the local case */ diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c index 2be1c45d9b4..791e34d6cbc 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c @@ -151,6 +151,37 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe return ret; } +int ompi_osc_pt2pt_frag_flush_target_locked (ompi_osc_pt2pt_module_t *module, int target) +{ + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); + ompi_osc_pt2pt_frag_t *frag; + int ret = OMPI_SUCCESS; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc pt2pt: frag flush to target target %d. queue fragments: %lu", + target, (unsigned long) opal_list_get_size (&peer->queued_frags))); + + /* walk through the pending list and send */ + while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) { + ret = frag_send(module, frag); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + } + + /* XXX -- TODO -- better error handling */ + if (OMPI_SUCCESS != ret) { + return ret; + } + + /* flush the active frag */ + ret = ompi_osc_pt2pt_flush_active_frag (module, peer); + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc pt2pt: frag flush target %d finished", target)); + + return ret; +} int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index da51b7db276..d7a73f42e1c 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -41,9 +41,10 @@ struct ompi_osc_pt2pt_frag_t { typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t; OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t); -extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer); -extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); -extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); +int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer); +int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); +int ompi_osc_pt2pt_frag_flush_target_locked(ompi_osc_pt2pt_module_t *module, int target); +int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t* buffer) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 32f7ce47cc3..d93674a6e65 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -122,6 +122,12 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp int ret; + OPAL_THREAD_LOCK(&peer->lock); + if (ompi_osc_pt2pt_peer_locked (peer)) { + OPAL_THREAD_UNLOCK(&peer->lock); + return OMPI_SUCCESS; + } + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); @@ -137,16 +143,23 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp lock_req.lock_ptr = (uint64_t) (uintptr_t) lock; OSC_PT2PT_HTON(&lock_req, module, target); - ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req)); + do { + ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + /* make sure the request gets sent, so we can start eager sending... */ + ret = ompi_osc_pt2pt_frag_flush_target_locked (module, target); + } while (0); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; + OPAL_THREAD_ADD32(&lock->sync_expected, -1); } - /* make sure the request gets sent, so we can start eager sending... */ - ret = ompi_osc_pt2pt_frag_flush_target (module, target); - if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { - ompi_osc_pt2pt_peer_set_locked (peer, true); - } + ompi_osc_pt2pt_peer_set_locked (peer, true); + + OPAL_THREAD_UNLOCK(&peer->lock); return ret; } @@ -316,6 +329,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, if (OPAL_UNLIKELY(NULL == lock)) { return OMPI_ERR_OUT_OF_RESOURCE; } + + lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target); } else { lock = &module->all_sync; }