Skip to content

Commit

Permalink
Ensure server and client indices are in sync
Browse files Browse the repository at this point in the history
Do not include a terminator element in the pmix_dictionary, but
instead rely on the defined number of entries as the length
of the array. Since user-defined keys can be different on
each process, and/or appear in different order, we cannot
define a unique index for them. Each process will assign
their own index when those keys are "put", and we currently
have no way to go back into the hash storage to "reset"
those values. So we have to push all returned modex values
into the local hash for now.

Signed-off-by: Ralph Castain <rhc@pmix.org>
  • Loading branch information
rhc54 committed Apr 22, 2024
1 parent 889b123 commit c38aef6
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 107 deletions.
9 changes: 3 additions & 6 deletions contrib/construct_dictionary.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2020 Intel, Inc. All rights reserved.
# Copyright (c) 2020-2022 Cisco Systems, Inc. All rights reserved
# Copyright (c) 2021-2023 Nanook Consulting All rights reserved.
# Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
# Copyright (c) 2022 Triad National Security, LLC. All rights reserved.
# $COPYRIGHT$
#
Expand Down Expand Up @@ -363,15 +363,12 @@ def main():
return 1

# mark the end of the array
constants.write(""",\n
{.index = UINT32_MAX, .name = "", .string = "", .type = PMIX_POINTER, .description = (char *[]){"NONE", NULL}}
};
""")
constants.write("""\n};""")
constants.write("\n")
constants.close()

# write the header
return _write_header(options, build_src_include_dir, index + 1)
return _write_header(options, build_src_include_dir, index)


if __name__ == '__main__':
Expand Down
79 changes: 49 additions & 30 deletions src/client/pmix_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ static void client_iof_handler(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr, pmix
pmix_status_t PMIx_Init(pmix_proc_t *proc,
pmix_info_t info[], size_t ninfo)
{
char *evar;
char *evar, *suri;
pmix_status_t rc = PMIX_SUCCESS;
pmix_cb_t cb;
pmix_buffer_t *req;
Expand Down Expand Up @@ -559,11 +559,12 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
* to connect if are currently unconnected */
if (!pmix_globals.connected) {
rc = pmix_ptl.connect_to_peer((struct pmix_peer_t *) pmix_client_globals.myserver, info,
ninfo);
ninfo, &suri);
if (PMIX_SUCCESS == rc) {
PMIX_ACQUIRE_THREAD(&pmix_global_lock);
pmix_init_result = rc;
pmix_client_globals.singleton = false;
free(suri);
PMIX_RELEASE_THREAD(&pmix_global_lock);
}
}
Expand Down Expand Up @@ -780,7 +781,8 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
PMIX_INFO_DESTRUCT(&ginfo);

/* attempt to connect to a server */
rc = pmix_ptl.connect_to_peer((struct pmix_peer_t *) pmix_client_globals.myserver, info, ninfo);
rc = pmix_ptl.connect_to_peer((struct pmix_peer_t *) pmix_client_globals.myserver,
info, ninfo, &suri);
if (PMIX_SUCCESS != rc) {
/* mark that we couldn't connect to a server */
pmix_client_globals.singleton = true;
Expand All @@ -802,6 +804,7 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
if (PMIX_SUCCESS != rc) {
pmix_init_result = rc;
PMIX_RELEASE_THREAD(&pmix_global_lock);
free(suri);
return rc;
}
} else {
Expand All @@ -815,6 +818,7 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
PMIX_RELEASE(req);
pmix_init_result = rc;
PMIX_RELEASE_THREAD(&pmix_global_lock);
free(suri);
return rc;
}
/* send to the server */
Expand All @@ -823,6 +827,7 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
if (PMIX_SUCCESS != rc) {
pmix_init_result = rc;
PMIX_RELEASE_THREAD(&pmix_global_lock);
free(suri);
return rc;
}
/* wait for the data to return */
Expand All @@ -832,6 +837,47 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
}
pmix_init_result = rc;

