From 64764abab998c2d2cf426dd84f0ec3e54f535e07 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 24 Aug 2016 14:35:45 -0600 Subject: [PATCH 1/3] osc/pt2pt: fix several bugs This commit fixes some bugs uncovered during thread testing of 2.0.1rc1. With these fixes the component is running cleanly with threads. Signed-off-by: Nathan Hjelm (cherry picked from commit open-mpi/ompi@70f8a6e7920785ff3ebbd76fc9e744c1633b2134) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 5 +-- ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 16 +++++----- ompi/mca/osc/pt2pt/osc_pt2pt_frag.c | 31 +++++++++++++++++++ ompi/mca/osc/pt2pt/osc_pt2pt_frag.h | 7 +++-- ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c | 29 ++++++++++++----- 5 files changed, 68 insertions(+), 20 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 04e7a14e99..34ced329e1 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -955,13 +955,14 @@ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t * static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank) { ompi_osc_pt2pt_sync_t *sync; + ompi_osc_pt2pt_peer_t *peer; - sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL); + sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer); if (!sync) { return false; } - return sync->eager_send_active; + return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer); } END_C_DECLS diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 0074c9f0e9..83cfeda744 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -118,7 +118,7 @@ static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co int ret; /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype, target, target_count, target_datatype); @@ -142,7 +142,7 @@ static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, vo int ret; /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ret = ompi_datatype_sndrcv (source, source_count, source_datatype, target, target_count, target_datatype); @@ -164,7 +164,7 @@ static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co ((unsigned long) target_disp * module->disp_unit); /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -188,7 +188,7 @@ static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co int ret; /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -338,7 +338,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ if (is_long_msg) { /* wait for eager sends to be active before starting a long put */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -497,7 +497,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, if (is_long_msg) { /* wait for synchronization before posting a long message */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } header = (ompi_osc_pt2pt_header_acc_t*) ptr; @@ -804,7 +804,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co if (!release_req) { /* wait for epoch to begin before starting rget operation */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } header = (ompi_osc_pt2pt_header_get_t*) ptr; @@ -970,7 +970,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin if (!release_req) { /* wait for epoch to begin before starting operation */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); } /* optimize the self case. TODO: optimize the local case */ diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c index b926cae157..ce1e481d32 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c @@ -153,6 +153,37 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe return ret; } +int ompi_osc_pt2pt_frag_flush_target_locked (ompi_osc_pt2pt_module_t *module, int target) +{ + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); + ompi_osc_pt2pt_frag_t *frag; + int ret = OMPI_SUCCESS; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc pt2pt: frag flush to target target %d. queue fragments: %lu", + target, (unsigned long) opal_list_get_size (&peer->queued_frags))); + + /* walk through the pending list and send */ + while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) { + ret = frag_send(module, frag); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + } + + /* XXX -- TODO -- better error handling */ + if (OMPI_SUCCESS != ret) { + return ret; + } + + /* flush the active frag */ + ret = ompi_osc_pt2pt_flush_active_frag (module, peer); + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc pt2pt: frag flush target %d finished", target)); + + return ret; +} int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index da51b7db27..d7a73f42e1 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -41,9 +41,10 @@ struct ompi_osc_pt2pt_frag_t { typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t; OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t); -extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer); -extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); -extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); +int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer); +int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); +int ompi_osc_pt2pt_frag_flush_target_locked(ompi_osc_pt2pt_module_t *module, int target); +int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t* buffer) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index b27bdadcbd..8ccba41e7d 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -122,6 +122,12 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp int ret; + OPAL_THREAD_LOCK(&peer->lock); + if (ompi_osc_pt2pt_peer_locked (peer)) { + OPAL_THREAD_UNLOCK(&peer->lock); + return OMPI_SUCCESS; + } + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); @@ -137,16 +143,23 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp lock_req.lock_ptr = (uint64_t) (uintptr_t) lock; OSC_PT2PT_HTON(&lock_req, module, target); - ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req)); + do { + ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + /* make sure the request gets sent, so we can start eager sending... */ + ret = ompi_osc_pt2pt_frag_flush_target_locked (module, target); + } while (0); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; + OPAL_THREAD_ADD32(&lock->sync_expected, -1); } - /* make sure the request gets sent, so we can start eager sending... */ - ret = ompi_osc_pt2pt_frag_flush_target (module, target); - if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { - ompi_osc_pt2pt_peer_set_locked (peer, true); - } + ompi_osc_pt2pt_peer_set_locked (peer, true); + + OPAL_THREAD_UNLOCK(&peer->lock); return ret; } @@ -316,6 +329,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, if (OPAL_UNLIKELY(NULL == lock)) { return OMPI_ERR_OUT_OF_RESOURCE; } + + lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target); } else { lock = &module->all_sync; } From 3901cbfea9f7990eaaa5e2482afb0fa358c8b748 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 25 Aug 2016 09:28:25 -0600 Subject: [PATCH 2/3] osc/pt2pt: fix possible race in peer locking It is possible for another thread to process a lock ack before the peer is set as locked. In this case either setting the locked or the eager active flag might clobber the other thread. To address this the flags have been made volatile and are set atomically. Since there is no a opal_atomic_or or opal_atomic_and function just use cmpset for now. Signed-off-by: Nathan Hjelm (cherry picked from commit open-mpi/ompi@7af138f83be3c1594b7b6f25419023f4048e50fd) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 16 ++++++++++------ ompi/mca/osc/pt2pt/osc_pt2pt_sync.h | 6 +++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 34ced329e1..d579f0b40b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -121,7 +121,7 @@ struct ompi_osc_pt2pt_peer_t { int32_t passive_incoming_frag_count; /** peer flags */ - int32_t flags; + volatile int32_t flags; }; typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t; @@ -144,11 +144,15 @@ static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value) { - if (value) { - peer->flags |= flag; - } else { - peer->flags &= ~flag; - } + int32_t peer_flags, new_flags; + do { + peer_flags = peer->flags; + if (value) { + new_flags = peer_flags | flag; + } else { + new_flags = peer_flags & ~flag; + } + } while (!OPAL_ATOMIC_CMPSET_32 (&peer->flags, peer_flags, new_flags)); } static inline void ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h index 5e1d990515..87bd1c45ad 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -74,10 +74,10 @@ struct ompi_osc_pt2pt_sync_t { int num_peers; /** number of synchronization messages expected */ - int32_t sync_expected; + volatile int32_t sync_expected; /** eager sends are active to all peers in this access epoch */ - bool eager_send_active; + volatile bool eager_send_active; /** communication has started on this epoch */ bool epoch_active; @@ -175,7 +175,7 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync) static inline void ompi_osc_pt2pt_sync_reset (ompi_osc_pt2pt_sync_t *sync) { sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE; - sync->eager_send_active = 0; + sync->eager_send_active = false; sync->epoch_active = 0; sync->peer_list.peers = NULL; sync->sync.pscw.group = NULL; From 43c7f8fc4ae972739b76042df50c2449d07b4876 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 1 Sep 2016 09:57:27 -0600 Subject: [PATCH 3/3] osc/pt2pt: do not use frag send to send lock request This commit cleans up some code in the passive target path. The code used the buffered frag control send path but it is more appropriate to use the unbuffered one. This avoids checking structures that are should not be in use in this path. Signed-off-by: Nathan Hjelm (cherry picked from commit open-mpi/ompi@cb1cb5ffed25f03fabfc4af742e6e066be832a50) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c | 9 ++---- ompi/mca/osc/pt2pt/osc_pt2pt_frag.c | 32 ------------------- ompi/mca/osc/pt2pt/osc_pt2pt_frag.h | 1 - ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c | 15 ++------- 4 files changed, 6 insertions(+), 51 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 959fb474e4..a9e473c656 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -1558,12 +1558,6 @@ static inline int process_frag (ompi_osc_pt2pt_module_t *module, ret = process_acc_long (module, frag->source, &header->acc); break; - case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ: - ret = ompi_osc_pt2pt_process_lock(module, frag->source, &header->lock); - if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { - ret = sizeof (header->lock); - } - break; case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ: ret = process_unlock(module, frag->source, &header->unlock); break; @@ -1667,6 +1661,9 @@ int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv) case OMPI_OSC_PT2PT_HDR_TYPE_POST: osc_pt2pt_incoming_post (module, source); break; + case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ: + ompi_osc_pt2pt_process_lock(module, source, (ompi_osc_pt2pt_header_lock_t *) base_header); + break; case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK: ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header); break; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c index ce1e481d32..c92190857c 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c @@ -153,38 +153,6 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe return ret; } -int ompi_osc_pt2pt_frag_flush_target_locked (ompi_osc_pt2pt_module_t *module, int target) -{ - ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); - ompi_osc_pt2pt_frag_t *frag; - int ret = OMPI_SUCCESS; - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: frag flush to target target %d. queue fragments: %lu", - target, (unsigned long) opal_list_get_size (&peer->queued_frags))); - - /* walk through the pending list and send */ - while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) { - ret = frag_send(module, frag); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - break; - } - } - - /* XXX -- TODO -- better error handling */ - if (OMPI_SUCCESS != ret) { - return ret; - } - - /* flush the active frag */ - ret = ompi_osc_pt2pt_flush_active_frag (module, peer); - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: frag flush target %d finished", target)); - - return ret; -} - int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module) { int ret = OMPI_SUCCESS; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index d7a73f42e1..42ef305f9c 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -43,7 +43,6 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t); int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer); int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); -int ompi_osc_pt2pt_frag_flush_target_locked(ompi_osc_pt2pt_module_t *module, int target); int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 8ccba41e7d..4721f82d00 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -143,22 +143,13 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp lock_req.lock_ptr = (uint64_t) (uintptr_t) lock; OSC_PT2PT_HTON(&lock_req, module, target); - do { - ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req)); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - break; - } - - /* make sure the request gets sent, so we can start eager sending... */ - ret = ompi_osc_pt2pt_frag_flush_target_locked (module, target); - } while (0); - + ret = ompi_osc_pt2pt_control_send_unbuffered (module, target, &lock_req, sizeof (lock_req)); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OPAL_THREAD_ADD32(&lock->sync_expected, -1); + } else { + ompi_osc_pt2pt_peer_set_locked (peer, true); } - ompi_osc_pt2pt_peer_set_locked (peer, true); - OPAL_THREAD_UNLOCK(&peer->lock); return ret;