diff --git a/orte/orted/pmix/pmix_server_dyn.c b/orte/orted/pmix/pmix_server_dyn.c index 8eacbbfe401..4b908b990a0 100644 --- a/orte/orted/pmix/pmix_server_dyn.c +++ b/orte/orted/pmix/pmix_server_dyn.c @@ -399,7 +399,11 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata) /* restart the cnct processor */ ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata); + /* protect the re-referenced data */ + cd->procs = NULL; + cd->info = NULL; OBJ_RELEASE(cd); + return; release: if (NULL != cd->cbfunc) { @@ -415,6 +419,7 @@ static void _cnct(int sd, short args, void *cbdata) char **keys = NULL, *key; orte_job_t *jdata; int rc = ORTE_SUCCESS; + opal_value_t *kv; ORTE_ACQUIRE_OBJECT(cd); @@ -444,6 +449,12 @@ static void _cnct(int sd, short args, void *cbdata) orte_util_convert_jobid_to_string(&key, nm->name.jobid); opal_argv_append_nosize(&keys, key); free(key); + /* we have to add the user's id to our list of info */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_USERID); + kv->type = OPAL_UINT32; + kv->data.uint32 = geteuid(); + opal_list_append(cd->info, &kv->super); if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) { opal_argv_free(keys); goto release; diff --git a/orte/orted/pmix/pmix_server_pub.c b/orte/orted/pmix/pmix_server_pub.c index 6a6044e6a38..46d4e97000d 100644 --- a/orte/orted/pmix/pmix_server_pub.c +++ b/orte/orted/pmix/pmix_server_pub.c @@ -394,6 +394,10 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys, req->timeout = iptr->data.integer; continue; } + opal_output_verbose(2, orte_pmix_server_globals.output, + "%s lookup directive %s for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, + ORTE_NAME_PRINT(proc)); if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); diff --git a/orte/orted/pmix/pmix_server_register_fns.c b/orte/orted/pmix/pmix_server_register_fns.c index 5db881970a3..b1b08d8f39c 100644 --- a/orte/orted/pmix/pmix_server_register_fns.c +++ b/orte/orted/pmix/pmix_server_register_fns.c @@ -50,6 +50,8 @@ #include "pmix_server_internal.h" #include "pmix_server.h" +static void mycbfunc(int status, void *cbdata); + /* stuff proc attributes for sending back to a proc */ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force) { @@ -472,5 +474,67 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force) info, NULL, NULL); OPAL_LIST_RELEASE(info); + /* if the user has connected us to an external server, then we must + * assume there is going to be some cross-mpirun exchange, and so + * we protect against that situation by publishing the job info + * for this job - this allows any subsequent "connect" to retrieve + * the job info */ + if (NULL != orte_data_server_uri) { + opal_buffer_t buf; + + OBJ_CONSTRUCT(&buf, opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return rc; + } + info = OBJ_NEW(opal_list_t); + /* create a key-value with the key being the string jobid + * and the value being the byte object */ + kv = OBJ_NEW(opal_value_t); + orte_util_convert_jobid_to_string(&kv->key, jdata->jobid); + kv->type = OPAL_BYTE_OBJECT; + opal_dss.unload(&buf, (void**)&kv->data.bo.bytes, &kv->data.bo.size); + OBJ_DESTRUCT(&buf); + opal_list_append(info, &kv->super); + + /* set the range to be session */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_RANGE); + kv->type = OPAL_UINT; + kv->data.uint = OPAL_PMIX_RANGE_SESSION; + opal_list_append(info, &kv->super); + + /* set the persistence to be app */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_PERSISTENCE); + kv->type = OPAL_INT; + kv->data.integer = OPAL_PMIX_PERSIST_APP; + opal_list_append(info, &kv->super); + + /* add our effective userid to the directives */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_USERID); + kv->type = OPAL_UINT32; + kv->data.uint32 = geteuid(); + opal_list_append(info, &kv->super); + + /* now publish it */ + if (ORTE_SUCCESS != (rc = pmix_server_publish_fn(ORTE_PROC_MY_NAME, + info, mycbfunc, info))) { + ORTE_ERROR_LOG(rc); + } + } + return rc; } + +static void mycbfunc(int status, void *cbdata) +{ + opal_list_t *info = (opal_list_t*)cbdata; + + if (ORTE_SUCCESS != status) { + ORTE_ERROR_LOG(status); + } + OPAL_LIST_RELEASE(info); +} diff --git a/orte/runtime/orte_data_server.c b/orte/runtime/orte_data_server.c index e20eb26b814..903e17c66ad 100644 --- a/orte/runtime/orte_data_server.c +++ b/orte/runtime/orte_data_server.c @@ -195,10 +195,10 @@ void orte_data_server(int status, orte_process_name_t* sender, orte_data_req_t *req, *rqnext; orte_jobid_t jobid = ORTE_JOBID_INVALID; - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server got message from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender))); + opal_output_verbose(1, orte_data_server_output, + "%s data server got message from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender)); /* unpack the room number of the caller's request */ count = 1; @@ -233,10 +233,10 @@ void orte_data_server(int status, orte_process_name_t* sender, goto SEND_ERROR; } - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: publishing data from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&data->owner))); + opal_output_verbose(1, orte_data_server_output, + "%s data server: publishing data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&data->owner)); /* unpack the range */ count = 1; @@ -260,19 +260,19 @@ void orte_data_server(int status, orte_process_name_t* sender, data->uid = iptr->data.uint32; OBJ_RELEASE(iptr); } else { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s data server: adding %s to data from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, - ORTE_NAME_PRINT(&data->owner))); + opal_output_verbose(10, orte_data_server_output, + "%s data server: adding %s to data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, + ORTE_NAME_PRINT(&data->owner)); opal_list_append(&data->values, &iptr->super); } } data->index = opal_pointer_array_add(&orte_data_server_store, data); - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: checking for pending requests", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + opal_output_verbose(1, orte_data_server_output, + "%s data server: checking for pending requests", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* check for pending requests that match this data */ reply = NULL; @@ -291,14 +291,14 @@ void orte_data_server(int status, orte_process_name_t* sender, for (i=0; NULL != req->keys[i]; i++) { /* cycle thru the data keys for matches */ OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s\tCHECKING %s TO %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - iptr->key, req->keys[i])); + opal_output_verbose(10, orte_data_server_output, + "%s\tCHECKING %s TO %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + iptr->key, req->keys[i]); if (0 == strcmp(iptr->key, req->keys[i])) { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s data server: packaging return", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + opal_output_verbose(10, orte_data_server_output, + "%s data server: packaging return", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* found it - package it for return */ if (NULL == reply) { reply = OBJ_NEW(opal_buffer_t); @@ -318,10 +318,10 @@ void orte_data_server(int status, orte_process_name_t* sender, ORTE_ERROR_LOG(rc); break; } - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s data server: adding %s data from %s to response", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, - ORTE_NAME_PRINT(&data->owner))); + opal_output_verbose(10, orte_data_server_output, + "%s data server: adding %s data from %s to response", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, + ORTE_NAME_PRINT(&data->owner)); if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &iptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); break; @@ -331,10 +331,10 @@ void orte_data_server(int status, orte_process_name_t* sender, } if (NULL != reply) { /* send it back to the requestor */ - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, + opal_output_verbose(1, orte_data_server_output, "%s data server: returning data to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&req->requestor))); + ORTE_NAME_PRINT(&req->requestor)); if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, &req->requestor, reply, ORTE_RML_TAG_DATA_CLIENT, @@ -348,11 +348,11 @@ void orte_data_server(int status, orte_process_name_t* sender, reply = NULL; /* if the persistence is "first_read", then delete this data */ if (OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) { - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, + opal_output_verbose(1, orte_data_server_output, "%s NOT STORING DATA FROM %s AT INDEX %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&data->owner), data->index); - opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL)); + opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL); OBJ_RELEASE(data); goto release; } @@ -371,10 +371,10 @@ void orte_data_server(int status, orte_process_name_t* sender, break; case ORTE_PMIX_LOOKUP_CMD: - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: lookup data from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(sender))); + opal_output_verbose(1, orte_data_server_output, + "%s data server: lookup data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender)); /* unpack the requestor's jobid */ count = 1; @@ -429,18 +429,24 @@ void orte_data_server(int status, orte_process_name_t* sender, /* ignore anything else for now */ OBJ_RELEASE(iptr); } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc || UINT32_MAX == uid) { + if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { ORTE_ERROR_LOG(rc); opal_argv_free(keys); goto SEND_ERROR; } + if (UINT32_MAX == uid) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + rc = ORTE_ERR_NOT_FOUND; + opal_argv_free(keys); + goto SEND_ERROR; + } /* cycle across the provided keys */ ret_packed = false; for (i=0; NULL != keys[i]; i++) { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s data server: looking for %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i])); + opal_output_verbose(10, orte_data_server_output, + "%s data server: looking for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i]); /* cycle across the stored data, looking for a match */ for (k=0; k < orte_data_server_store.size; k++) { data_added = false; @@ -450,10 +456,10 @@ void orte_data_server(int status, orte_process_name_t* sender, } /* for security reasons, can only access data posted by the same user id */ if (uid != data->uid) { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s\tMISMATCH UID %u %u", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (unsigned)uid, (unsigned)data->uid)); + opal_output_verbose(10, orte_data_server_output, + "%s\tMISMATCH UID %u %u", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (unsigned)uid, (unsigned)data->uid); continue; } /* if the published range is constrained to namespace, then only @@ -461,20 +467,20 @@ void orte_data_server(int status, orte_process_name_t* sender, * in the same namespace as the requestor */ if (OPAL_PMIX_RANGE_NAMESPACE == data->range) { if (jobid != data->owner.jobid) { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, - "%s\tMISMATCH JOBID %s %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jobid), - ORTE_JOBID_PRINT(data->owner.jobid))); + opal_output_verbose(10, orte_data_server_output, + "%s\tMISMATCH JOBID %s %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jobid), + ORTE_JOBID_PRINT(data->owner.jobid)); continue; } } /* see if we have this key */ OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) { - OPAL_OUTPUT_VERBOSE((10, orte_data_server_output, + opal_output_verbose(10, orte_data_server_output, "%s COMPARING %s %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - keys[i], iptr->key)); + keys[i], iptr->key); if (0 == strcmp(iptr->key, keys[i])) { /* found it - package it for return */ if (!ret_packed) { @@ -492,10 +498,10 @@ void orte_data_server(int status, orte_process_name_t* sender, opal_argv_free(keys); goto SEND_ERROR; } - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: adding %s to data from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, - ORTE_NAME_PRINT(&data->owner))); + opal_output_verbose(1, orte_data_server_output, + "%s data server: adding %s to data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, + ORTE_NAME_PRINT(&data->owner)); if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &iptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); opal_argv_free(keys); @@ -504,26 +510,26 @@ void orte_data_server(int status, orte_process_name_t* sender, } } if (data_added && OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) { - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, + opal_output_verbose(1, orte_data_server_output, "%s REMOVING DATA FROM %s AT INDEX %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&data->owner), data->index)); + ORTE_NAME_PRINT(&data->owner), data->index); opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL); OBJ_RELEASE(data); } } } if (!ret_packed) { - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server:lookup: data not found", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + opal_output_verbose(1, orte_data_server_output, + "%s data server:lookup: data not found", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* if we were told to wait for the data, then queue this up * for later processing */ if (wait) { - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server:lookup: pushing request to wait", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + opal_output_verbose(1, orte_data_server_output, + "%s data server:lookup: pushing request to wait", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); OBJ_RELEASE(answer); req = OBJ_NEW(orte_data_req_t); req->room_number = room_number; @@ -541,9 +547,9 @@ void orte_data_server(int status, orte_process_name_t* sender, } opal_argv_free(keys); - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server:lookup: data found", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + opal_output_verbose(1, orte_data_server_output, + "%s data server:lookup: data found", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); goto SEND_ANSWER; break; @@ -555,10 +561,10 @@ void orte_data_server(int status, orte_process_name_t* sender, goto SEND_ERROR; } - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: unpublish data from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&requestor))); + opal_output_verbose(1, orte_data_server_output, + "%s data server: unpublish data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&requestor)); /* unpack the range - this sets some constraints on the range of data to be considered */ count = 1; @@ -663,10 +669,10 @@ void orte_data_server(int status, orte_process_name_t* sender, goto SEND_ERROR; } - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: purge data from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&requestor))); + opal_output_verbose(1, orte_data_server_output, + "%s data server: purge data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&requestor)); /* cycle across the stored data, looking for a match */ for (k=0; k < orte_data_server_store.size; k++) { @@ -699,11 +705,11 @@ void orte_data_server(int status, orte_process_name_t* sender, break; } - SEND_ERROR: - OPAL_OUTPUT_VERBOSE((1, orte_data_server_output, - "%s data server: sending error %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_ERROR_NAME(rc))); + SEND_ERROR: + opal_output_verbose(1, orte_data_server_output, + "%s data server: sending error %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_ERROR_NAME(rc)); /* pack the error code */ if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) { ORTE_ERROR_LOG(ret);