Skip to content

Commit

Permalink
Threads-RECV: Support tunnel for recv-srtp.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent eb7ce7f commit d282ccd
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 10 deletions.
4 changes: 4 additions & 0 deletions trunk/conf/full.conf
Expand Up @@ -130,6 +130,10 @@ threads {
# Whether enable the ASYNC SEND, send udp packets in dedicate threads.
# Default: off
async_send off;
# Whether enable the tunnel, to consume packets between srtp/recv/send threads,
# without proxy by hybrid(except the few head packets).
# Default: off
async_tunnel off;
# CPU set for affinity, for example:
# 0 means CPU0
# 0-3 means CPU0, CPU1, CPU2
Expand Down
17 changes: 17 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Expand Up @@ -4178,6 +4178,23 @@ bool SrsConfig::get_threads_async_send()
return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_async_tunnel()
{
static bool DEFAULT = false;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
return DEFAULT;
}

conf = conf->get("async_tunnel");
if (!conf) {
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end)
{
static int DEFAULT_START = 0;
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_config.hpp
Expand Up @@ -481,6 +481,7 @@ class SrsConfig
virtual bool get_threads_async_srtp();
virtual bool get_threads_async_recv();
virtual bool get_threads_async_send();
virtual bool get_threads_async_tunnel();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
virtual int get_threads_max_recv_queue();
virtual int get_high_threshold();
Expand Down
34 changes: 34 additions & 0 deletions trunk/src/app/srs_app_rtc_conn.cpp
Expand Up @@ -90,6 +90,10 @@ ISrsRtcTransport::~ISrsRtcTransport()
{
}

void ISrsRtcTransport::dig_tunnel(SrsUdpMuxSocket* skt)
{
}

SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)
{
session_ = s;
Expand Down Expand Up @@ -201,21 +205,37 @@ srs_error_t SrsSecurityTransport::srtp_initialize()

srs_error_t SrsSecurityTransport::on_rtp_plaintext(char* plaintext, int size)
{
// We should keep alive here, because when tunnel is enabled, the connection die
// for the SrsRtcServer::on_udp_packet might be skipped.
session_->alive();

return session_->on_rtp_plaintext(plaintext, size);
}

srs_error_t SrsSecurityTransport::on_rtcp_plaintext(char* plaintext, int size)
{
// We should keep alive here, because when tunnel is enabled, the connection die
// for the SrsRtcServer::on_udp_packet might be skipped.
session_->alive();

return session_->on_rtcp_plaintext(plaintext, size);
}

srs_error_t SrsSecurityTransport::on_rtp_cipher(char* cipher, int size)
{
// We should keep alive here, because when tunnel is enabled, the connection die
// for the SrsRtcServer::on_udp_packet might be skipped.
session_->alive();

return session_->on_rtp_cipher(cipher, size);
}

srs_error_t SrsSecurityTransport::on_rtcp_cipher(char* cipher, int size)
{
// We should keep alive here, because when tunnel is enabled, the connection die
// for the SrsRtcServer::on_udp_packet might be skipped.
session_->alive();

return session_->on_rtcp_cipher(cipher, size);
}

Expand All @@ -239,6 +259,13 @@ srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext
return srtp_->unprotect_rtcp(packet, nb_plaintext);
}

void SrsSecurityTransport::dig_tunnel(SrsUdpMuxSocket* skt)
{
if (srtp_) {
srtp_->dig_tunnel(skt);
}
}

SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s)
{
}
Expand Down Expand Up @@ -1832,6 +1859,13 @@ vector<SrsUdpMuxSocket*> SrsRtcConnection::peer_addresses()
return addresses;
}

void SrsRtcConnection::dig_tunnel(SrsUdpMuxSocket* skt)
{
if (transport_) {
transport_->dig_tunnel(skt);
}
}

const SrsContextId& SrsRtcConnection::get_id()
{
return cid_;
Expand Down
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_rtc_conn.hpp
Expand Up @@ -107,6 +107,10 @@ class ISrsRtcTransport : public ISrsDtlsCallback
// The nb_plaintext should be initialized to the size of cipher.
virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext) = 0;
virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext) = 0;
public:
// Try to dig the recv tunnel, for srtp thread to consume packets from recv
// threads directly without proxying by hybrid thread.
virtual void dig_tunnel(SrsUdpMuxSocket* skt);
};

// The security transport, use DTLS/SRTP to protect the data.
Expand Down Expand Up @@ -147,6 +151,8 @@ class SrsSecurityTransport : public ISrsRtcTransport
srs_error_t on_rtcp_plaintext(char* plaintext, int size);
srs_error_t on_rtp_cipher(char* cipher, int size);
srs_error_t on_rtcp_cipher(char* cipher, int size);
public:
void dig_tunnel(SrsUdpMuxSocket* skt);
};

