Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ompi/mca/coll/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* These functions are normally invoked by the back-ends of:
*
* - The back-ends of MPI_Init() and MPI_Finalize()
* - Communuicactor constructors (e.g., MPI_Comm_split()) and
* - Communicator constructors (e.g., MPI_Comm_split()) and
* destructors (e.g., MPI_Comm_free())
* - The laminfo command
*/
Expand Down
109 changes: 57 additions & 52 deletions ompi/mca/coll/base/coll_base_alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2015 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
Expand Down Expand Up @@ -42,8 +42,8 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
mca_coll_base_module_t *module)
{
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
int i, j, size, rank, err=MPI_SUCCESS;
MPI_Request *preq;
int i, j, size, rank, err = MPI_SUCCESS, line;
ompi_request_t **preq, **reqs;
char *tmp_buffer;
size_t max_size;
ptrdiff_t ext, true_lb, true_ext;
Expand All @@ -63,65 +63,72 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
ompi_datatype_get_true_extent ( rdtype, &true_lb, &true_ext);
max_size = true_ext + ext * (rcount-1);

/* Initiate all send/recv to/from others. */
reqs = coll_base_comm_get_reqs(base_module->base_data, 2);
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bosilca
Why not just returning OMPI_ERR_OUT_OF_RESOURCE here?
If you don't, you should check reqs!=NULL before calling free_reqs() at error_hndl
(or insert a check inside the routine free_reqs() as proposed in the first review)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 reasons:

  • I prefer to have a single return statement for errors
  • to get access to the error message.


/* Allocate a temporary buffer */
tmp_buffer = calloc (max_size, 1);
if (NULL == tmp_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (NULL == tmp_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; }
max_size = ext * rcount;

/* in-place alltoall slow algorithm (but works) */
for (i = 0 ; i < size ; ++i) {
for (j = i+1 ; j < size ; ++j) {
/* Initiate all send/recv to/from others. */
preq = coll_base_comm_get_reqs(base_module->base_data, size * 2);
preq = reqs;

if (i == rank) {
/* Copy the data into the temporary buffer */
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bosilca (line 77 above): no need to allocate size*2 requests, we are only posting 2 requests in each turn of the loop

err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
(char *) rbuf + j * max_size);
if (MPI_SUCCESS != err) { goto error_hndl; }
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * j, rcount, rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALL, comm, preq++));
if (MPI_SUCCESS != err) { goto error_hndl; }
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

err = MCA_PML_CALL(isend ((char *) tmp_buffer, rcount, rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
comm, preq++));
if (MPI_SUCCESS != err) { goto error_hndl; }
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
} else if (j == rank) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
(char *) rbuf + i * max_size);
if (MPI_SUCCESS != err) { goto error_hndl; }
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * i, rcount, rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALL, comm, preq++));
if (MPI_SUCCESS != err) { goto error_hndl; }
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

err = MCA_PML_CALL(isend ((char *) tmp_buffer, rcount, rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
comm, preq++));
if (MPI_SUCCESS != err) { goto error_hndl; }
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
} else {
continue;
}

/* Wait for the requests to complete */
err = ompi_request_wait_all (2, base_module->base_data->mcct_reqs, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
err = ompi_request_wait_all (2, reqs, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
}
}

error_hndl:
/* Free the temporary buffer */
free (tmp_buffer);

/* All done */
if( MPI_SUCCESS != err ) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
rank));
ompi_coll_base_free_reqs(reqs, 2);
}

/* All done */
return err;
}

Expand Down Expand Up @@ -385,29 +392,28 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
total_reqs = (((max_outstanding_reqs > (size - 1)) ||
(max_outstanding_reqs <= 0)) ?
(size - 1) : (max_outstanding_reqs));
reqs = (ompi_request_t**) malloc( 2 * total_reqs *
sizeof(ompi_request_t*));
reqs = coll_base_comm_get_reqs(module->base_data, 2 * total_reqs);
if (NULL == reqs) { error = -1; line = __LINE__; goto error_hndl; }

prcv = (char *) rbuf;
psnd = (char *) sbuf;

