Skip to content

Commit

Permalink
Merge pull request #222 from rhc54/topic/nid
Browse files Browse the repository at this point in the history
Cleanup the hetero-topology case
  • Loading branch information
rhc54 committed Mar 26, 2019
2 parents 315681d + 598c0a5 commit 716be58
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 59 deletions.
2 changes: 1 addition & 1 deletion orte/mca/plm/base/plm_base_launch_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
t = OBJ_NEW(orte_topology_t);
t->sig = sig;
opal_pointer_array_add(orte_node_topologies, t);
t->index = opal_pointer_array_add(orte_node_topologies, t);
daemon->node->topology = t;
if (NULL != topo) {
t->topo = topo;
Expand Down
3 changes: 2 additions & 1 deletion orte/runtime/orte_globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Copyright (c) 2007-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2017-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
Expand Down Expand Up @@ -220,6 +220,7 @@ struct orte_job_map_t;
/* define an object for storing node topologies */
typedef struct {
opal_object_t super;
int index;
hwloc_topology_t topo;
char *sig;
} orte_topology_t;
Expand Down
205 changes: 148 additions & 57 deletions orte/util/nidmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
}
} else {
vp8 = (uint8_t*)boptr->bytes;
sz = boptr->size;
boptr->bytes = NULL;
boptr->size = 0;
}
Expand Down Expand Up @@ -444,27 +445,17 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
return rc;
}

typedef struct {
opal_list_item_t super;
orte_topology_t *t;
} orte_tptr_trk_t;
static OBJ_CLASS_INSTANCE(orte_tptr_trk_t,
opal_list_item_t,
NULL, NULL);

int orte_util_pass_node_info(opal_buffer_t *buffer)
{
uint16_t *slots=NULL, slot = UINT16_MAX;
uint8_t *flags=NULL, flag = UINT8_MAX, *topologies = NULL;
uint8_t *flags=NULL, flag = UINT8_MAX;
int8_t i8, ntopos;
int rc, n, nbitmap, nstart;
bool compressed, unislots = true, uniflags = true, unitopos = true;
orte_node_t *nptr;
opal_byte_object_t bo, *boptr;
size_t sz, nslots;
opal_buffer_t bucket;
orte_tptr_trk_t *trk;
opal_list_t topos;
orte_topology_t *t;

/* make room for the number of slots on each node */
Expand Down Expand Up @@ -493,15 +484,18 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
} else {
nstart = 0;
}
OBJ_CONSTRUCT(&topos, opal_list_t);
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
ntopos = 0;
for (n=nstart; n < orte_node_topologies->size; n++) {
if (NULL == (t = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, n))) {
continue;
}
trk = OBJ_NEW(orte_tptr_trk_t);
trk->t = t;
opal_list_append(&topos, &trk->super);
/* pack the index */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->index, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
/* pack this topology string */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->sig, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
Expand All @@ -514,36 +508,65 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
++ntopos;
}
/* pack the number of topologies in allocation */
ntopos = opal_list_get_size(&topos);
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &ntopos, 1, OPAL_INT8))) {
goto cleanup;
}
if (1 < ntopos) {
/* need to send them along */
opal_dss.copy_payload(buffer, &bucket);
/* allocate space to report them */
ntopos = orte_node_pool->size;
topologies = (uint8_t*)malloc(ntopos);
if (opal_compress.compress_block((uint8_t*)bucket.base_ptr, bucket.bytes_used,
&bo.bytes, &sz)) {
/* the data was compressed - mark that we compressed it */
compressed = true;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &bucket.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
bo.size = sz;
} else {
/* mark that it was not compressed */
compressed = false;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
}
unitopos = false;
/* pack the info */
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
OBJ_DESTRUCT(&bucket);
free(bo.bytes);
}
OBJ_DESTRUCT(&bucket);

/* construct the per-node info */
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
for (n=0; n < orte_node_pool->size; n++) {
if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
continue;
}
/* store the topology, if required */
/* track the topology, if required */
if (!unitopos) {
topologies[n] = 0;
if (0 == nstart || 0 < n) {
OPAL_LIST_FOREACH(trk, &topos, orte_tptr_trk_t) {
if (trk->t == nptr->topology) {
break;
}
topologies[n]++;
}
i8 = nptr->topology->index;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &i8, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
}
/* store the number of slots */
Expand Down Expand Up @@ -572,21 +595,19 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)

