From eef5404292beb03622a093a439ff4e2917d36692 Mon Sep 17 00:00:00 2001 From: Artem Polyakov Date: Sat, 26 Nov 2016 09:45:49 +0700 Subject: [PATCH 1/3] orte/oob/tcp: Fix message dropping in case of concurrent connection. The problem was observed for direct modex used with recursive doubling algorithm (used for collective ID calculation prior to d52a2d081e9598a9ac9a50fb4b013a6d2a72375b) that has pairwise nature and counter-connections are highly likely. The following scenario was uncovering the issue: * ranks `x` and `y` want to communicate with each other, `x` < `y`; * rank `x` initiates the connection and sends the ack; * rank `y` starts to `connect()` and gets the ack from `x`; * `y` identifies that it already started connecting and `y` > `x` so it rejects incoming connection. * `x` sees that his connection was rejected in `mca_oob_tcp_peer_recv_connect_ack()` when trying to read the message header using `tcp_peer_recv_blocking()` which calls `mca_oob_tcp_peer_close()` that effectively flushes all the messages in the peer->send_queue. * `y` send the ack to `x` and the connection is established, however all the messages for the peer at `x` are vanished (except the front one in peer->send_msg). This commit introduces a "nack" function that will be used at `y` side to tell `x` that `y` has the priority and `x`'s connection should be closed. This allows to avoid "guessing" on the unexpectedly closed connection. (cherry-picked from ada93e0c026df8f28b9078d47af25c74eb398656) Signed-off-by: Artem Polyakov --- orte/mca/oob/tcp/oob_tcp_connection.c | 177 ++++++++++++++++++-------- 1 file changed, 125 insertions(+), 52 deletions(-) diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index f7e6f43af66..e7efafcecdf 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -16,6 +16,7 @@ * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -78,6 +79,7 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer); static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer); +static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name); static int tcp_peer_send_blocking(int sd, void* data, size_t size); static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd, void* data, size_t size); @@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) { char *msg; mca_oob_tcp_hdr_t hdr; + uint16_t ack_flag = htons(1); int rc; - size_t sdsize; + size_t sdsize, offset = 0; char *cred; size_t credsize; @@ -399,21 +402,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)credsize); - /* set the number of bytes to be read beyond the header */ - hdr.nbytes = strlen(orte_version_string) + 1 + credsize; + /* payload size */ + sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize; + hdr.nbytes = sdsize; MCA_OOB_TCP_HDR_HTON(&hdr); /* create a space for our message */ - sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize; + sdsize += sizeof(hdr); if (NULL == (msg = (char*)malloc(sdsize))) { return ORTE_ERR_OUT_OF_RESOURCE; } memset(msg, 0, sdsize); /* load the message */ - memcpy(msg, &hdr, sizeof(hdr)); - memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string)); - memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize); + memcpy(msg + offset, &hdr, sizeof(hdr)); + offset += sizeof(hdr); + memcpy(msg + offset, &ack_flag, sizeof(ack_flag)); + offset += sizeof(ack_flag); + memcpy(msg + offset, orte_version_string, strlen(orte_version_string)); + offset += strlen(orte_version_string)+1; + memcpy(msg + offset, cred, credsize); /* clear the memory */ if (NULL != cred) { free(cred); @@ -431,6 +439,53 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) return ORTE_SUCCESS; } +/* Respond with refuse to the connection request */ +static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name) +{ + char *msg; + mca_oob_tcp_hdr_t hdr; + uint16_t ack_flag = htons(0); + int rc = ORTE_SUCCESS; + size_t sdsize, offset = 0; + + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* load the header */ + hdr.origin = *ORTE_PROC_MY_NAME; + hdr.dst = name; + hdr.type = MCA_OOB_TCP_IDENT; + hdr.tag = 0; + + /* payload size */ + sdsize = sizeof(ack_flag); + hdr.nbytes = sdsize; + MCA_OOB_TCP_HDR_HTON(&hdr); + + /* create a space for our message */ + sdsize += sizeof(hdr); + if (NULL == (msg = (char*)malloc(sdsize))) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + memset(msg, 0, sdsize); + + /* load the message */ + memcpy(msg + offset, &hdr, sizeof(hdr)); + offset += sizeof(hdr); + memcpy(msg + offset, &ack_flag, sizeof(ack_flag)); + offset += sizeof(ack_flag); + + /* send it */ + if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) { + /* it's ok if it fails - remote side may already + * identifiet the collision and closed the connection + */ + rc = ORTE_SUCCESS; + } + free(msg); + return rc; +} + /* * Initialize events to be used by the peer instance for TCP select/poll callbacks. */ @@ -634,6 +689,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal) return false; } else { /* The connection will be retried */ + tcp_peer_send_connect_nack(sd, peer->name); CLOSE_THE_SOCKET(sd); return true; } @@ -647,10 +703,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, char *version; int rc; char *cred; - size_t credsize; + size_t credsize, offset = 0; mca_oob_tcp_hdr_t hdr; mca_oob_tcp_peer_t *peer; uint64_t *ui64; + uint16_t ack_flag; + bool is_new = (NULL == pr); opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s RECV CONNECT ACK FROM %s ON SOCKET %d", @@ -679,19 +737,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd); - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (NULL != peer && - (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state || - MCA_OOB_TCP_CLOSED == peer->state)) { - retry(peer, sd, false); - } return ORTE_ERR_UNREACH; } @@ -746,23 +791,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, CLOSE_THE_SOCKET(sd); return ORTE_ERR_OUT_OF_RESOURCE; } - } else { - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state) { - if (retry(peer, sd, false)) { - return ORTE_ERR_UNREACH; - } - } } } else { - /* compare the peers name to the expected value */ if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) { opal_output(0, "%s tcp_peer_recv_connect_ack: " @@ -793,23 +823,66 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name), peer->sd); - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state) { - retry(peer, sd, true); - } free(msg); return ORTE_ERR_UNREACH; } + /* Check the type of acknowledgement */ + memcpy(&ack_flag, msg + offset, sizeof(ack_flag)); + offset += sizeof(ack_flag); + + ack_flag = ntohs(ack_flag); + if( !ack_flag ){ + if (MCA_OOB_TCP_CONNECT_ACK == peer->state) { + /* We got nack from the remote side which means that + * it will be the initiator of the connection. + */ + + /* release the socket */ + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + + /* unregister active events */ + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + + /* change the state so we'll accept the remote + * connection when it'll appear + */ + peer->state = MCA_OOB_TCP_UNCONNECTED; + } else { + /* FIXME: this shouldn't happen. We need to force next address + * to be tried. + */ + mca_oob_tcp_peer_close(peer); + } + return ORTE_ERR_UNREACH; + } + + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (is_new && + ( MCA_OOB_TCP_CONNECTED == peer->state || + MCA_OOB_TCP_CONNECTING == peer->state || + MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) { + if (retry(peer, sd, false)) { + return ORTE_ERR_UNREACH; + } + } + /* check that this is from a matching version */ - version = (char*)(msg); + version = (char*)((void*)msg + offset); + offset += strlen(version) + 1; if (0 != strcmp(version, orte_version_string)) { opal_output(0, "%s tcp_peer_recv_connect_ack: " "received different version from %s: %s instead of %s\n", @@ -828,8 +901,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, ORTE_NAME_PRINT(&peer->name)); /* check security token */ - cred = (char*)(msg + strlen(version) + 1); - credsize = hdr.nbytes - strlen(version) - 1; + cred = (char*)((void*)msg + offset); + credsize = hdr.nbytes - offset; if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) { char *hostname; hostname = orte_get_proc_hostname(&peer->name); @@ -909,8 +982,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer) */ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer) { - mca_oob_tcp_send_t *snd; - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp_peer_close for %s sd %d state %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -962,10 +1033,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer) * handle these recycled messages. This prevents us from unintentionally * attempting to send the message again across the now-failed interface */ + /* if (NULL != peer->send_msg) { } while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) { } + */ } /* From 5c6e6f6dd37e8a45a8090f66f078b80c678c7259 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 29 Nov 2016 08:33:22 -0800 Subject: [PATCH 2/3] Silence minor warnings (cherry-picked from 30ff8be9c98da040c264b15d11b0e2827ecc0651) Signed-off-by: Artem Polyakov --- orte/mca/oob/tcp/oob_tcp_connection.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index e7efafcecdf..8839707e4bb 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -477,7 +477,7 @@ static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name) /* send it */ if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) { - /* it's ok if it fails - remote side may already + /* it's ok if it fails - remote side may already * identifiet the collision and closed the connection */ rc = ORTE_SUCCESS; @@ -830,7 +830,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, /* Check the type of acknowledgement */ memcpy(&ack_flag, msg + offset, sizeof(ack_flag)); offset += sizeof(ack_flag); - + ack_flag = ntohs(ack_flag); if( !ack_flag ){ if (MCA_OOB_TCP_CONNECT_ACK == peer->state) { @@ -871,8 +871,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, * let the higher ranked process retry - if I'm the lower ranked * process, I'll simply defer until I receive the request */ - if (is_new && - ( MCA_OOB_TCP_CONNECTED == peer->state || + if (is_new && + ( MCA_OOB_TCP_CONNECTED == peer->state || MCA_OOB_TCP_CONNECTING == peer->state || MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) { if (retry(peer, sd, false)) { @@ -881,7 +881,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, } /* check that this is from a matching version */ - version = (char*)((void*)msg + offset); + version = (char*)((char*)msg + offset); offset += strlen(version) + 1; if (0 != strcmp(version, orte_version_string)) { opal_output(0, "%s tcp_peer_recv_connect_ack: " @@ -901,7 +901,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, ORTE_NAME_PRINT(&peer->name)); /* check security token */ - cred = (char*)((void*)msg + offset); + cred = (char*)((char*)msg + offset); credsize = hdr.nbytes - offset; if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) { char *hostname; From 91bb88e6a6a2bf22218daed779a5eb33a45d9ba4 Mon Sep 17 00:00:00 2001 From: Artem Polyakov Date: Thu, 1 Dec 2016 06:39:44 +0700 Subject: [PATCH 3/3] orte/oob/tcp: Plug the memory leak. Plug coverity defect CID 1396541. (cherry-picked from bf79e83c2f192b3966f61e2ff02526a01f3548c4) Signed-off-by: Artem Polyakov --- orte/mca/oob/tcp/oob_tcp_connection.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 8839707e4bb..3ce503feb7f 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -862,6 +862,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, */ mca_oob_tcp_peer_close(peer); } + free(msg); return ORTE_ERR_UNREACH; } @@ -876,6 +877,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, MCA_OOB_TCP_CONNECTING == peer->state || MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) { if (retry(peer, sd, false)) { + free(msg); return ORTE_ERR_UNREACH; } }