Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ompi/communicator/comm_cid.c
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques
* needs to be reworked to take advantage of it. */
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60);
OBJ_DESTRUCT(&info);
fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc);
if (OPAL_SUCCESS != rc) {
OBJ_DESTRUCT(&pdat);
return rc;
Expand Down
4 changes: 2 additions & 2 deletions opal/mca/pmix/pmix2x/pmix/VERSION
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".

repo_rev=git92df386
repo_rev=git4e10e9d

# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
Expand All @@ -44,7 +44,7 @@ tarball_version=

# The date when this release was created

date="Jul 05, 2016"
date="Jul 19, 2016"

# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library
Expand Down
126 changes: 65 additions & 61 deletions opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,39 +296,46 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,

cnt = 1;
if (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(buf, &bptr, &cnt, PMIX_BUFFER))) {
cnt = 1;
cur_kval = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(bptr, cur_kval, &cnt, PMIX_KVAL))) {
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: unpacked key %s", cur_kval->key);
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->modex, cur_rank, cur_kval))) {
PMIX_ERROR_LOG(rc);
}
if (NULL != cb->key && 0 == strcmp(cb->key, cur_kval->key)) {
/* if the rank is WILDCARD, then this is an nspace blob */
if (PMIX_RANK_WILDCARD == cur_rank) {
pmix_client_process_nspace_blob(cb->nspace, bptr);
} else {
cnt = 1;
cur_kval = PMIX_NEW(pmix_kval_t);
while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(bptr, cur_kval, &cnt, PMIX_KVAL))) {
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: found requested value");
if (PMIX_SUCCESS != (rc = pmix_bfrop.copy((void**)&val, cur_kval->value, PMIX_VALUE))) {
"pmix: unpacked key %s", cur_kval->key);
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->modex, cur_rank, cur_kval))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(cur_kval);
val = NULL;
goto done;
}
if (NULL != cb->key && 0 == strcmp(cb->key, cur_kval->key)) {
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: found requested value");
if (PMIX_SUCCESS != (rc = pmix_bfrop.copy((void**)&val, cur_kval->value, PMIX_VALUE))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(cur_kval);
val = NULL;
goto done;
}
}
PMIX_RELEASE(cur_kval); // maintain acctg - hash_store does a retain
cnt = 1;
cur_kval = PMIX_NEW(pmix_kval_t);
}
PMIX_RELEASE(cur_kval); // maintain acctg - hash_store does a retain
cnt = 1;
cur_kval = PMIX_NEW(pmix_kval_t);
PMIX_RELEASE(cur_kval);
}
cnt = 1;
PMIX_RELEASE(cur_kval);
}
PMIX_RELEASE(bptr); // free's the data region
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc &&
PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
rc = PMIX_ERR_SILENT; // avoid error-logging twice
break;
}
}
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc &&
PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
} else {
rc = PMIX_SUCCESS;
Expand Down Expand Up @@ -409,10 +416,11 @@ static void _getnbfn(int fd, short flags, void *cbdata)
goto request;
}

/* if the key is NULL, then we have to check both the job-data
* and the modex tables. If we don't yet have the modex data,
* then we are going to have to go get it. So let's check that
* case first */
/* The NULL==key scenario only pertains to cases where legacy
* PMI methods are being employed. In this case, we have to check
* both the job-data and the modex tables. If we don't yet have
* the modex data, then we are going to have to go get it. So let's
* check that case first */
if (NULL == cb->key) {
PMIX_CONSTRUCT(&results, pmix_pointer_array_t);
pmix_pointer_array_init(&results, 2, INT_MAX, 1);
Expand Down Expand Up @@ -451,9 +459,7 @@ static void _getnbfn(int fd, short flags, void *cbdata)
}
} else {
/* if we didn't find a modex for this rank, then we need
* to go get it. Recall that the NULL==key scenario only
* pertains to cases where legacy PMI methods are being
* employed. Thus, the caller wants -all- information for
* to go get it. Thus, the caller wants -all- information for
* the specified rank, not just the job-level info. */
goto request;
}
Expand Down Expand Up @@ -505,43 +511,26 @@ static void _getnbfn(int fd, short flags, void *cbdata)
return;
}

/* the requested data could be in the job-data table, so let's
* just check there first. */
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, PMIX_RANK_WILDCARD, cb->key, &val))) {
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_RELEASE(cb);
return;
}
if (PMIX_RANK_WILDCARD == cb->rank) {
/* can't be anywhere else */
cb->value_cbfunc(PMIX_ERR_NOT_FOUND, NULL, cb->cbdata);
PMIX_RELEASE(cb);
return;
}