/* deal with the topology assignments */
if (!unitopos) {
if (opal_compress.compress_block((uint8_t*)topologies, ntopos,
if (opal_compress.compress_block((uint8_t*)bucket.base_ptr, bucket.bytes_used,
(uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */
i8 = 1;
compressed = true;
bo.size = sz;
} else {
/* mark that this was not compressed */
i8 = 0;
compressed = false;
bo.bytes = topologies;
bo.size = nbitmap;
bo.bytes = bucket.base_ptr;
bo.size = bucket.bytes_used;
}
/* indicate compression */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
if (compressed) {
free(bo.bytes);
}
Expand All @@ -607,6 +628,7 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
free(bo.bytes);
}
}
OBJ_DESTRUCT(&bucket);

/* if we have uniform #slots, then just flag it - no
* need to pass anything */
Expand Down Expand Up @@ -713,16 +735,19 @@ int orte_util_pass_node_info(opal_buffer_t *buffer)
int orte_util_parse_node_info(opal_buffer_t *buf)
{
int8_t i8;
int rc = ORTE_SUCCESS, cnt, n, m;
bool compressed;
int rc = ORTE_SUCCESS, cnt, n, m, index;
orte_node_t *nptr;
size_t sz;
opal_byte_object_t *boptr;
uint16_t *slots = NULL;
uint8_t *flags = NULL;
uint8_t *topologies = NULL;
orte_topology_t *t2, **tps = NULL;
uint8_t *bytes = NULL;
orte_topology_t *t2;
hwloc_topology_t topo;
char *sig;
opal_buffer_t bucket;

/* check to see if we have uniform topologies */
cnt = 1;
Expand All @@ -733,43 +758,104 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
/* we already defaulted to uniform topology, so only need to
* process this if it is non-uniform */
if (1 < i8) {
/* create an array to cache these */
tps = (orte_topology_t**)malloc(sizeof(orte_topology_t*));
/* unpack the compression flag */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &compressed, &cnt, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (compressed) {
/* get the uncompressed size */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* unpack the topology object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}

/* if compressed, decompress */
if (compressed) {
if (!opal_compress.decompress_block((uint8_t**)&bytes, sz,
boptr->bytes, boptr->size)) {
ORTE_ERROR_LOG(ORTE_ERROR);
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
rc = ORTE_ERROR;
goto cleanup;
}
} else {
bytes = (uint8_t*)boptr->bytes;
sz = boptr->size;
boptr->bytes = NULL;
boptr->size = 0;
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
/* setup to unpack */
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
opal_dss.load(&bucket, bytes, sz);

for (n=0; n < i8; n++) {
/* unpack the index */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &sig, &cnt, OPAL_STRING))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bucket, &index, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* unpack the signature */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &topo, &cnt, OPAL_HWLOC_TOPO))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bucket, &sig, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* new topology - record it */
/* unpack the topology */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bucket, &topo, &cnt, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* record it */
t2 = OBJ_NEW(orte_topology_t);
t2->index = index;
t2->sig = sig;
t2->topo = topo;
opal_pointer_array_add(orte_node_topologies, t2);
/* keep a cached copy */
tps[n] = t2;
opal_pointer_array_set_item(orte_node_topologies, index, t2);
}
OBJ_DESTRUCT(&bucket);

/* now get the array of assigned topologies */
/* if compressed, get the uncompressed size */
/* unpack the compression flag */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &compressed, &cnt, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (compressed) {
/* get the uncompressed size */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* unpack the topologies object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if compressed, decompress */
if (1 == i8) {
if (!opal_compress.decompress_block((uint8_t**)&topologies, sz,
if (compressed) {
if (!opal_compress.decompress_block((uint8_t**)&bytes, sz,
boptr->bytes, boptr->size)) {
ORTE_ERROR_LOG(ORTE_ERROR);
if (NULL != boptr->bytes) {
Expand All @@ -780,19 +866,27 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
goto cleanup;
}
} else {
topologies = (uint8_t*)boptr->bytes;
bytes = (uint8_t*)boptr->bytes;
sz = boptr->size;
boptr->bytes = NULL;
boptr->size = 0;
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
opal_dss.load(&bucket, bytes, sz);
/* cycle across the node pool and assign the values */
for (n=0, m=0; n < orte_node_pool->size; n++) {
for (n=0; n < orte_node_pool->size; n++) {
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
nptr->topology = tps[topologies[m]];
++m;
/* unpack the next topology index */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &i8, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
nptr->topology = opal_pointer_array_get_item(orte_node_topologies, index);
}
}
}
Expand Down Expand Up @@ -932,9 +1026,6 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
if (NULL != flags) {
free(flags);
}
if (NULL != tps) {
free(tps);
}
if (NULL != topologies) {
free(topologies);
}
Expand Down

0 comments on commit 716be58

Please sign in to comment.