// Semi security transport, setup DTLS and SRTP, with SRTP decrypt, without SRTP encrypt.
Expand Down Expand Up @@ -473,6 +479,8 @@ class SrsRtcConnection : public ISrsResource
std::string username();
// Get all addresses client used.
std::vector<SrsUdpMuxSocket*> peer_addresses();
public:
void dig_tunnel(SrsUdpMuxSocket* skt);
// Interface ISrsResource.
public:
virtual const SrsContextId& get_id();
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_rtc_dtls.cpp
Expand Up @@ -1181,3 +1181,7 @@ srs_error_t SrsSRTP::unprotect_rtcp(void* packet, int* nb_plaintext)
return err;
}

void SrsSRTP::dig_tunnel(SrsUdpMuxSocket* skt)
{
}

3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_rtc_dtls.hpp
Expand Up @@ -35,6 +35,7 @@
#include <srs_app_st.hpp>

class SrsRequest;
class SrsUdpMuxSocket;

class SrsDtlsCertificate
{
Expand Down Expand Up @@ -245,6 +246,8 @@ class SrsSRTP
virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher);
virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
public:
virtual void dig_tunnel(SrsUdpMuxSocket* skt);
};

#endif
23 changes: 20 additions & 3 deletions trunk/src/app/srs_app_rtc_server.cpp
Expand Up @@ -254,6 +254,8 @@ SrsRtcServer::SrsRtcServer()
handler = NULL;
hijacker = NULL;

async_tunnel_ = false;

_srs_config->subscribe(this);
}

Expand Down Expand Up @@ -295,11 +297,14 @@ srs_error_t SrsRtcServer::initialize()
_srs_rtp_msg_cache_buffers->setup(rtp_msg_cache_enabled, rtp_msg_cache_buffer_size);
_srs_rtp_msg_cache_objs->setup(rtp_msg_cache_enabled, rtp_msg_cache_msg_size);

srs_trace("RTC: Object cache init, rtp-cache=(enabled:%d,pkt:%dm-%dw,payload:%dm-%dw-%dw), msg-cache=(enabled:%d,obj:%dm-%dw,buf:%dm-%dw)",
async_tunnel_ = _srs_config->get_threads_async_tunnel();

srs_trace("RTC: Object cache init, rtp-cache=(enabled:%d,pkt:%dm-%dw,payload:%dm-%dw-%dw), msg-cache=(enabled:%d,obj:%dm-%dw,buf:%dm-%dw), tunnel=%d",
rtp_cache_enabled, (int)(rtp_cache_pkt_size/1024/1024), _srs_rtp_cache->capacity()/10000,
(int)(rtp_cache_payload_size/1024/1024), _srs_rtp_raw_cache->capacity()/10000, _srs_rtp_fua_cache->capacity()/10000,
rtp_msg_cache_enabled, (int)(rtp_msg_cache_msg_size/1024/1024), _srs_rtp_msg_cache_objs->capacity()/10000,
(int)(rtp_msg_cache_buffer_size/1024/1024), _srs_rtp_msg_cache_buffers->capacity()/10000);
(int)(rtp_msg_cache_buffer_size/1024/1024), _srs_rtp_msg_cache_buffers->capacity()/10000,
async_tunnel_);

return err;
}
Expand Down Expand Up @@ -446,6 +451,11 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
ping.get_username().c_str(), peer_id.c_str(), fast_id);
}

// Try to dig tunnel for ping-pong(the address might change).
if (async_tunnel_) {
session->dig_tunnel(skt);
}

return session->on_stun(skt, &ping);
}

Expand Down Expand Up @@ -475,7 +485,14 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
if (srs_is_dtls((uint8_t*)data, size)) {
++_srs_pps_rstuns->sugar;

return session->on_dtls(data, size);
err = session->on_dtls(data, size);

// Try to dig tunnel for DTLS packets(when DTLS done).
if (async_tunnel_) {
session->dig_tunnel(skt);
}

return err;
}
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_rtc_server.hpp
Expand Up @@ -91,6 +91,8 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs
std::vector<SrsUdpMuxListener*> listeners;
ISrsRtcServerHandler* handler;
ISrsRtcServerHijacker* hijacker;
private:
bool async_tunnel_;
public:
SrsRtcServer();
virtual ~SrsRtcServer();
Expand Down

0 comments on commit d282ccd

Please sign in to comment.