Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 57 additions & 17 deletions opal/mca/pmix/pmix120/pmix/src/server/pmix_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1056,30 +1056,55 @@ static bool match_error_registration(pmix_regevents_info_t *reginfoptr, pmix_not
size_t ninfo = reginfoptr->ninfo;
pmix_status_t error = cd->status;

if (NULL == info || ninfo <= 0) {
/* this is a general errhandler, and so it always matches.
* however, here we are looking for an exact match, and
* so we ignore general errhandlers unless the incoming
* one is also general */
if (NULL == cd->info || 0 == cd->ninfo) {
return true;
} else {
return false;
}
}

/* since this errhandler has info keys, it is not a general errhandler.
* If the incoming errhandler *is* a general one, then we must not
* match so we can store the general case */
if (NULL == cd->info || 0 == cd->ninfo) {
return false;
}

/* try to match using error name or error group keys - this indicates
* a request for a specific error state */
pmix_get_errorgroup(error, errgroup);
/* try to match using error name or error group keys */
for (i=0; i < ninfo; i++) {
// if we get a match on any key then we abort the search and return true.
if ((0 == strcmp(info[i].key, PMIX_ERROR_NAME)) &&
if ((0 == strncmp(info[i].key, PMIX_ERROR_NAME, PMIX_MAX_KEYLEN)) &&
(error == info[i].value.data.int32)) {
return true;
} else if ((0 == strcmp(info[i].key, errgroup)) &&
} else if ((0 == strncmp(info[i].key, errgroup, PMIX_MAX_KEYLEN)) &&
(true == info[i].value.data.flag)) {
return true;
}
}
/* search by node (error location) key if it is specified in the notify info list*/
for (i=0; i<cd->ninfo ; i++) {
if (0 == strcmp (cd->info[i].key, PMIX_ERROR_NODE_NAME)) {
for (j=0; j<ninfo; j++) {
if ((0 == strcmp (info[j].key, PMIX_ERROR_NODE_NAME)) &&
(0 == strcmp (info[j].value.data.string, cd->info[i].value.data.string))) {

/* if we get here, then they haven't asked for a specific error state.
* It is possible, however, that they are asking for all errors from a
* specific node, so search by node (error location) key if it is
* specified in the notify info list */
for (i=0; i < cd->ninfo ; i++) {
if (0 == strncmp(cd->info[i].key, PMIX_ERROR_NODE_NAME, PMIX_MAX_KEYLEN)) {
for (j=0; j < ninfo; j++) {
if ((0 == strncmp(info[j].key, PMIX_ERROR_NODE_NAME, PMIX_MAX_KEYLEN)) &&
(0 == strcmp(info[j].value.data.string, cd->info[i].value.data.string))) {
return true;
}
}
}
}
/* end of search return false*/

/* end of search and nothing matched, so return false */
return false;
}

Expand All @@ -1093,9 +1118,11 @@ static void _notify_error(int sd, short args, void *cbdata)
pmix_peer_t *peer;
pmix_regevents_info_t *reginfoptr;
bool notify, notifyall;

pmix_output_verbose(0, pmix_globals.debug_output,
"pmix_server: _notify_error notifying client of error %d",
cd->status);

/* pack the command */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(cd->buf, &cmd, 1, PMIX_CMD))) {
PMIX_ERROR_LOG(rc);
Expand Down Expand Up @@ -1157,6 +1184,8 @@ static void _notify_error(int sd, short args, void *cbdata)
}
}
if (!notify) {
/* if we are not notifying everyone, and this proc isn't to
* be notified, then just continue the main loop */
continue;
}
}
Expand All @@ -1173,8 +1202,9 @@ static void _notify_error(int sd, short args, void *cbdata)
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server _notify_error - match error registration returned notify =%d ", notify);
}
if (notify)
if (notify) {
break;
}
}
if (notify) {
pmix_output_verbose(2, pmix_globals.debug_output,
Expand Down Expand Up @@ -1212,6 +1242,7 @@ pmix_status_t pmix_server_notify_error(pmix_status_t status,
cd->ninfo = ninfo;
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;

pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_notify_error status =%d, nprocs = %lu, ninfo =%lu",
status, nprocs, ninfo);
Expand All @@ -1227,18 +1258,19 @@ static void reg_errhandler(int sd, short args, void *cbdata)
int index = 0;
pmix_status_t rc;
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;

/* check if this handler is already registered if so return error */
if (PMIX_SUCCESS == pmix_lookup_errhandler (cd->err, &index)) {
if (PMIX_SUCCESS == pmix_lookup_errhandler(cd->err, &index)) {
/* complete request with error status and return its original reference */
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_register_errhandler error - hdlr already registered index = %d",
index);
cd->cbfunc.errregcbfn (PMIX_EXISTS, index, cd->cbdata);
cd->cbfunc.errregcbfn(PMIX_EXISTS, index, cd->cbdata);
} else {
rc = pmix_add_errhandler (cd->err, cd->info, cd->ninfo, &index);
rc = pmix_add_errhandler(cd->err, cd->info, cd->ninfo, &index);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_register_errhandler - success index =%d", index);
cd->cbfunc.errregcbfn (rc, index, cd->cbdata);
cd->cbfunc.errregcbfn(rc, index, cd->cbdata);
}
cd->active = false;
PMIX_RELEASE(cd);
Expand All @@ -1250,24 +1282,30 @@ void pmix_server_register_errhandler(pmix_info_t info[], size_t ninfo,
void *cbdata)
{
pmix_shift_caddy_t *cd;

/* need to thread shift this request */
cd = PMIX_NEW(pmix_shift_caddy_t);
cd->info = info;
cd->ninfo = ninfo;
cd->err = errhandler;
cd->cbfunc.errregcbfn = cbfunc;
cd->cbdata = cbdata;

pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_register_errhandler shifting to server thread");

PMIX_THREADSHIFT(cd, reg_errhandler);
}

static void dereg_errhandler(int sd, short args, void *cbdata)
{
pmix_status_t rc;
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
rc = pmix_remove_errhandler (cd->ref);
cd->cbfunc.opcbfn(rc, cd->cbdata);

rc = pmix_remove_errhandler(cd->ref);
if (NULL != cd->cbfunc.opcbfn) {
cd->cbfunc.opcbfn(rc, cd->cbdata);
}
cd->active = false;
}

Expand All @@ -1276,12 +1314,14 @@ void pmix_server_deregister_errhandler(int errhandler_ref,
void *cbdata)
{
pmix_shift_caddy_t *cd;

/* need to thread shift this request */
cd = PMIX_NEW(pmix_shift_caddy_t);
cd->cbfunc.opcbfn = cbfunc;
cd->cbdata = cbdata;
cd->ref = errhandler_ref;
PMIX_THREADSHIFT(cd, dereg_errhandler);

PMIX_WAIT_FOR_COMPLETION(cd->active);
PMIX_RELEASE(cd);
}
Expand Down
54 changes: 33 additions & 21 deletions opal/mca/pmix/pmix120/pmix/src/util/error.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ void pmix_errhandler_invoke(pmix_status_t status,
PMIX_INFO_CREATE(iptr, ninfo+1);
(void)strncpy(iptr[0].key, PMIX_ERROR_HANDLER_ID, PMIX_MAX_KEYLEN);
iptr[0].value.type = PMIX_INT;
for (j=0; j < ninfo; j++) {
PMIX_INFO_LOAD(&iptr[j+1], info[j].key, &info[j].value.data, info[j].value.type);
if (NULL != info) {
for (j=0; j < ninfo; j++) {
PMIX_INFO_LOAD(&iptr[j+1], info[j].key, &info[j].value.data, info[j].value.type);
}
}

for (i = 0; i < pmix_globals.errregs.size; i++) {
Expand Down Expand Up @@ -221,9 +223,10 @@ pmix_status_t pmix_lookup_errhandler(pmix_notification_fn_t err,
int i;
pmix_status_t rc = PMIX_ERR_NOT_FOUND;
pmix_error_reg_info_t *errreg = NULL;

for (i = 0; i < pmix_pointer_array_get_size(&pmix_globals.errregs) ; i++) {
errreg = (pmix_error_reg_info_t*) pmix_pointer_array_get_item (&pmix_globals.errregs, i);
if((NULL != errreg) && (err == errreg->errhandler)) {
errreg = (pmix_error_reg_info_t*)pmix_pointer_array_get_item(&pmix_globals.errregs, i);
if ((NULL != errreg) && (err == errreg->errhandler)) {
*index = i;
rc = PMIX_SUCCESS;
break;
Expand All @@ -238,36 +241,45 @@ pmix_status_t pmix_add_errhandler(pmix_notification_fn_t err,
{
int i;
pmix_status_t rc = PMIX_SUCCESS;
pmix_error_reg_info_t *errreg = PMIX_NEW(pmix_error_reg_info_t);
pmix_error_reg_info_t *errreg;

errreg = PMIX_NEW(pmix_error_reg_info_t);
errreg->errhandler = err;
errreg->ninfo = ninfo;
PMIX_INFO_CREATE(errreg->info, ninfo);
for (i=0; i < ninfo; i++) {
memcpy(errreg->info[i].key, info[i].key, PMIX_MAX_KEYLEN);
pmix_value_xfer(&errreg->info[i].value, &info[i].value);
if (NULL != info && 0 < ninfo) {
PMIX_INFO_CREATE(errreg->info, ninfo);
for (i=0; i < ninfo; i++) {
(void)strncpy(errreg->info[i].key, info[i].key, PMIX_MAX_KEYLEN);
pmix_value_xfer(&errreg->info[i].value, &info[i].value);
}
}
*index = pmix_pointer_array_add (&pmix_globals.errregs, errreg);
*index = pmix_pointer_array_add(&pmix_globals.errregs, errreg);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_add_errhandler index =%d", *index);
if (-1 == *index)
if (*index < 0) {
PMIX_RELEASE(errreg);
rc = PMIX_ERROR;
}
return rc;
}

pmix_status_t pmix_remove_errhandler(int errhandler_ref)
{
int rc = PMIX_SUCCESS;
pmix_error_reg_info_t *errreg;
errreg = (pmix_error_reg_info_t*) pmix_pointer_array_get_item (&pmix_globals.errregs,
errhandler_ref);
if (NULL != errreg)

errreg = (pmix_error_reg_info_t*)pmix_pointer_array_get_item(&pmix_globals.errregs,
errhandler_ref);
if (NULL != errreg) {
PMIX_RELEASE(errreg);
else
pmix_pointer_array_set_item(&pmix_globals.errregs, errhandler_ref, NULL);
} else {
rc = PMIX_ERR_NOT_FOUND;
}
return rc;
}

void pmix_get_errorgroup ( pmix_status_t status, char *pmix_error_group)
void pmix_get_errorgroup(pmix_status_t status, char *pmix_error_group)
{
switch(status) {
case PMIX_ERR_UNREACH:
Expand All @@ -276,24 +288,24 @@ void pmix_get_errorgroup ( pmix_status_t status, char *pmix_error_group)
case PMIX_ERR_TIMEOUT:
case PMIX_ERR_PACK_FAILURE:
case PMIX_ERR_UNPACK_FAILURE:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_COMM);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_COMM, PMIX_MAX_KEYLEN);
break;
case PMIX_ERR_OUT_OF_RESOURCE:
case PMIX_ERR_RESOURCE_BUSY:
case PMIX_ERR_NOMEM:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_RESOURCE);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_RESOURCE, PMIX_MAX_KEYLEN);
break;
case PMIX_ERR_PROC_MIGRATE:
case PMIX_ERR_PROC_CHECKPOINT:
case PMIX_ERR_PROC_RESTART:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_MIGRATE);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_MIGRATE, PMIX_MAX_KEYLEN);
break;
case PMIX_ERR_PROC_ABORTING:
case PMIX_ERR_PROC_REQUESTED_ABORT:
case PMIX_ERR_PROC_ABORTED:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_ABORT);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_ABORT, PMIX_MAX_KEYLEN);
break;
default:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_GENERAL);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_GENERAL, PMIX_MAX_KEYLEN);
}
}
46 changes: 28 additions & 18 deletions orte/mca/oob/tcp/oob_tcp_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,27 +970,26 @@ void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));

