diff --git a/ompi/mca/coll/xhc/coll_xhc.c b/ompi/mca/coll/xhc/coll_xhc.c index 5d508e7b82f..0e4afb9400a 100644 --- a/ompi/mca/coll/xhc/coll_xhc.c +++ b/ompi/mca/coll/xhc/coll_xhc.c @@ -26,12 +26,12 @@ #include "coll_xhc.h" -static int xhc_make_comms(ompi_communicator_t *ompi_comm, +static int xhc_comms_make(ompi_communicator_t *ompi_comm, xhc_peer_info_t *peer_info, xhc_comm_t **comms_dst, int *comm_count_dst, xhc_loc_t *hierarchy, int hierarchy_len); -static void xhc_destroy_comms(xhc_comm_t *comms, int comm_count); +static void xhc_comms_destroy(xhc_comm_t *comms, int comm_count); -static void xhc_print_info(xhc_module_t *module, +static int xhc_print_info(xhc_module_t *module, ompi_communicator_t *comm, xhc_data_t *data); static void *xhc_shmem_create(opal_shmem_ds_t *seg_ds, size_t size, @@ -39,7 +39,9 @@ static void *xhc_shmem_create(opal_shmem_ds_t *seg_ds, size_t size, static void *xhc_shmem_attach(opal_shmem_ds_t *seg_ds); static mca_smsc_endpoint_t *xhc_smsc_ep(xhc_peer_info_t *peer_info); -int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) { +// ------------------------------------------------ + +int mca_coll_xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) { int comm_size = ompi_comm_size(comm); int rank = ompi_comm_rank(comm); @@ -118,7 +120,7 @@ int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) { /* An XHC communicator is created for each level of the hierarchy. * The hierachy must be in an order of most-specific to most-general. */ - ret = xhc_make_comms(comm, peer_info, &data->comms, &data->comm_count, + ret = xhc_comms_make(comm, peer_info, &data->comms, &data->comm_count, module->hierarchy, module->hierarchy_len); if(ret != OMPI_SUCCESS) { RETURN_WITH_ERROR(return_code, ret, end); @@ -144,7 +146,10 @@ int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) { // ---- if(mca_coll_xhc_component.print_info) { - xhc_print_info(module, comm, data); + ret = xhc_print_info(module, comm, data); + if(ret != OMPI_SUCCESS) { + RETURN_WITH_ERROR(return_code, ret, end); + } } // ---- @@ -162,18 +167,18 @@ int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) { opal_show_help("help-coll-xhc.txt", "xhc-init-failed", true, return_code, errno, strerror(errno)); - xhc_deinit(module); + xhc_fini(module); } return return_code; } -void xhc_deinit(mca_coll_xhc_module_t *module) { +void mca_coll_xhc_fini(mca_coll_xhc_module_t *module) { if(module->data) { xhc_data_t *data = module->data; if(data->comm_count >= 0) { - xhc_destroy_comms(data->comms, data->comm_count); + xhc_comms_destroy(data->comms, data->comm_count); } free(data->comms); @@ -198,7 +203,20 @@ void xhc_deinit(mca_coll_xhc_module_t *module) { } } -static int xhc_make_comms(ompi_communicator_t *ompi_comm, +// ------------------------------------------------ + +/* This method is where the hierarchy of XHC is constructed; it receives + * the hierarchy specifications (hierarchy param) and groups ranks together + * among them. The process begins with the first locality in the list. All + * ranks that share this locality (determined via the relative peer to peer + * distances) become siblings. The one amongst them with the lowest rank + * number becomes the manager/leader of the group. The members don't really + * need to keep track of the actual ranks of their siblings -- only the rank + * of the group's leader/manager, the size of the group, and their own member + * ID. The process continues with the next locality, only that now only the + * ranks that became leaders in the previous level are eligible (determined + * via comm_candidate, see inline comments). */ +static int xhc_comms_make(ompi_communicator_t *ompi_comm, xhc_peer_info_t *peer_info, xhc_comm_t **comms_dst, int *comm_count_dst, xhc_loc_t *hierarchy, int hierarchy_len) { @@ -257,6 +275,10 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm, // ---- + /* Only ranks that were leaders in the previous level are candidates + * for this one. Every rank advertises whether others may consider + * it for inclusion via an Allgather. */ + bool is_candidate = (comm_count == 0 || comms[comm_count - 1].manager_rank == ompi_rank); @@ -305,14 +327,16 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm, assert(xc->size > 0); + /* If there are no local peers in regards to this locality, no + * XHC comm is created for this process on this level. */ if(xc->size == 1) { opal_output_verbose(MCA_BASE_VERBOSE_WARN, ompi_coll_base_framework.framework_output, "coll:xhc: Warning: Locality 0x%04x does not result " "in any new groupings; skipping it", xc->locality); - /* Must participate in this allgather, even if useless - * to this rank, since it's necessary for the rest */ + /* All ranks must participate in the "control struct sharing" + * allgather, even if useless to this rank to some of them */ ret = ompi_comm->c_coll->coll_allgather(&xc->ctrl_ds, sizeof(opal_shmem_ds_t), MPI_BYTE, comm_ctrl_ds, @@ -322,7 +346,7 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm, RETURN_WITH_ERROR(return_code, ret, comm_error); } - xhc_destroy_comms(xc, 1); + xhc_comms_destroy(xc, 1); continue; } @@ -372,6 +396,12 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm, + sizeof(xhc_comm_ctrl_t) + smsc_reg_size); } + /* The comm's managers share the details of the communication structs + * with their children, so that they may attach to them. Because + * there's not any MPI communicator formed that includes (only) the + * members of the XHC comm, the sharing is achieved with a single + * Allgather, instead of a Broadcast inside each XHC comm. */ + ret = ompi_comm->c_coll->coll_allgather(&xc->ctrl_ds, sizeof(opal_shmem_ds_t), MPI_BYTE, comm_ctrl_ds, sizeof(opal_shmem_ds_t), MPI_BYTE, ompi_comm, @@ -404,7 +434,7 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm, continue; comm_error: { - xhc_destroy_comms(comms, comm_count+1); + xhc_comms_destroy(comms, comm_count+1); comm_count = -1; goto end; @@ -428,7 +458,7 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm, return return_code; } -static void xhc_destroy_comms(xhc_comm_t *comms, int comm_count) { +static void xhc_comms_destroy(xhc_comm_t *comms, int comm_count) { bool is_manager = true; for(int i = 0; i < comm_count; i++) { @@ -458,16 +488,17 @@ static void xhc_destroy_comms(xhc_comm_t *comms, int comm_count) { } } -static void xhc_print_info(xhc_module_t *module, +static int xhc_print_info(xhc_module_t *module, ompi_communicator_t *comm, xhc_data_t *data) { int rank = ompi_comm_rank(comm); - - char *drval_str = NULL; - char *lb_rla_str = NULL; - char *un_min_str = NULL; + int ret; if(rank == 0) { + char *drval_str; + char *lb_rla_str; + char *un_min_str; + switch(mca_coll_xhc_component.dynamic_reduce) { case OMPI_XHC_DYNAMIC_REDUCE_DISABLED: drval_str = "OFF"; break; @@ -492,8 +523,11 @@ static void xhc_print_info(xhc_module_t *module, lb_rla_str = "???"; } - opal_asprintf(&un_min_str, " (min '%zu' bytes)", + ret = opal_asprintf(&un_min_str, " (min '%zu' bytes)", mca_coll_xhc_component.uniform_chunks_min); + if(ret < 0) { + return OMPI_ERR_OUT_OF_RESOURCE; + } printf("------------------------------------------------\n" "OMPI coll/xhc @ %s, priority %d\n" @@ -508,39 +542,52 @@ static void xhc_print_info(xhc_module_t *module, (mca_coll_xhc_component.uniform_chunks ? "ON" : "OFF"), (mca_coll_xhc_component.uniform_chunks ? un_min_str : ""), mca_coll_xhc_component.cico_max); + + free(un_min_str); } - // TODO convert to opal_asprintf? for(int i = 0; i < data->comm_count; i++) { - char buf[BUFSIZ] = {0}; - size_t buf_idx = 0; + char *mlist = NULL; + char *tmp; - buf_idx += snprintf(buf+buf_idx, sizeof(buf) - buf_idx, - "%d", data->comms[i].manager_rank); + ret = opal_asprintf(&mlist, "%d", data->comms[i].manager_rank); + if(ret < 0) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + for(int m = 1; m < data->comms[i].size; m++) { + char c; - for(int j = 1; j < data->comms[i].size; j++) { - if(j == data->comms[i].member_id) { + if(m == data->comms[i].member_id) { if(i == 0 || data->comms[i-1].manager_rank == rank) { - buf_idx += snprintf(buf+buf_idx, - sizeof(buf) - buf_idx, " %d", rank); + ret = opal_asprintf(&tmp, " %d", rank); } else { - buf_idx += snprintf(buf+buf_idx, - sizeof(buf) - buf_idx, " _"); + ret = opal_asprintf(&tmp, " _"); } } else { - buf_idx += snprintf(buf+buf_idx, - sizeof(buf) - buf_idx, " x"); + ret = opal_asprintf(&tmp, " x"); + } + + free(mlist); + mlist = tmp; + + if(ret < 0) { + return OMPI_ERR_OUT_OF_RESOURCE; } } printf("XHC comm loc=0x%08x chunk_size=%zu with %d members [%s]\n", data->comms[i].locality, data->comms[i].chunk_size, - data->comms[i].size, buf); + data->comms[i].size, mlist); + + free(mlist); } - free(un_min_str); + return OMPI_SUCCESS; } +// ------------------------------------------------ + static void *xhc_shmem_create(opal_shmem_ds_t *seg_ds, size_t size, ompi_communicator_t *ompi_comm, const char *name_chr_s, int name_chr_i) { @@ -594,18 +641,6 @@ static void *xhc_shmem_attach(opal_shmem_ds_t *seg_ds) { return addr; } -void *xhc_get_cico(xhc_peer_info_t *peer_info, int rank) { - if(OMPI_XHC_CICO_MAX == 0) { - return NULL; - } - - if(peer_info[rank].cico_buffer == NULL) { - peer_info[rank].cico_buffer = xhc_shmem_attach(&peer_info[rank].cico_ds); - } - - return peer_info[rank].cico_buffer; -} - static mca_smsc_endpoint_t *xhc_smsc_ep(xhc_peer_info_t *peer_info) { if(!peer_info->smsc_ep) { peer_info->smsc_ep = MCA_SMSC_CALL(get_endpoint, &peer_info->proc->super); @@ -622,7 +657,21 @@ static mca_smsc_endpoint_t *xhc_smsc_ep(xhc_peer_info_t *peer_info) { return peer_info->smsc_ep; } -int xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data) { +// ------------------------------------------------ + +void *mca_coll_xhc_get_cico(xhc_peer_info_t *peer_info, int rank) { + if(OMPI_XHC_CICO_MAX == 0) { + return NULL; + } + + if(peer_info[rank].cico_buffer == NULL) { + peer_info[rank].cico_buffer = xhc_shmem_attach(&peer_info[rank].cico_ds); + } + + return peer_info[rank].cico_buffer; +} + +int mca_coll_xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data) { if(mca_smsc_base_has_feature(MCA_SMSC_FEATURE_REQUIRE_REGISTATION)) { void *data = MCA_SMSC_CALL(register_region, base, len); @@ -640,11 +689,11 @@ int xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data return 0; } -void xhc_copy_region_post(void *dst, xhc_copy_data_t *region_data) { +void mca_coll_xhc_copy_region_post(void *dst, xhc_copy_data_t *region_data) { memcpy(dst, region_data, mca_smsc_base_registration_data_size()); } -int xhc_copy_from(xhc_peer_info_t *peer_info, +int mca_coll_xhc_copy_from(xhc_peer_info_t *peer_info, void *dst, void *src, size_t size, void *access_token) { mca_smsc_endpoint_t *smsc_ep = xhc_smsc_ep(peer_info); @@ -659,12 +708,12 @@ int xhc_copy_from(xhc_peer_info_t *peer_info, return (status == OPAL_SUCCESS ? 0 : -1); } -void xhc_copy_close_region(xhc_copy_data_t *region_data) { +void mca_coll_xhc_copy_close_region(xhc_copy_data_t *region_data) { if(mca_smsc_base_has_feature(MCA_SMSC_FEATURE_REQUIRE_REGISTATION)) MCA_SMSC_CALL(deregister_region, region_data); } -void *xhc_get_registration(xhc_peer_info_t *peer_info, +void *mca_coll_xhc_get_registration(xhc_peer_info_t *peer_info, void *peer_vaddr, size_t size, xhc_reg_t **reg) { mca_smsc_endpoint_t *smsc_ep = xhc_smsc_ep(peer_info); @@ -695,6 +744,6 @@ void *xhc_get_registration(xhc_peer_info_t *peer_info, /* Won't actually unmap/detach, since we've set * the "persist" flag while creating the mapping */ -void xhc_return_registration(xhc_reg_t *reg) { +void mca_coll_xhc_return_registration(xhc_reg_t *reg) { MCA_SMSC_CALL(unmap_peer_region, reg); } diff --git a/ompi/mca/coll/xhc/coll_xhc.h b/ompi/mca/coll/xhc/coll_xhc.h index 3d05e7cdade..efb81205d60 100644 --- a/ompi/mca/coll/xhc/coll_xhc.h +++ b/ompi/mca/coll/xhc/coll_xhc.h @@ -370,17 +370,23 @@ struct xhc_loc_def_t { // coll_xhc_component.c // -------------------- +#define xhc_component_parse_hierarchy(...) mca_coll_xhc_component_parse_hierarchy(__VA_ARGS__) +#define xhc_component_parse_chunk_sizes(...) mca_coll_xhc_component_parse_chunk_sizes(__VA_ARGS__) + int mca_coll_xhc_component_init_query(bool enable_progress_threads, bool enable_mpi_threads); -int xhc_component_parse_hierarchy(const char *val_str, +int mca_coll_xhc_component_parse_hierarchy(const char *val_str, opal_list_t **level_defs_dst, int *nlevel_defs_dst); -int xhc_component_parse_chunk_sizes(const char *val_str, +int mca_coll_xhc_component_parse_chunk_sizes(const char *val_str, size_t **vals_dst, int *len_dst); // coll_xhc_module.c // ----------------- +#define xhc_module_set_coll_fns(...) mca_coll_xhc_module_set_coll_fns(__VA_ARGS__) +#define xhc_module_prepare_hierarchy(...) mca_coll_xhc_module_prepare_hierarchy(__VA_ARGS__) + mca_coll_base_module_t *mca_coll_xhc_module_comm_query( ompi_communicator_t *comm, int *priority); @@ -389,29 +395,42 @@ int mca_coll_xhc_module_enable(mca_coll_base_module_t *module, int mca_coll_xhc_module_disable(mca_coll_base_module_t *module, ompi_communicator_t *comm); -xhc_coll_fns_t xhc_module_set_coll_fns(ompi_communicator_t *comm, +xhc_coll_fns_t mca_coll_xhc_module_set_coll_fns(ompi_communicator_t *comm, xhc_coll_fns_t new); -int xhc_module_prepare_hierarchy(mca_coll_xhc_module_t *module, +int mca_coll_xhc_module_prepare_hierarchy(mca_coll_xhc_module_t *module, ompi_communicator_t *comm); // coll_xhc.c // ---------- -int xhc_lazy_init(mca_coll_xhc_module_t *module, ompi_communicator_t *comm); -void xhc_deinit(mca_coll_xhc_module_t *module); +#define xhc_lazy_init(...) mca_coll_xhc_lazy_init(__VA_ARGS__) +#define xhc_fini(...) mca_coll_xhc_fini(__VA_ARGS__) + +#define xhc_get_cico(...) mca_coll_xhc_get_cico(__VA_ARGS__) + +#define xhc_copy_expose_region(...) mca_coll_xhc_copy_expose_region(__VA_ARGS__) +#define xhc_copy_region_post(...) mca_coll_xhc_copy_region_post(__VA_ARGS__) +#define xhc_copy_from(...) mca_coll_xhc_copy_from(__VA_ARGS__) +#define xhc_copy_close_region(...) mca_coll_xhc_copy_close_region(__VA_ARGS__) -void *xhc_get_cico(xhc_peer_info_t *peer_info, int rank); +#define xhc_get_registration(...) mca_coll_xhc_get_registration(__VA_ARGS__) +#define xhc_return_registration(...) mca_coll_xhc_return_registration(__VA_ARGS__) -int xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data); -void xhc_copy_region_post(void *dst, xhc_copy_data_t *region_data); -int xhc_copy_from(xhc_peer_info_t *peer_info, void *dst, +int mca_coll_xhc_lazy_init(mca_coll_xhc_module_t *module, ompi_communicator_t *comm); +void mca_coll_xhc_fini(mca_coll_xhc_module_t *module); + +void *mca_coll_xhc_get_cico(xhc_peer_info_t *peer_info, int rank); + +int mca_coll_xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data); +void mca_coll_xhc_copy_region_post(void *dst, xhc_copy_data_t *region_data); +int mca_coll_xhc_copy_from(xhc_peer_info_t *peer_info, void *dst, void *src, size_t size, void *access_token); -void xhc_copy_close_region(xhc_copy_data_t *region_data); +void mca_coll_xhc_copy_close_region(xhc_copy_data_t *region_data); -void *xhc_get_registration(xhc_peer_info_t *peer_info, +void *mca_coll_xhc_get_registration(xhc_peer_info_t *peer_info, void *peer_vaddr, size_t size, xhc_reg_t **reg); -void xhc_return_registration(xhc_reg_t *reg); +void mca_coll_xhc_return_registration(xhc_reg_t *reg); // Primitives (respective file) // ---------------------------- @@ -433,6 +452,8 @@ int mca_coll_xhc_allreduce(const void *sbuf, void *rbuf, // Miscellaneous // ------------- +#define xhc_allreduce_internal(...) mca_coll_xhc_allreduce_internal(__VA_ARGS__) + int mca_coll_xhc_allreduce_internal(const void *sbuf, void *rbuf, int count, ompi_datatype_t *datatype, ompi_op_t *op, ompi_communicator_t *ompi_comm, mca_coll_base_module_t *module, bool require_bcast); diff --git a/ompi/mca/coll/xhc/coll_xhc_allreduce.c b/ompi/mca/coll/xhc/coll_xhc_allreduce.c index fe741586d65..0d2e8dc7cef 100644 --- a/ompi/mca/coll/xhc/coll_xhc_allreduce.c +++ b/ompi/mca/coll/xhc/coll_xhc_allreduce.c @@ -1097,6 +1097,6 @@ int mca_coll_xhc_allreduce(const void *sbuf, void *rbuf, int count, ompi_datatype_t *datatype, ompi_op_t *op, ompi_communicator_t *ompi_comm, mca_coll_base_module_t *ompi_module) { - return mca_coll_xhc_allreduce_internal(sbuf, rbuf, + return xhc_allreduce_internal(sbuf, rbuf, count, datatype, op, ompi_comm, ompi_module, true); } diff --git a/ompi/mca/coll/xhc/coll_xhc_component.c b/ompi/mca/coll/xhc/coll_xhc_component.c index bc5a83d0528..6b3895c2f85 100644 --- a/ompi/mca/coll/xhc/coll_xhc_component.c +++ b/ompi/mca/coll/xhc/coll_xhc_component.c @@ -517,7 +517,7 @@ static void destruct_xhc_loc_def_combination(void *data) { OPAL_LIST_DESTRUCT((opal_list_t *) data); } -int xhc_component_parse_hierarchy(const char *val_str, +int mca_coll_xhc_component_parse_hierarchy(const char *val_str, opal_list_t **level_defs_dst, int *nlevel_defs_dst) { /* The hierarchy is in a comma-separated list format. Each item in the @@ -645,7 +645,7 @@ static int conv_chunk_size(char *str, void *result) { return (legal ? OMPI_SUCCESS : OMPI_ERR_BAD_PARAM); } -int xhc_component_parse_chunk_sizes(const char *val_str, +int mca_coll_xhc_component_parse_chunk_sizes(const char *val_str, size_t **chunks_dst, int *len_dst) { if(val_str == NULL) { diff --git a/ompi/mca/coll/xhc/coll_xhc_module.c b/ompi/mca/coll/xhc/coll_xhc_module.c index 39f6d4e73cc..8f05c07d5a9 100644 --- a/ompi/mca/coll/xhc/coll_xhc_module.c +++ b/ompi/mca/coll/xhc/coll_xhc_module.c @@ -57,7 +57,7 @@ static void mca_coll_xhc_module_construct(mca_coll_xhc_module_t *module) { } static void mca_coll_xhc_module_destruct(mca_coll_xhc_module_t *module) { - xhc_deinit(module); + xhc_fini(module); free(module->hierarchy_string); free(module->hierarchy); diff --git a/ompi/mca/coll/xhc/coll_xhc_reduce.c b/ompi/mca/coll/xhc/coll_xhc_reduce.c index 19b3f9b0f6c..f05c1e0cfde 100644 --- a/ompi/mca/coll/xhc/coll_xhc_reduce.c +++ b/ompi/mca/coll/xhc/coll_xhc_reduce.c @@ -54,7 +54,7 @@ int mca_coll_xhc_reduce(const void *sbuf, void *rbuf, * but an rbuf is available for all ranks. */ if(root == -1 || (mca_coll_xhc_component.force_reduce && root == 0)) { - return mca_coll_xhc_allreduce_internal(sbuf, rbuf, count, + return xhc_allreduce_internal(sbuf, rbuf, count, datatype, op, ompi_comm, ompi_module, false); } else { xhc_coll_fns_t fallback = module->prev_colls;