diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index f60dbabf4fc..c09c122a382 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -10,7 +10,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2006-2010 University of Houston. All rights reserved. + * Copyright (c) 2006-2017 University of Houston. All rights reserved. * Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2012-2015 Los Alamos National Security, LLC. @@ -35,6 +35,7 @@ #include "opal/util/bit_ops.h" #include "opal/util/info_subscriber.h" +#include "opal/mca/pmix/pmix.h" #include "ompi/constants.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/coll/base/base.h" @@ -150,6 +151,23 @@ int ompi_comm_init(void) because MPI_COMM_WORLD has some predefined attributes. */ ompi_attr_hash_init(&ompi_mpi_comm_world.comm.c_keyhash); + /* Check for the binding policy used. We are only interested in + whether mapby-node has been set right now (could be extended later) + and only on MPI_COMM_WORLD, since for all other sub-communicators + it is virtually impossible to identify their layout across nodes + in the most generic sense. This is used by OMPIO for deciding which + ranks to use for aggregators + */ + opal_process_name_t wildcard = {ORTE_PROC_MY_NAME->jobid, OPAL_VPID_WILDCARD}; + char *str=NULL; + int rc; + + OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, OPAL_PMIX_MAPBY, &wildcard, &str, OPAL_STRING); + if ( 0 == rc ) { + if ( strstr ( str, "BYNODE") ) { + OMPI_COMM_SET_MAPBY_NODE(&ompi_mpi_comm_world.comm); + } + } /* Setup MPI_COMM_SELF */ OBJ_CONSTRUCT(&ompi_mpi_comm_self, ompi_communicator_t); assert(ompi_mpi_comm_self.comm.c_f_to_c_index == 1); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 101a18eb6a4..4fe4721244c 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -11,7 +11,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2017 Cisco Systems, Inc. All rights reserved - * Copyright (c) 2006-2010 University of Houston. All rights reserved. + * Copyright (c) 2006-2017 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2011-2013 Inria. All rights reserved. * Copyright (c) 2011-2013 Universite Bordeaux 1 @@ -60,6 +60,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t); #define OMPI_COMM_DIST_GRAPH 0x00000400 #define OMPI_COMM_PML_ADDED 0x00001000 #define OMPI_COMM_EXTRA_RETAIN 0x00004000 +#define OMPI_COMM_MAPBY_NODE 0x00008000 /* some utility #defines */ #define OMPI_COMM_IS_INTER(comm) ((comm)->c_flags & OMPI_COMM_INTER) @@ -76,12 +77,14 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t); #define OMPI_COMM_IS_TOPO(comm) (OMPI_COMM_IS_CART((comm)) || \ OMPI_COMM_IS_GRAPH((comm)) || \ OMPI_COMM_IS_DIST_GRAPH((comm))) +#define OMPI_COMM_IS_MAPBY_NODE(comm) ((comm)->c_flags & OMPI_COMM_MAPBY_NODE) #define OMPI_COMM_SET_DYNAMIC(comm) ((comm)->c_flags |= OMPI_COMM_DYNAMIC) #define OMPI_COMM_SET_INVALID(comm) ((comm)->c_flags |= OMPI_COMM_INVALID) #define OMPI_COMM_SET_PML_ADDED(comm) ((comm)->c_flags |= OMPI_COMM_PML_ADDED) #define OMPI_COMM_SET_EXTRA_RETAIN(comm) ((comm)->c_flags |= OMPI_COMM_EXTRA_RETAIN) +#define OMPI_COMM_SET_MAPBY_NODE(comm) ((comm)->c_flags |= OMPI_COMM_MAPBY_NODE) /* a set of special tags: */ diff --git a/ompi/mca/common/ompio/common_ompio_file_view.c b/ompi/mca/common/ompio/common_ompio_file_view.c index 62242ef0362..f4718375b90 100644 --- a/ompi/mca/common/ompio/common_ompio_file_view.c +++ b/ompi/mca/common/ompio/common_ompio_file_view.c @@ -96,7 +96,6 @@ int mca_common_ompio_set_view (mca_io_ompio_file_t *fh, if ( fh->f_flags & OMPIO_UNIFORM_FVIEW ) { fh->f_flags &= ~OMPIO_UNIFORM_FVIEW; } - fh->f_flags |= OMPIO_FILE_VIEW_IS_SET; fh->f_datarep = strdup (datarep); datatype_duplicate (filetype, &fh->f_orig_filetype ); @@ -113,6 +112,7 @@ int mca_common_ompio_set_view (mca_io_ompio_file_t *fh, } else { newfiletype = filetype; + fh->f_flags |= OMPIO_FILE_VIEW_IS_SET; } fh->f_iov_count = 0; diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c index f5de324664e..70353179b37 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c @@ -27,6 +27,7 @@ #include "fcoll_two_phase.h" #include "mpi.h" #include "ompi/constants.h" +#include "ompi/communicator/communicator.h" #include "ompi/mca/fcoll/fcoll.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" @@ -199,7 +200,7 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh, } if (two_phase_num_io_procs > fh->f_size){ - two_phase_num_io_procs = fh->f_size; + two_phase_num_io_procs = fh->f_size; } aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int)); @@ -208,9 +209,16 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh, goto exit; } - for (i=0; i< two_phase_num_io_procs; i++){ - aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs; + if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) { + for (i =0; i< two_phase_num_io_procs; i++){ + aggregator_list[i] = i; + } } + else { + for (i =0; i< two_phase_num_io_procs; i++){ + aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs; + } + } ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t *)fh, max_data, diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c index 649d4ac99a3..a7adef51647 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c @@ -27,6 +27,7 @@ #include "mpi.h" #include "ompi/constants.h" +#include "ompi/communicator/communicator.h" #include "ompi/mca/fcoll/fcoll.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" @@ -235,9 +236,10 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, } if (two_phase_num_io_procs > fh->f_size){ - two_phase_num_io_procs = fh->f_size; + two_phase_num_io_procs = fh->f_size; } + #if DEBUG_ON printf("Number of aggregators : %ld\n", two_phase_num_io_procs); #endif @@ -248,10 +250,16 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, goto exit; } - for (i =0; i< two_phase_num_io_procs; i++){ - aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs; + if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) { + for (i =0; i< two_phase_num_io_procs; i++){ + aggregator_list[i] = i; + } } - + else { + for (i =0; i< two_phase_num_io_procs; i++){ + aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs; + } + } ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t*)fh, max_data, diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 417863a0bf6..0268a022440 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -49,6 +49,9 @@ extern int mca_io_ompio_num_aggregators; extern int mca_io_ompio_record_offset_info; extern int mca_io_ompio_sharedfp_lazy_open; extern int mca_io_ompio_grouping_option; +extern int mca_io_ompio_max_aggregators_ratio; +extern int mca_io_ompio_aggregators_cutoff_threshold; + OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info; /* diff --git a/ompi/mca/io/ompio/io_ompio_aggregators.c b/ompi/mca/io/ompio/io_ompio_aggregators.c index 9bf568235c1..17f9d17ac57 100644 --- a/ompi/mca/io/ompio/io_ompio_aggregators.c +++ b/ompi/mca/io/ompio/io_ompio_aggregators.c @@ -47,41 +47,148 @@ ** ** The first group functions determines the number of aggregators based on various characteristics ** -** 1. simple_grouping:aA simple heuristic based on the amount of data written and size of -** of the temporary buffer used by aggregator processes +** 1. simple_grouping: A heuristic based on a cost model ** 2. fview_based_grouping: analysis the fileview to detect regular patterns ** 3. cart_based_grouping: uses a cartesian communicator to derive certain (probable) properties ** of the access pattern */ + +static double cost_calc (int P, int P_agg, size_t Data_proc, size_t coll_buffer, int dim ); +#define DIM1 1 +#define DIM2 2 + int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh, - int *num_groups, + int *num_groups_out, mca_io_ompio_contg *contg_groups) { - size_t stripe_size = (size_t) fh->f_stripe_size; int group_size = 0; int k=0, p=0, g=0; int total_procs = 0; + int num_groups=1; - if ( 0 >= fh->f_stripe_size ) { - stripe_size = OMPIO_DEFAULT_STRIPE_SIZE; - } + double time=0.0, time_prev=0.0, dtime=0.0, dtime_abs=0.0, dtime_diff=0.0, dtime_prev=0.0; + double dtime_threshold=0.0; + + /* This is the threshold for absolute improvement. It is not + ** exposed as an MCA parameter to avoid overwhelming users. It is + ** mostly relevant for smaller process counts and data volumes. + */ + double time_threshold=0.001; + + int incr=1, mode=1; + int P_a, P_a_prev; + + /* The aggregator selection algorithm is based on the formulas described + ** in: Shweta Jha, Edgar Gabriel, 'Performance Models for Communication in + ** Collective I/O operations', Proceedings of the 17th IEEE/ACM Symposium + ** on Cluster, Cloud and Grid Computing, Workshop on Theoretical + ** Approaches to Performance Evaluation, Modeling and Simulation, 2017. + ** + ** The current implementation is based on the 1-D and 2-D models derived for the even + ** file partitioning strategy in the paper. Note, that the formulas currently only model + ** the communication aspect of collective I/O operations. There are two extensions in this + ** implementation: + ** + ** 1. Since the resulting formula has an asymptotic behavior w.r.t. the + ** no. of aggregators, this version determines the no. of aggregators to + ** be used iteratively and stops increasing the no. of aggregators if the + ** benefits of increasing the aggregators is below a certain threshold + ** value relative to the last number tested. The aggresivnes of cutting of + ** the increasie in the number of aggregators is controlled by the new mca + ** parameter mca_io_ompio_aggregator_cutoff_threshold. Lower values for + ** this parameter will lead to higher number of aggregators (useful e.g + ** for PVFS2 and GPFS file systems), while higher number will lead to + ** lower no. of aggregators (useful for regular UNIX or NFS file systems). + ** + ** 2. The algorithm further caps the maximum no. of aggregators used to not exceed + ** (no. of processes / mca_io_ompio_max_aggregators_ratio), i.e. a higher value + ** for mca_io_ompio_max_aggregators will decrease the maximum number of aggregators + ** allowed for the given no. of processes. + */ + dtime_threshold = (double) mca_io_ompio_aggregators_cutoff_threshold / 100.0; + + /* Determine whether to use the formula for 1-D or 2-D data decomposition. Anything + ** that is not 1-D is assumed to be 2-D in this version + */ + mode = ( fh->f_cc_size == fh->f_view_size ) ? 1 : 2; - if ( 0 != fh->f_cc_size && stripe_size > fh->f_cc_size ) { - group_size = (((int)stripe_size/(int)fh->f_cc_size) > fh->f_size ) ? fh->f_size : ((int)stripe_size/(int)fh->f_cc_size); - *num_groups = fh->f_size / group_size; + /* Determine the increment size when searching the optimal + ** no. of aggregators + */ + if ( fh->f_size < 16 ) { + incr = 2; + } + else if (fh->f_size < 128 ) { + incr = 4; + } + else if ( fh->f_size < 4096 ) { + incr = 16; } - else if ( fh->f_cc_size <= OMPIO_CONTG_FACTOR * stripe_size) { - *num_groups = fh->f_size/OMPIO_CONTG_FACTOR > 0 ? (fh->f_size/OMPIO_CONTG_FACTOR) : 1 ; - group_size = OMPIO_CONTG_FACTOR; - } else { - *num_groups = fh->f_size; - group_size = 1; + incr = 32; + } + + P_a = 1; + time_prev = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode ); + P_a_prev = P_a; + for ( P_a = incr; P_a <= fh->f_size; P_a += incr ) { + time = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode ); + dtime_abs = (time_prev - time); + dtime = dtime_abs / time_prev; + dtime_diff = ( P_a == incr ) ? dtime : (dtime_prev - dtime); +#ifdef OMPIO_DEBUG + if ( 0 == fh->f_rank ){ + printf(" d_p = %ld P_a = %d time = %lf dtime = %lf dtime_abs =%lf dtime_diff=%lf\n", + fh->f_view_size, P_a, time, dtime, dtime_abs, dtime_diff ); + } +#endif + if ( dtime_diff < dtime_threshold ) { + /* The relative improvement compared to the last number + ** of aggregators was below a certain threshold. This is typically + ** the dominating factor for large data volumes and larger process + ** counts + */ +#ifdef OMPIO_DEBUG + if ( 0 == fh->f_rank ) { + printf("dtime_diff below threshold\n"); + } +#endif + break; + } + if ( dtime_abs < time_threshold ) { + /* The absolute improvement compared to the last number + ** of aggregators was below a given threshold. This is typically + ** important for small data valomes and smallers process counts + */ +#ifdef OMPIO_DEBUG + if ( 0 == fh->f_rank ) { + printf("dtime_abs below threshold\n"); + } +#endif + break; + } + time_prev = time; + dtime_prev = dtime; + P_a_prev = P_a; + } + num_groups = P_a_prev; +#ifdef OMPIO_DEBUG + printf(" For P=%d d_p=%ld b_c=%d threshold=%f chosen P_a = %d \n", + fh->f_size, fh->f_view_size, fh->f_bytes_per_agg, dtime_threshold, P_a_prev); +#endif + + /* Cap the maximum number of aggregators.*/ + if ( num_groups > (fh->f_size/mca_io_ompio_max_aggregators_ratio)) { + num_groups = (fh->f_size/mca_io_ompio_max_aggregators_ratio); } + if ( 1 >= num_groups ) { + num_groups = 1; + } + group_size = fh->f_size / num_groups; - for ( k=0, p=0; p<*num_groups; p++ ) { - if ( p == (*num_groups - 1) ) { + for ( k=0, p=0; pf_size - total_procs; } else { @@ -93,6 +200,8 @@ int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh, k++; } } + + *num_groups_out = num_groups; return OMPI_SUCCESS; } @@ -413,6 +522,9 @@ int mca_io_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh, /* Forced number of aggregators ** calculate the offset at which each group of processes will start */ + if ( num_aggregators > fh->f_size ) { + num_aggregators = fh->f_size; + } procs_per_group = ceil ((float)fh->f_size/num_aggregators); /* calculate the number of processes in the local group */ @@ -1295,4 +1407,77 @@ int mca_io_ompio_prepare_to_group(mca_io_ompio_file_t *fh, return ret; } - +/* +** This is the actual formula of the cost function from the paper. +** One change made here is to use floating point values for +** all parameters, since the ceil() function leads to sometimes +** unexpected jumps in the execution time. Using float leads to +** more consistent predictions for the no. of aggregators. +*/ +static double cost_calc (int P, int P_a, size_t d_p, size_t b_c, int dim ) +{ + float n_as=1.0, m_s=1.0, n_s=1.0; + float n_ar=1.0; + double t_send, t_recv, t_tot; + + /* LogGP parameters based on DDR InfiniBand values */ + double L=.00000184; + double o=.00000149; + double g=.0000119; + double G=.00000000067; + + long file_domain = (P * d_p) / P_a; + float n_r = (float)file_domain/(float) b_c; + + switch (dim) { + case DIM1: + { + if( d_p > b_c ){ + //printf("case 1\n"); + n_ar = 1; + n_as = 1; + m_s = b_c; + n_s = (float)d_p/(float)b_c; + } + else { + n_ar = (float)b_c/(float)d_p; + n_as = 1; + m_s = d_p; + n_s = 1; + } + break; + } + case DIM2: + { + int P_x, P_y, c; + + P_x = P_y = (int) sqrt(P); + c = (float) P_a / (float)P_x; + + n_ar = (float) P_y; + n_as = (float) c; + if ( d_p > (P_a*b_c/P )) { + m_s = fmin(b_c / P_y, d_p); + } + else { + m_s = fmin(d_p * P_x / P_a, d_p); + } + break; + } + default : + printf("stop putting random values\n"); + break; + } + + n_s = (float) d_p / (float)(n_as * m_s); + + if( m_s < 33554432) { + g = .00000108; + } + t_send = n_s * (L + 2 * o + (n_as -1) * g + (m_s - 1) * n_as * G); + t_recv= n_r * (L + 2 * o + (n_ar -1) * g + (m_s - 1) * n_ar * G);; + t_tot = t_send + t_recv; + + return t_tot; +} + diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index e0b89ab0088..5a93a5f3548 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -39,6 +39,8 @@ int mca_io_ompio_num_aggregators = -1; int mca_io_ompio_record_offset_info = 0; int mca_io_ompio_coll_timing_info = 0; int mca_io_ompio_sharedfp_lazy_open = 0; +int mca_io_ompio_max_aggregators_ratio=8; +int mca_io_ompio_aggregators_cutoff_threshold=3; int mca_io_ompio_grouping_option=5; @@ -216,6 +218,31 @@ static int register_component(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_grouping_option); + mca_io_ompio_max_aggregators_ratio = 8; + (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, + "max_aggregators_ratio", + "Maximum number of processes that can be an aggregator expressed as " + "the ratio to the number of process used to open the file" + " i.e 1 out of n processes can be an aggregator, with n being specified" + " by this mca parameter.", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_io_ompio_max_aggregators_ratio); + + + mca_io_ompio_aggregators_cutoff_threshold=3; + (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, + "aggregators_cutoff_threshold", + "Relativ cutoff threshold for incrementing the number of aggregators " + "in the simple aggregator selection algorithm (5). Lower value " + "for this parameter will lead to higher no. of aggregators.", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_io_ompio_aggregators_cutoff_threshold); + + return OMPI_SUCCESS; }