Skip to content

Commit

Permalink
[core] Clean up the CUDT processConnectRequest(..) function.
Browse files Browse the repository at this point in the history
Update listener write-ready only after the new connection.
Was changed in Haivision#1650, but must not be done at all (see Haivision#1831).
  • Loading branch information
maxsharabayko committed Apr 4, 2022
1 parent 8d1a722 commit d6d4db3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 62 deletions.
13 changes: 8 additions & 5 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,16 @@ class CUDTUnited
/// @return The new UDT socket ID, or INVALID_SOCK.
SRTSOCKET newSocket(CUDTSocket** pps = NULL);

/// Create a new UDT connection.
/// @param [in] listen the listening UDT socket;
/// Create (listener-side) a new socket associated with the incoming connection request.
/// @param [in] listen the listening socket ID.
/// @param [in] peer peer address.
/// @param [in,out] hs handshake information from peer side (in), negotiated value (out);
/// @param [out] w_error error code when failed
/// @param [out] w_acpu entity of accepted socket, if connection already exists
/// @return If the new connection is successfully created: 1 success, 0 already exist, -1 error.
/// @param [out] w_error error code in case of failure.
/// @param [out] w_acpu reference to the existing associated socket if already exists.
/// @return 1: if the new connection was successfully created (accepted), @a w_acpu is NULL;
/// 0: the connection already exists (reference to the corresponding socket is returned in @a w_acpu).
/// -1: The connection processing failed due to memory alloation error, exceeding listener's backlog,
/// any error propagated from CUDT::open and CUDT::acceptAndRespond.
int newConnection(const SRTSOCKET listen,
const sockaddr_any& peer,
const CPacket& hspkt,
Expand Down
90 changes: 34 additions & 56 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10748,8 +10748,8 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet)
HLOGC(cnlog.Debug, log << "processConnectRequest: ... NOT. Rejecting because broken.");
return m_RejectReason;
}
size_t exp_len =
CHandShake::m_iContentSize; // When CHandShake::m_iContentSize is used in log, the file fails to link!
// When CHandShake::m_iContentSize is used in log, the file fails to link!
size_t exp_len = CHandShake::m_iContentSize;

// NOTE!!! Old version of SRT code checks if the size of the HS packet
// is EQUAL to the above CHandShake::m_iContentSize.
Expand Down Expand Up @@ -10927,6 +10927,11 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet)
}
else
{
// IMPORTANT!!!
// If the newConnection() detects there is already a socket connection associated with the remote peer,
// it returns the socket via `acpu`, and the `result` returned is 0.
// Else if a new connection is successfully created, the conclusion handshake response
// is sent by the function itself (it calls the acceptAndRespond(..)), the `acpu` remains null, the `result` is 1.
int error = SRT_REJ_UNKNOWN;
CUDT* acpu = NULL;
int result = uglobal().newConnection(m_SocketID, addr, packet, (hs), (error), (acpu));
Expand All @@ -10944,47 +10949,17 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet)
LOGF(cnlog.Warn, "processConnectRequest: rsp(REJECT): %d - %s", hs.m_iReqType, srt_rejectreason_str(error));
}

// CONFUSION WARNING!
//
// The newConnection() will call acceptAndRespond() if the processing
// was successful - IN WHICH CASE THIS PROCEDURE SHOULD DO NOTHING.
// Ok, almost nothing - see update_events below.
//
// If newConnection() failed, acceptAndRespond() will not be called.
// Ok, more precisely, the thing that acceptAndRespond() is expected to do
// will not be done (this includes sending any response to the peer).
//
// Now read CAREFULLY. The newConnection() will return:
//
// - -1: The connection processing failed due to errors like:
// - memory alloation error
// - listen backlog exceeded
// - any error propagated from CUDT::open and CUDT::acceptAndRespond
// - 0: The connection already exists
// - 1: Connection accepted.
//
// So, update_events is called only if the connection is established.
// Both 0 (repeated) and -1 (error) require that a response be sent.
// The CPacket object that has arrived as a connection request is here
// reused for the connection rejection response (see URQ_ERROR_REJECT set
// as m_iReqType).