/* it could still be in the job-data table, only stored under its own
* rank and not WILDCARD - e.g., this is true of data returned about
* ourselves during startup */
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) {
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
/* if the key is in the PMIx namespace, then they are looking for data
* that was provided at startup */
if (0 == strncmp(cb->key, "pmix", 4)) {
/* should be in the internal hash table. */
if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) {
/* found it - we are in an event, so we can
* just execute the callback */
cb->value_cbfunc(rc, val, cb->cbdata);
/* cleanup */
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_RELEASE(cb);
return;
}
PMIX_RELEASE(cb);
return;
/* if we don't have it, go request it */
goto request;
}

/* not finding it is not an error - it could be in the
* modex hash table, so check it */
/* otherwise, the data must be something they "put" */
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val))) {
#else
Expand Down Expand Up @@ -606,6 +595,16 @@ static void _getnbfn(int fd, short flags, void *cbdata)
}
}

/* if we are seeking "pmix" data for our own nspace, then we must fail
* as it was provided at startup - any updates would have come via
* event notifications */
if (0 == strncmp(cb->key, "pmix", 4) &&
0 == strncmp(cb->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN)) {
cb->value_cbfunc(PMIX_ERR_NOT_FOUND, NULL, cb->cbdata);
PMIX_RELEASE(cb);
return;
}

/* see if we already have a request in place with the server for data from
* this nspace:rank. If we do, then no need to ask again as the
* request will return _all_ data from that proc */
Expand All @@ -628,6 +627,11 @@ static void _getnbfn(int fd, short flags, void *cbdata)
return;
}

pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d REQUESTING DATA FROM SERVER FOR %s:%d KEY %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
cb->nspace, cb->rank, cb->key);

/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the return message is recvd */
Expand Down
5 changes: 2 additions & 3 deletions opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -983,10 +983,9 @@ PMIX_EXPORT pmix_status_t PMIx_server_setup_fork(const pmix_proc_t *proc, char *
pmix_setenv("PMIX_RANK", rankstr, true, env);
/* pass our rendezvous info */
PMIX_LIST_FOREACH(lt, &pmix_server_globals.listeners, pmix_listener_t) {
if (NULL == lt->uri) {
continue;
if (NULL != lt->uri && NULL != lt->varname) {
pmix_setenv(lt->varname, lt->uri, true, env);
}
pmix_setenv(lt->varname, lt->uri, true, env);
}
/* pass our active security mode */
pmix_setenv("PMIX_SECURITY_MODE", security_mode, true, env);
Expand Down
68 changes: 48 additions & 20 deletions opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ static void dmdx_cbfunc(pmix_status_t status, const char *data,
size_t ndata, void *cbdata,
pmix_release_cbfunc_t relfn, void *relcbdata);
static pmix_status_t _satisfy_request(pmix_nspace_t *ns, int rank,
pmix_server_caddy_t *cd,
pmix_modex_cbfunc_t cbfunc, void *cbdata, bool *scope);
static pmix_status_t create_local_tracker(char nspace[], int rank,
pmix_info_t info[], size_t ninfo,
Expand All @@ -110,6 +111,7 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
pmix_modex_cbfunc_t cbfunc,
void *cbdata)
{
pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
int32_t cnt;
pmix_status_t rc;
int rank;
Expand All @@ -120,6 +122,9 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
size_t ninfo=0;
pmix_dmdx_local_t *lcd;
bool local;
pmix_buffer_t pbkt;
char *data;
size_t sz;

pmix_output_verbose(2, pmix_globals.debug_output,
"recvd GET");
Expand Down Expand Up @@ -166,9 +171,11 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
}

pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d EXECUTE GET FOR %s:%d",
"%s:%d EXECUTE GET FOR %s:%d ON BEHALF OF %s:%d",
pmix_globals.myid.nspace,
pmix_globals.myid.rank, nspace, rank);
pmix_globals.myid.rank, nspace, rank,
cd->peer->info->nptr->nspace,
cd->peer->info->rank);