MCA_OOB_TCP_CHECK_SHUTDOWN(pop);

/* Mark that we no longer support this peer */
memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) || NULL == bpr) {
bpr = OBJ_NEW(orte_oob_base_peer_t);
}
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}

/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
if (!orte_finalizing) {
/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
}
}

OBJ_RELEASE(pop);
}

Expand All @@ -1006,8 +1005,6 @@ void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop));

MCA_OOB_TCP_CHECK_SHUTDOWN(mop);

/* mark that we cannot reach this hop */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
Expand All @@ -1020,11 +1017,16 @@ void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
ORTE_ERROR_LOG(rc);
}

/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
/* report the error back to the OOB and let it try other components
* or declare a problem
*/
if (!orte_finalizing && !orte_abnormal_term_ordered) {
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
}
}

OBJ_RELEASE(mop);
Expand All @@ -1042,7 +1044,11 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop));

MCA_OOB_TCP_CHECK_SHUTDOWN(mop);
if (orte_finalizing || orte_abnormal_term_ordered) {
/* just ignore the problem */
OBJ_RELEASE(mop);
return;
}

/* mark that this component cannot reach this hop */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
Expand Down Expand Up @@ -1110,7 +1116,11 @@ void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));

MCA_OOB_TCP_CHECK_SHUTDOWN(pop);
/* if we are terminating, then don't attempt to reconnect */
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
OBJ_RELEASE(pop);
return;
}

/* activate the proc state */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
Expand Down
Loading