From 6aa658ae33e0f803d1f53dc83dcb1eeca82affd6 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 17 Aug 2016 20:14:01 -0600 Subject: [PATCH] ompi/request: change semantics of ompi request callbacks This commit changes the sematics of ompi request callbacks. If a request's callback has freed or re-posted (using start) a request the callback must return 1 instead of OMPI_SUCCESS. This indicates to ompi_request_complete that the request should not be modified further. This fixes a race condition in osc/pt2pt that could lead to the req_state being inconsistent if a request is freed between the callback and setting the request as complete. Signed-off-by: Nathan Hjelm --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 26 +++-------------- ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 10 +++---- ompi/mca/osc/pt2pt/osc_pt2pt_component.c | 1 - ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c | 34 ++++++++-------------- ompi/mca/osc/pt2pt/osc_pt2pt_frag.c | 6 ++-- ompi/mca/osc/pt2pt/osc_pt2pt_module.c | 1 - ompi/request/request.h | 36 ++++++++++++++---------- 7 files changed, 42 insertions(+), 72 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 33d864d1a0d..1a636309cc7 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -265,9 +265,6 @@ struct ompi_osc_pt2pt_module_t { /** Lock for garbage collection lists */ opal_mutex_t gc_lock; - /** List of requests that need to be freed */ - opal_list_t request_gc; - /** List of buffers that need to be freed */ opal_list_t buffer_gc; }; @@ -658,14 +655,13 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, con } /** - * osc_pt2pt_request_gc_clean: + * osc_pt2pt_gc_clean: * * @short Release finished PML requests and accumulate buffers. * - * @long This function exists because it is not possible to free a PML request - * or buffer from a request completion callback. We instead put requests - * and buffers on the module's garbage collection lists and release then - * at a later time. + * @long This function exists because it is not possible to free a buffer from + * a request completion callback. We instead put requests and buffers on the + * module's garbage collection lists and release then at a later time. */ static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module) { @@ -673,26 +669,12 @@ static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module) opal_list_item_t *item; OPAL_THREAD_LOCK(&module->gc_lock); - - while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) { - OPAL_THREAD_UNLOCK(&module->gc_lock); - ompi_request_free (&request); - OPAL_THREAD_LOCK(&module->gc_lock); - } - while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) { OBJ_RELEASE(item); } - OPAL_THREAD_UNLOCK(&module->gc_lock); } -static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request) -{ - OPAL_THREAD_SCOPED_LOCK(&module->gc_lock, - opal_list_append (&module->request_gc, (opal_list_item_t *) request)); -} - static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer) { OPAL_THREAD_SCOPED_LOCK(&module->gc_lock, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 0074c9f0e9f..fc1c3c4f2a0 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -45,10 +45,9 @@ static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request) mark_outgoing_completion(module); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); + ompi_request_free (&request); - return OMPI_SUCCESS; + return 1; } static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) @@ -101,10 +100,9 @@ static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); assert (NULL != module); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); + ompi_request_free (&request); - return OMPI_SUCCESS; + return 1; } /* self communication optimizations */ diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index d9833ce8af1..d96fbfedab7 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -320,7 +320,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); - OBJ_CONSTRUCT(&module->request_gc, opal_list_t); OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 18a53412468..1c37d52353b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -238,10 +238,8 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request) /* free the temporary buffer */ free (ctx); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; + ompi_request_free (&request); + return 1; } /** @@ -437,10 +435,8 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request) mark_incoming_completion (module, rank); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; + ompi_request_free (&request); + return 1; } struct osc_pt2pt_get_post_send_cb_data_t { @@ -460,10 +456,8 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request) /* mark this as a completed "incoming" request */ mark_incoming_completion (module, rank); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; + ompi_request_free (&request); + return 1; } /** @@ -699,9 +693,7 @@ static int accumulate_cb (ompi_request_t *request) osc_pt2pt_gc_add_buffer (module, &acc_data->super); } - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - + ompi_request_free (&request); return ret; } @@ -771,13 +763,11 @@ static int replace_cb (ompi_request_t *request) mark_incoming_completion (module, rank); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - /* unlock the accumulate lock */ ompi_osc_pt2pt_accumulate_unlock (module); - return OMPI_SUCCESS; + ompi_request_free (&request); + return 1; } /** @@ -1435,13 +1425,11 @@ static int process_large_datatype_request_cb (ompi_request_t *request) return OMPI_ERROR; } - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - /* free the datatype buffer */ osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super); - return OMPI_SUCCESS; + ompi_request_free (&request); + return 1; } /** diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c index b926cae1579..2be1c45d9b4 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c @@ -37,11 +37,9 @@ static int frag_send_cb (ompi_request_t *request) mark_outgoing_completion(module); opal_free_list_return (&mca_osc_pt2pt_component.frags, &frag->super); + ompi_request_free (&request); - /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (module, request); - - return OMPI_SUCCESS; + return 1; } static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c index 564954d2daa..7b6cb61d24c 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c @@ -79,7 +79,6 @@ int ompi_osc_pt2pt_free(ompi_win_t *win) OPAL_LIST_DESTRUCT(&module->pending_acc); osc_pt2pt_gc_clean (module); - OPAL_LIST_DESTRUCT(&module->request_gc); OPAL_LIST_DESTRUCT(&module->buffer_gc); OBJ_DESTRUCT(&module->gc_lock); diff --git a/ompi/request/request.h b/ompi/request/request.h index 6136119155e..e94fa11c75c 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -67,8 +67,10 @@ typedef int (*ompi_request_cancel_fn_t)(struct ompi_request_t* request, int flag /* * Optional function called when the request is completed from the MPI - * library perspective. This function is not allowed to release any - * ressources related to the request. + * library perspective. This function is allowed to release the request if + * the request will not be used with ompi_request_wait* or ompi_request_test. + * If the function reposts (using start) a request or calls ompi_request_free() + * on the request it *MUST* return 1. It should return 0 otherwise. */ typedef int (*ompi_request_complete_fn_t)(struct ompi_request_t* request); @@ -412,24 +414,28 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) */ static inline int ompi_request_complete(ompi_request_t* request, bool with_signal) { + int rc = 0; + if( NULL != request->req_complete_cb) { - request->req_complete_cb( request ); + rc = request->req_complete_cb( request ); request->req_complete_cb = NULL; } - if( OPAL_LIKELY(with_signal) ) { - if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) { - ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete, - REQUEST_COMPLETED); - /* In the case where another thread concurrently changed the request to REQUEST_PENDING */ - if( REQUEST_PENDING != tmp_sync ) - wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR); + if (0 == rc) { + if( OPAL_LIKELY(with_signal) ) { + if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) { + ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete, + REQUEST_COMPLETED); + /* In the case where another thread concurrently changed the request to REQUEST_PENDING */ + if( REQUEST_PENDING != tmp_sync ) + wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR); + } + } else + request->req_complete = REQUEST_COMPLETED; + + if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) { + ompi_request_failed++; } - } else - request->req_complete = REQUEST_COMPLETED; - - if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) { - ompi_request_failed++; } return OMPI_SUCCESS;