From b1cb049a9d9a486c6e08bd2966b0033e30df7055 Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Tue, 5 Jan 2016 16:57:37 +0900 Subject: [PATCH 1/4] osc/pt2pt: use two distinct "namespaces" for tags (cherry picked from open-mpi/ompi@06ecdb6aa7ee688f51de2b3ca05e9f0605a90099) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 14 +++++++++++++- ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 2 +- ompi/mca/osc/pt2pt/osc_pt2pt_component.c | 1 + 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 51b14b7057..1f3c204502 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -150,6 +150,7 @@ struct ompi_osc_pt2pt_module_t { /** cyclic counter for a unique tage for long messages. */ unsigned int tag_counter; + unsigned int rtag_counter; /* Number of outgoing fragments that have completed since the begining of time */ @@ -659,11 +660,22 @@ static inline int get_tag(ompi_osc_pt2pt_module_t *module) completion). */ int tmp = module->tag_counter + !!(module->passive_target_access_epoch); - module->tag_counter = (module->tag_counter + 2) & OSC_PT2PT_FRAG_MASK; + module->tag_counter = (module->tag_counter + 4) & OSC_PT2PT_FRAG_MASK; return tmp; } +static inline int get_rtag(ompi_osc_pt2pt_module_t *module) +{ + /* the LSB of the tag is used be the receiver to determine if the + message is a passive or active target (ie, where to mark + completion). */ + int tmp = module->rtag_counter + !!(module->passive_target_access_epoch); + + module->rtag_counter = (module->rtag_counter + 4) & OSC_PT2PT_FRAG_MASK; + + return tmp; +} /** * ompi_osc_pt2pt_accumulate_lock: * diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 5bb3a070cf..b22f7837bf 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -475,7 +475,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, } is_long_msg = true; - tag = get_tag (module); + tag = get_rtag (module); } /* flush will be called at the end of this function. make sure all post messages have diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index 6a8f53ebc8..41bbe187b5 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -290,6 +290,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* fill in the function pointer part */ memcpy(module, &ompi_osc_pt2pt_module_template, sizeof(ompi_osc_base_module_t)); + module->rtag_counter = 2; /* initialize the objects, so that always free in cleanup */ OBJ_CONSTRUCT(&module->lock, opal_mutex_t); From 98532dda3d642f845498a40e7e1c660f13ab67aa Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Tue, 2 Feb 2016 12:22:21 -0700 Subject: [PATCH 2/4] osc/pt2pt: various threading fixes This commit fixes several bugs identified by a new multi-threaded RMA benchmarking suite. The following bugs have been identified and fixed: - The code that signaled the actual start of an access epoch changed the eager_send_active flag on a synchronization object without holding the object's lock. This could cause another thread waiting on eager sends to block indefinitely because the entirety of ompi_osc_pt2pt_sync_expected could exectute between the check of eager_send_active and the conditon wait of ompi_osc_pt2pt_sync_wait. - The bookkeeping of fragments could get screwed up when performing long put/accumulate operations from different threads. This was caused by the fragment flush code at the end of both put and accumulate. This code was put in place to avoid sending a large number of unexpected messages to a peer. To fix the bookkeeping issue we now 1) wait for eager sends to be active before stating any large isend's, and 2) keep track of the number of large isends associated with a fragment. If the number of large isends reaches 32 the active fragment is flushed. - Use atomics to update the large receive/send tag counters. This prevents duplicate tags from being used. The tag space has also been updated to use the entire 16-bits of the tag space. These changes should also fix open-mpi/ompi#1299. Signed-off-by: Nathan Hjelm (cherry picked from open-mpi/ompi@d7264aa61394ffa278cc9ea08bc7b4704fb680e1) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 40 ++--- ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c | 4 +- ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 150 ++++++++----------- ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c | 31 +--- ompi/mca/osc/pt2pt/osc_pt2pt_frag.h | 72 +++++---- ompi/mca/osc/pt2pt/osc_pt2pt_request.c | 1 + ompi/mca/osc/pt2pt/osc_pt2pt_request.h | 3 +- ompi/mca/osc/pt2pt/osc_pt2pt_sync.h | 4 +- 8 files changed, 120 insertions(+), 185 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 1f3c204502..409011d32b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -149,20 +149,20 @@ struct ompi_osc_pt2pt_module_t { uint32_t *epoch_outgoing_frag_count; /** cyclic counter for a unique tage for long messages. */ - unsigned int tag_counter; - unsigned int rtag_counter; + uint32_t tag_counter; + uint32_t rtag_counter; /* Number of outgoing fragments that have completed since the begining of time */ - uint32_t outgoing_frag_count; + volatile uint32_t outgoing_frag_count; /* Next outgoing fragment count at which we want a signal on cond */ - uint32_t outgoing_frag_signal_count; + volatile uint32_t outgoing_frag_signal_count; /* Number of incoming fragments that have completed since the begining of time */ - uint32_t active_incoming_frag_count; + volatile uint32_t active_incoming_frag_count; /* Next incoming buffer count at which we want a signal on cond */ - uint32_t active_incoming_frag_signal_count; + volatile uint32_t active_incoming_frag_signal_count; /** Number of targets locked/being locked */ unsigned int passive_target_access_epoch; @@ -409,14 +409,6 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module, int tag, struct ompi_communicator_t *comm); -int ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module, - const void *buf, - size_t count, - struct ompi_datatype_t *datatype, - int dest, - int tag, - struct ompi_communicator_t *comm); - /** * ompi_osc_pt2pt_progress_pending_acc: * @@ -639,8 +631,8 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending) opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super)); } -#define OSC_PT2PT_FRAG_TAG 0x10000 -#define OSC_PT2PT_FRAG_MASK 0x0ffff +#define OSC_PT2PT_FRAG_TAG 0x80000 +#define OSC_PT2PT_FRAG_MASK 0x7ffff /** * get_tag: @@ -658,11 +650,8 @@ static inline int get_tag(ompi_osc_pt2pt_module_t *module) /* the LSB of the tag is used be the receiver to determine if the message is a passive or active target (ie, where to mark completion). */ - int tmp = module->tag_counter + !!(module->passive_target_access_epoch); - - module->tag_counter = (module->tag_counter + 4) & OSC_PT2PT_FRAG_MASK; - - return tmp; + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 4); + return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); } static inline int get_rtag(ompi_osc_pt2pt_module_t *module) @@ -670,11 +659,8 @@ static inline int get_rtag(ompi_osc_pt2pt_module_t *module) /* the LSB of the tag is used be the receiver to determine if the message is a passive or active target (ie, where to mark completion). */ - int tmp = module->rtag_counter + !!(module->passive_target_access_epoch); - - module->rtag_counter = (module->rtag_counter + 4) & OSC_PT2PT_FRAG_MASK; - - return tmp; + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->rtag_counter, 4); + return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); } /** * ompi_osc_pt2pt_accumulate_lock: diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index e169addb54..58d6b40b76 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -211,7 +211,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_sync_t *sync = &module->all_sync; - OPAL_THREAD_LOCK(&sync->lock); + OPAL_THREAD_LOCK(&module->lock); /* check if we are already in an access epoch */ if (ompi_osc_pt2pt_access_epoch_active (module)) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index b22f7837bf..a1dcfd7172 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -34,27 +34,55 @@ #include /* progress an OSC request */ +static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request) +{ + ompi_osc_pt2pt_module_t *module = + (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; + + OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, + "isend_completion_cb called")); + + mark_outgoing_completion(module); + + /* put this request on the garbage colletion list */ + osc_pt2pt_gc_add_request (module, request); + + return OMPI_SUCCESS; +} + static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) { ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t *) request->req_complete_cb_data; - ompi_osc_pt2pt_module_t *module = pt2pt_request->module; OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_req_comm_complete called tag = %d", request->req_status.MPI_TAG)); - mark_outgoing_completion (module); - if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) { ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR); } - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); + return ompi_osc_pt2pt_comm_complete (request); +} - return OMPI_SUCCESS; +static inline int ompi_osc_pt2pt_data_isend (ompi_osc_pt2pt_module_t *module, const void *buf, + size_t count, ompi_datatype_t *datatype, int dest, + int tag, ompi_osc_pt2pt_request_t *request) +{ + /* increment the outgoing send count */ + ompi_osc_signal_outgoing (module, dest, 1); + + if (NULL != request) { + ++request->outstanding_requests; + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm, + ompi_osc_pt2pt_req_comm_complete, request); + } + + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm, + ompi_osc_pt2pt_comm_complete, module); } + static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) { ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data; @@ -282,14 +310,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -301,9 +329,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ tag = get_tag(module); } - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (is_long_msg) { + /* wait for eager sends to be active before starting a long put */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -361,18 +388,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->tag = tag; osc_pt2pt_hton(header, proc); - /* increase the outgoing signal count */ - ompi_osc_signal_outgoing (module, target, 1); - - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete, - request); - } else { - ret = ompi_osc_pt2pt_component_isend (module,origin_addr, origin_count, origin_dt, target, tag, - module->comm); - } + ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag, + request); } } while (0); @@ -380,14 +397,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (request || is_long_msg) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int @@ -459,14 +469,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -478,9 +488,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, tag = get_rtag (module); } - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (is_long_msg || is_long_datatype) { + /* wait for synchronization before posting a long message */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -538,18 +547,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "acc: starting long accumulate with tag %d", tag)); - /* increment the outgoing send count */ - ompi_osc_signal_outgoing (module, target, 1); - - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete, - request); - } else { - ret = ompi_osc_pt2pt_component_isend (module, origin_addr, origin_count, origin_dt, target, tag, - module->comm); - } + ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag, + request); } } while (0); @@ -561,14 +560,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (is_long_msg || request) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int @@ -639,7 +631,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar } frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -787,11 +779,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co ddt_len = ompi_datatype_pack_description_length(target_dt); frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -804,9 +796,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co /* for bookkeeping the get is "outgoing" */ ompi_osc_signal_outgoing (module, target, 1); - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (!release_req) { + /* wait for epoch to begin before starting rget operation */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -857,14 +848,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co *request = &pt2pt_request->super; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (!release_req) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int ompi_osc_pt2pt_rget (void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, @@ -1003,14 +987,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin } frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -1030,9 +1014,8 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin /* increment the number of outgoing fragments */ ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests); - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (!release_req) { + /* wait for epoch to begin before starting operation */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -1100,14 +1083,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin *request = (ompi_request_t *) pt2pt_request; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (!release_req) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target_rank); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int ompi_osc_pt2pt_get_accumulate(const void *origin_addr, int origin_count, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 6883d79a4a..09bd2859b2 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, char *ptr; int ret; - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { memcpy (ptr, data, len); @@ -1682,33 +1682,6 @@ int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf, osc_pt2pt_incoming_req_complete, module); } - -static int -isend_completion_cb(ompi_request_t *request) -{ - ompi_osc_pt2pt_module_t *module = - (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; - - OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, - "isend_completion_cb called")); - - mark_outgoing_completion(module); - - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; -} - - -int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, const void *buf, - size_t count, struct ompi_datatype_t *datatype, - int dest, int tag, struct ompi_communicator_t *comm) -{ - return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm, - isend_completion_cb, module); -} - int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag, ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index 515ce82fdf..f55e6cba09 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -33,7 +33,8 @@ struct ompi_osc_pt2pt_frag_t { char *top; /* Number of operations which have started writing into the frag, but not yet completed doing so */ - int32_t pending; + volatile int32_t pending; + int32_t pending_long_sends; ompi_osc_pt2pt_frag_header_t *header; ompi_osc_pt2pt_module_t *module; }; @@ -44,12 +45,24 @@ extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_p extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); +static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, + ompi_osc_pt2pt_frag_t* buffer) +{ + opal_atomic_wmb (); + if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { + opal_atomic_mb (); + return ompi_osc_pt2pt_frag_start(module, buffer); + } + + return OMPI_SUCCESS; +} + /* * Note: module lock must be held during this operation */ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target, size_t request_len, ompi_osc_pt2pt_frag_t **buffer, - char **ptr) + char **ptr, bool long_send) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_frag_t *curr; @@ -66,29 +79,21 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in OPAL_THREAD_LOCK(&module->lock); curr = peer->active_frag; - if (NULL == curr || curr->remain_len < request_len) { - opal_free_list_item_t *item = NULL; - - if (NULL != curr) { - curr->remain_len = 0; - peer->active_frag = NULL; - opal_atomic_mb (); - + if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) { + if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) { /* If there's something pending, the pending finish will start the buffer. Otherwise, we need to start it now. */ - if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) { - ret = ompi_osc_pt2pt_frag_start(module, curr); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; - } + ret = ompi_osc_pt2pt_frag_finish (module, curr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_UNLOCK(&module->lock); + return ret; } } - item = opal_free_list_get (&mca_osc_pt2pt_component.frags); - if (OPAL_UNLIKELY(NULL == item)) { + curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags); + if (OPAL_UNLIKELY(NULL == curr)) { return OMPI_ERR_OUT_OF_RESOURCE; } - curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item; curr->target = target; @@ -96,7 +101,8 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->top = (char*) (curr->header + 1); curr->remain_len = mca_osc_pt2pt_component.buffer_size; curr->module = module; - curr->pending = 1; + curr->pending = 2; + curr->pending_long_sends = long_send; curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; @@ -104,12 +110,18 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; } curr->header->source = ompi_comm_rank(module->comm); - curr->header->num_ops = 0; + curr->header->num_ops = 1; if (curr->remain_len < request_len) { OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } + + peer->active_frag = curr; + } else { + OPAL_THREAD_ADD32(&curr->pending, 1); + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); + curr->pending_long_sends += long_send; } *ptr = curr->top; @@ -117,24 +129,8 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->top += request_len; curr->remain_len -= request_len; - OPAL_THREAD_UNLOCK(&module->lock); - OPAL_THREAD_ADD32(&curr->pending, 1); - OPAL_THREAD_ADD32(&curr->header->num_ops, 1); - - return OMPI_SUCCESS; -} - - -/* - * Note: module lock must be held for this operation - */ -static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module, - ompi_osc_pt2pt_frag_t* buffer) -{ - if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { - return ompi_osc_pt2pt_frag_start(module, buffer); - } + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c index eddccf5b42..6741036e11 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c @@ -51,6 +51,7 @@ request_construct(ompi_osc_pt2pt_request_t *request) request->super.req_status._cancelled = 0; request->super.req_free = request_free; request->super.req_cancel = request_cancel; + request->outstanding_requests = 0; } OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_request_t, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h index 07b9d53093..dee5c86892 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. @@ -57,6 +57,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t); #define OMPI_OSC_PT2PT_REQUEST_RETURN(req) \ do { \ OMPI_REQUEST_FINI(&(req)->super); \ + (req)->outstanding_requests = 0; \ opal_free_list_return (&mca_osc_pt2pt_component.requests, \ (opal_free_list_item_t *) (req)); \ } while (0) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h index eee29645c2..f4e4adcae0 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -163,8 +163,10 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync) { int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1); if (0 == new_value) { + OPAL_THREAD_LOCK(&sync->lock); sync->eager_send_active = true; opal_condition_broadcast (&sync->cond); + OPAL_THREAD_UNLOCK(&sync->lock); } } From 245147390edeb9cd9fab1d08610f83841588989b Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Tue, 2 Feb 2016 12:44:17 -0700 Subject: [PATCH 3/4] osc/pt2pt: eager sends are always active if MPI_MODE_NOCHECK is used This commit fixes open-mpi/ompi#1299. Signed-off-by: Nathan Hjelm (cherry picked from open-mpi/ompi@519fffb65e7a9502b0e5edeb72b1ad2d802daed4) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 0ddc4cf326..099aa56462 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -244,6 +244,8 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module } } + } else { + lock->eager_send_active = true; } return OMPI_SUCCESS; From 0e8f2675bf13dd6aa34cbf4492f92a5cddcaaf6f Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 4 Feb 2016 16:59:39 -0700 Subject: [PATCH 4/4] osc/pt2pt: bug fixes This commit fixes several bugs identified by @ggouaillardet and MTT: - Fix SEGV in long send completion caused by missing update to the request callback data. - Add an MPI_Barrier to the fence short-cut. This fixes potential semantic issues where messages may be received before fence is reached. - Ensure fragments are flushed when using request-based RMA. This allows MPI_Test/MPI_Wait/etc to work as expected. - Restore the tag space back to 16-bits. It was intended that the space be expanded to 32-bits but the required change to the fragment headers was not committed. The tag space may be expanded in a later commit. Signed-off-by: Nathan Hjelm (cherry picked from commit open-mpi/ompi@5b9c82a9648b06364b695e199711e1c26a3afeeb) Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 4 +- ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c | 1 + ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 50 +++++---- ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c | 2 +- ompi/mca/osc/pt2pt/osc_pt2pt_frag.h | 109 ++++++++++++------- 5 files changed, 100 insertions(+), 66 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 409011d32b..68ca022b7a 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -631,8 +631,8 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending) opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super)); } -#define OSC_PT2PT_FRAG_TAG 0x80000 -#define OSC_PT2PT_FRAG_MASK 0x7ffff +#define OSC_PT2PT_FRAG_TAG 0x10000 +#define OSC_PT2PT_FRAG_MASK 0x0ffff /** * get_tag: diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index 58d6b40b76..0b3c2e0727 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -147,6 +147,7 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) /* short-circuit the noprecede case */ if (0 != (assert & MPI_MODE_NOPRECEDE)) { + module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: fence end (short circuit)")); return ret; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index a1dcfd7172..1205767f01 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -58,6 +58,9 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) "ompi_osc_pt2pt_req_comm_complete called tag = %d", request->req_status.MPI_TAG)); + /* update the cbdata for ompi_osc_pt2pt_comm_complete */ + request->req_complete_cb_data = pt2pt_request->module; + if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) { ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR); } @@ -218,8 +221,8 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c ((unsigned long) target_disp * module->disp_unit); int ret; - /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: starting local " + "get accumulate")); ompi_osc_pt2pt_accumulate_lock (module); @@ -250,6 +253,9 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c ompi_osc_pt2pt_accumulate_unlock (module); + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: local get " + "accumulate complete")); + if (request) { /* NTH: is it ok to use an ompi error code here? */ ompi_osc_pt2pt_request_complete (request, ret); @@ -310,14 +316,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -469,14 +475,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -488,7 +494,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, tag = get_rtag (module); } - if (is_long_msg || is_long_datatype) { + if (is_long_msg) { /* wait for synchronization before posting a long message */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -631,7 +637,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar } frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, false); if (OMPI_SUCCESS != ret) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -663,9 +669,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar return ret; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - return ret; + return ompi_osc_pt2pt_frag_finish (module, frag); } @@ -779,11 +783,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co ddt_len = ompi_datatype_pack_description_length(target_dt); frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -961,6 +965,11 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin return OMPI_SUCCESS; } + if (!release_req) { + /* wait for epoch to begin before starting operation */ + ompi_osc_pt2pt_sync_wait (pt2pt_sync); + } + /* optimize the self case. TODO: optimize the local case */ if (ompi_comm_rank (module->comm) == target_rank) { *request = &pt2pt_request->super; @@ -987,14 +996,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin } frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false, release_req); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -1014,11 +1023,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin /* increment the number of outgoing fragments */ ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests); - if (!release_req) { - /* wait for epoch to begin before starting operation */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); - } - header = (ompi_osc_pt2pt_header_acc_t *) ptr; header->base.flags = 0; header->len = frag_len; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 09bd2859b2..681e73acca 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, char *ptr; int ret; - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false); + ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { memcpy (ptr, data, len); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index f55e6cba09..da51b7db27 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -57,16 +57,62 @@ static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, return OMPI_SUCCESS; } +static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module, + ompi_osc_pt2pt_peer_t *peer, + size_t request_len) +{ + ompi_osc_pt2pt_frag_t *curr; + + /* to ensure ordering flush the buffer on the peer */ + curr = peer->active_frag; + if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) { + /* If there's something pending, the pending finish will + start the buffer. Otherwise, we need to start it now. */ + int ret = ompi_osc_pt2pt_frag_finish (module, curr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return NULL; + } + } + + curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags); + if (OPAL_UNLIKELY(NULL == curr)) { + return NULL; + } + + curr->target = peer->rank; + + curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer; + curr->top = (char*) (curr->header + 1); + curr->remain_len = mca_osc_pt2pt_component.buffer_size; + curr->module = module; + curr->pending = 1; + + curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; + curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; + if (module->passive_target_access_epoch) { + curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; + } + curr->header->source = ompi_comm_rank(module->comm); + curr->header->num_ops = 1; + + return curr; +} + /* - * Note: module lock must be held during this operation + * Note: this function takes the module lock + * + * buffered sends will cache the fragment on the peer object associated with the + * target. unbuffered-sends will cause the target fragment to be flushed and + * will not be cached on the peer. this causes the fragment to be flushed as + * soon as it is sent. this allows request-based rma fragments to be completed + * so MPI_Test/MPI_Wait/etc will work as expected. */ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target, size_t request_len, ompi_osc_pt2pt_frag_t **buffer, - char **ptr, bool long_send) + char **ptr, bool long_send, bool buffered) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_frag_t *curr; - int ret; /* osc pt2pt headers can have 64-bit values. these will need to be aligned * on an 8-byte boundary on some architectures so we up align the allocation @@ -77,51 +123,34 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, + "attempting to allocate buffer for %lu bytes to target %d. long send: %d, " + "buffered: %d", (unsigned long) request_len, target, long_send, buffered)); + OPAL_THREAD_LOCK(&module->lock); - curr = peer->active_frag; - if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) { - if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) { - /* If there's something pending, the pending finish will - start the buffer. Otherwise, we need to start it now. */ - ret = ompi_osc_pt2pt_frag_finish (module, curr); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + if (buffered) { + curr = peer->active_frag; + if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) { + curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len); + if (OPAL_UNLIKELY(NULL == curr)) { OPAL_THREAD_UNLOCK(&module->lock); - return ret; + return OMPI_ERR_OUT_OF_RESOURCE; } - } - - curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags); - if (OPAL_UNLIKELY(NULL == curr)) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - curr->target = target; - curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer; - curr->top = (char*) (curr->header + 1); - curr->remain_len = mca_osc_pt2pt_component.buffer_size; - curr->module = module; - curr->pending = 2; - curr->pending_long_sends = long_send; - - curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; - curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; - if (module->passive_target_access_epoch) { - curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; + curr->pending_long_sends = long_send; + peer->active_frag = curr; + } else { + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); + curr->pending_long_sends += long_send; } - curr->header->source = ompi_comm_rank(module->comm); - curr->header->num_ops = 1; - if (curr->remain_len < request_len) { + OPAL_THREAD_ADD32(&curr->pending, 1); + } else { + curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len); + if (OPAL_UNLIKELY(NULL == curr)) { OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + return OMPI_ERR_OUT_OF_RESOURCE; } - - peer->active_frag = curr; - } else { - OPAL_THREAD_ADD32(&curr->pending, 1); - OPAL_THREAD_ADD32(&curr->header->num_ops, 1); - curr->pending_long_sends += long_send; } *ptr = curr->top;