diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 68f70c361e..3391c3a194 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -130,6 +130,10 @@ threads { # Whether enable the ASYNC SEND, send udp packets in dedicate threads. # Default: off async_send off; + # Whether enable the tunnel, to consume packets between srtp/recv/send threads, + # without proxy by hybrid(except the few head packets). + # Default: off + async_tunnel off; # CPU set for affinity, for example: # 0 means CPU0 # 0-3 means CPU0, CPU1, CPU2 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 589a60d52f..75c4b46ceb 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4178,6 +4178,23 @@ bool SrsConfig::get_threads_async_send() return SRS_CONF_PERFER_FALSE(conf->arg0()); } +bool SrsConfig::get_threads_async_tunnel() +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("threads"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("async_tunnel"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end) { static int DEFAULT_START = 0; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index f2d8fcf9dd..8ea82f296d 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -481,6 +481,7 @@ class SrsConfig virtual bool get_threads_async_srtp(); virtual bool get_threads_async_recv(); virtual bool get_threads_async_send(); + virtual bool get_threads_async_tunnel(); virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end); virtual int get_threads_max_recv_queue(); virtual int get_high_threshold(); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 72b820ca19..ed85e9fdec 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -90,6 +90,10 @@ ISrsRtcTransport::~ISrsRtcTransport() { } +void ISrsRtcTransport::dig_tunnel(SrsUdpMuxSocket* skt) +{ +} + SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s) { session_ = s; @@ -201,21 +205,37 @@ srs_error_t SrsSecurityTransport::srtp_initialize() srs_error_t SrsSecurityTransport::on_rtp_plaintext(char* plaintext, int size) { + // We should keep alive here, because when tunnel is enabled, the connection die + // for the SrsRtcServer::on_udp_packet might be skipped. + session_->alive(); + return session_->on_rtp_plaintext(plaintext, size); } srs_error_t SrsSecurityTransport::on_rtcp_plaintext(char* plaintext, int size) { + // We should keep alive here, because when tunnel is enabled, the connection die + // for the SrsRtcServer::on_udp_packet might be skipped. + session_->alive(); + return session_->on_rtcp_plaintext(plaintext, size); } srs_error_t SrsSecurityTransport::on_rtp_cipher(char* cipher, int size) { + // We should keep alive here, because when tunnel is enabled, the connection die + // for the SrsRtcServer::on_udp_packet might be skipped. + session_->alive(); + return session_->on_rtp_cipher(cipher, size); } srs_error_t SrsSecurityTransport::on_rtcp_cipher(char* cipher, int size) { + // We should keep alive here, because when tunnel is enabled, the connection die + // for the SrsRtcServer::on_udp_packet might be skipped. + session_->alive(); + return session_->on_rtcp_cipher(cipher, size); } @@ -239,6 +259,13 @@ srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext return srtp_->unprotect_rtcp(packet, nb_plaintext); } +void SrsSecurityTransport::dig_tunnel(SrsUdpMuxSocket* skt) +{ + if (srtp_) { + srtp_->dig_tunnel(skt); + } +} + SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s) { } @@ -1832,6 +1859,13 @@ vector SrsRtcConnection::peer_addresses() return addresses; } +void SrsRtcConnection::dig_tunnel(SrsUdpMuxSocket* skt) +{ + if (transport_) { + transport_->dig_tunnel(skt); + } +} + const SrsContextId& SrsRtcConnection::get_id() { return cid_; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 0150f18e9a..87373fe858 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -107,6 +107,10 @@ class ISrsRtcTransport : public ISrsDtlsCallback // The nb_plaintext should be initialized to the size of cipher. virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext) = 0; virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext) = 0; +public: + // Try to dig the recv tunnel, for srtp thread to consume packets from recv + // threads directly without proxying by hybrid thread. + virtual void dig_tunnel(SrsUdpMuxSocket* skt); }; // The security transport, use DTLS/SRTP to protect the data. @@ -147,6 +151,8 @@ class SrsSecurityTransport : public ISrsRtcTransport srs_error_t on_rtcp_plaintext(char* plaintext, int size); srs_error_t on_rtp_cipher(char* cipher, int size); srs_error_t on_rtcp_cipher(char* cipher, int size); +public: + void dig_tunnel(SrsUdpMuxSocket* skt); }; // Semi security transport, setup DTLS and SRTP, with SRTP decrypt, without SRTP encrypt. @@ -473,6 +479,8 @@ class SrsRtcConnection : public ISrsResource std::string username(); // Get all addresses client used. std::vector peer_addresses(); +public: + void dig_tunnel(SrsUdpMuxSocket* skt); // Interface ISrsResource. public: virtual const SrsContextId& get_id(); diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index 74a5111836..e45698242b 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -1181,3 +1181,7 @@ srs_error_t SrsSRTP::unprotect_rtcp(void* packet, int* nb_plaintext) return err; } +void SrsSRTP::dig_tunnel(SrsUdpMuxSocket* skt) +{ +} + diff --git a/trunk/src/app/srs_app_rtc_dtls.hpp b/trunk/src/app/srs_app_rtc_dtls.hpp index f46a462a8a..dd585e91a1 100644 --- a/trunk/src/app/srs_app_rtc_dtls.hpp +++ b/trunk/src/app/srs_app_rtc_dtls.hpp @@ -35,6 +35,7 @@ #include class SrsRequest; +class SrsUdpMuxSocket; class SrsDtlsCertificate { @@ -245,6 +246,8 @@ class SrsSRTP virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher); virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); +public: + virtual void dig_tunnel(SrsUdpMuxSocket* skt); }; #endif diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 2196bcb739..b7ac976c5d 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -254,6 +254,8 @@ SrsRtcServer::SrsRtcServer() handler = NULL; hijacker = NULL; + async_tunnel_ = false; + _srs_config->subscribe(this); } @@ -295,11 +297,14 @@ srs_error_t SrsRtcServer::initialize() _srs_rtp_msg_cache_buffers->setup(rtp_msg_cache_enabled, rtp_msg_cache_buffer_size); _srs_rtp_msg_cache_objs->setup(rtp_msg_cache_enabled, rtp_msg_cache_msg_size); - srs_trace("RTC: Object cache init, rtp-cache=(enabled:%d,pkt:%dm-%dw,payload:%dm-%dw-%dw), msg-cache=(enabled:%d,obj:%dm-%dw,buf:%dm-%dw)", + async_tunnel_ = _srs_config->get_threads_async_tunnel(); + + srs_trace("RTC: Object cache init, rtp-cache=(enabled:%d,pkt:%dm-%dw,payload:%dm-%dw-%dw), msg-cache=(enabled:%d,obj:%dm-%dw,buf:%dm-%dw), tunnel=%d", rtp_cache_enabled, (int)(rtp_cache_pkt_size/1024/1024), _srs_rtp_cache->capacity()/10000, (int)(rtp_cache_payload_size/1024/1024), _srs_rtp_raw_cache->capacity()/10000, _srs_rtp_fua_cache->capacity()/10000, rtp_msg_cache_enabled, (int)(rtp_msg_cache_msg_size/1024/1024), _srs_rtp_msg_cache_objs->capacity()/10000, - (int)(rtp_msg_cache_buffer_size/1024/1024), _srs_rtp_msg_cache_buffers->capacity()/10000); + (int)(rtp_msg_cache_buffer_size/1024/1024), _srs_rtp_msg_cache_buffers->capacity()/10000, + async_tunnel_); return err; } @@ -446,6 +451,11 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) ping.get_username().c_str(), peer_id.c_str(), fast_id); } + // Try to dig tunnel for ping-pong(the address might change). + if (async_tunnel_) { + session->dig_tunnel(skt); + } + return session->on_stun(skt, &ping); } @@ -475,7 +485,14 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) if (srs_is_dtls((uint8_t*)data, size)) { ++_srs_pps_rstuns->sugar; - return session->on_dtls(data, size); + err = session->on_dtls(data, size); + + // Try to dig tunnel for DTLS packets(when DTLS done). + if (async_tunnel_) { + session->dig_tunnel(skt); + } + + return err; } return srs_error_new(ERROR_RTC_UDP, "unknown packet"); } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 075b9d1edb..693c6e2e00 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -91,6 +91,8 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs std::vector listeners; ISrsRtcServerHandler* handler; ISrsRtcServerHijacker* hijacker; +private: + bool async_tunnel_; public: SrsRtcServer(); virtual ~SrsRtcServer(); diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 437c1bda17..8cd2791b01 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -59,6 +59,12 @@ SrsPps* _srs_thread_sync_100us = new SrsPps(); SrsPps* _srs_thread_sync_1000us = new SrsPps(); SrsPps* _srs_thread_sync_plus = new SrsPps(); +SrsPps* _srs_tunnel_recv_raw = new SrsPps(); +SrsPps* _srs_tunnel_recv_hit = new SrsPps(); + +extern bool srs_is_rtp_or_rtcp(const uint8_t* data, size_t len); +extern bool srs_is_rtcp(const uint8_t* data, size_t len); + uint64_t srs_covert_cpuset(cpu_set_t v) { #ifdef SRS_OSX @@ -243,10 +249,14 @@ srs_error_t SrsThreadPool::initialize() bool async_send = _srs_config->get_threads_async_send(); _srs_async_send->set_enabled(async_send); - srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d", + bool async_tunnel = _srs_config->get_threads_async_tunnel(); + _srs_async_recv->set_tunnel_enabled(async_tunnel); + + srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d", entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp, entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2), - high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send); + high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send, + async_tunnel); return err; } @@ -379,6 +389,13 @@ srs_error_t SrsThreadPool::run() sync_desc = buf; } + string tunnel_desc; + _srs_tunnel_recv_raw->update(); _srs_tunnel_recv_hit->update(); + if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s()) { + snprintf(buf, sizeof(buf), ", tunnel=%d,%d", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s()); + tunnel_desc = buf; + } + // Show statistics for RTC server. SrsProcSelfStat* u = srs_get_self_proc_stat(); // Resident Set Size: number of pages the process has in real memory. @@ -408,9 +425,10 @@ srs_error_t SrsThreadPool::run() circuit_breaker = buf; } - srs_trace("Process: cpu=%.2f%%,%dMB, threads=%d,%.2f%%,%.2f%%%s%s%s%s", + srs_trace("Process: cpu=%.2f%%,%dMB, threads=%d,%.2f%%,%.2f%%%s%s%s%s%s", u->percent * 100, memory, (int)threads_.size(), top_percent, thread_percent, - async_logs.c_str(), sync_desc.c_str(), queue_desc.c_str(), circuit_breaker.c_str()); + async_logs.c_str(), sync_desc.c_str(), queue_desc.c_str(), circuit_breaker.c_str(), + tunnel_desc.c_str()); } return err; @@ -775,8 +793,7 @@ srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key) return srs_error_wrap(err, "init async srtp"); } - // TODO: FIMXE: Remove it. - return SrsSRTP::initialize(recv_key, send_key); + return err; } srs_error_t SrsAsyncSRTP::protect_rtp(void* packet, int* nb_cipher) @@ -848,6 +865,8 @@ srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext) // Do the job asynchronously. *nb_plaintext = 0; + ++_srs_tunnel_recv_raw->sugar; + return srs_success; } @@ -870,9 +889,25 @@ srs_error_t SrsAsyncSRTP::unprotect_rtcp(void* packet, int* nb_plaintext) // Do the job asynchronously. *nb_plaintext = 0; + ++_srs_tunnel_recv_raw->sugar; + return srs_success; } +void SrsAsyncSRTP::dig_tunnel(SrsUdpMuxSocket* skt) +{ + if (!task_) { + return; + } + + uint64_t fast_id = skt->fast_id(); + if (!fast_id) { + return; + } + + _srs_async_recv->tunnels()->dig_tunnel(fast_id, task_); +} + SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec) { codec_ = codec; @@ -1132,11 +1167,40 @@ SrsThreadUdpListener::~SrsThreadUdpListener() { } +SrsRecvTunnels::SrsRecvTunnels() +{ + lock_ = new SrsThreadMutex(); +} + +// TODO: FIXME: Cleanup SRTP tasks. +SrsRecvTunnels::~SrsRecvTunnels() +{ + srs_freep(lock_); +} + +void SrsRecvTunnels::dig_tunnel(uint64_t fast_id, SrsAsyncSRTPTask* task) +{ + SrsThreadLocker(lock_); + tunnels_[fast_id] = task; +} + +SrsAsyncSRTPTask* SrsRecvTunnels::find(uint64_t fast_id) +{ + SrsThreadLocker(lock_); + std::map::iterator it = tunnels_.find(fast_id); + if (it != tunnels_.end()) { + return it->second; + } + return NULL; +} + SrsAsyncRecvManager::SrsAsyncRecvManager() { lock_ = new SrsThreadMutex(); received_packets_ = new SrsThreadQueue(); max_recv_queue_ = 0; + tunnels_ = new SrsRecvTunnels(); + tunnel_enabled_ = false; } // TODO: FIXME: We should stop the thread first, then free the manager. @@ -1144,6 +1208,7 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager() { srs_freep(lock_); srs_freep(received_packets_); + srs_freep(tunnels_); vector::iterator it; for (it = listeners_.begin(); it != listeners_.end(); ++it) { @@ -1201,8 +1266,15 @@ srs_error_t SrsAsyncRecvManager::do_start() continue; } - // If got packet, copy to the queue. + // OK, we got packets. got_packets = true; + + // Try to consume the packet by tunnel. + if (tunnel_enabled_ && consume_by_tunnel(listener->skt_)) { + continue; + } + + // If got packet, copy to the queue. received_packets_->push_back(listener->skt_->copy()); } } @@ -1259,6 +1331,35 @@ srs_error_t SrsAsyncRecvManager::consume(int* nn_consumed) return err; } +bool SrsAsyncRecvManager::consume_by_tunnel(SrsUdpMuxSocket* skt) +{ + uint64_t fast_id = skt->fast_id(); + SrsAsyncSRTPTask* task = tunnels_->find(fast_id); + if (!task) { + return false; + } + + char* data = skt->data(); int size = skt->size(); + bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)data, size); + bool is_rtcp = srs_is_rtcp((uint8_t*)data, size); + if (!is_rtp_or_rtcp) { + return false; + } + + int nb_cipher = size; + char* buf = new char[nb_cipher]; + memcpy(buf, data, nb_cipher); + + SrsAsyncSRTPPacket* pkt = new SrsAsyncSRTPPacket(task); + pkt->msg_->wrap(buf, nb_cipher); + pkt->is_rtp_ = !is_rtcp; + pkt->do_decrypt_ = true; + _srs_async_srtp->add_packet(pkt); + + ++_srs_tunnel_recv_hit->sugar; + return true; +} + SrsAsyncRecvManager* _srs_async_recv = new SrsAsyncRecvManager(); SrsAsyncUdpPacket::SrsAsyncUdpPacket() diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 5260f197b2..19c9900c26 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -319,6 +319,8 @@ class SrsAsyncSRTP : public SrsSRTP srs_error_t protect_rtcp(void* packet, int* nb_cipher); srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); +public: + void dig_tunnel(SrsUdpMuxSocket* skt); }; // The async SRTP task, bind to the codec, managed by SrsAsyncSRTPManager, @@ -398,15 +400,37 @@ class SrsThreadUdpListener virtual ~SrsThreadUdpListener(); }; +// The tunnel for recv to directly consume packets to SRTP decrypt. +class SrsRecvTunnels +{ +private: + // The tunnel for recv to directly consume packets to SRTP decrypt. + // Key is fast_id of UDP packet, value is SRTP task. + std::map tunnels_; + SrsThreadMutex* lock_; +public: + SrsRecvTunnels(); + virtual ~SrsRecvTunnels(); +public: + // Dig a tunnel for recv thread. + void dig_tunnel(uint64_t fast_id, SrsAsyncSRTPTask* task); + // Get the SRTP task to handle packet. + SrsAsyncSRTPTask* find(uint64_t fast_id); +}; + // The async RECV manager, to recv UDP packets. class SrsAsyncRecvManager { private: // The received UDP packets. SrsThreadQueue* received_packets_; + // The tunnel for recv to directly consume packets to SRTP decrypt. + SrsRecvTunnels* tunnels_; private: // If exceed max queue, drop packet. int max_recv_queue_; + // Whether enabled tunnel. + bool tunnel_enabled_; private: std::vector listeners_; SrsThreadMutex* lock_; @@ -414,6 +438,12 @@ class SrsAsyncRecvManager SrsAsyncRecvManager(); virtual ~SrsAsyncRecvManager(); public: + // Get the recv tunnels. + // SrsAsyncRecvManager::tunnels() + SrsRecvTunnels* tunnels() { return tunnels_; } + // Enable or disable the tunnel. + // SrsAsyncRecvManager::set_tunnel_enabled() + void set_tunnel_enabled(bool v) { tunnel_enabled_ = v; } // Set the max queue size. // SrsAsyncRecvManager::set_max_recv_queue() void set_max_recv_queue(int v) { max_recv_queue_ =v; } @@ -429,6 +459,9 @@ class SrsAsyncRecvManager public: // Consume received UDP packets. Must call in worker/service thread. virtual srs_error_t consume(int* nn_consumed); +private: + // Try to consume by tunnel. + bool consume_by_tunnel(SrsUdpMuxSocket* skt); }; // The global async RECV manager.