Skip to content

Commit

Permalink
Perform some cleanup
Browse files Browse the repository at this point in the history
Cleanup a few formats. Primary focus is on updating the
server get function to ensure that we provide the entire
modex info for a proc upon first request.

Signed-off-by: Ralph Castain <rhc@pmix.org>
  • Loading branch information
rhc54 committed May 10, 2024
1 parent 06c9621 commit d63c50c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 39 deletions.
9 changes: 4 additions & 5 deletions src/mca/gds/base/gds_base_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Copyright (c) 2018-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
*
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -113,11 +113,10 @@ pmix_status_t pmix_gds_base_store_modex(struct pmix_namespace_t *nspace, pmix_bu
* store them in our GDS module */
cnt = 1;
PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buff, &bo, &cnt, PMIX_BYTE_OBJECT);

/* If the collect flag is set, we should have some data for unpacking */
if ((PMIX_COLLECT_YES == trk->collect_type)
&& (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc)) {
goto exit;
if ((PMIX_COLLECT_YES == trk->collect_type) &&
(PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc)) {
goto exit;
}

while (PMIX_SUCCESS == rc) {
Expand Down
2 changes: 1 addition & 1 deletion src/mca/gds/gds.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ typedef pmix_status_t (*pmix_gds_base_module_store_modex_fn_t)(struct pmix_names
*
* n - pointer to the pmix_namespace_t this blob is to be stored for
*
* b - pointer to pmix_byte_object_t containing the data
* b - pointer to pmix_buffer_t containing the data
*
* t - pointer to the modex server tracker
*/
Expand Down
3 changes: 2 additions & 1 deletion src/mca/gds/hash/gds_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,8 @@ static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx, pmix_proc_t *pro
pmix_kval_t kv;

pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%d] gds:hash:store_modex for nspace %s", pmix_globals.myid.nspace,
"[%s:%d] gds:hash:store_modex for nspace %s",
pmix_globals.myid.nspace,
pmix_globals.myid.rank, proc->nspace);

PMIX_HIDE_UNUSED_PARAMS(ctx);
Expand Down
64 changes: 32 additions & 32 deletions src/server/pmix_server_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ static pmix_status_t defer_response(char *nspace, pmix_rank_t rank, char *key,
}
pmix_output_verbose(2, pmix_server_globals.get_output,
"%s:%d TRACKER CREATED - WAITING TIMEOUT %d",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
pmix_globals.myid.nspace, pmix_globals.myid.rank,
(NULL == tv) ? -1 : (int)tv->tv_sec);
/* if they specified a timeout, set it up now */
if (NULL != tv && 0 < tv->tv_sec) {
Expand Down Expand Up @@ -579,7 +579,8 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf, pmix_modex_cbfunc_t cbfunc, vo
return PMIX_SUCCESS;
}

pmix_output_verbose(2, pmix_server_globals.get_output, "%s:%d DATA NOT FOUND",
pmix_output_verbose(2, pmix_server_globals.get_output,
"%s:%d DATA NOT FOUND",
pmix_globals.myid.nspace, pmix_globals.myid.rank);

request:
Expand Down Expand Up @@ -846,13 +847,12 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
pmix_byte_object_t bo;
char *data = NULL;
size_t sz = 0;
pmix_rank_info_t *rinfo;
pmix_peer_t *peer;

pmix_output_verbose(2, pmix_server_globals.get_output,
"%s:%d SATISFY REQUEST CALLED FOR %s:%d ON SCOPE %s",
"%s:%d SATISFY REQUEST CALLED FOR %s:%d ON SCOPE %s KEY %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
nptr->nspace, rank, PMIx_Scope_string(scope));
nptr->nspace, rank, PMIx_Scope_string(scope),
(NULL == key) ? "NULL" : key);

PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
PMIX_LOAD_NSPACE(proc.nspace, nptr->nspace);
Expand All @@ -865,35 +865,38 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
PMIX_DESTRUCT(&pbkt);
return rc;
}
if (PMIX_RANK_WILDCARD == rank) {
// we are done
found = true;
goto complete;
}
}

/* retrieve the data for the specific rank they are asking about */
proc.rank = rank;
PMIX_CONSTRUCT(&cb, pmix_cb_t);
/* this is a local request, so give the gds the option
* of returning a copy of the data, or a pointer to
* local storage */

/* if they are asking for a reserved key, we only look for
* that one specific key. If they are looking for a
* non-reserved key - return all non-reserved keys for the
* specified proc. We do this as an optimization as we
* suspect that a request for one value for a proc will be
* followed by requests for additional values
*/

if (NULL != key && PMIX_CHECK_RESERVED_KEY(key)) {
cb.key = key;
}

cb.proc = &proc;
cb.key = key;
cb.scope = scope;
cb.copy = false;
cb.info = cd->info;
cb.ninfo = cd->ninfo;
PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
/* if we didn't find it on my peer, it could have
* been stored on their peer - so check there */
/* if we didn't find it on my peer, then it isn't found */
if (PMIX_SUCCESS != rc) {
/* all procs in a given nspace must use
* the same GDS component */
peer = NULL;
rinfo = (pmix_rank_info_t*)pmix_list_get_first(&nptr->ranks);
while (NULL == peer && NULL != rinfo) {
peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, rinfo->peerid);
rinfo = (pmix_rank_info_t*)pmix_list_get_next(&rinfo->super);
}
if (NULL != peer) {
PMIX_GDS_FETCH_KV(rc, peer, &cb);
}
PMIX_DESTRUCT(&pbkt);
PMIX_DESTRUCT(&cb);
return PMIX_ERR_NOT_FOUND;
}
cb.info = NULL;
cb.ninfo = 0;
Expand Down Expand Up @@ -937,14 +940,12 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
}
PMIX_DESTRUCT(&pkt);
} else {
// Don't unload the buffer here. Since
// it gets repacked, we'll lose the base_ptr
// to destroy pkt later.
bo.bytes = (char *) pkt.unpack_ptr;
bo.size = pkt.bytes_used;
PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
PMIX_DESTRUCT(&pkt);

/* pack it for transmission */
PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
PMIX_BYTE_OBJECT_DESTRUCT(&bo); // data has been copied
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&pbkt);
Expand All @@ -955,14 +956,13 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
}
PMIX_DESTRUCT(&cb);

complete:
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
PMIX_DESTRUCT(&pbkt);

if (found) {
/* pass it back */
cbfunc(rc, data, sz, cbdata, relfn, data);
// Safe to free pkt.
PMIX_DESTRUCT(&pkt);
return rc;
}

Expand Down

0 comments on commit d63c50c

Please sign in to comment.