diff --git a/opal/mca/pmix/native/pmix_native.c b/opal/mca/pmix/native/pmix_native.c index 4fa06d7ed98..4ad1861ceca 100644 --- a/opal/mca/pmix/native/pmix_native.c +++ b/opal/mca/pmix/native/pmix_native.c @@ -102,8 +102,7 @@ const opal_pmix_base_module_t opal_pmix_native_module = { // local variables static int init_cntr = 0; -opal_process_name_t native_pname = {0}; - +static opal_process_name_t native_pname = {0}; /* callback for wait completion */ static void wait_cbfunc(opal_buffer_t *buf, void *cbdata) @@ -213,7 +212,7 @@ static int native_fini(void) if (1 != init_cntr) { --init_cntr; - return OPAL_SUCCESS; + return OPAL_SUCCESS; } init_cntr = 0; @@ -275,12 +274,20 @@ static bool native_initialized(void) return false; } +static void timeout(int sd, short args, void *cbdata) +{ + pmix_cb_t *cb = (pmix_cb_t*)cbdata; + cb->active = false; +} + static int native_abort(int flag, const char msg[]) { opal_buffer_t *bfr; pmix_cmd_t cmd = PMIX_ABORT_CMD; int rc; pmix_cb_t *cb; + opal_event_t ev; + struct timeval tv = {1, 0}; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native abort called", @@ -291,39 +298,48 @@ static int native_abort(int flag, const char msg[]) return OPAL_SUCCESS; } - /* create a buffer to hold the message */ - bfr = OBJ_NEW(opal_buffer_t); - /* pack the cmd */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(bfr); - return rc; - } - /* pack the status flag */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(bfr); - return rc; - } - /* pack the string message - a NULL is okay */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(bfr); - return rc; - } + if (PMIX_USOCK_CONNECTED == mca_pmix_native_component.state) { + /* create a buffer to hold the message */ + bfr = OBJ_NEW(opal_buffer_t); + /* pack the cmd */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(bfr); + return rc; + } + /* pack the status flag */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(bfr); + return rc; + } + /* pack the string message - a NULL is okay */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(bfr); + return rc; + } - /* create a callback object as we need to pass it to the - * recv routine so we know which callback to use when - * the return message is recvd */ - cb = OBJ_NEW(pmix_cb_t); - cb->active = true; + /* create a callback object as we need to pass it to the + * recv routine so we know which callback to use when + * the return message is recvd */ + cb = OBJ_NEW(pmix_cb_t); + cb->active = true; - /* push the message into our event base to send to the server */ - PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb); + /* push a timeout event to wake us up just in case this + * message cannot get thru - e.g., someone else may have + * detected the failure of the server and ordered an abort */ + opal_event_evtimer_set(mca_pmix_native_component.evbase, + &ev, timeout, cb); + opal_event_evtimer_add(&ev, &tv); - /* wait for the release */ - PMIX_WAIT_FOR_COMPLETION(cb->active); - OBJ_RELEASE(cb); + /* push the message into our event base to send to the server */ + PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb); + + /* wait for the release */ + PMIX_WAIT_FOR_COMPLETION(cb->active); + OBJ_RELEASE(cb); + } return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/native/pmix_native.h b/opal/mca/pmix/native/pmix_native.h index 0aad4ec27ee..1e0127303bd 100644 --- a/opal/mca/pmix/native/pmix_native.h +++ b/opal/mca/pmix/native/pmix_native.h @@ -41,6 +41,13 @@ typedef enum { PMIX_USOCK_ACCEPTING } pmix_usock_state_t; +/* define a macro for abnormal termination */ +#define PMIX_NATIVE_ABNORMAL_TERM \ + do { \ + mca_pmix_native_component.state = PMIX_USOCK_FAILED; \ + opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \ + } while(0); + /* define a command type for communicating to the * pmix server */ typedef uint8_t pmix_cmd_t; @@ -202,12 +209,13 @@ OPAL_MODULE_DECLSPEC int usock_send_connect_ack(void); opal_event_active(&ms->ev, OPAL_EV_WRITE, 1); \ } while(0); -#define CLOSE_THE_SOCKET(socket) \ - do { \ - shutdown(socket, 2); \ - close(socket); \ - /* notify the error handler */ \ - opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \ +#define CLOSE_THE_SOCKET(socket) \ + do { \ + if (0 <= socket) { \ + shutdown(socket, 2); \ + close(socket); \ + socket = -1; \ + } \ } while(0) diff --git a/opal/mca/pmix/native/usock.c b/opal/mca/pmix/native/usock.c index 9f6e1353a65..6e2273cc468 100644 --- a/opal/mca/pmix/native/usock.c +++ b/opal/mca/pmix/native/usock.c @@ -35,6 +35,12 @@ #ifdef HAVE_SYS_TYPES_H #include #endif +#ifdef HAVE_SYS_STAT_H +#include +#endif +#ifdef HAVE_FCNTL_H +#include +#endif #include "opal_stdint.h" #include "opal/opal_socket_errno.h" @@ -191,6 +197,7 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata) /* we get here if no matching recv was found - this is an error */ opal_output(0, "%s UNEXPECTED MESSAGE", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream OBJ_RELEASE(msg); } @@ -222,9 +229,10 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) "usock_peer_try_connect: attempting to connect to server on socket %d", mca_pmix_native_component.sd); /* try to connect */ - if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) { + if (connect(mca_pmix_native_component.sd, (struct sockaddr*)&mca_pmix_native_component.address, addrlen) < 0) { if (opal_socket_errno == ETIMEDOUT) { - /* The server may be too busy to accept new connections */ + /* The server may be too busy to accept new connections, + * so cycle around and let it try again */ opal_output_verbose(2, opal_pmix_base_framework.framework_output, "timeout connecting to server"); CLOSE_THE_SOCKET(mca_pmix_native_component.sd); @@ -235,7 +243,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) abort a connection that was ECONNREFUSED on the last attempt, without even trying to establish the connection. Handle that case in a semi-rational - way by trying twice before giving up */ + way by trying again before giving up */ if (ECONNABORTED == opal_socket_errno) { opal_output_verbose(2, opal_pmix_base_framework.framework_output, "connection to server aborted by OS - retrying"); @@ -255,6 +263,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) if (0 <= mca_pmix_native_component.sd) { CLOSE_THE_SOCKET(mca_pmix_native_component.sd); } + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } @@ -282,7 +291,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI); mca_pmix_native_component.send_ev_active = false; - /* setup the socket as non-blocking */ + /* setup the socket as non-blocking */ if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) { opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", strerror(opal_socket_errno), @@ -310,8 +319,8 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) "usock_send_connect_ack to server failed: %s (%d)", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), opal_strerror(rc), rc); - mca_pmix_native_component.state = PMIX_USOCK_FAILED; CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } } diff --git a/opal/mca/pmix/native/usock_sendrecv.c b/opal/mca/pmix/native/usock_sendrecv.c index b012d36be2e..8b2d21776bd 100644 --- a/opal/mca/pmix/native/usock_sendrecv.c +++ b/opal/mca/pmix/native/usock_sendrecv.c @@ -176,6 +176,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata) mca_pmix_native_component.send_ev_active = false; OBJ_RELEASE(msg); mca_pmix_native_component.send_msg = NULL; + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } } @@ -208,6 +210,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata) mca_pmix_native_component.send_ev_active = false; OBJ_RELEASE(msg); mca_pmix_native_component.send_msg = NULL; + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } } @@ -239,6 +243,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata) opal_event_del(&mca_pmix_native_component.send_event); mca_pmix_native_component.send_ev_active = false; } + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream break; } } @@ -356,6 +362,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata) OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); opal_event_del(&mca_pmix_native_component.recv_event); mca_pmix_native_component.recv_ev_active = false; + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } break; @@ -372,6 +380,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata) if (NULL == mca_pmix_native_component.recv_msg) { opal_output(0, "%s usock_recv_handler: unable to allocate recv message\n", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } /* start by reading the header */ @@ -416,6 +426,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata) "%s usock:recv:handler error reading bytes - closing connection", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } } @@ -447,6 +458,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata) opal_event_del(&mca_pmix_native_component.recv_event); mca_pmix_native_component.recv_ev_active = false; CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } } @@ -456,6 +468,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata) OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), mca_pmix_native_component.state); CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream break; } } @@ -689,6 +702,7 @@ static void usock_complete_connect(void) opal_socket_errno); mca_pmix_native_component.state = PMIX_USOCK_FAILED; CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; return; } @@ -703,8 +717,8 @@ static void usock_complete_connect(void) OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), strerror(so_error), so_error); - mca_pmix_native_component.state = PMIX_USOCK_FAILED; CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } else if (so_error != 0) { /* No need to worry about the return code here - we return regardless @@ -715,8 +729,8 @@ static void usock_complete_connect(void) "connection to server failed with error %d", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), so_error); - mca_pmix_native_component.state = PMIX_USOCK_FAILED; CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream return; } @@ -737,8 +751,8 @@ static void usock_complete_connect(void) } else { opal_output(0, "%s usock_complete_connect: unable to send connect ack to server", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - mca_pmix_native_component.state = PMIX_USOCK_FAILED; CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream } }