diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 8bedfef7d07..49890762f2f 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -14,7 +14,7 @@ * et Automatique. All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2014-2016 Research Organization for Information Science + * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. * $COPYRIGHT$ @@ -817,6 +817,10 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender, int i; uint32_t h; orte_job_t *jdata; + uint8_t flag; + size_t inlen, cmplen; + uint8_t *packed_data, *cmpdata; + opal_buffer_t datbuf, *data; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:daemon_topology recvd for daemon %s", @@ -832,10 +836,55 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender, orted_failed_launch = true; goto CLEANUP; } + OBJ_CONSTRUCT(&datbuf, opal_buffer_t); + /* unpack the flag to see if this payload is compressed */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + if (flag) { + /* unpack the data size */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* unpack the unpacked data size */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* allocate the space */ + packed_data = (uint8_t*)malloc(inlen); + /* unpack the data blob */ + idx = inlen; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* decompress the data */ + if (orte_util_uncompress_block(&cmpdata, cmplen, + packed_data, inlen)) { + /* the data has been uncompressed */ + opal_dss.load(&datbuf, cmpdata, cmplen); + data = &datbuf; + } else { + data = buffer; + } + free(packed_data); + } else { + data = buffer; + } /* unpack the topology signature for this node */ idx=1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; @@ -861,7 +910,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender, /* unpack the topology */ idx=1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; @@ -873,7 +922,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender, /* unpack any coprocessors */ idx=1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; @@ -900,7 +949,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender, } /* see if this daemon is on a coprocessor */ idx=1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; @@ -1088,8 +1137,57 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender, /* rank=1 always sends its topology back */ topo = NULL; if (1 == dname.vpid) { + uint8_t flag; + size_t inlen, cmplen; + uint8_t *packed_data, *cmpdata; + opal_buffer_t datbuf, *data; + OBJ_CONSTRUCT(&datbuf, opal_buffer_t); + /* unpack the flag to see if this payload is compressed */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + if (flag) { + /* unpack the data size */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* unpack the unpacked data size */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* allocate the space */ + packed_data = (uint8_t*)malloc(inlen); + /* unpack the data blob */ + idx = inlen; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* decompress the data */ + if (orte_util_uncompress_block(&cmpdata, cmplen, + packed_data, inlen)) { + /* the data has been uncompressed */ + opal_dss.load(&datbuf, cmpdata, cmplen); + data = &datbuf; + } else { + data = buffer; + } + free(packed_data); + } else { + data = buffer; + } idx=1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index a1c48b811d4..4b5b7932c0e 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -15,7 +15,7 @@ * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2010-2011 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2016 Research Organization for Information Science + * Copyright (c) 2016-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * @@ -59,6 +59,7 @@ #include "orte/util/session_dir.h" #include "orte/util/name_fns.h" #include "orte/util/nidmap.h" +#include "orte/util/compress.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/base/base.h" @@ -101,7 +102,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, int32_t signal; orte_jobid_t job; char *contact_info; - opal_buffer_t *answer; + opal_buffer_t data, *answer; orte_job_t *jdata; orte_process_name_t proc, proc2; orte_process_name_t *return_addr; @@ -124,6 +125,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, char *rtmod; char *coprocessors; orte_job_map_t *map; + int8_t flag; + uint8_t *cmpdata; + size_t cmplen; /* unpack the command */ n = 1; @@ -620,23 +624,23 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, /**** REPORT TOPOLOGY COMMAND ****/ case ORTE_DAEMON_REPORT_TOPOLOGY_CMD: - answer = OBJ_NEW(opal_buffer_t); + OBJ_CONSTRUCT(&data, opal_buffer_t); /* pack the topology signature */ - if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &orte_topo_signature, 1, OPAL_STRING))) { + if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &orte_topo_signature, 1, OPAL_STRING))) { ORTE_ERROR_LOG(ret); - OBJ_RELEASE(answer); + OBJ_DESTRUCT(&data); goto CLEANUP; } /* pack the topology */ - if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) { + if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(ret); - OBJ_RELEASE(answer); + OBJ_DESTRUCT(&data); goto CLEANUP; } /* detect and add any coprocessors */ coprocessors = opal_hwloc_base_find_coprocessors(opal_hwloc_topology); - if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) { + if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) { ORTE_ERROR_LOG(ret); } if (NULL != coprocessors) { @@ -644,12 +648,54 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, } /* see if I am on a coprocessor */ coprocessors = opal_hwloc_base_check_on_coprocessor(); - if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) { + if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) { ORTE_ERROR_LOG(ret); } if (NULL!= coprocessors) { free(coprocessors); } + answer = OBJ_NEW(opal_buffer_t); + if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used, + &cmpdata, &cmplen)) { + /* the data was compressed - mark that we compressed it */ + flag = 1; + if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the compressed length */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &cmplen, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the uncompressed length */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &data.bytes_used, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the compressed info */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, cmpdata, cmplen, OPAL_UINT8))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + OBJ_DESTRUCT(&data); + free(cmpdata); + } else { + /* mark that it was not compressed */ + flag = 0; + if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&data); + free(cmpdata); + } + /* transfer the payload across */ + opal_dss.copy_payload(answer, &data); + OBJ_DESTRUCT(&data); + } /* send the data */ if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit, sender, answer, ORTE_RML_TAG_TOPOLOGY_REPORT, diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 76b62f6d1ec..c21e0f54f66 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -76,6 +76,7 @@ #include "orte/util/parse_options.h" #include "orte/mca/rml/base/rml_contact.h" #include "orte/util/pre_condition_transports.h" +#include "orte/util/compress.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" @@ -793,9 +794,58 @@ int orte_daemon(int argc, char *argv[]) /* if we are rank=1, then send our topology back - otherwise, mpirun * will request it if necessary */ if (1 == ORTE_PROC_MY_NAME->vpid) { - if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) { + opal_buffer_t data; + int8_t flag; + uint8_t *cmpdata; + size_t cmplen; + + /* setup an intermediate buffer */ + OBJ_CONSTRUCT(&data, opal_buffer_t); + + if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(ret); } + if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used, + &cmpdata, &cmplen)) { + /* the data was compressed - mark that we compressed it */ + flag = 1; + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the compressed length */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the uncompressed length */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the compressed info */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + OBJ_DESTRUCT(&data); + free(cmpdata); + } else { + /* mark that it was not compressed */ + flag = 0; + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&data); + free(cmpdata); + } + /* transfer the payload across */ + opal_dss.copy_payload(buffer, &data); + OBJ_DESTRUCT(&data); + } } /* send it to the designated target */