Skip to content

Commit

Permalink
TMP: Address first comments
Browse files Browse the repository at this point in the history
Signed-off-by: George Katevenis <gkatev@ics.forth.gr>
  • Loading branch information
gkatev committed Feb 22, 2023
1 parent e5e7d2d commit ce5ce73
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 72 deletions.
157 changes: 103 additions & 54 deletions ompi/mca/coll/xhc/coll_xhc.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,22 @@

#include "coll_xhc.h"

static int xhc_make_comms(ompi_communicator_t *ompi_comm,
static int xhc_comms_make(ompi_communicator_t *ompi_comm,
xhc_peer_info_t *peer_info, xhc_comm_t **comms_dst,
int *comm_count_dst, xhc_loc_t *hierarchy, int hierarchy_len);
static void xhc_destroy_comms(xhc_comm_t *comms, int comm_count);
static void xhc_comms_destroy(xhc_comm_t *comms, int comm_count);

static void xhc_print_info(xhc_module_t *module,
static int xhc_print_info(xhc_module_t *module,
ompi_communicator_t *comm, xhc_data_t *data);

static void *xhc_shmem_create(opal_shmem_ds_t *seg_ds, size_t size,
ompi_communicator_t *ompi_comm, const char *name_chr_s, int name_chr_i);
static void *xhc_shmem_attach(opal_shmem_ds_t *seg_ds);
static mca_smsc_endpoint_t *xhc_smsc_ep(xhc_peer_info_t *peer_info);

int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) {
// ------------------------------------------------

int mca_coll_xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) {

int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
Expand Down Expand Up @@ -118,7 +120,7 @@ int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) {
/* An XHC communicator is created for each level of the hierarchy.
* The hierachy must be in an order of most-specific to most-general. */

ret = xhc_make_comms(comm, peer_info, &data->comms, &data->comm_count,
ret = xhc_comms_make(comm, peer_info, &data->comms, &data->comm_count,
module->hierarchy, module->hierarchy_len);
if(ret != OMPI_SUCCESS) {
RETURN_WITH_ERROR(return_code, ret, end);
Expand All @@ -144,7 +146,10 @@ int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) {
// ----

if(mca_coll_xhc_component.print_info) {
xhc_print_info(module, comm, data);
ret = xhc_print_info(module, comm, data);
if(ret != OMPI_SUCCESS) {
RETURN_WITH_ERROR(return_code, ret, end);
}
}

// ----
Expand All @@ -162,18 +167,18 @@ int xhc_lazy_init(xhc_module_t *module, ompi_communicator_t *comm) {
opal_show_help("help-coll-xhc.txt", "xhc-init-failed", true,
return_code, errno, strerror(errno));

xhc_deinit(module);
xhc_fini(module);
}

return return_code;
}

void xhc_deinit(mca_coll_xhc_module_t *module) {
void mca_coll_xhc_fini(mca_coll_xhc_module_t *module) {
if(module->data) {
xhc_data_t *data = module->data;

if(data->comm_count >= 0) {
xhc_destroy_comms(data->comms, data->comm_count);
xhc_comms_destroy(data->comms, data->comm_count);
}

free(data->comms);
Expand All @@ -198,7 +203,20 @@ void xhc_deinit(mca_coll_xhc_module_t *module) {
}
}

static int xhc_make_comms(ompi_communicator_t *ompi_comm,
// ------------------------------------------------

/* This method is where the hierarchy of XHC is constructed; it receives
* the hierarchy specifications (hierarchy param) and groups ranks together
* among them. The process begins with the first locality in the list. All
* ranks that share this locality (determined via the relative peer to peer
* distances) become siblings. The one amongst them with the lowest rank
* number becomes the manager/leader of the group. The members don't really
* need to keep track of the actual ranks of their siblings -- only the rank
* of the group's leader/manager, the size of the group, and their own member
* ID. The process continues with the next locality, only that now only the
* ranks that became leaders in the previous level are eligible (determined
* via comm_candidate, see inline comments). */
static int xhc_comms_make(ompi_communicator_t *ompi_comm,
xhc_peer_info_t *peer_info, xhc_comm_t **comms_dst,
int *comm_count_dst, xhc_loc_t *hierarchy, int hierarchy_len) {

Expand Down Expand Up @@ -257,6 +275,10 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm,

// ----

/* Only ranks that were leaders in the previous level are candidates
* for this one. Every rank advertises whether others may consider
* it for inclusion via an Allgather. */

bool is_candidate = (comm_count == 0
|| comms[comm_count - 1].manager_rank == ompi_rank);

Expand Down Expand Up @@ -305,14 +327,16 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm,

assert(xc->size > 0);

/* If there are no local peers in regards to this locality, no
* XHC comm is created for this process on this level. */
if(xc->size == 1) {
opal_output_verbose(MCA_BASE_VERBOSE_WARN,
ompi_coll_base_framework.framework_output,
"coll:xhc: Warning: Locality 0x%04x does not result "
"in any new groupings; skipping it", xc->locality);

/* Must participate in this allgather, even if useless
* to this rank, since it's necessary for the rest */
/* All ranks must participate in the "control struct sharing"
* allgather, even if useless to this rank to some of them */

ret = ompi_comm->c_coll->coll_allgather(&xc->ctrl_ds,
sizeof(opal_shmem_ds_t), MPI_BYTE, comm_ctrl_ds,
Expand All @@ -322,7 +346,7 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm,
RETURN_WITH_ERROR(return_code, ret, comm_error);
}

xhc_destroy_comms(xc, 1);
xhc_comms_destroy(xc, 1);
continue;
}

Expand Down Expand Up @@ -372,6 +396,12 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm,
+ sizeof(xhc_comm_ctrl_t) + smsc_reg_size);
}

/* The comm's managers share the details of the communication structs
* with their children, so that they may attach to them. Because
* there's not any MPI communicator formed that includes (only) the
* members of the XHC comm, the sharing is achieved with a single
* Allgather, instead of a Broadcast inside each XHC comm. */

ret = ompi_comm->c_coll->coll_allgather(&xc->ctrl_ds,
sizeof(opal_shmem_ds_t), MPI_BYTE, comm_ctrl_ds,
sizeof(opal_shmem_ds_t), MPI_BYTE, ompi_comm,
Expand Down Expand Up @@ -404,7 +434,7 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm,
continue;

comm_error: {
xhc_destroy_comms(comms, comm_count+1);
xhc_comms_destroy(comms, comm_count+1);
comm_count = -1;

goto end;
Expand All @@ -428,7 +458,7 @@ static int xhc_make_comms(ompi_communicator_t *ompi_comm,
return return_code;
}

static void xhc_destroy_comms(xhc_comm_t *comms, int comm_count) {
static void xhc_comms_destroy(xhc_comm_t *comms, int comm_count) {
bool is_manager = true;

for(int i = 0; i < comm_count; i++) {
Expand Down Expand Up @@ -458,16 +488,17 @@ static void xhc_destroy_comms(xhc_comm_t *comms, int comm_count) {
}
}

static void xhc_print_info(xhc_module_t *module,
static int xhc_print_info(xhc_module_t *module,
ompi_communicator_t *comm, xhc_data_t *data) {

int rank = ompi_comm_rank(comm);

char *drval_str = NULL;
char *lb_rla_str = NULL;
char *un_min_str = NULL;
int ret;

if(rank == 0) {
char *drval_str;
char *lb_rla_str;
char *un_min_str;

switch(mca_coll_xhc_component.dynamic_reduce) {
case OMPI_XHC_DYNAMIC_REDUCE_DISABLED:
drval_str = "OFF"; break;
Expand All @@ -492,8 +523,11 @@ static void xhc_print_info(xhc_module_t *module,
lb_rla_str = "???";
}

opal_asprintf(&un_min_str, " (min '%zu' bytes)",
ret = opal_asprintf(&un_min_str, " (min '%zu' bytes)",
mca_coll_xhc_component.uniform_chunks_min);
if(ret < 0) {
return OMPI_ERR_OUT_OF_RESOURCE;
}

printf("------------------------------------------------\n"
"OMPI coll/xhc @ %s, priority %d\n"
Expand All @@ -508,39 +542,52 @@ static void xhc_print_info(xhc_module_t *module,
(mca_coll_xhc_component.uniform_chunks ? "ON" : "OFF"),
(mca_coll_xhc_component.uniform_chunks ? un_min_str : ""),
mca_coll_xhc_component.cico_max);

free(un_min_str);
}

// TODO convert to opal_asprintf?
for(int i = 0; i < data->comm_count; i++) {
char buf[BUFSIZ] = {0};
size_t buf_idx = 0;
char *mlist = NULL;
char *tmp;

buf_idx += snprintf(buf+buf_idx, sizeof(buf) - buf_idx,
"%d", data->comms[i].manager_rank);
ret = opal_asprintf(&mlist, "%d", data->comms[i].manager_rank);
if(ret < 0) {
return OMPI_ERR_OUT_OF_RESOURCE;
}

for(int m = 1; m < data->comms[i].size; m++) {
char c;

for(int j = 1; j < data->comms[i].size; j++) {
if(j == data->comms[i].member_id) {
if(m == data->comms[i].member_id) {
if(i == 0 || data->comms[i-1].manager_rank == rank) {
buf_idx += snprintf(buf+buf_idx,
sizeof(buf) - buf_idx, " %d", rank);
ret = opal_asprintf(&tmp, " %d", rank);
} else {
buf_idx += snprintf(buf+buf_idx,
sizeof(buf) - buf_idx, " _");
ret = opal_asprintf(&tmp, " _");
}
} else {
buf_idx += snprintf(buf+buf_idx,
sizeof(buf) - buf_idx, " x");
ret = opal_asprintf(&tmp, " x");
}

free(mlist);
mlist = tmp;

if(ret < 0) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
}

printf("XHC comm loc=0x%08x chunk_size=%zu with %d members [%s]\n",
data->comms[i].locality, data->comms[i].chunk_size,
data->comms[i].size, buf);
data->comms[i].size, mlist);

free(mlist);
}

free(un_min_str);
return OMPI_SUCCESS;
}

// ------------------------------------------------

static void *xhc_shmem_create(opal_shmem_ds_t *seg_ds, size_t size,
ompi_communicator_t *ompi_comm, const char *name_chr_s, int name_chr_i) {

Expand Down Expand Up @@ -594,18 +641,6 @@ static void *xhc_shmem_attach(opal_shmem_ds_t *seg_ds) {
return addr;
}

void *xhc_get_cico(xhc_peer_info_t *peer_info, int rank) {
if(OMPI_XHC_CICO_MAX == 0) {
return NULL;
}

if(peer_info[rank].cico_buffer == NULL) {
peer_info[rank].cico_buffer = xhc_shmem_attach(&peer_info[rank].cico_ds);
}

return peer_info[rank].cico_buffer;
}

static mca_smsc_endpoint_t *xhc_smsc_ep(xhc_peer_info_t *peer_info) {
if(!peer_info->smsc_ep) {
peer_info->smsc_ep = MCA_SMSC_CALL(get_endpoint, &peer_info->proc->super);
Expand All @@ -622,7 +657,21 @@ static mca_smsc_endpoint_t *xhc_smsc_ep(xhc_peer_info_t *peer_info) {
return peer_info->smsc_ep;
}

int xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data) {
// ------------------------------------------------

void *mca_coll_xhc_get_cico(xhc_peer_info_t *peer_info, int rank) {
if(OMPI_XHC_CICO_MAX == 0) {
return NULL;
}

if(peer_info[rank].cico_buffer == NULL) {
peer_info[rank].cico_buffer = xhc_shmem_attach(&peer_info[rank].cico_ds);
}

return peer_info[rank].cico_buffer;
}

int mca_coll_xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data) {
if(mca_smsc_base_has_feature(MCA_SMSC_FEATURE_REQUIRE_REGISTATION)) {
void *data = MCA_SMSC_CALL(register_region, base, len);

Expand All @@ -640,11 +689,11 @@ int xhc_copy_expose_region(void *base, size_t len, xhc_copy_data_t **region_data
return 0;
}

void xhc_copy_region_post(void *dst, xhc_copy_data_t *region_data) {
void mca_coll_xhc_copy_region_post(void *dst, xhc_copy_data_t *region_data) {
memcpy(dst, region_data, mca_smsc_base_registration_data_size());
}

int xhc_copy_from(xhc_peer_info_t *peer_info,
int mca_coll_xhc_copy_from(xhc_peer_info_t *peer_info,
void *dst, void *src, size_t size, void *access_token) {

mca_smsc_endpoint_t *smsc_ep = xhc_smsc_ep(peer_info);
Expand All @@ -659,12 +708,12 @@ int xhc_copy_from(xhc_peer_info_t *peer_info,
return (status == OPAL_SUCCESS ? 0 : -1);
}

void xhc_copy_close_region(xhc_copy_data_t *region_data) {
void mca_coll_xhc_copy_close_region(xhc_copy_data_t *region_data) {
if(mca_smsc_base_has_feature(MCA_SMSC_FEATURE_REQUIRE_REGISTATION))
MCA_SMSC_CALL(deregister_region, region_data);
}

void *xhc_get_registration(xhc_peer_info_t *peer_info,
void *mca_coll_xhc_get_registration(xhc_peer_info_t *peer_info,
void *peer_vaddr, size_t size, xhc_reg_t **reg) {

mca_smsc_endpoint_t *smsc_ep = xhc_smsc_ep(peer_info);
Expand Down Expand Up @@ -695,6 +744,6 @@ void *xhc_get_registration(xhc_peer_info_t *peer_info,

/* Won't actually unmap/detach, since we've set
* the "persist" flag while creating the mapping */
void xhc_return_registration(xhc_reg_t *reg) {
void mca_coll_xhc_return_registration(xhc_reg_t *reg) {
MCA_SMSC_CALL(unmap_peer_region, reg);
}

0 comments on commit ce5ce73

Please sign in to comment.