Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions orte/orted/pmix/pmix_server_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
pset = false;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
range = (opal_pmix_data_range_t)iptr->data.integer;
range = (opal_pmix_data_range_t)iptr->data.uint;
if (pset) {
break;
}
Expand All @@ -136,7 +136,7 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
}

/* pack the range */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_INT))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_PMIX_DATA_RANGE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
Expand Down Expand Up @@ -211,10 +211,17 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
return rc;
}

/* pack the requesting process jobid */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &proc->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}

/* no help for it - need to search for range */
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
range = (opal_pmix_data_range_t)iptr->data.integer;
range = (opal_pmix_data_range_t)iptr->data.uint;
break;
}
}
Expand Down
40 changes: 30 additions & 10 deletions orte/runtime/orte_data_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ typedef struct {
static void construct(orte_data_object_t *ptr)
{
ptr->index = -1;
ptr->uid = UINT32_MAX;
ptr->range = OPAL_PMIX_RANGE_UNDEF;
ptr->persistence = OPAL_PMIX_PERSIST_SESSION;
OBJ_CONSTRUCT(&ptr->values, opal_list_t);
}

Expand Down Expand Up @@ -172,9 +175,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
char **keys = NULL, *str;
bool ret_packed = false, wait = false, data_added;
int room_number;
uint32_t uid;
uint32_t uid = UINT32_MAX;
opal_pmix_data_range_t range;
orte_data_req_t *req, *rqnext;
orte_jobid_t jobid;

OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server got message from %s",
Expand Down Expand Up @@ -206,7 +210,7 @@ void orte_data_server(int status, orte_process_name_t* sender,
switch(command) {
case ORTE_PMIX_PUBLISH_CMD:
data = OBJ_NEW(orte_data_object_t);
/* unpack the requestor */
/* unpack the publisher */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->owner, &count, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
Expand All @@ -221,7 +225,7 @@ void orte_data_server(int status, orte_process_name_t* sender,

/* unpack the range */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->range, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->range, &count, OPAL_PMIX_DATA_RANGE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(data);
goto SEND_ERROR;
Expand Down Expand Up @@ -261,8 +265,13 @@ void orte_data_server(int status, orte_process_name_t* sender,
if (req->uid != data->uid) {
continue;
}
if (req->range != data->range) {
continue;
/* if the published range is constrained to namespace, then only
* consider this data if the publisher is
* in the same namespace as the requestor */
if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
if (jobid != data->owner.jobid) {
continue;
}
}
for (i=0; NULL != req->keys[i]; i++) {
/* cycle thru the data keys for matches */
Expand Down Expand Up @@ -344,9 +353,16 @@ void orte_data_server(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));

/* unpack the requestor's jobid */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}

/* unpack the range - this sets some constraints on the range of data to be considered */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_PMIX_DATA_RANGE))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
Expand Down Expand Up @@ -409,13 +425,17 @@ void orte_data_server(int status, orte_process_name_t* sender,
if (NULL == data) {
continue;
}
/* can only access data posted by the same user id */
/* for security reasons, can only access data posted by the same user id */
if (uid != data->uid) {
continue;
}
/* if the range doesn't match, then we cannot consider it */
if (range != data->range) {
continue;
/* if the published range is constrained to namespace, then only
* consider this data if the publisher is
* in the same namespace as the requestor */
if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
if (jobid != data->owner.jobid) {
continue;
}
}
/* see if we have this key */
OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
Expand Down