/* Post first batch or ireceive and isend requests */
for (nreqs = 0, nrreqs = 0, ri = (rank + 1) % size; nreqs < total_reqs;
ri = (ri + 1) % size, ++nreqs, ++nrreqs) {
error =
MCA_PML_CALL(irecv
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
ri = (ri + 1) % size, ++nrreqs) {
nreqs++;
error = MCA_PML_CALL(irecv
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
}
for ( nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
si = (si + size - 1) % size, ++nreqs, ++nsreqs) {
error =
MCA_PML_CALL(isend
(psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
for (nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
si = (si + size - 1) % size, ++nsreqs) {
nreqs++;
error = MCA_PML_CALL(isend
(psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
}

Expand Down Expand Up @@ -435,11 +441,10 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
ncreqs++;
if (completed < total_reqs) {
if (nrreqs < (size - 1)) {
error =
MCA_PML_CALL(irecv
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
MCA_COLL_BASE_TAG_ALLTOALL, comm,
&reqs[completed]));
error = MCA_PML_CALL(irecv
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
MCA_COLL_BASE_TAG_ALLTOALL, comm,
&reqs[completed]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
++nrreqs;
ri = (ri + 1) % size;
Expand All @@ -451,24 +456,22 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm,
&reqs[completed]));
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
++nsreqs;
si = (si + size - 1) % size;
}
}
}
}

/* Free the reqs */
free(reqs);

/* All done */
return MPI_SUCCESS;

error_hndl:
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error,
rank));
if (NULL != reqs) free(reqs);
ompi_coll_base_free_reqs(reqs, nreqs);
return error;
}

Expand Down Expand Up @@ -554,7 +557,7 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, rank, size, err, nreqs;
int i, rank, size, err, nreqs, line;
char *psnd, *prcv;
MPI_Aint lb, sndinc, rcvinc;
ompi_request_t **req, **sreq, **rreq;
Expand Down Expand Up @@ -605,21 +608,20 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
/* Initiate all send/recv to/from others. */

req = rreq = coll_base_comm_get_reqs(data, (size - 1) * 2);
if (NULL == req) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hndl; }

prcv = (char *) rbuf;
psnd = (char *) sbuf;

/* Post all receives first -- a simple optimization */

for (nreqs = 0, i = (rank + 1) % size; i != rank;
i = (i + 1) % size, ++rreq, ++nreqs) {
i = (i + 1) % size, ++rreq) {
nreqs++;
err = MCA_PML_CALL(irecv_init
(prcv + (ptrdiff_t)i * rcvinc, rcount, rdtype, i,
MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq));
if (MPI_SUCCESS != err) {
ompi_coll_base_free_reqs(req, nreqs);
return err;
}
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
}

/* Now post all sends in reverse order
Expand All @@ -628,15 +630,13 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
*/
sreq = rreq;
for (i = (rank + size - 1) % size; i != rank;
i = (i + size - 1) % size, ++sreq, ++nreqs) {
i = (i + size - 1) % size, ++sreq) {
nreqs++;
err = MCA_PML_CALL(isend_init
(psnd + (ptrdiff_t)i * sndinc, scount, sdtype, i,
MCA_COLL_BASE_TAG_ALLTOALL,
MCA_PML_BASE_SEND_STANDARD, comm, sreq));
if (MPI_SUCCESS != err) {
ompi_coll_base_free_reqs(req, nreqs);
return err;
}
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
}

/* Start your engines. This will never return an error. */
Expand All @@ -652,7 +652,12 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,

err = ompi_request_wait_all(nreqs, req, MPI_STATUSES_IGNORE);

/* Free the reqs */
err_hndl:
if( MPI_SUCCESS != err ) {
OPAL_OUTPUT( (ompi_coll_base_framework.framework_output,"%s:%4d\tError occurred %d, rank %2d",
__FILE__, line, err, rank) );
}
/* Free the reqs in all cases as they are persistent requests */
ompi_coll_base_free_reqs(req, nreqs);

/* All done */
Expand Down
Loading