/* store our server's ID */
if (NULL != pmix_client_globals.myserver &&
NULL != pmix_client_globals.myserver->info) {
kptr = PMIX_NEW(pmix_kval_t);
kptr->key = strdup(PMIX_SERVER_NSPACE);
PMIX_VALUE_CREATE(kptr->value, 1);
kptr->value->type = PMIX_STRING;
kptr->value->data.string = strdup(pmix_client_globals.myserver->info->pname.nspace);
PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &pmix_globals.myid, PMIX_INTERNAL, kptr);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
PMIX_RELEASE(kptr); // maintain accounting
kptr = PMIX_NEW(pmix_kval_t);
kptr->key = strdup(PMIX_SERVER_RANK);
PMIX_VALUE_CREATE(kptr->value, 1);
kptr->value->type = PMIX_PROC_RANK;
kptr->value->data.rank = pmix_client_globals.myserver->info->pname.rank;
PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &pmix_globals.myid, PMIX_INTERNAL, kptr);
PMIX_RELEASE(kptr); // maintain accounting
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}

/* store the URI for subsequent lookups */
PMIX_KVAL_NEW(kptr, PMIX_SERVER_URI);
kptr->value->type = PMIX_STRING;
pmix_asprintf(&kptr->value->data.string, "%s.%u;%s",
pmix_client_globals.myserver->info->pname.nspace,
pmix_client_globals.myserver->info->pname.rank, suri);
free(suri);
PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &pmix_globals.myid, PMIX_INTERNAL, kptr);
PMIX_RELEASE(kptr); // maintain accounting
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
}

// enable show_help subsystem
pmix_show_help_enabled = true;
PMIX_RELEASE_THREAD(&pmix_global_lock);
Expand Down Expand Up @@ -898,33 +944,6 @@ pmix_status_t PMIx_Init(pmix_proc_t *proc,
_check_for_notify(info, ninfo);
}

/* store our server's ID */
if (NULL != pmix_client_globals.myserver &&
NULL != pmix_client_globals.myserver->info) {
kptr = PMIX_NEW(pmix_kval_t);
kptr->key = strdup(PMIX_SERVER_NSPACE);
PMIX_VALUE_CREATE(kptr->value, 1);
kptr->value->type = PMIX_STRING;
kptr->value->data.string = strdup(pmix_client_globals.myserver->info->pname.nspace);
PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &pmix_globals.myid, PMIX_INTERNAL, kptr);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
PMIX_RELEASE(kptr); // maintain accounting
kptr = PMIX_NEW(pmix_kval_t);
kptr->key = strdup(PMIX_SERVER_RANK);
PMIX_VALUE_CREATE(kptr->value, 1);
kptr->value->type = PMIX_PROC_RANK;
kptr->value->data.rank = pmix_client_globals.myserver->info->pname.rank;
PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &pmix_globals.myid, PMIX_INTERNAL, kptr);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
PMIX_RELEASE(kptr); // maintain accounting
}

