Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions orte/mca/plm/base/plm_base_launch_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* et Automatique. All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 IBM Corporation. All rights reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -817,6 +817,10 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
int i;
uint32_t h;
orte_job_t *jdata;
uint8_t flag;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
opal_buffer_t datbuf, *data;

OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:daemon_topology recvd for daemon %s",
Expand All @@ -832,10 +836,55 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
orted_failed_launch = true;
goto CLEANUP;
}
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* unpack the flag to see if this payload is compressed */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
if (flag) {
/* unpack the data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* unpack the unpacked data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
idx = inlen;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}

/* unpack the topology signature for this node */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
Expand All @@ -861,7 +910,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,

/* unpack the topology */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
Expand All @@ -873,7 +922,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,

/* unpack any coprocessors */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
Expand All @@ -900,7 +949,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
}
/* see if this daemon is on a coprocessor */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
Expand Down Expand Up @@ -1088,8 +1137,57 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
/* rank=1 always sends its topology back */
topo = NULL;
if (1 == dname.vpid) {
uint8_t flag;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
opal_buffer_t datbuf, *data;
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* unpack the flag to see if this payload is compressed */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
if (flag) {
/* unpack the data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* unpack the unpacked data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
idx = inlen;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
Expand Down
64 changes: 55 additions & 9 deletions orte/orted/orted_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2010-2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Research Organization for Information Science
* Copyright (c) 2016-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -59,6 +59,7 @@
#include "orte/util/session_dir.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/util/compress.h"

#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h"
Expand Down Expand Up @@ -101,7 +102,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
int32_t signal;
orte_jobid_t job;
char *contact_info;
opal_buffer_t *answer;
opal_buffer_t data, *answer;
orte_job_t *jdata;
orte_process_name_t proc, proc2;
orte_process_name_t *return_addr;
Expand All @@ -124,6 +125,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
char *rtmod;
char *coprocessors;
orte_job_map_t *map;
int8_t flag;
uint8_t *cmpdata;
size_t cmplen;

/* unpack the command */
n = 1;
Expand Down Expand Up @@ -620,36 +624,78 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,

/**** REPORT TOPOLOGY COMMAND ****/
case ORTE_DAEMON_REPORT_TOPOLOGY_CMD:
answer = OBJ_NEW(opal_buffer_t);
OBJ_CONSTRUCT(&data, opal_buffer_t);
/* pack the topology signature */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &orte_topo_signature, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &orte_topo_signature, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
OBJ_DESTRUCT(&data);
goto CLEANUP;
}
/* pack the topology */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
OBJ_DESTRUCT(&data);
goto CLEANUP;
}

/* detect and add any coprocessors */
coprocessors = opal_hwloc_base_find_coprocessors(opal_hwloc_topology);
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
}
if (NULL != coprocessors) {
free(coprocessors);
}
/* see if I am on a coprocessor */
coprocessors = opal_hwloc_base_check_on_coprocessor();
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
}
if (NULL!= coprocessors) {
free(coprocessors);
}
answer = OBJ_NEW(opal_buffer_t);
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
&cmpdata, &cmplen)) {
/* the data was compressed - mark that we compressed it */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &cmplen, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &data.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed info */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, cmpdata, cmplen, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
OBJ_DESTRUCT(&data);
free(cmpdata);
} else {
/* mark that it was not compressed */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&data);
free(cmpdata);
}
/* transfer the payload across */
opal_dss.copy_payload(answer, &data);
OBJ_DESTRUCT(&data);
}
/* send the data */
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
sender, answer, ORTE_RML_TAG_TOPOLOGY_REPORT,
Expand Down
52 changes: 51 additions & 1 deletion orte/orted/orted_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
#include "orte/util/parse_options.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/util/pre_condition_transports.h"
#include "orte/util/compress.h"

#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
Expand Down Expand Up @@ -793,9 +794,58 @@ int orte_daemon(int argc, char *argv[])
/* if we are rank=1, then send our topology back - otherwise, mpirun
* will request it if necessary */
if (1 == ORTE_PROC_MY_NAME->vpid) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
opal_buffer_t data;
int8_t flag;
uint8_t *cmpdata;
size_t cmplen;

/* setup an intermediate buffer */
OBJ_CONSTRUCT(&data, opal_buffer_t);

if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(ret);
}
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
&cmpdata, &cmplen)) {
/* the data was compressed - mark that we compressed it */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed info */
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
OBJ_DESTRUCT(&data);
free(cmpdata);
} else {
/* mark that it was not compressed */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&data);
free(cmpdata);
}
/* transfer the payload across */
opal_dss.copy_payload(buffer, &data);
OBJ_DESTRUCT(&data);
}
}

/* send it to the designated target */
Expand Down