diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 38ba266275..cc58ce4b0c 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -86,7 +86,7 @@ static int tcp_peer_create_socket(mca_oob_tcp_peer_t* peer) { int flags; - if (peer->sd > 0) { + if (peer->sd >= 0) { return ORTE_SUCCESS; } @@ -309,8 +309,8 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata) * we and the peer are trying to connect at the same time. If I * am the higher vpid, then retry the connection - otherwise, * step aside for now */ - if (ORTE_PROC_MY_NAME->vpid > peer->name.vpid) { - connected = false; + int cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &peer->name); + if (OPAL_VALUE1_GREATER == cmpval) { peer->state = MCA_OOB_TCP_CONNECTING; ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect); } else { @@ -395,6 +395,7 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer) { if (peer->sd >= 0) { + assert(!peer->send_ev_active && !peer->recv_ev_active); opal_event_set(mca_oob_tcp_module.ev_base, &peer->recv_event, peer->sd, @@ -544,7 +545,7 @@ static int tcp_peer_send_blocking(int sd, void* data, size_t size) * connected socket and verify the expected response. If so, move the * socket to a connected state. */ -static void retry(mca_oob_tcp_peer_t* peer, int sd) +static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal) { int cmpval; @@ -552,30 +553,49 @@ static void retry(mca_oob_tcp_peer_t* peer, int sd) "%s SIMUL CONNECTION WITH %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name)); - if (peer->recv_ev_active) { - opal_event_del(&peer->recv_event); - peer->recv_ev_active = false; - } - if (peer->send_ev_active) { - opal_event_del(&peer->send_event); - peer->send_ev_active = false; - } - if (0 < peer->sd) { - CLOSE_THE_SOCKET(peer->sd); - peer->sd = -1; - } - CLOSE_THE_SOCKET(sd); - if (NULL != peer->active_addr) { - peer->active_addr->retries = 0; - } cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, ORTE_PROC_MY_NAME); - if (OPAL_VALUE1_GREATER == cmpval) { - /* force the other end to retry the connection */ - peer->state = MCA_OOB_TCP_UNCONNECTED; + if (fatal) { + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (0 < peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + } + CLOSE_THE_SOCKET(peer->sd); + if (OPAL_VALUE1_GREATER == cmpval) { + /* force the other end to retry the connection */ + peer->state = MCA_OOB_TCP_UNCONNECTED; + } else { + /* retry the connection */ + peer->state = MCA_OOB_TCP_CONNECTING; + ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect); + } + return true; } else { - /* retry the connection */ - peer->state = MCA_OOB_TCP_CONNECTING; - ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect); + if (OPAL_VALUE1_GREATER == cmpval) { + /* The other end will retry the connection */ + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + CLOSE_THE_SOCKET(peer->sd); + peer->state = MCA_OOB_TCP_UNCONNECTED; + return false; + } else { + /* The connection will be retried */ + CLOSE_THE_SOCKET(sd); + return true; + } } } @@ -626,8 +646,9 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, if (NULL != peer && (MCA_OOB_TCP_CONNECTED == peer->state || MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state)) { - retry(peer, sd); + MCA_OOB_TCP_CONNECT_ACK == peer->state || + MCA_OOB_TCP_CLOSED == peer->state)) { + retry(peer, sd, false); } return ORTE_ERR_UNREACH; } @@ -693,8 +714,9 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, if (MCA_OOB_TCP_CONNECTED == peer->state || MCA_OOB_TCP_CONNECTING == peer->state || MCA_OOB_TCP_CONNECT_ACK == peer->state) { - retry(peer, sd); - return ORTE_ERR_UNREACH; + if (retry(peer, sd, false)) { + return ORTE_ERR_UNREACH; + } } } } else { @@ -724,7 +746,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, return ORTE_ERR_OUT_OF_RESOURCE; } if (!tcp_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) { - /* unable to complete the recv */ + /* unable to complete the recv but should never happen */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -738,7 +760,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, if (MCA_OOB_TCP_CONNECTED == peer->state || MCA_OOB_TCP_CONNECTING == peer->state || MCA_OOB_TCP_CONNECT_ACK == peer->state) { - retry(peer, sd); + retry(peer, sd, true); } free(msg); return ORTE_ERR_UNREACH; @@ -1085,11 +1107,6 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer) opal_event_add(&peer->recv_event, 0); peer->recv_ev_active = true; } - /* if a message is waiting to be sent, ensure the send event is active */ - if (NULL != peer->send_msg && !peer->send_ev_active) { - opal_event_add(&peer->send_event, 0); - peer->send_ev_active = true; - } if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { mca_oob_tcp_peer_dump(peer, "accepted"); }