Skip to content

Commit

Permalink
[core] Added rejection handshake sent to the peer in rendezvous mode (H…
Browse files Browse the repository at this point in the history
…aivision#2667).

Fixed: send SHUTDOWN also in blocking mode.
Added fallback ROGUE reject reason case it isn't set.
  • Loading branch information
ethouris authored and maxsharabayko committed Apr 4, 2023
1 parent 46d55c6 commit e1429db
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 14 deletions.
3 changes: 3 additions & 0 deletions srtcore/buffer_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes
m_iInRateBytesCount = 0;
m_tsInRateStartTime = time;

LOGC(bslog.Note,
log << "updateInputRate: " << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us << " us.");

setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
}
}
Expand Down
4 changes: 4 additions & 0 deletions srtcore/congctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,13 @@ class LiveCC: public SrtCongestionControlBase

void setMaxBW(int64_t maxbw)
{
const auto old_val = m_llSndMaxBW;
m_llSndMaxBW = maxbw > 0 ? maxbw : BW_INFINITE;
updatePktSndPeriod();

if (m_llSndMaxBW != old_val)
LOGC(rslog.Note, log << "setMaxBW: :" << 8 * maxbw << " bps, snd period " << m_dPktSndPeriod << ".");

/*
* UDT default flow control should not trigger under normal SRT operation
* UDT stops sending if the number of packets in transit (not acknowledged)
Expand Down
97 changes: 86 additions & 11 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2629,6 +2629,9 @@ bool srt::CUDT::interpretSrtHandshake(const CHandShake& hs,
{
// Cryptographic modes mismatch. Not acceptable at all.
m_RejectReason = SRT_REJ_CRYPTO;
LOGC(cnlog.Error,
log << CONID()
<< "interpretSrtHandshake: KMREQ result: Bad crypto mode - rejecting");
return false;
}
#endif
Expand Down Expand Up @@ -3682,11 +3685,23 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
cst = processRendezvous(&response, serv_addr, RST_OK, (reqpkt));
if (cst == CONN_CONTINUE)
continue;
break;

// Just in case it wasn't set, set this as a fallback
if (m_RejectReason == SRT_REJ_UNKNOWN)
m_RejectReason = SRT_REJ_ROGUE;

// rejection or erroneous code.
reqpkt.setLength(m_iMaxSRTPayloadSize);
reqpkt.setControl(UMSG_HANDSHAKE);
sendRendezvousRejection(serv_addr, (reqpkt));
}

if (cst == CONN_REJECT)
{
HLOGC(cnlog.Debug,
log << CONID() << "startConnect: REJECTED by processConnectResponse - sending SHUTDOWN");
sendCtrl(UMSG_SHUTDOWN);
}

if (cst != CONN_CONTINUE && cst != CONN_CONFUSED)
break; // --> OUTSIDE-LOOP
Expand Down Expand Up @@ -3873,6 +3888,11 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst,
LOGC(cnlog.Warn,
log << CONID()
<< "processAsyncConnectRequest: REJECT reported from processRendezvous, not processing further.");

if (m_RejectReason == SRT_REJ_UNKNOWN)
m_RejectReason = SRT_REJ_ROGUE;

sendRendezvousRejection(serv_addr, (request));
status = false;
}
}
Expand Down Expand Up @@ -3925,6 +3945,24 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst,
return status;
}

void srt::CUDT::sendRendezvousRejection(const sockaddr_any& serv_addr, CPacket& r_rsppkt)
{
// We can reuse m_ConnReq because we are about to abandon the connection process.
m_ConnReq.m_iReqType = URQFailure(m_RejectReason);

// Assumed that r_rsppkt refers to a packet object that was already prepared
// to be used for storing the handshake there.
size_t size = r_rsppkt.getLength();
m_ConnReq.store_to((r_rsppkt.m_pcData), (size));
r_rsppkt.setLength(size);

HLOGC(cnlog.Debug, log << CONID() << "sendRendezvousRejection: using code=" << m_ConnReq.m_iReqType
<< " for reject reason code " << m_RejectReason << " (" << srt_rejectreason_str(m_RejectReason) << ")");

setPacketTS(r_rsppkt, steady_clock::now());
m_pSndQueue->sendto(serv_addr, r_rsppkt, m_SourceAddr);
}

void srt::CUDT::cookieContest()
{
if (m_SrtHsSide != HSD_DRAW)
Expand Down Expand Up @@ -4138,7 +4176,7 @@ EConnectStatus srt::CUDT::processRendezvous(

// The CryptoControl must be created by the prepareConnectionObjects() before interpreting and creating HSv5 extensions
// because the it will be used there.
if (!prepareConnectionObjects(m_ConnRes, m_SrtHsSide, NULL))
if (!prepareConnectionObjects(m_ConnRes, m_SrtHsSide, NULL) || !prepareBuffers(NULL))
{
// m_RejectReason already handled
HLOGC(cnlog.Debug,
Expand Down Expand Up @@ -4427,7 +4465,25 @@ EConnectStatus srt::CUDT::processConnectResponse(const CPacket& response, CUDTEx
<< "processConnectResponse: CONFUSED: expected UMSG_HANDSHAKE as connection not yet established, "
"got: "
<< MessageTypeStr(response.getType(), response.getExtendedType()));

if (response.getType() == UMSG_SHUTDOWN)
{
LOGC(cnlog.Error,
log << CONID() << "processConnectResponse: UMSG_SHUTDOWN received, rejecting connection.");
return CONN_REJECT;
}
}

if (m_config.bRendezvous)
{
// In rendezvous mode we expect that both sides are known
// to the service operator (unlike a listener, which may
// operate connections from unknown sources). This means that
// the connection process should be terminated anyway, on
// whichever side it would happen.
return CONN_REJECT;
}

return CONN_CONFUSED;
}

Expand Down Expand Up @@ -4675,6 +4731,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
// In this situation the interpretation of handshake was already done earlier.
ok = ok && pResponse->isControl();
ok = ok && interpretSrtHandshake(m_ConnRes, *pResponse, 0, 0);
ok = ok && prepareBuffers(eout);

if (!ok)
{
Expand Down Expand Up @@ -5514,9 +5571,21 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd
}
}

if (!createCrypter(hsd, bidirectional)) // Make sure CC is created (lazy)
{
m_RejectReason = SRT_REJ_RESOURCE;
return false;
}

return true;
}

bool srt::CUDT::prepareBuffers(CUDTException* eout)
{
try
{
const int authtag = m_config.iCryptoMode == CSrtConfig::CIPHER_MODE_AES_GCM ? HAICRYPT_AUTHTAG_MAX : 0;
// CryptoControl has to be initialized and in case of RESPONDER the KM REQ must be processed (interpretSrtHandshake(..)) for the crypto mode to be deduced.
const int authtag = (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) ? HAICRYPT_AUTHTAG_MAX : 0;
m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize, authtag);
SRT_ASSERT(m_iISN != -1);
m_pRcvBuffer = new srt::CRcvBuffer(m_iISN, m_config.iRcvBufSize, m_pRcvQueue->m_pUnitQueue, m_config.bMessageAPI);
Expand All @@ -5534,13 +5603,6 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd
m_RejectReason = SRT_REJ_RESOURCE;
return false;
}

if (!createCrypter(hsd, bidirectional)) // Make sure CC is created (lazy)
{
m_RejectReason = SRT_REJ_RESOURCE;
return false;
}

return true;
}

Expand Down Expand Up @@ -5651,6 +5713,19 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
throw CUDTException(MJ_SETUP, MN_REJECTED, 0);
}

if (!prepareBuffers(NULL))
{
HLOGC(cnlog.Debug,
log << CONID() << "acceptAndRespond: prepareConnectionObjects failed - responding with REJECT.");
// If the SRT buffers failed to be allocated,
// the connection must be rejected.
//
// Respond with the rejection message and exit with exception
// so that the caller will know that this new socket should be deleted.
w_hs.m_iReqType = URQFailure(m_RejectReason);
throw CUDTException(MJ_SETUP, MN_REJECTED, 0);
}

// Synchronize the time NOW because the following function is about
// to use the start time to pass it to the receiver buffer data.
bool have_group = false;
Expand Down Expand Up @@ -9137,7 +9212,7 @@ int srt::CUDT::packLostData(CPacket& w_packet)

// The packet has been ecrypted, thus the authentication tag is expected to be stored
// in the SND buffer as well right after the payload.
if (m_config.iCryptoMode == CSrtConfig::CIPHER_MODE_AES_GCM)
if (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM)
{
w_packet.setLength(w_packet.getLength() + HAICRYPT_AUTHTAG_MAX);
}
Expand Down
5 changes: 5 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,16 @@ class CUDT
/// @param rst Current read status to know if the HS packet was freshly received from the peer, or this is only a periodic update (RST_AGAIN)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
EConnectStatus processRendezvous(const CPacket* response, const sockaddr_any& serv_addr, EReadStatus, CPacket& reqpkt);
void sendRendezvousRejection(const sockaddr_any& serv_addr, CPacket& request);

/// Create the CryptoControl object based on the HS packet. Allocates sender and receiver buffers and loss lists.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException *eout);

/// Create the CryptoControl object based on the HS packet. Allocates sender and receiver buffers and loss lists.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
bool prepareBuffers(CUDTException* eout);

SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
EConnectStatus postConnect(const CPacket* response, bool rendezvous, CUDTException* eout) ATR_NOEXCEPT;

Expand Down
3 changes: 3 additions & 0 deletions srtcore/crypto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ std::string KmStateStr(SRT_KM_STATE state)
TAKE(SECURING);
TAKE(NOSECRET);
TAKE(BADSECRET);
#ifdef ENABLE_AEAD_API_PREVIEW
TAKE(BADCRYPTOMODE);
#endif
#undef TAKE
default:
{
Expand Down
7 changes: 4 additions & 3 deletions testing/testmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class FileTarget: public virtual Target
{
ofile.write(data.payload.data(), data.payload.size());
#ifdef PLEASE_LOG
applog.Debug() << "FileTarget::Write: " << data.size() << " written to a file";
applog.Debug() << "FileTarget::Write: " << data.payload.size() << " written to a file";
#endif
}

Expand Down Expand Up @@ -1316,7 +1316,8 @@ void SrtCommon::ConnectClient(string host, int port)
{
int reason = srt_getrejectreason(m_sock);
#if PLEASE_LOG
LOGP(applog.Error, "ERROR reported by srt_connect - closing socket @", m_sock);
LOGP(applog.Error, "ERROR reported by srt_connect - closing socket @", m_sock,
" reject reason: ", reason, ": ", srt_rejectreason_str(reason));
#endif
if (transmit_retry_connect && (transmit_retry_always || reason == SRT_REJ_TIMEOUT))
{
Expand Down Expand Up @@ -2379,7 +2380,7 @@ MediaPacket SrtSource::Read(size_t chunk)
#if PLEASE_LOG
extern srt_logging::Logger applog;
LOGC(applog.Debug, log << "recv: #" << mctrl.msgno << " %" << mctrl.pktseq << " "
<< BufferStamp(data.data(), stat) << " BELATED: " << ((CTimer::getTime()-mctrl.srctime)/1000.0) << "ms");
<< BufferStamp(data.data(), stat) << " BELATED: " << ((srt_time_now()-mctrl.srctime)/1000.0) << "ms");
#endif

Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << " " << BufferStamp(data.data(), stat) << ") " << VerbNoEOL;
Expand Down

0 comments on commit e1429db

Please sign in to comment.