/* register the client supported attrs */
rc = pmix_register_client_attrs();
if (PMIX_SUCCESS == pmix_init_result &&
Expand Down
6 changes: 4 additions & 2 deletions src/common/pmix_attributes.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Copyright (c) 2016 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2016 IBM Corporation. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* Copyright (c) 2022-2024 Triad National Security, LLC. All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -73,7 +73,8 @@ PMIX_EXPORT void pmix_init_registered_attrs(void)

/* cycle across the dictionary and load a hash
* table with translations of key -> index */
for (n=0; UINT32_MAX != pmix_dictionary[n].index; n++) {
pmix_pointer_array_set_size(pmix_globals.keyindex.table, PMIX_INDEX_BOUNDARY); // minimize realloc's
for (n=0; n < PMIX_INDEX_BOUNDARY; n++) {
p = (pmix_regattr_input_t*)pmix_malloc(sizeof(pmix_regattr_input_t));
p->index = pmix_dictionary[n].index;
p->name = strdup(pmix_dictionary[n].name);
Expand All @@ -82,6 +83,7 @@ PMIX_EXPORT void pmix_init_registered_attrs(void)
p->description = PMIx_Argv_copy(pmix_dictionary[n].description);
pmix_hash_register_key(p->index, p, &pmix_globals.keyindex);
}
pmix_globals.keyindex.next_id = PMIX_INDEX_BOUNDARY;
initialized = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/pmix_globals.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static void keyindex_construct(pmix_keyindex_t *ki)
ki->table = PMIX_NEW(pmix_pointer_array_t, tma);
pmix_pointer_array_init(ki->table, 1024, INT_MAX, 128);

ki->next_id = PMIX_INDEX_BOUNDARY;
ki->next_id = 0;
}

static void keyindex_destruct(pmix_keyindex_t *ki)
Expand Down
18 changes: 10 additions & 8 deletions src/mca/gds/gds.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* All rights reserved.
* Copyright (c) 2016-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2018 IBM Corporation. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* Copyright (c) 2022 Triad National Security, LLC. All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -270,19 +270,21 @@ typedef pmix_status_t (*pmix_gds_base_module_store_modex_fn_t)(struct pmix_names
*
* t - pointer to the modex server tracker
*/
#define PMIX_GDS_STORE_MODEX(r, n, b, t) \
do { \
pmix_output_verbose(1, pmix_gds_base_output, "[%s:%d] GDS STORE MODEX WITH %s", __FILE__, \
__LINE__, (n)->compat.gds->name); \
(r) = (n)->compat.gds->store_modex((struct pmix_namespace_t *) n, b, t); \
#define PMIX_GDS_STORE_MODEX(r, n, b, t) \
do { \
pmix_gds_base_module_t *_g = pmix_globals.mypeer->nptr->compat.gds; \
pmix_output_verbose(1, pmix_gds_base_output, \
"[%s:%d] GDS STORE MODEX WITH %s", __FILE__, \
__LINE__, _g->name); \
(r) = _g->store_modex((struct pmix_namespace_t *)n, b, t); \
} while (0)

typedef pmix_status_t (*pmix_gds_base_module_mark_modex_complete_fn_t)(struct pmix_peer_t *peer,
pmix_list_t *nslist,
pmix_buffer_t *buff);
#define PMIX_GDS_MARK_MODEX_COMPLETE(r, p, l, b) \
do { \
pmix_gds_base_module_t *_g = (p)->nptr->compat.gds; \
pmix_gds_base_module_t *_g = pmix_globals.mypeer->nptr->compat.gds; \
pmix_output_verbose(1, pmix_gds_base_output, \
"[%s:%d] GDS MARK MODEX COMPLETE WITH %s", \
__FILE__, __LINE__, _g->name); \
Expand All @@ -292,7 +294,7 @@ typedef pmix_status_t (*pmix_gds_base_module_mark_modex_complete_fn_t)(struct pm
typedef pmix_status_t (*pmix_gds_base_module_recv_modex_complete_fn_t)(pmix_buffer_t *buff);
#define PMIX_GDS_RECV_MODEX_COMPLETE(r, p, b) \
do { \
pmix_gds_base_module_t *_g = (p)->nptr->compat.gds; \
pmix_gds_base_module_t *_g = pmix_globals.mypeer->nptr->compat.gds; \
pmix_output_verbose(1, pmix_gds_base_output, \
"[%s:%d] GDS RECV MODEX COMPLETE WITH %s", \
__FILE__, __LINE__, _g->name); \
Expand Down
19 changes: 3 additions & 16 deletions src/mca/gds/shmem2/gds_shmem2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1538,15 +1538,6 @@ pack_server_keyindex_description(
return rc;
}

static inline int
dictionary_nelems(
const pmix_regattr_input_t *dict
) {
int i = 0;
for ( ; UINT32_MAX != dict[i].index; ++i) { }
return i;
}

static inline pmix_status_t
pack_server_keyindex_info(
pmix_gds_shmem2_job_t *job,
Expand Down Expand Up @@ -1577,12 +1568,11 @@ pack_server_keyindex_info(
PMIX_DESTRUCT(&kv);

// Pack the size of the server's keyindex table.
const int tabsize = dictionary_nelems(pmix_dictionary);
PMIX_CONSTRUCT(&kv, pmix_kval_t);
kv.key = strdup(SHMEM2_KIDX_TAB_SIZE_KEY);
kv.value = (pmix_value_t *)calloc(1, sizeof(pmix_value_t));
kv.value->type = PMIX_UINT32;
kv.value->data.uint32 = (uint32_t)tabsize;
kv.value->data.uint32 = (uint32_t)PMIX_INDEX_BOUNDARY;

PMIX_BFROPS_PACK(rc, peer, buffer, &kv, 1, PMIX_KVAL);
if (PMIX_UNLIKELY(PMIX_SUCCESS != rc)) {
Expand All @@ -1591,7 +1581,7 @@ pack_server_keyindex_info(
}
PMIX_DESTRUCT(&kv);

for (int i = 0; i < tabsize; ++i) {
for (int i = 0; i < PMIX_INDEX_BOUNDARY; ++i) {
const pmix_regattr_input_t *p = &pmix_dictionary[i];
PMIX_GDS_SHMEM2_VVVOUT(
"%s:keyindex=(index=%zd, type=%zd, name=%s string=%s, description=%s)",
Expand Down Expand Up @@ -1899,7 +1889,7 @@ client_update_global_keyindex_if_necessary(
return rc;
}

pmix_pointer_array_set_item(tmpindex.table, (int)tmpindex.next_id, ra);
pmix_pointer_array_set_item(tmpindex.table, (int)tmpindex.next_id, ra);
ra->index = tmpindex.next_id;
tmpindex.next_id += 1;

Expand All @@ -1918,9 +1908,6 @@ client_update_global_keyindex_if_necessary(
// just yet
for (i = 0; i < PMIX_INDEX_BOUNDARY; ++i) {
ra = (pmix_regattr_input_t*)&pmix_dictionary[i];
if (UINT32_MAX == ra->index) {
break;
}
// see if this entry is already present in the new keyindex
found = false;
for (m=0; m < tmpindex.table->size; m++) {
Expand Down
9 changes: 4 additions & 5 deletions src/mca/ptl/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Copyright (c) 2014-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2020 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* Copyright (c) 2023 Triad National Security, LLC. All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -122,7 +122,8 @@ PMIX_CLASS_DECLARATION(pmix_connection_t);
/* API stubs */
PMIX_EXPORT pmix_status_t pmix_ptl_base_set_notification_cbfunc(pmix_ptl_cbfunc_t cbfunc);
PMIX_EXPORT pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *peer,
pmix_info_t info[], size_t ninfo);
pmix_info_t info[], size_t ninfo,
char **suri);
PMIX_EXPORT pmix_status_t pmix_ptl_base_parse_uri_file(char *filename,
bool optional,
pmix_list_t *connections);
Expand Down Expand Up @@ -167,16 +168,14 @@ PMIX_EXPORT pmix_rnd_flag_t pmix_ptl_base_set_flag(size_t *sz);
PMIX_EXPORT pmix_status_t pmix_ptl_base_make_connection(pmix_peer_t *peer, char *suri,
pmix_info_t *iptr, size_t niptr);
PMIX_EXPORT void pmix_ptl_base_complete_connection(pmix_peer_t *peer, char *nspace,
pmix_rank_t rank, char *uri);
pmix_rank_t rank);
PMIX_EXPORT pmix_status_t pmix_ptl_base_set_timeout(pmix_peer_t *peer, struct timeval *save,
pmix_socklen_t *sz, bool *sockopt);
PMIX_EXPORT void pmix_ptl_base_setup_socket(pmix_peer_t *peer);
PMIX_EXPORT pmix_status_t pmix_ptl_base_client_handshake(pmix_peer_t *peer, pmix_status_t reply);
PMIX_EXPORT pmix_status_t pmix_ptl_base_tool_handshake(pmix_peer_t *peer, pmix_status_t rp);
PMIX_EXPORT char **pmix_ptl_base_split_and_resolve(const char *orig_str,
const char *name);
PMIX_EXPORT pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *pr, pmix_info_t *info,
size_t ninfo);
PMIX_EXPORT pmix_status_t pmix_ptl_base_set_peer(pmix_peer_t *peer, char *evar);
PMIX_EXPORT char *pmix_ptl_base_get_cmd_line(void);

Expand Down
12 changes: 6 additions & 6 deletions src/mca/ptl/base/ptl_base_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2015-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -433,7 +433,9 @@ static pmix_status_t trysearch(pmix_peer_t *peer, char **nspace,
return rc;
}

pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *pr, pmix_info_t *info, size_t ninfo)
pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *pr,
pmix_info_t *info, size_t ninfo,
char **suriout)
{
char *suri = NULL, *st, *evar;
char *filename, *nspace = NULL;
Expand Down Expand Up @@ -796,9 +798,10 @@ pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *pr, pmix_info_t
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"tool_peer_try_connect: Connection across to server succeeded");

pmix_ptl_base_complete_connection(peer, nspace, rank, suri);
pmix_ptl_base_complete_connection(peer, nspace, rank);

cleanup:
*suriout = suri;
if (NULL != nspace) {
free(nspace);
}
Expand All @@ -808,9 +811,6 @@ pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *pr, pmix_info_t
if (NULL != rendfile) {
free(rendfile);
}
if (NULL != suri) {
free(suri);
}
if (NULL != server_nspace) {
free(server_nspace);
}
Expand Down

0 comments on commit c38aef6

Please sign in to comment.