if (NULL == nptr || NULL == nptr->server) {
/* this is for an nspace we don't know about yet, so
Expand All @@ -179,6 +186,20 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
return rc;
}

/* if the rank is wildcard, then they are asking for the job-level
* info for this nspace - provide it */
if (PMIX_RANK_WILDCARD == rank) {
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
pmix_bfrop.pack(&pbkt, &rank, 1, PMIX_INT);
/* the client is expecting this to arrive as a byte object
* containing a buffer, so package it accordingly */
pmix_bfrop.pack(&pbkt, &nptr->server->job_info, 1, PMIX_BUFFER);
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
PMIX_DESTRUCT(&pbkt);
cbfunc(PMIX_SUCCESS, data, sz, cbdata, relfn, data);
return PMIX_SUCCESS;
}

/* We have to wait for all local clients to be registered before
* we can know whether this request is for data from a local or a
* remote client because one client might ask for data about another
Expand All @@ -194,27 +215,13 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
}

/* see if we already have this data */
rc = _satisfy_request(nptr, rank, cbfunc, cbdata, &local);
rc = _satisfy_request(nptr, rank, cd, cbfunc, cbdata, &local);
if( PMIX_SUCCESS == rc ){
/* request was successfully satisfied */
PMIX_INFO_FREE(info, ninfo);
return rc;
}

/* do not force dmodex logic for non-specific ranks
* let return not found status instead of doing fence with
* data exchange. User can make a decision to do such call getting
* not found status
*/
if (PMIX_RANK_UNDEF == rank || PMIX_RANK_WILDCARD == rank) {
pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d not found data for namespace = %s, rank = %d "
"(do not request resource manager server for non-specified rank)",
pmix_globals.myid.nspace,
pmix_globals.myid.rank, nspace, rank);
return PMIX_ERR_NOT_FOUND;
}

/* If we get here, then we don't have the data at this time. Check
* to see if we already have a pending request for the data - if
* we do, then we can just wait for it to arrive */
Expand Down Expand Up @@ -355,7 +362,7 @@ void pmix_pending_nspace_requests(pmix_nspace_t *nptr)
}
}

static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank,
static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank, pmix_server_caddy_t *cd,
pmix_modex_cbfunc_t cbfunc, void *cbdata, bool *scope)
{
pmix_status_t rc;
Expand All @@ -364,7 +371,7 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank,
size_t sz;
int cur_rank;
int found = 0;
pmix_buffer_t pbkt;
pmix_buffer_t pbkt, *pbptr;
void *last;
pmix_hash_table_t *hts[3];
pmix_hash_table_t **htptr;
Expand Down Expand Up @@ -404,6 +411,27 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank,
* having been committed */
htptr = hts;
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);

/* if they are asking about a rank from an nspace different
* from their own, then include a copy of the job-level info */
if (NULL != cd &&
0 != strncmp(nptr->nspace, cd->peer->info->nptr->nspace, PMIX_MAX_NSLEN)) {
cur_rank = PMIX_RANK_WILDCARD;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&pbkt, &cur_rank, 1, PMIX_INT))) {
PMIX_ERROR_LOG(rc);
cbfunc(rc, NULL, 0, cbdata, relfn, data);
return rc;
}
/* the client is expecting this to arrive as a byte object
* containing a buffer, so package it accordingly */
pbptr = &nptr->server->job_info;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&pbkt, &pbptr, 1, PMIX_BUFFER))) {
PMIX_ERROR_LOG(rc);
cbfunc(rc, NULL, 0, cbdata, relfn, data);
return rc;
}
}

while (NULL != *htptr) {
cur_rank = rank;
if (PMIX_RANK_UNDEF == rank) {
Expand Down Expand Up @@ -494,7 +522,7 @@ pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
/* run through all the requests to this rank */
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
pmix_status_t rc;
rc = _satisfy_request(nptr, rank, req->cbfunc, req->cbdata, NULL);
rc = _satisfy_request(nptr, rank, NULL, req->cbfunc, req->cbdata, NULL);
if( PMIX_SUCCESS != rc ){
/* if we can't satisfy this particular request (missing key?) */
req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
Expand Down
6 changes: 4 additions & 2 deletions opal/mca/pmix/pmix2x/pmix/test/pmix_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ int main(int argc, char **argv)
pmix_value_t *val = &value;
test_params params;
INIT_TEST_PARAMS(params);
pmix_proc_t myproc;
pmix_proc_t myproc, proc;

parse_cmd(argc, argv, &params);

Expand Down Expand Up @@ -102,7 +102,9 @@ int main(int argc, char **argv)
}
TEST_VERBOSE((" Client ns %s rank %d: PMIx_Init success", myproc.nspace, myproc.rank));

if (PMIX_SUCCESS != (rc = PMIx_Get(&myproc,PMIX_UNIV_SIZE,NULL, 0,&val))) {
(void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
TEST_ERROR(("rank %d: PMIx_Get universe size failed: %d", myproc.rank, rc));
FREE_TEST_PARAMS(params);
exit(0);
Expand Down
4 changes: 3 additions & 1 deletion opal/mca/pmix/pmix2x/pmix/test/simple/simpclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ int main(int argc, char **argv)
}

/* get our universe size */
if (PMIX_SUCCESS != (rc = PMIx_Get(&myproc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
(void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
pmix_output(0, "Client ns %s rank %d: PMIx_Get universe size failed: %d", myproc.nspace, myproc.rank, rc);
goto done;
}
Expand Down
Loading