diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 51b14b7057..68ca022b7a 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -149,19 +149,20 @@ struct ompi_osc_pt2pt_module_t { uint32_t *epoch_outgoing_frag_count; /** cyclic counter for a unique tage for long messages. */ - unsigned int tag_counter; + uint32_t tag_counter; + uint32_t rtag_counter; /* Number of outgoing fragments that have completed since the begining of time */ - uint32_t outgoing_frag_count; + volatile uint32_t outgoing_frag_count; /* Next outgoing fragment count at which we want a signal on cond */ - uint32_t outgoing_frag_signal_count; + volatile uint32_t outgoing_frag_signal_count; /* Number of incoming fragments that have completed since the begining of time */ - uint32_t active_incoming_frag_count; + volatile uint32_t active_incoming_frag_count; /* Next incoming buffer count at which we want a signal on cond */ - uint32_t active_incoming_frag_signal_count; + volatile uint32_t active_incoming_frag_signal_count; /** Number of targets locked/being locked */ unsigned int passive_target_access_epoch; @@ -408,14 +409,6 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module, int tag, struct ompi_communicator_t *comm); -int ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module, - const void *buf, - size_t count, - struct ompi_datatype_t *datatype, - int dest, - int tag, - struct ompi_communicator_t *comm); - /** * ompi_osc_pt2pt_progress_pending_acc: * @@ -657,13 +650,18 @@ static inline int get_tag(ompi_osc_pt2pt_module_t *module) /* the LSB of the tag is used be the receiver to determine if the message is a passive or active target (ie, where to mark completion). */ - int tmp = module->tag_counter + !!(module->passive_target_access_epoch); - - module->tag_counter = (module->tag_counter + 2) & OSC_PT2PT_FRAG_MASK; - - return tmp; + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 4); + return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); } +static inline int get_rtag(ompi_osc_pt2pt_module_t *module) +{ + /* the LSB of the tag is used be the receiver to determine if the + message is a passive or active target (ie, where to mark + completion). */ + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->rtag_counter, 4); + return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); +} /** * ompi_osc_pt2pt_accumulate_lock: * diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index e169addb54..0b3c2e0727 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -147,6 +147,7 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) /* short-circuit the noprecede case */ if (0 != (assert & MPI_MODE_NOPRECEDE)) { + module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: fence end (short circuit)")); return ret; @@ -211,7 +212,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_sync_t *sync = &module->all_sync; - OPAL_THREAD_LOCK(&sync->lock); + OPAL_THREAD_LOCK(&module->lock); /* check if we are already in an access epoch */ if (ompi_osc_pt2pt_access_epoch_active (module)) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 5bb3a070cf..1205767f01 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -34,27 +34,58 @@ #include /* progress an OSC request */ +static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request) +{ + ompi_osc_pt2pt_module_t *module = + (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; + + OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, + "isend_completion_cb called")); + + mark_outgoing_completion(module); + + /* put this request on the garbage colletion list */ + osc_pt2pt_gc_add_request (module, request); + + return OMPI_SUCCESS; +} + static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) { ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t *) request->req_complete_cb_data; - ompi_osc_pt2pt_module_t *module = pt2pt_request->module; OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_req_comm_complete called tag = %d", request->req_status.MPI_TAG)); - mark_outgoing_completion (module); + /* update the cbdata for ompi_osc_pt2pt_comm_complete */ + request->req_complete_cb_data = pt2pt_request->module; if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) { ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR); } - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); + return ompi_osc_pt2pt_comm_complete (request); +} - return OMPI_SUCCESS; +static inline int ompi_osc_pt2pt_data_isend (ompi_osc_pt2pt_module_t *module, const void *buf, + size_t count, ompi_datatype_t *datatype, int dest, + int tag, ompi_osc_pt2pt_request_t *request) +{ + /* increment the outgoing send count */ + ompi_osc_signal_outgoing (module, dest, 1); + + if (NULL != request) { + ++request->outstanding_requests; + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm, + ompi_osc_pt2pt_req_comm_complete, request); + } + + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm, + ompi_osc_pt2pt_comm_complete, module); } + static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) { ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data; @@ -190,8 +221,8 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c ((unsigned long) target_disp * module->disp_unit); int ret; - /* if we are in active target mode wait until all post messages arrive */ - ompi_osc_pt2pt_sync_wait (pt2pt_sync); + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: starting local " + "get accumulate")); ompi_osc_pt2pt_accumulate_lock (module); @@ -222,6 +253,9 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c ompi_osc_pt2pt_accumulate_unlock (module); + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: local get " + "accumulate complete")); + if (request) { /* NTH: is it ok to use an ompi error code here? */ ompi_osc_pt2pt_request_complete (request, ret); @@ -282,14 +316,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -301,9 +335,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ tag = get_tag(module); } - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (is_long_msg) { + /* wait for eager sends to be active before starting a long put */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -361,18 +394,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->tag = tag; osc_pt2pt_hton(header, proc); - /* increase the outgoing signal count */ - ompi_osc_signal_outgoing (module, target, 1); - - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete, - request); - } else { - ret = ompi_osc_pt2pt_component_isend (module,origin_addr, origin_count, origin_dt, target, tag, - module->comm); - } + ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag, + request); } } while (0); @@ -380,14 +403,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (request || is_long_msg) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int @@ -459,14 +475,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -475,12 +491,11 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, } is_long_msg = true; - tag = get_tag (module); + tag = get_rtag (module); } - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (is_long_msg) { + /* wait for synchronization before posting a long message */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -538,18 +553,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "acc: starting long accumulate with tag %d", tag)); - /* increment the outgoing send count */ - ompi_osc_signal_outgoing (module, target, 1); - - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete, - request); - } else { - ret = ompi_osc_pt2pt_component_isend (module, origin_addr, origin_count, origin_dt, target, tag, - module->comm); - } + ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag, + request); } } while (0); @@ -561,14 +566,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (is_long_msg || request) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int @@ -639,7 +637,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar } frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, false); if (OMPI_SUCCESS != ret) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -671,9 +669,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar return ret; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - return ret; + return ompi_osc_pt2pt_frag_finish (module, frag); } @@ -787,11 +783,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co ddt_len = ompi_datatype_pack_description_length(target_dt); frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -804,9 +800,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co /* for bookkeeping the get is "outgoing" */ ompi_osc_signal_outgoing (module, target, 1); - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + if (!release_req) { + /* wait for epoch to begin before starting rget operation */ ompi_osc_pt2pt_sync_wait (pt2pt_sync); } @@ -857,14 +852,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co *request = &pt2pt_request->super; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (!release_req) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int ompi_osc_pt2pt_rget (void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, @@ -977,6 +965,11 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin return OMPI_SUCCESS; } + if (!release_req) { + /* wait for epoch to begin before starting operation */ + ompi_osc_pt2pt_sync_wait (pt2pt_sync); + } + /* optimize the self case. TODO: optimize the local case */ if (ompi_comm_rank (module->comm) == target_rank) { *request = &pt2pt_request->super; @@ -1003,14 +996,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin } frag_len = sizeof(*header) + ddt_len + payload_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false, release_req); if (OMPI_SUCCESS != ret) { frag_len = sizeof(*header) + ddt_len; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req); if (OMPI_SUCCESS != ret) { /* allocate space for the header plus space to store ddt_len */ frag_len = sizeof(*header) + 8; - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERR_OUT_OF_RESOURCE; } @@ -1030,12 +1023,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin /* increment the number of outgoing fragments */ ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests); - /* flush will be called at the end of this function. make sure all post messages have - * arrived. */ - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { - ompi_osc_pt2pt_sync_wait (pt2pt_sync); - } - header = (ompi_osc_pt2pt_header_acc_t *) ptr; header->base.flags = 0; header->len = frag_len; @@ -1100,14 +1087,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin *request = (ompi_request_t *) pt2pt_request; } - ret = ompi_osc_pt2pt_frag_finish(module, frag); - - if (!release_req) { - /* need to flush now in case the caller decides to wait on the request */ - ompi_osc_pt2pt_frag_flush_target (module, target_rank); - } - - return ret; + return ompi_osc_pt2pt_frag_finish(module, frag); } int ompi_osc_pt2pt_get_accumulate(const void *origin_addr, int origin_count, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index 6a8f53ebc8..41bbe187b5 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -290,6 +290,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* fill in the function pointer part */ memcpy(module, &ompi_osc_pt2pt_module_template, sizeof(ompi_osc_base_module_t)); + module->rtag_counter = 2; /* initialize the objects, so that always free in cleanup */ OBJ_CONSTRUCT(&module->lock, opal_mutex_t); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 6883d79a4a..681e73acca 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, char *ptr; int ret; - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr); + ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { memcpy (ptr, data, len); @@ -1682,33 +1682,6 @@ int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf, osc_pt2pt_incoming_req_complete, module); } - -static int -isend_completion_cb(ompi_request_t *request) -{ - ompi_osc_pt2pt_module_t *module = - (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; - - OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, - "isend_completion_cb called")); - - mark_outgoing_completion(module); - - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; -} - - -int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, const void *buf, - size_t count, struct ompi_datatype_t *datatype, - int dest, int tag, struct ompi_communicator_t *comm) -{ - return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm, - isend_completion_cb, module); -} - int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag, ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx) { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index 515ce82fdf..da51b7db27 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -33,7 +33,8 @@ struct ompi_osc_pt2pt_frag_t { char *top; /* Number of operations which have started writing into the frag, but not yet completed doing so */ - int32_t pending; + volatile int32_t pending; + int32_t pending_long_sends; ompi_osc_pt2pt_frag_header_t *header; ompi_osc_pt2pt_module_t *module; }; @@ -44,16 +45,74 @@ extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_p extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); +static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, + ompi_osc_pt2pt_frag_t* buffer) +{ + opal_atomic_wmb (); + if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { + opal_atomic_mb (); + return ompi_osc_pt2pt_frag_start(module, buffer); + } + + return OMPI_SUCCESS; +} + +static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module, + ompi_osc_pt2pt_peer_t *peer, + size_t request_len) +{ + ompi_osc_pt2pt_frag_t *curr; + + /* to ensure ordering flush the buffer on the peer */ + curr = peer->active_frag; + if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) { + /* If there's something pending, the pending finish will + start the buffer. Otherwise, we need to start it now. */ + int ret = ompi_osc_pt2pt_frag_finish (module, curr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return NULL; + } + } + + curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags); + if (OPAL_UNLIKELY(NULL == curr)) { + return NULL; + } + + curr->target = peer->rank; + + curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer; + curr->top = (char*) (curr->header + 1); + curr->remain_len = mca_osc_pt2pt_component.buffer_size; + curr->module = module; + curr->pending = 1; + + curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; + curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; + if (module->passive_target_access_epoch) { + curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; + } + curr->header->source = ompi_comm_rank(module->comm); + curr->header->num_ops = 1; + + return curr; +} + /* - * Note: module lock must be held during this operation + * Note: this function takes the module lock + * + * buffered sends will cache the fragment on the peer object associated with the + * target. unbuffered-sends will cause the target fragment to be flushed and + * will not be cached on the peer. this causes the fragment to be flushed as + * soon as it is sent. this allows request-based rma fragments to be completed + * so MPI_Test/MPI_Wait/etc will work as expected. */ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target, size_t request_len, ompi_osc_pt2pt_frag_t **buffer, - char **ptr) + char **ptr, bool long_send, bool buffered) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_frag_t *curr; - int ret; /* osc pt2pt headers can have 64-bit values. these will need to be aligned * on an 8-byte boundary on some architectures so we up align the allocation @@ -64,51 +123,33 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, + "attempting to allocate buffer for %lu bytes to target %d. long send: %d, " + "buffered: %d", (unsigned long) request_len, target, long_send, buffered)); + OPAL_THREAD_LOCK(&module->lock); - curr = peer->active_frag; - if (NULL == curr || curr->remain_len < request_len) { - opal_free_list_item_t *item = NULL; - - if (NULL != curr) { - curr->remain_len = 0; - peer->active_frag = NULL; - opal_atomic_mb (); - - /* If there's something pending, the pending finish will - start the buffer. Otherwise, we need to start it now. */ - if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) { - ret = ompi_osc_pt2pt_frag_start(module, curr); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; - } + if (buffered) { + curr = peer->active_frag; + if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) { + curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len); + if (OPAL_UNLIKELY(NULL == curr)) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_OUT_OF_RESOURCE; } - } - - item = opal_free_list_get (&mca_osc_pt2pt_component.frags); - if (OPAL_UNLIKELY(NULL == item)) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item; - - curr->target = target; - - curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer; - curr->top = (char*) (curr->header + 1); - curr->remain_len = mca_osc_pt2pt_component.buffer_size; - curr->module = module; - curr->pending = 1; - curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; - curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; - if (module->passive_target_access_epoch) { - curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; + curr->pending_long_sends = long_send; + peer->active_frag = curr; + } else { + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); + curr->pending_long_sends += long_send; } - curr->header->source = ompi_comm_rank(module->comm); - curr->header->num_ops = 0; - if (curr->remain_len < request_len) { + OPAL_THREAD_ADD32(&curr->pending, 1); + } else { + curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len); + if (OPAL_UNLIKELY(NULL == curr)) { OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + return OMPI_ERR_OUT_OF_RESOURCE; } } @@ -117,24 +158,8 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in curr->top += request_len; curr->remain_len -= request_len; - OPAL_THREAD_UNLOCK(&module->lock); - - OPAL_THREAD_ADD32(&curr->pending, 1); - OPAL_THREAD_ADD32(&curr->header->num_ops, 1); - return OMPI_SUCCESS; -} - - -/* - * Note: module lock must be held for this operation - */ -static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module, - ompi_osc_pt2pt_frag_t* buffer) -{ - if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { - return ompi_osc_pt2pt_frag_start(module, buffer); - } + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 0ddc4cf326..099aa56462 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -244,6 +244,8 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module } } + } else { + lock->eager_send_active = true; } return OMPI_SUCCESS; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c index eddccf5b42..6741036e11 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c @@ -51,6 +51,7 @@ request_construct(ompi_osc_pt2pt_request_t *request) request->super.req_status._cancelled = 0; request->super.req_free = request_free; request->super.req_cancel = request_cancel; + request->outstanding_requests = 0; } OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_request_t, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h index 07b9d53093..dee5c86892 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. @@ -57,6 +57,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t); #define OMPI_OSC_PT2PT_REQUEST_RETURN(req) \ do { \ OMPI_REQUEST_FINI(&(req)->super); \ + (req)->outstanding_requests = 0; \ opal_free_list_return (&mca_osc_pt2pt_component.requests, \ (opal_free_list_item_t *) (req)); \ } while (0) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h index eee29645c2..f4e4adcae0 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -163,8 +163,10 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync) { int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1); if (0 == new_value) { + OPAL_THREAD_LOCK(&sync->lock); sync->eager_send_active = true; opal_condition_broadcast (&sync->cond); + OPAL_THREAD_UNLOCK(&sync->lock); } }