Skip to content

Commit

Permalink
Fix local group operations
Browse files Browse the repository at this point in the history
Group operations that are purely local go thru an
optimized path, but still need to return the
right set of info so the client can correctly
track the group.

Signed-off-by: Ralph Castain <rhc@pmix.org>
  • Loading branch information
rhc54 committed Mar 20, 2024
1 parent 73a4d3f commit 6e9417a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 26 deletions.
3 changes: 2 additions & 1 deletion src/prted/pmix/pmix_server_dyn.c
Expand Up @@ -1314,7 +1314,8 @@ pmix_status_t pmix_server_connect_fn(const pmix_proc_t procs[], size_t nprocs,
{
prte_pmix_server_op_caddy_t *op;

pmix_output_verbose(2, prte_pmix_server_globals.output, "%s connect called with %d procs",
pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s connect called with %d procs",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) nprocs);

/* protect ourselves */
Expand Down
105 changes: 80 additions & 25 deletions src/prted/pmix/pmix_server_group.c
Expand Up @@ -62,7 +62,7 @@ static void relcb(void *cbdata)
{
prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata;

if (NULL != cd->info) {
if (NULL != cd->info) {
PMIX_INFO_FREE(cd->info, cd->ninfo);
}
PMIX_RELEASE(cd);
Expand Down Expand Up @@ -236,6 +236,64 @@ static void group_release(int status, pmix_data_buffer_t *buf, void *cbdata)
}
}

static void local_complete(int sd, short args, void *cbdata)
{
prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t*)cbdata;
pmix_server_pset_t *pset;
pmix_data_array_t *members;
pmix_proc_t *p;

if (PMIX_GROUP_CONSTRUCT == cd->op) {

// construct the group membership
members = PMIx_Data_array_create(cd->nprocs, PMIX_PROC);
p = (pmix_proc_t*)members->array;
memcpy(p, cd->procs, cd->nprocs * sizeof(pmix_proc_t));
cd->ninfo = 2;
PMIX_INFO_CREATE(cd->info, cd->ninfo);
PMIX_LOAD_KEY(cd->info[0].key, PMIX_GROUP_MEMBERSHIP);
cd->info[0].value.type = PMIX_DATA_ARRAY;
cd->info[0].value.data.darray = members;

PMIX_LOAD_KEY(cd->info[1].key, PMIX_GROUP_ID);
cd->info[1].value.type = PMIX_STRING;
cd->info[1].value.data.string = strdup(cd->grpid);

/* add it to our list of known groups */
pset = PMIX_NEW(pmix_server_pset_t);
pset->name = strdup(cd->grpid);
pset->num_members = cd->nprocs;
PMIX_PROC_CREATE(pset->members, pset->num_members);
memcpy(pset->members, cd->procs, cd->nprocs * sizeof(pmix_proc_t));
pmix_list_append(&prte_pmix_server_globals.groups, &pset->super);

// protect the procs array
cd->procs = NULL;
cd->nprocs = 0;

// return this to them
cd->infocbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd->cbdata, relcb, cd);

} else {
/* find this group ID on our list of groups and remove it */
PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t)
{
if (0 == strcmp(pset->name, cd->grpid)) {
pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super);
PMIX_RELEASE(pset);
break;
}
}
// return their callback
cd->infocbfunc(PMIX_SUCCESS, NULL, 0, cd->cbdata, NULL, NULL);
// protect the procs array
cd->procs = NULL;
cd->nprocs = 0;

PMIX_RELEASE(cd);
}
}

pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
const pmix_proc_t procs[], size_t nprocs,
const pmix_info_t directives[], size_t ndirs,
Expand Down Expand Up @@ -317,6 +375,9 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
}
}
if (0 < tv.tv_sec) {
if (copied) {
PMIX_PROC_FREE(members, num_members);
}
return PMIX_ERR_NOT_SUPPORTED;
}

Expand All @@ -328,32 +389,25 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s group request - purely local",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME));
if (PMIX_GROUP_CONSTRUCT == op) {
/* add it to our list of known groups */
pset = PMIX_NEW(pmix_server_pset_t);
pset->name = strdup(grpid);
pset->num_members = nprocs;
if (NULL != members) {
pset->num_members += num_members;
}
PMIX_PROC_CREATE(pset->members, pset->num_members);
memcpy(pset->members, procs, nprocs * sizeof(pmix_proc_t));
if (NULL != members) {
memcpy(&pset->members[nprocs], members, num_members * sizeof(pmix_proc_t));
}
pmix_list_append(&prte_pmix_server_globals.groups, &pset->super);
} else if (PMIX_GROUP_DESTRUCT == op) {
/* find this group ID on our list of groups */
PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t)
{
if (0 == strcmp(pset->name, grpid)) {
pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super);
PMIX_RELEASE(pset);
break;
}
if (force_local && assignID) {
// we cannot do that
if (copied) {
PMIX_PROC_FREE(members, num_members);
}
return PMIX_ERR_BAD_PARAM;
}
cd = PMIX_NEW(prte_pmix_mdx_caddy_t);
cd->op = op;
cd->grpid = strdup(grpid);
cd->procs = (pmix_proc_t*)procs;
cd->nprocs = nprocs;
cd->infocbfunc = cbfunc;
cd->cbdata = cbdata;
PRTE_PMIX_THREADSHIFT(cd, prte_event_base, local_complete);
if (copied) {
PMIX_PROC_FREE(members, num_members);
}
return PMIX_OPERATION_SUCCEEDED;
return PMIX_SUCCESS;
}

cd = PMIX_NEW(prte_pmix_mdx_caddy_t);
Expand Down Expand Up @@ -401,6 +455,7 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
PMIX_ERROR_LOG(rc);
}
}

/* pass it to the global collective algorithm */
if (PRTE_SUCCESS != (rc = prte_grpcomm.allgather(cd))) {
PRTE_ERROR_LOG(rc);
Expand Down

0 comments on commit 6e9417a

Please sign in to comment.