Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion ompi/communicator/comm_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2010 University of Houston. All rights reserved.
* Copyright (c) 2006-2017 University of Houston. All rights reserved.
* Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2012-2015 Los Alamos National Security, LLC.
Expand All @@ -35,6 +35,7 @@

#include "opal/util/bit_ops.h"
#include "opal/util/info_subscriber.h"
#include "opal/mca/pmix/pmix.h"
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/coll/base/base.h"
Expand Down Expand Up @@ -150,6 +151,23 @@ int ompi_comm_init(void)
because MPI_COMM_WORLD has some predefined attributes. */
ompi_attr_hash_init(&ompi_mpi_comm_world.comm.c_keyhash);

/* Check for the binding policy used. We are only interested in
whether mapby-node has been set right now (could be extended later)
and only on MPI_COMM_WORLD, since for all other sub-communicators
it is virtually impossible to identify their layout across nodes
in the most generic sense. This is used by OMPIO for deciding which
ranks to use for aggregators
*/
opal_process_name_t wildcard = {ORTE_PROC_MY_NAME->jobid, OPAL_VPID_WILDCARD};
char *str=NULL;
int rc;

OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, OPAL_PMIX_MAPBY, &wildcard, &str, OPAL_STRING);
if ( 0 == rc ) {
if ( strstr ( str, "BYNODE") ) {
OMPI_COMM_SET_MAPBY_NODE(&ompi_mpi_comm_world.comm);
}
}
/* Setup MPI_COMM_SELF */
OBJ_CONSTRUCT(&ompi_mpi_comm_self, ompi_communicator_t);
assert(ompi_mpi_comm_self.comm.c_f_to_c_index == 1);
Expand Down
5 changes: 4 additions & 1 deletion ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2006-2010 University of Houston. All rights reserved.
* Copyright (c) 2006-2017 University of Houston. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Inria. All rights reserved.
* Copyright (c) 2011-2013 Universite Bordeaux 1
Expand Down Expand Up @@ -60,6 +60,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
#define OMPI_COMM_DIST_GRAPH 0x00000400
#define OMPI_COMM_PML_ADDED 0x00001000
#define OMPI_COMM_EXTRA_RETAIN 0x00004000
#define OMPI_COMM_MAPBY_NODE 0x00008000

/* some utility #defines */
#define OMPI_COMM_IS_INTER(comm) ((comm)->c_flags & OMPI_COMM_INTER)
Expand All @@ -76,12 +77,14 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
#define OMPI_COMM_IS_TOPO(comm) (OMPI_COMM_IS_CART((comm)) || \
OMPI_COMM_IS_GRAPH((comm)) || \
OMPI_COMM_IS_DIST_GRAPH((comm)))
#define OMPI_COMM_IS_MAPBY_NODE(comm) ((comm)->c_flags & OMPI_COMM_MAPBY_NODE)

#define OMPI_COMM_SET_DYNAMIC(comm) ((comm)->c_flags |= OMPI_COMM_DYNAMIC)
#define OMPI_COMM_SET_INVALID(comm) ((comm)->c_flags |= OMPI_COMM_INVALID)

#define OMPI_COMM_SET_PML_ADDED(comm) ((comm)->c_flags |= OMPI_COMM_PML_ADDED)
#define OMPI_COMM_SET_EXTRA_RETAIN(comm) ((comm)->c_flags |= OMPI_COMM_EXTRA_RETAIN)
#define OMPI_COMM_SET_MAPBY_NODE(comm) ((comm)->c_flags |= OMPI_COMM_MAPBY_NODE)

/* a set of special tags: */

Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/common/ompio/common_ompio_file_view.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ int mca_common_ompio_set_view (mca_io_ompio_file_t *fh,
if ( fh->f_flags & OMPIO_UNIFORM_FVIEW ) {
fh->f_flags &= ~OMPIO_UNIFORM_FVIEW;
}
fh->f_flags |= OMPIO_FILE_VIEW_IS_SET;
fh->f_datarep = strdup (datarep);
datatype_duplicate (filetype, &fh->f_orig_filetype );

Expand All @@ -113,6 +112,7 @@ int mca_common_ompio_set_view (mca_io_ompio_file_t *fh,
}
else {
newfiletype = filetype;
fh->f_flags |= OMPIO_FILE_VIEW_IS_SET;
}

fh->f_iov_count = 0;
Expand Down
14 changes: 11 additions & 3 deletions ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "fcoll_two_phase.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
Expand Down Expand Up @@ -199,7 +200,7 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
}

if (two_phase_num_io_procs > fh->f_size){
two_phase_num_io_procs = fh->f_size;
two_phase_num_io_procs = fh->f_size;
}

aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int));
Expand All @@ -208,9 +209,16 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
goto exit;
}

for (i=0; i< two_phase_num_io_procs; i++){
aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
for (i =0; i< two_phase_num_io_procs; i++){
aggregator_list[i] = i;
}
}
else {
for (i =0; i< two_phase_num_io_procs; i++){
aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
}
}

ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t *)fh,
max_data,
Expand Down
16 changes: 12 additions & 4 deletions ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
Expand Down Expand Up @@ -235,9 +236,10 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
}

if (two_phase_num_io_procs > fh->f_size){
two_phase_num_io_procs = fh->f_size;
two_phase_num_io_procs = fh->f_size;
}


#if DEBUG_ON
printf("Number of aggregators : %ld\n", two_phase_num_io_procs);
#endif
Expand All @@ -248,10 +250,16 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}

for (i =0; i< two_phase_num_io_procs; i++){
aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
for (i =0; i< two_phase_num_io_procs; i++){
aggregator_list[i] = i;
}
}

else {
for (i =0; i< two_phase_num_io_procs; i++){
aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
}
}

ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t*)fh,
max_data,
Expand Down
3 changes: 3 additions & 0 deletions ompi/mca/io/ompio/io_ompio.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ extern int mca_io_ompio_num_aggregators;
extern int mca_io_ompio_record_offset_info;
extern int mca_io_ompio_sharedfp_lazy_open;
extern int mca_io_ompio_grouping_option;
extern int mca_io_ompio_max_aggregators_ratio;
extern int mca_io_ompio_aggregators_cutoff_threshold;

OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info;

/*
Expand Down
Loading