// The 'acpu' should be set to a new socket, if found;
// this means simultaneously that result == 0, but it's safest to
// check this condition only. This means that 'newConnection' found
// that the connection attempt has already been accepted, just the
// caller side somehow didn't get the answer. The rule is that every
// connection request HS must be completed with a symmetric HS response,
// so craft one here.

// Note that this function runs in the listener socket context, while 'acpu'
// is the CUDT entity for the accepted socket.
// The `acpu` not NULL means connection exists, the `result` should be 0. It is not checked here though.
// The `newConnection(..)` only sends reponse for newly created connection.
// The connection already exists (no new connection has been created, no response sent).
// Send the conclusion response manually here in case the peer has missed the first one.
// The value `result` here should be 0.
if (acpu)
{
// This is an existing connection, so the handshake is only needed
// because of the rule that every handshake request must be covered
// by the handshake response. It wouldn't be good to call interpretSrtHandshake
// here because the data from the handshake have been already interpreted
// here because the data from the handshake has been already interpreted
// and recorded. We just need to craft a response.
HLOGC(cnlog.Debug,
log << CONID() << "processConnectRequest: sending REPEATED handshake response req="
Expand Down Expand Up @@ -11029,16 +11004,31 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet)
}
}

// send back a response if connection failed or connection already existed
// (or the above procedure failed)
if (result == -1)
if (result == 1)
{
// BUG! There is no need to update write-readiness on the listener socket once new connection is accepted.
// Only read-readiness has to be updated, but it is done so in the newConnection(..) function.
// See PR #1831 and issue #1667.
HLOGC(cnlog.Debug, log << "processConnectRequest: @" << m_SocketID
<< " accepted connection, updating epoll to write-ready");

// New connection has been accepted or an existing one has been found. Update epoll write-readiness.
// a new connection has been created, enable epoll for write
// Note: not using SRT_EPOLL_CONNECT symbol because this is a procedure
// executed for the accepted socket.
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true);
}
else if (result == -1)
{
// The new connection failed
// or the connection already existed, but manually sending the HS response above has failed.
// HSv4: Send the SHUTDOWN message to the peer (see PR #2010) in order to disallow then to connect.
// The HSv4 clients do not interpret the error handshake response correctly.
// HSv5: Send a handshake with an error code (hs.m_iReqType set earlier) to the peer.
if (hs.m_iVersion < HS_VERSION_SRT1)
{
HLOGC(cnlog.Debug, log << CONID() << "processConnectRequest: HSv4 caller, sending SHUTDOWN after rejection with "
<< RequestTypeStr(hs.m_iReqType));
// The HSv4 clients do not interpret the error handshake response correctly.
// In order to really disallow them to connect there's needed the shutdown response.
CPacket rsp;
setPacketTS((rsp), steady_clock::now());
rsp.pack(UMSG_SHUTDOWN);
Expand All @@ -11053,24 +11043,12 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet)
size_t size = CHandShake::m_iContentSize;
hs.store_to((packet.m_pcData), (size));
packet.setLength(size);
packet.m_iID = id;
packet.m_iID = id;
setPacketTS(packet, steady_clock::now());
HLOGC(cnlog.Debug, log << "processConnectRequest: SENDING HS (a): " << hs.show());
m_pSndQueue->sendto(addr, packet);
}
}
// new connection response should be sent in acceptAndRespond()
// turn the socket writable if this is the first time when this was found out.
else
{
// a new connection has been created, enable epoll for write
HLOGC(cnlog.Debug, log << "processConnectRequest: @" << m_SocketID
<< " connected, setting epoll to connect:");

// Note: not using SRT_EPOLL_CONNECT symbol because this is a procedure
// executed for the accepted socket.
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true);
}
}
LOGC(cnlog.Note, log << "listen ret: " << hs.m_iReqType << " - " << RequestTypeStr(hs.m_iReqType));

Expand Down
2 changes: 1 addition & 1 deletion srtcore/handshake.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class CHandShake
int32_t m_iMSS; // maximum segment size
int32_t m_iFlightFlagSize; // flow control window size
UDTRequestType m_iReqType; // handshake stage
int32_t m_iID; // socket ID
int32_t m_iID; // SRT socket ID of HS sender
int32_t m_iCookie; // cookie
uint32_t m_piPeerIP[4]; // The IP address that the peer's UDP port is bound to

Expand Down

0 comments on commit d6d4db3

Please sign in to comment.