Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 4 additions & 22 deletions ompi/mca/osc/pt2pt/osc_pt2pt.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,6 @@ struct ompi_osc_pt2pt_module_t {
/** Lock for garbage collection lists */
opal_mutex_t gc_lock;

/** List of requests that need to be freed */
opal_list_t request_gc;

/** List of buffers that need to be freed */
opal_list_t buffer_gc;
};
Expand Down Expand Up @@ -658,41 +655,26 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, con
}

/**
* osc_pt2pt_request_gc_clean:
* osc_pt2pt_gc_clean:
*
* @short Release finished PML requests and accumulate buffers.
*
* @long This function exists because it is not possible to free a PML request
* or buffer from a request completion callback. We instead put requests
* and buffers on the module's garbage collection lists and release then
* at a later time.
* @long This function exists because it is not possible to free a buffer from
* a request completion callback. We instead put requests and buffers on the
* module's garbage collection lists and release then at a later time.
*/
static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
{
ompi_request_t *request;
opal_list_item_t *item;

OPAL_THREAD_LOCK(&module->gc_lock);

while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) {
OPAL_THREAD_UNLOCK(&module->gc_lock);
ompi_request_free (&request);
OPAL_THREAD_LOCK(&module->gc_lock);
}

while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) {
OBJ_RELEASE(item);
}

OPAL_THREAD_UNLOCK(&module->gc_lock);
}

static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request)
{
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
opal_list_append (&module->request_gc, (opal_list_item_t *) request));
}

static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer)
{
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
Expand Down
10 changes: 4 additions & 6 deletions ompi/mca/osc/pt2pt/osc_pt2pt_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request)

mark_outgoing_completion(module);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
ompi_request_free (&request);

return OMPI_SUCCESS;
return 1;
}

static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
Expand Down Expand Up @@ -101,10 +100,9 @@ static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
assert (NULL != module);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
ompi_request_free (&request);

return OMPI_SUCCESS;
return 1;
}

/* self communication optimizations */
Expand Down
1 change: 0 additions & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
Expand Down
34 changes: 11 additions & 23 deletions ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,8 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
/* free the temporary buffer */
free (ctx);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down Expand Up @@ -437,10 +435,8 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)

mark_incoming_completion (module, rank);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

struct osc_pt2pt_get_post_send_cb_data_t {
Expand All @@ -460,10 +456,8 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
/* mark this as a completed "incoming" request */
mark_incoming_completion (module, rank);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down Expand Up @@ -699,9 +693,7 @@ static int accumulate_cb (ompi_request_t *request)
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
}

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

ompi_request_free (&request);
return ret;
}

Expand Down Expand Up @@ -771,13 +763,11 @@ static int replace_cb (ompi_request_t *request)

mark_incoming_completion (module, rank);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

/* unlock the accumulate lock */
ompi_osc_pt2pt_accumulate_unlock (module);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down Expand Up @@ -1435,13 +1425,11 @@ static int process_large_datatype_request_cb (ompi_request_t *request)
return OMPI_ERROR;
}

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

/* free the datatype buffer */
osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);

return OMPI_SUCCESS;
ompi_request_free (&request);
return 1;
}

/**
Expand Down
6 changes: 2 additions & 4 deletions ompi/mca/osc/pt2pt/osc_pt2pt_frag.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ static int frag_send_cb (ompi_request_t *request)
mark_outgoing_completion(module);
opal_free_list_return (&mca_osc_pt2pt_component.frags, &frag->super);

ompi_request_free (&request);

/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);

return OMPI_SUCCESS;
return 1;
}

static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag)
Expand Down
1 change: 0 additions & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ int ompi_osc_pt2pt_free(ompi_win_t *win)
OPAL_LIST_DESTRUCT(&module->pending_acc);

osc_pt2pt_gc_clean (module);
OPAL_LIST_DESTRUCT(&module->request_gc);
OPAL_LIST_DESTRUCT(&module->buffer_gc);
OBJ_DESTRUCT(&module->gc_lock);

Expand Down
36 changes: 21 additions & 15 deletions ompi/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ typedef int (*ompi_request_cancel_fn_t)(struct ompi_request_t* request, int flag

/*
* Optional function called when the request is completed from the MPI
* library perspective. This function is not allowed to release any
* ressources related to the request.
* library perspective. This function is allowed to release the request if
* the request will not be used with ompi_request_wait* or ompi_request_test.
* If the function reposts (using start) a request or calls ompi_request_free()
* on the request it *MUST* return 1. It should return 0 otherwise.
*/
typedef int (*ompi_request_complete_fn_t)(struct ompi_request_t* request);

Expand Down Expand Up @@ -412,24 +414,28 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
*/
static inline int ompi_request_complete(ompi_request_t* request, bool with_signal)
{
int rc = 0;

if( NULL != request->req_complete_cb) {
request->req_complete_cb( request );
rc = request->req_complete_cb( request );
request->req_complete_cb = NULL;
}

if( OPAL_LIKELY(with_signal) ) {
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
REQUEST_COMPLETED);
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
if( REQUEST_PENDING != tmp_sync )
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
if (0 == rc) {
if( OPAL_LIKELY(with_signal) ) {
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
REQUEST_COMPLETED);
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
if( REQUEST_PENDING != tmp_sync )
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
}
} else
request->req_complete = REQUEST_COMPLETED;

if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
ompi_request_failed++;
}
} else
request->req_complete = REQUEST_COMPLETED;

if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
ompi_request_failed++;
}

return OMPI_SUCCESS;
Expand Down