From 1f1504ebbb2301879272a3c785f66991143d25d9 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Wed, 6 Jul 2016 12:31:49 -0500 Subject: [PATCH 1/2] remove some unused code --- ompi/mca/io/ompio/io_ompio.c | 745 ----------------------------------- ompi/mca/io/ompio/io_ompio.h | 35 -- 2 files changed, 780 deletions(-) diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index 7d93402606a..1c8fa3a3618 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -1165,751 +1165,6 @@ int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh, return 1; } -int ompi_io_ompio_distribute_file_view (mca_io_ompio_file_t *fh, - struct iovec *broken_iov, - int broken_count, - int num_aggregators, - size_t stripe_size, - int **fview_count, - struct iovec **iov, - int *count) -{ - - - int *num_entries; - int *broken_index; - int temp = 0; - int *fview_cnt = NULL; - int global_fview_count = 0; - int i = 0; - int *displs = NULL; - int rc = OMPI_SUCCESS; - struct iovec *global_fview = NULL; - struct iovec **broken = NULL; - MPI_Request *req=NULL, *sendreq=NULL; - - - num_entries = (int *) malloc (sizeof (int) * num_aggregators); - if (NULL == num_entries) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - broken_index = (int *) malloc (sizeof (int) * num_aggregators); - if (NULL == broken_index) { - opal_output (1, "OUT OF MEMORY\n"); - free(num_entries); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - memset (num_entries, 0x0, num_aggregators * sizeof (int)); - memset (broken_index, 0x0, num_aggregators * sizeof (int)); - - /* calculate how many entries in the broken iovec belong to each aggregator */ - for (i=0 ; if_rank%fh->f_aggregator_index) { - fview_cnt = (int *) malloc (sizeof (int) * fh->f_size); - if (NULL == fview_cnt) { - opal_output (1, "OUT OF MEMORY\n"); - free(num_entries); - free(broken_index); - return OMPI_ERR_OUT_OF_RESOURCE; - } - req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request)); - if (NULL == req) { - free(num_entries); - free(broken_index); - free(fview_cnt); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - - sendreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request)); - if (NULL == sendreq) { - free(num_entries); - free(broken_index); - if (0 == fh->f_rank%fh->f_aggregator_index) { - free(fview_cnt); - free(req); - } - return OMPI_ERR_OUT_OF_RESOURCE; - } - - /* gather at each aggregator how many entires from the broken file view it - expects from each process */ - if (0 == fh->f_rank%fh->f_aggregator_index) { - for (i=0; if_size ; i++) { - rc = MCA_PML_CALL(irecv(&fview_cnt[i], - 1, - MPI_INT, - i, - OMPIO_TAG_GATHER, - fh->f_comm, - &req[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - - for (i=0 ; if_aggregator_index, - OMPIO_TAG_GATHER, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &sendreq[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - - if (0 == fh->f_rank%fh->f_aggregator_index) { - rc = ompi_request_wait_all (fh->f_size, req, MPI_STATUSES_IGNORE); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - rc = ompi_request_wait_all (num_aggregators, sendreq, MPI_STATUSES_IGNORE); - if (OMPI_SUCCESS != rc) { - goto exit; - } - - /* - for (i=0 ; if_comm->c_coll.coll_gather (&num_entries[i], - 1, - MPI_INT, - fview_cnt, - 1, - MPI_INT, - i*fh->f_aggregator_index, - fh->f_comm, - fh->f_comm->c_coll.coll_gather_module); - } - */ - - if (0 == fh->f_rank%fh->f_aggregator_index) { - displs = (int*) malloc (fh->f_size * sizeof (int)); - if (NULL == displs) { - opal_output (1, "OUT OF MEMORY\n"); - free(fview_cnt); - free(num_entries); - free(broken_index); - return OMPI_ERR_OUT_OF_RESOURCE; - } - displs[0] = 0; - global_fview_count = fview_cnt[0]; - for (i=1 ; if_size ; i++) { - global_fview_count += fview_cnt[i]; - displs[i] = displs[i-1] + fview_cnt[i-1]; - } - - if (global_fview_count) { - global_fview = (struct iovec*)malloc (global_fview_count * - sizeof(struct iovec)); - if (NULL == global_fview) { - opal_output (1, "OUT OF MEMORY\n"); - free(num_entries); - free(broken_index); - free(fview_cnt); - free(displs); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - } - - broken = (struct iovec**)malloc (num_aggregators * sizeof(struct iovec *)); - if (NULL == broken) { - opal_output (1, "OUT OF MEMORY\n"); - free(num_entries); - free(broken_index); - if (0 == fh->f_rank%fh->f_aggregator_index) { - free(global_fview); - free(displs); - free(fview_cnt); - } - return OMPI_ERR_OUT_OF_RESOURCE; - } - - for (i=0 ; if_rank%fh->f_aggregator_index) { - free(global_fview); - free(displs); - free(fview_cnt); - } - free(broken); - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - } - - for (i=0 ; i%d: OFFSET: %d LENGTH: %d\n", - fh->f_rank, - i, - broken[i][j].iov_base, - broken[i][j].iov_len); - } - } - sleep(1); - */ - - if (0 == fh->f_rank%fh->f_aggregator_index) { - ptrdiff_t lb, extent; - rc = ompi_datatype_get_extent(fh->f_iov_type, &lb, &extent); - if (OMPI_SUCCESS != rc) { - goto exit; - } - for (i=0; if_size ; i++) { - if (fview_cnt[i]) { - char *ptmp; - ptmp = ((char *) global_fview) + (extent * displs[i]); - rc = MCA_PML_CALL(irecv(ptmp, - fview_cnt[i], - fh->f_iov_type, - i, - OMPIO_TAG_GATHERV, - fh->f_comm, - &req[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - } - - for (i=0 ; if_iov_type, - i*fh->f_aggregator_index, - OMPIO_TAG_GATHERV, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &sendreq[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - - if (0 == fh->f_rank%fh->f_aggregator_index) { - for (i=0; if_size ; i++) { - if (fview_cnt[i]) { - rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - } - - for (i=0; if_comm->c_coll.coll_gatherv (broken[i], - num_entries[i], - fh->f_iov_type, - global_fview, - fview_cnt, - displs, - fh->f_iov_type, - i*fh->f_aggregator_index, - fh->f_comm, - fh->f_comm->c_coll.coll_gatherv_module); - } - */ - /* - for (i=0 ; if_rank, - global_fview[i].iov_base, - global_fview[i].iov_len); - } - */ - exit: - - if (NULL != broken) { - for (i=0 ; i part) { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+ - temp_position[temp]), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf + - (total_bytes_sent-bytes_remaining)), - part); - bytes_remaining -= part; - temp_position[temp] += part; - part = 0; - current ++; - } - else { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+ - temp_position[temp]), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf + - (total_bytes_sent-bytes_remaining)), - bytes_remaining); - break; - } - } - else { - if (bytes_remaining > broken_iovec[current].iov_len) { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+ - temp_position[temp]), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf + - (total_bytes_sent-bytes_remaining)), - broken_iovec[current].iov_len); - bytes_remaining -= broken_iovec[current].iov_len; - temp_position[temp] += broken_iovec[current].iov_len; - current ++; - } - else { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+ - temp_position[temp]), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf + - (total_bytes_sent-bytes_remaining)), - bytes_remaining); - break; - } - } - } - - sendreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request)); - if (NULL == sendreq) { - free(sbuf); - free(temp_position); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - if (0 == fh->f_rank%fh->f_aggregator_index) { - req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request)); - if (NULL == req) { - free(sbuf); - free(temp_position); - free(sendreq); - return OMPI_ERR_OUT_OF_RESOURCE; - } - for (i=0; if_size ; i++) { - if (bytes_per_process[i]) { - rc = MCA_PML_CALL(irecv((char *)global_buf + displs[i], - bytes_per_process[i], - MPI_BYTE, - i, - OMPIO_TAG_GATHERV, - fh->f_comm, - &req[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - } - - for (i=0 ; if_aggregator_index, - OMPIO_TAG_GATHERV, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &sendreq[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - - if (0 == fh->f_rank%fh->f_aggregator_index) { - for (i=0; if_size ; i++) { - if (bytes_per_process[i]) { - rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - } - for (i=0; if_comm->c_coll.coll_gatherv (sbuf[i], - bytes_sent[i], - MPI_BYTE, - global_buf, - bytes_per_process, - displs, - MPI_BYTE, - i*fh->f_aggregator_index, - fh->f_comm, - fh->f_comm->c_coll.coll_gatherv_module); - } - */ - - exit: - for (i=0 ; if_aggregator_index, - OMPIO_TAG_SCATTERV, - fh->f_comm, - &recvreq[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - - if (0 == fh->f_rank%fh->f_aggregator_index) { - req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request)); - if (NULL == req) { - free(temp_position); - for (i=0; if_size ; i++) { - if (bytes_per_process[i]) { - rc = MCA_PML_CALL(isend((char *)global_buf + displs[i], - bytes_per_process[i], - MPI_BYTE, - i, - OMPIO_TAG_SCATTERV, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &req[i])); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - } - - for (i=0; if_rank%fh->f_aggregator_index) { - for (i=0; if_size ; i++) { - if (bytes_per_process[i]) { - rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE); - if (OMPI_SUCCESS != rc) { - goto exit; - } - } - } - } - /* - for (i=0 ; if_comm->c_coll.coll_scatterv (global_buf, - bytes_per_process, - displs, - MPI_BYTE, - rbuf[i], - bytes_received[i], - MPI_BYTE, - i*fh->f_aggregator_index, - fh->f_comm, - fh->f_comm->c_coll.coll_scatterv_module); - } - */ - bytes_remaining = total_bytes_recv; - - while (bytes_remaining) { - temp = (int)((OPAL_PTRDIFF_TYPE)broken_iovec[current].iov_base/stripe_size) - % num_aggregators; - - if (part) { - if (bytes_remaining > part) { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf + - (total_bytes_recv-bytes_remaining)), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+ - temp_position[temp]), - part); - bytes_remaining -= part; - temp_position[temp] += part; - part = 0; - current ++; - } - else { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf + - (total_bytes_recv-bytes_remaining)), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+ - temp_position[temp]), - bytes_remaining); - break; - } - } - else { - if (bytes_remaining > broken_iovec[current].iov_len) { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf + - (total_bytes_recv-bytes_remaining)), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+ - temp_position[temp]), - broken_iovec[current].iov_len); - bytes_remaining -= broken_iovec[current].iov_len; - temp_position[temp] += broken_iovec[current].iov_len; - current ++; - } - else { - memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf + - (total_bytes_recv-bytes_remaining)), - (IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+ - temp_position[temp]), - bytes_remaining); - break; - } - } - } - - exit: - for (i=0 ; i Date: Wed, 6 Jul 2016 13:12:31 -0500 Subject: [PATCH 2/2] fcoll/base: mv coll_array functionis to fcoll base the coll_array functions are truly only used by the fcoll modules, so move them to fcoll/base. There is currently one exception to that rule (number of aggreagtors logic), but that function will be moved in a long term also to fcoll/base. --- ompi/mca/fcoll/base/Makefile.am | 6 +- .../base/fcoll_base_coll_array.c} | 221 +++++++++--------- ompi/mca/fcoll/base/fcoll_base_coll_array.h | 108 +++++++++ .../dynamic/fcoll_dynamic_file_read_all.c | 67 +++--- .../dynamic/fcoll_dynamic_file_write_all.c | 67 +++--- .../fcoll_dynamic_gen2_file_read_all.c | 65 +++--- .../fcoll_dynamic_gen2_file_write_all.c | 63 ++--- .../fcoll/static/fcoll_static_file_read_all.c | 63 ++--- .../static/fcoll_static_file_write_all.c | 63 ++--- ompi/mca/io/ompio/Makefile.am | 1 - ompi/mca/io/ompio/io_ompio.c | 103 ++++---- ompi/mca/io/ompio/io_ompio.h | 117 ---------- ompi/mca/io/ompio/io_ompio_file_open.c | 5 - 13 files changed, 472 insertions(+), 477 deletions(-) rename ompi/mca/{io/ompio/io_ompio_coll_array.c => fcoll/base/fcoll_base_coll_array.c} (76%) create mode 100644 ompi/mca/fcoll/base/fcoll_base_coll_array.h diff --git a/ompi/mca/fcoll/base/Makefile.am b/ompi/mca/fcoll/base/Makefile.am index 4a7f17d6e7f..afc85e56a78 100644 --- a/ompi/mca/fcoll/base/Makefile.am +++ b/ompi/mca/fcoll/base/Makefile.am @@ -18,10 +18,12 @@ # headers += \ - base/base.h + base/base.h \ + base/fcoll_base_coll_array.h libmca_fcoll_la_SOURCES += \ base/fcoll_base_frame.c \ base/fcoll_base_file_select.c \ base/fcoll_base_file_unselect.c \ - base/fcoll_base_find_available.c + base/fcoll_base_find_available.c \ + base/fcoll_base_coll_array.c diff --git a/ompi/mca/io/ompio/io_ompio_coll_array.c b/ompi/mca/fcoll/base/fcoll_base_coll_array.c similarity index 76% rename from ompi/mca/io/ompio/io_ompio_coll_array.c rename to ompi/mca/fcoll/base/fcoll_base_coll_array.c index ed92eb41366..a903a43cd29 100644 --- a/ompi/mca/io/ompio/io_ompio_coll_array.c +++ b/ompi/mca/fcoll/base/fcoll_base_coll_array.c @@ -10,7 +10,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2011 University of Houston. All rights reserved. + * Copyright (c) 2008-2016 University of Houston. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -28,20 +28,21 @@ #include "ompi/request/request.h" #include -#include "io_ompio.h" - - -int ompi_io_ompio_allgatherv_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int *rcounts, - int *disps, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm) +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" +#include "ompi/mca/io/ompio/io_ompio.h" + + +int fcoll_base_coll_allgatherv_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int *rcounts, + int *disps, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm) { int err = OMPI_SUCCESS; OPAL_PTRDIFF_TYPE extent, lb; @@ -73,17 +74,17 @@ int ompi_io_ompio_allgatherv_array (void *sbuf, send_type = sdtype; } - err = ompi_io_ompio_gatherv_array (send_buf, - rcounts[j], - send_type, - rbuf, - rcounts, - disps, - rdtype, - root_index, - procs_in_group, - procs_per_group, - comm); + err = fcoll_base_coll_gatherv_array (send_buf, + rcounts[j], + send_type, + rbuf, + rcounts, + disps, + rdtype, + root_index, + procs_in_group, + procs_per_group, + comm); if (OMPI_SUCCESS != err) { return err; } @@ -100,31 +101,31 @@ int ompi_io_ompio_allgatherv_array (void *sbuf, if(MPI_SUCCESS != err) { return err; } - - ompi_io_ompio_bcast_array (rbuf, - 1, - newtype, - root_index, - procs_in_group, - procs_per_group, - comm); - + + fcoll_base_coll_bcast_array (rbuf, + 1, + newtype, + root_index, + procs_in_group, + procs_per_group, + comm); + ompi_datatype_destroy (&newtype); return OMPI_SUCCESS; } -int ompi_io_ompio_gatherv_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int *rcounts, - int *disps, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - struct ompi_communicator_t *comm) +int fcoll_base_coll_gatherv_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int *rcounts, + int *disps, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + struct ompi_communicator_t *comm) { int i, rank; int err = OMPI_SUCCESS; @@ -140,7 +141,7 @@ int ompi_io_ompio_gatherv_array (void *sbuf, scount, sdtype, procs_in_group[root_index], - OMPIO_TAG_GATHERV, + FCOLL_TAG_GATHERV, MCA_PML_BASE_SEND_STANDARD, comm)); } @@ -181,7 +182,7 @@ int ompi_io_ompio_gatherv_array (void *sbuf, rcounts[i], rdtype, procs_in_group[i], - OMPIO_TAG_GATHERV, + FCOLL_TAG_GATHERV, comm, &reqs[i])); } @@ -203,17 +204,17 @@ int ompi_io_ompio_gatherv_array (void *sbuf, return err; } -int ompi_io_ompio_scatterv_array (void *sbuf, - int *scounts, - int *disps, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - struct ompi_communicator_t *comm) +int fcoll_base_coll_scatterv_array (void *sbuf, + int *scounts, + int *disps, + ompi_datatype_t *sdtype, + void *rbuf, + int rcount, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + struct ompi_communicator_t *comm) { int i, rank; int err = OMPI_SUCCESS; @@ -229,7 +230,7 @@ int ompi_io_ompio_scatterv_array (void *sbuf, rcount, rdtype, procs_in_group[root_index], - OMPIO_TAG_SCATTERV, + FCOLL_TAG_SCATTERV, comm, MPI_STATUS_IGNORE)); } @@ -271,7 +272,7 @@ int ompi_io_ompio_scatterv_array (void *sbuf, scounts[i], sdtype, procs_in_group[i], - OMPIO_TAG_SCATTERV, + FCOLL_TAG_SCATTERV, MCA_PML_BASE_SEND_STANDARD, comm, &reqs[i])); @@ -293,16 +294,16 @@ int ompi_io_ompio_scatterv_array (void *sbuf, return err; } -int ompi_io_ompio_allgather_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm) +int fcoll_base_coll_allgather_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int rcount, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm) { int err = OMPI_SUCCESS; int rank; @@ -321,41 +322,41 @@ int ompi_io_ompio_allgather_array (void *sbuf, } /* Gather and broadcast. */ - err = ompi_io_ompio_gather_array (sbuf, - scount, - sdtype, - rbuf, - rcount, - rdtype, - root_index, - procs_in_group, - procs_per_group, - comm); - + err = fcoll_base_coll_gather_array (sbuf, + scount, + sdtype, + rbuf, + rcount, + rdtype, + root_index, + procs_in_group, + procs_per_group, + comm); + if (OMPI_SUCCESS == err) { - err = ompi_io_ompio_bcast_array (rbuf, - rcount * procs_per_group, - rdtype, - root_index, - procs_in_group, - procs_per_group, - comm); + err = fcoll_base_coll_bcast_array (rbuf, + rcount * procs_per_group, + rdtype, + root_index, + procs_in_group, + procs_per_group, + comm); } /* All done */ return err; } -int ompi_io_ompio_gather_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - struct ompi_communicator_t *comm) +int fcoll_base_coll_gather_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int rcount, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + struct ompi_communicator_t *comm) { int i; int rank; @@ -373,7 +374,7 @@ int ompi_io_ompio_gather_array (void *sbuf, scount, sdtype, procs_in_group[root_index], - OMPIO_TAG_GATHER, + FCOLL_TAG_GATHER, MCA_PML_BASE_SEND_STANDARD, comm)); return err; @@ -410,7 +411,7 @@ int ompi_io_ompio_gather_array (void *sbuf, rcount, rdtype, procs_in_group[i], - OMPIO_TAG_GATHER, + FCOLL_TAG_GATHER, comm, &reqs[i])); /* @@ -436,13 +437,13 @@ int ompi_io_ompio_gather_array (void *sbuf, return err; } -int ompi_io_ompio_bcast_array (void *buff, - int count, - ompi_datatype_t *datatype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm) +int fcoll_base_coll_bcast_array (void *buff, + int count, + ompi_datatype_t *datatype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm) { int i, rank; int err = OMPI_SUCCESS; @@ -456,7 +457,7 @@ int ompi_io_ompio_bcast_array (void *buff, count, datatype, procs_in_group[root_index], - OMPIO_TAG_BCAST, + FCOLL_TAG_BCAST, comm, MPI_STATUS_IGNORE)); return err; @@ -478,7 +479,7 @@ int ompi_io_ompio_bcast_array (void *buff, count, datatype, procs_in_group[i], - OMPIO_TAG_BCAST, + FCOLL_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm, &reqs[i])); diff --git a/ompi/mca/fcoll/base/fcoll_base_coll_array.h b/ompi/mca/fcoll/base/fcoll_base_coll_array.h new file mode 100644 index 00000000000..a0f97d7b2ab --- /dev/null +++ b/ompi/mca/fcoll/base/fcoll_base_coll_array.h @@ -0,0 +1,108 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2016 University of Houston. All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_FCOLL_BASE_COLL_ARRAY_H +#define MCA_FCOLL_BASE_COLL_ARRAY_H + +#include "mpi.h" +#include "opal/class/opal_list.h" +#include "ompi/communicator/communicator.h" +#include "ompi/info/info.h" +#include "opal/datatype/opal_convertor.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/request/request.h" + +#define FCOLL_TAG_GATHER 100 +#define FCOLL_TAG_GATHERV 101 +#define FCOLL_TAG_BCAST 102 +#define FCOLL_TAG_SCATTERV 103 + + +/* + * Modified versions of Collective operations + * Based on an array of procs in group + */ +OMPI_DECLSPEC int fcoll_base_coll_gatherv_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int *rcounts, + int *disps, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm); +OMPI_DECLSPEC int fcoll_base_coll_scatterv_array (void *sbuf, + int *scounts, + int *disps, + ompi_datatype_t *sdtype, + void *rbuf, + int rcount, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm); +OMPI_DECLSPEC int fcoll_base_coll_allgather_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int rcount, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm); + +OMPI_DECLSPEC int fcoll_base_coll_allgatherv_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int *rcounts, + int *disps, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm); +OMPI_DECLSPEC int fcoll_base_coll_gather_array (void *sbuf, + int scount, + ompi_datatype_t *sdtype, + void *rbuf, + int rcount, + ompi_datatype_t *rdtype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm); +OMPI_DECLSPEC int fcoll_base_coll_bcast_array (void *buff, + int count, + ompi_datatype_t *datatype, + int root_index, + int *procs_in_group, + int procs_per_group, + ompi_communicator_t *comm); + +END_C_DECLS + +#endif /* MCA_FCOLL_BASE_COLL_ARRAY_H */ diff --git a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c index b506c7a3391..39b9e1d2dd5 100644 --- a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c +++ b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c @@ -23,6 +23,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" #include "math.h" @@ -161,16 +162,16 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&max_data, - 1, - MPI_LONG, - total_bytes_per_process, - 1, - MPI_LONG, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgather_array (&max_data, + 1, + MPI_LONG, + total_bytes_per_process, + 1, + MPI_LONG, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ goto exit; } @@ -213,17 +214,17 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&local_count, - 1, - MPI_INT, - fview_count, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - + ret = fcoll_base_coll_allgather_array (&local_count, + 1, + MPI_INT, + fview_count, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if (OMPI_SUCCESS != ret){ goto exit; } @@ -271,18 +272,18 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif - ret = fh->f_allgatherv_array (local_iov_array, - local_count, - fh->f_iov_type, - global_iov_array, - fview_count, - displs, - fh->f_iov_type, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - + ret = fcoll_base_coll_allgatherv_array (local_iov_array, + local_count, + fh->f_iov_type, + global_iov_array, + fview_count, + displs, + fh->f_iov_type, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if (OMPI_SUCCESS != ret){ goto exit; } diff --git a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c index 524158c35eb..9af60b606b3 100644 --- a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c +++ b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c @@ -25,6 +25,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" #include "math.h" @@ -167,17 +168,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&max_data, - 1, - MPI_LONG, - total_bytes_per_process, - 1, - MPI_LONG, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - + ret = fcoll_base_coll_allgather_array (&max_data, + 1, + MPI_LONG, + total_bytes_per_process, + 1, + MPI_LONG, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if( OMPI_SUCCESS != ret){ goto exit; } @@ -230,17 +231,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&local_count, - 1, - MPI_INT, - fview_count, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - + ret = fcoll_base_coll_allgather_array (&local_count, + 1, + MPI_INT, + fview_count, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if( OMPI_SUCCESS != ret){ goto exit; } @@ -292,17 +293,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - ret = fh->f_allgatherv_array (local_iov_array, - local_count, - fh->f_iov_type, - global_iov_array, - fview_count, - displs, - fh->f_iov_type, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgatherv_array (local_iov_array, + local_count, + fh->f_iov_type, + global_iov_array, + fview_count, + displs, + fh->f_iov_type, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ goto exit; } diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c index f34858ed34b..40438100a53 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c @@ -23,6 +23,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" #include "math.h" @@ -161,16 +162,16 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&max_data, - 1, - MPI_LONG, - total_bytes_per_process, - 1, - MPI_LONG, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgather_array (&max_data, + 1, + MPI_LONG, + total_bytes_per_process, + 1, + MPI_LONG, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ goto exit; } @@ -213,17 +214,17 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&local_count, - 1, - MPI_INT, - fview_count, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - + ret = fcoll_base_coll_allgather_array (&local_count, + 1, + MPI_INT, + fview_count, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if (OMPI_SUCCESS != ret){ goto exit; } @@ -271,17 +272,17 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rcomm_time = MPI_Wtime(); #endif - ret = fh->f_allgatherv_array (local_iov_array, - local_count, - fh->f_iov_type, - global_iov_array, - fview_count, - displs, - fh->f_iov_type, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgatherv_array (local_iov_array, + local_count, + fh->f_iov_type, + global_iov_array, + fview_count, + displs, + fh->f_iov_type, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ goto exit; diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 409fdc4c006..8946d33652c 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -25,6 +25,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" #include "math.h" @@ -273,16 +274,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, fh->f_comm->c_coll.coll_allgather_module); } else { - ret = fh->f_allgather_array (broken_total_lengths, - dynamic_gen2_num_io_procs, - MPI_LONG, - total_bytes_per_process, - dynamic_gen2_num_io_procs, - MPI_LONG, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgather_array (broken_total_lengths, + dynamic_gen2_num_io_procs, + MPI_LONG, + total_bytes_per_process, + dynamic_gen2_num_io_procs, + MPI_LONG, + 0, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); } if( OMPI_SUCCESS != ret){ @@ -332,16 +333,16 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, fh->f_comm->c_coll.coll_allgather_module); } else { - ret = fh->f_allgather_array (broken_counts, - dynamic_gen2_num_io_procs, - MPI_INT, - result_counts, - dynamic_gen2_num_io_procs, - MPI_INT, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgather_array (broken_counts, + dynamic_gen2_num_io_procs, + MPI_INT, + result_counts, + dynamic_gen2_num_io_procs, + MPI_INT, + 0, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); } if( OMPI_SUCCESS != ret){ goto exit; @@ -419,17 +420,17 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, fh->f_comm->c_coll.coll_allgatherv_module ); } else { - ret = fh->f_allgatherv_array (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - aggr_data[i]->fview_count, - displs, - fh->f_iov_type, - aggregators[i], - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgatherv_array (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + aggr_data[i]->fview_count, + displs, + fh->f_iov_type, + aggregators[i], + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); } if (OMPI_SUCCESS != ret){ goto exit; diff --git a/ompi/mca/fcoll/static/fcoll_static_file_read_all.c b/ompi/mca/fcoll/static/fcoll_static_file_read_all.c index ae03552327e..17206628c39 100644 --- a/ompi/mca/fcoll/static/fcoll_static_file_read_all.c +++ b/ompi/mca/fcoll/static/fcoll_static_file_read_all.c @@ -26,6 +26,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" #include "math.h" @@ -291,16 +292,16 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rexch = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&iov_size, - 1, - MPI_INT, - iovec_count_per_process, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgather_array (&iov_size, + 1, + MPI_INT, + iovec_count_per_process, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if( OMPI_SUCCESS != ret){ goto exit; @@ -334,17 +335,17 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rexch = MPI_Wtime(); #endif - ret = fh->f_gatherv_array (local_iov_array, - iov_size, - io_array_type, - global_iov_array, - iovec_count_per_process, - displs, - io_array_type, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_gatherv_array (local_iov_array, + iov_size, + io_array_type, + global_iov_array, + iovec_count_per_process, + displs, + io_array_type, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ fprintf(stderr,"global_iov_array gather error!\n"); @@ -493,16 +494,16 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_rexch = MPI_Wtime(); #endif - fh->f_gather_array (&bytes_to_read_in_cycle, - 1, - MPI_INT, - bytes_per_process, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + fcoll_base_coll_gather_array (&bytes_to_read_in_cycle, + 1, + MPI_INT, + bytes_per_process, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_rcomm_time = MPI_Wtime(); diff --git a/ompi/mca/fcoll/static/fcoll_static_file_write_all.c b/ompi/mca/fcoll/static/fcoll_static_file_write_all.c index a326e2ae549..9e817b9dacc 100644 --- a/ompi/mca/fcoll/static/fcoll_static_file_write_all.c +++ b/ompi/mca/fcoll/static/fcoll_static_file_write_all.c @@ -26,6 +26,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" #include "math.h" @@ -294,16 +295,16 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_exch = MPI_Wtime(); #endif - ret = fh->f_allgather_array (&iov_size, - 1, - MPI_INT, - iovec_count_per_process, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_allgather_array (&iov_size, + 1, + MPI_INT, + iovec_count_per_process, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if( OMPI_SUCCESS != ret){ fprintf(stderr,"iov size allgatherv array!\n"); @@ -338,17 +339,17 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_exch = MPI_Wtime(); #endif - ret = fh->f_gatherv_array (local_iov_array, - iov_size, - io_array_type, - global_iov_array, - iovec_count_per_process, - displs, - io_array_type, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_gatherv_array (local_iov_array, + iov_size, + io_array_type, + global_iov_array, + iovec_count_per_process, + displs, + io_array_type, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ fprintf(stderr,"global_iov_array gather error!\n"); goto exit; @@ -499,16 +500,16 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, start_exch = MPI_Wtime(); #endif /* gather from each process how many bytes each will be sending */ - ret = fh->f_gather_array (&bytes_to_write_in_cycle, - 1, - MPI_INT, - bytes_per_process, - 1, - MPI_INT, - fh->f_aggregator_index, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); + ret = fcoll_base_coll_gather_array (&bytes_to_write_in_cycle, + 1, + MPI_INT, + bytes_per_process, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); if (OMPI_SUCCESS != ret){ fprintf(stderr,"bytes_to_write_in_cycle gather error!\n"); diff --git a/ompi/mca/io/ompio/Makefile.am b/ompi/mca/io/ompio/Makefile.am index b0efdf633d5..887e4e7ebf6 100644 --- a/ompi/mca/io/ompio/Makefile.am +++ b/ompi/mca/io/ompio/Makefile.am @@ -46,7 +46,6 @@ sources = \ io_ompio.c \ io_ompio_component.c \ io_ompio_module.c \ - io_ompio_coll_array.c \ io_ompio_file_set_view.c \ io_ompio_file_open.c \ io_ompio_file_write.c \ diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index 1c8fa3a3618..5ec6d9c594c 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -28,6 +28,7 @@ #include "ompi/communicator/communicator.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/topo/topo.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "opal/datatype/opal_convertor.h" #include "opal/datatype/opal_datatype.h" #include "ompi/datatype/ompi_datatype.h" @@ -1885,17 +1886,17 @@ int mca_io_ompio_merge_groups(mca_io_ompio_file_t *fh, //merge_aggrs[0] is considered the new aggregator //New aggregator collects group sizes of the groups to be merged - ompi_io_ompio_allgather_array (&fh->f_init_procs_per_group, - 1, - MPI_INT, - sizes_old_group, - 1, - MPI_INT, - 0, - merge_aggrs, - num_merge_aggrs, - fh->f_comm); - + fcoll_base_coll_allgather_array (&fh->f_init_procs_per_group, + 1, + MPI_INT, + sizes_old_group, + 1, + MPI_INT, + 0, + merge_aggrs, + num_merge_aggrs, + fh->f_comm); + fh->f_procs_per_group = 0; @@ -1919,18 +1920,18 @@ int mca_io_ompio_merge_groups(mca_io_ompio_file_t *fh, //New aggregator also collects the grouping distribution //This is the actual merge //use allgatherv array - ompi_io_ompio_allgatherv_array (fh->f_init_procs_in_group, - fh->f_init_procs_per_group, - MPI_INT, - fh->f_procs_in_group, - sizes_old_group, - displs, - MPI_INT, - 0, - merge_aggrs, - num_merge_aggrs, - fh->f_comm); - + fcoll_base_coll_allgatherv_array (fh->f_init_procs_in_group, + fh->f_init_procs_per_group, + MPI_INT, + fh->f_procs_in_group, + sizes_old_group, + displs, + MPI_INT, + 0, + merge_aggrs, + num_merge_aggrs, + fh->f_comm); + free(displs); free (sizes_old_group); @@ -2107,16 +2108,16 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh, } //Gather start offsets across processes in a group on aggregator - ompi_io_ompio_allgather_array (start_offset_len, - 3, - OMPI_OFFSET_DATATYPE, - start_offsets_lens_tmp, - 3, - OMPI_OFFSET_DATATYPE, - 0, - fh->f_init_procs_in_group, - fh->f_init_procs_per_group, - fh->f_comm); + fcoll_base_coll_allgather_array (start_offset_len, + 3, + OMPI_OFFSET_DATATYPE, + start_offsets_lens_tmp, + 3, + OMPI_OFFSET_DATATYPE, + 0, + fh->f_init_procs_in_group, + fh->f_init_procs_per_group, + fh->f_comm); for( k = 0 ; k < fh->f_init_procs_per_group; k++){ end_offsets_tmp[k] = start_offsets_lens_tmp[3*k] + start_offsets_lens_tmp[3*k+1]; } @@ -2150,16 +2151,16 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh, return OMPI_ERR_OUT_OF_RESOURCE; } //Communicate bytes per group between all aggregators - ompi_io_ompio_allgather_array (bytes_per_group, - 1, - OMPI_OFFSET_DATATYPE, - aggr_bytes_per_group_tmp, - 1, - OMPI_OFFSET_DATATYPE, - 0, - fh->f_init_aggr_list, - fh->f_init_num_aggrs, - fh->f_comm); + fcoll_base_coll_allgather_array (bytes_per_group, + 1, + OMPI_OFFSET_DATATYPE, + aggr_bytes_per_group_tmp, + 1, + OMPI_OFFSET_DATATYPE, + 0, + fh->f_init_aggr_list, + fh->f_init_num_aggrs, + fh->f_comm); for( i = 0; i < fh->f_init_num_aggrs; i++){ if((size_t)(aggr_bytes_per_group_tmp[i])> @@ -2230,14 +2231,14 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh, *decision_list = &decision_list_tmp[0]; } //Communicate flag to all group members - ompi_io_ompio_bcast_array (ompio_grouping_flag, - 1, - MPI_INT, - 0, - fh->f_init_procs_in_group, - fh->f_init_procs_per_group, - fh->f_comm); - + fcoll_base_coll_bcast_array (ompio_grouping_flag, + 1, + MPI_INT, + 0, + fh->f_init_procs_in_group, + fh->f_init_procs_per_group, + fh->f_comm); + return OMPI_SUCCESS; diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 3bd3a9fdb29..d24493b3c35 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -224,52 +224,6 @@ typedef int (*mca_io_ompio_sort_iovec_fn_t) (struct iovec *iov, int num_entries, int *sorted); -/* collective operations based on list of participating ranks instead of communicators*/ -typedef int (*mca_io_ompio_allgather_array_fn_t) (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); - -typedef int (*mca_io_ompio_allgatherv_array_fn_t) (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int *rcounts, - int *disps, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); - -typedef int (*mca_io_ompio_gather_array_fn_t) (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); -typedef int (*mca_io_ompio_gatherv_array_fn_t) (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int *rcounts, - int *disps, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); - /* functions to retrieve the number of aggregators and the size of the temporary buffer on aggregators from the fcoll modules */ typedef void (*mca_io_ompio_get_num_aggregators_fn_t) ( int *num_aggregators); @@ -373,11 +327,6 @@ struct mca_io_ompio_file_t { mca_io_ompio_sort_fn_t f_sort; mca_io_ompio_sort_iovec_fn_t f_sort_iovec; - mca_io_ompio_allgather_array_fn_t f_allgather_array; - mca_io_ompio_allgatherv_array_fn_t f_allgatherv_array; - mca_io_ompio_gather_array_fn_t f_gather_array; - mca_io_ompio_gatherv_array_fn_t f_gatherv_array; - mca_io_ompio_get_num_aggregators_fn_t f_get_num_aggregators; mca_io_ompio_get_bytes_per_agg_fn_t f_get_bytes_per_agg; mca_io_ompio_set_aggregator_props_fn_t f_set_aggregator_props; @@ -569,72 +518,6 @@ OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh, int *broken_count); -/* - * Modified versions of Collective operations - * Based on an array of procs in group - */ -OMPI_DECLSPEC int ompi_io_ompio_gatherv_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int *rcounts, - int *disps, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); -OMPI_DECLSPEC int ompi_io_ompio_scatterv_array (void *sbuf, - int *scounts, - int *disps, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); -OMPI_DECLSPEC int ompi_io_ompio_allgather_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); - -OMPI_DECLSPEC int ompi_io_ompio_allgatherv_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int *rcounts, - int *disps, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); -OMPI_DECLSPEC int ompi_io_ompio_gather_array (void *sbuf, - int scount, - ompi_datatype_t *sdtype, - void *rbuf, - int rcount, - ompi_datatype_t *rdtype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); -OMPI_DECLSPEC int ompi_io_ompio_bcast_array (void *buff, - int count, - ompi_datatype_t *datatype, - int root_index, - int *procs_in_group, - int procs_per_group, - ompi_communicator_t *comm); - OMPI_DECLSPEC int ompi_io_ompio_register_print_entry (int queue_type, mca_io_ompio_print_entry x); diff --git a/ompi/mca/io/ompio/io_ompio_file_open.c b/ompi/mca/io/ompio/io_ompio_file_open.c index 234fd0bf1e0..35498307f1d 100644 --- a/ompi/mca/io/ompio/io_ompio_file_open.c +++ b/ompi/mca/io/ompio/io_ompio_file_open.c @@ -144,11 +144,6 @@ ompio_io_ompio_file_open (ompi_communicator_t *comm, ompio_fh->f_sort=ompi_io_ompio_sort; ompio_fh->f_sort_iovec=ompi_io_ompio_sort_iovec; - ompio_fh->f_allgather_array=ompi_io_ompio_allgather_array; - ompio_fh->f_allgatherv_array=ompi_io_ompio_allgatherv_array; - ompio_fh->f_gather_array=ompi_io_ompio_gather_array; - ompio_fh->f_gatherv_array=ompi_io_ompio_gatherv_array; - ompio_fh->f_get_num_aggregators=mca_io_ompio_get_num_aggregators; ompio_fh->f_get_bytes_per_agg=mca_io_ompio_get_bytes_per_agg; ompio_fh->f_set_aggregator_props=ompi_io_ompio_set_aggregator_props;