Skip to content

Commit

Permalink
Threads: Support Circuit-Breaker to work in storms.
Browse files Browse the repository at this point in the history
1. Config the recv queue, drop packet if exceed.
2. Config the high and critical threshold and pulse of water level.
3. If critical water level, disable NACK and TWCC.
4. If high water level, ignore for NACK insert and send.
5. Support read the CPU of thread.
6. Refine SrsPps to support r1s sample.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 1c90497 commit 9a05d24
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 87 deletions.
27 changes: 24 additions & 3 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ threads {
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
# Default: off
async_recv off;
# If exceed the max size of recv queue, drop the received packet.
# Default: 5000
max_recv_queue 5000;
# CPU set for affinity, for example:
# 0 means CPU0
# 0-3 means CPU0, CPU1, CPU2
Expand All @@ -149,6 +146,30 @@ threads {
}
}

# For system circuit breaker.
circuit_breaker {
# If exceed the max size of recv queue, slow the recv to decrease packets.
# Default: 5000
max_recv_queue 5000;
# The CPU percent(0, 100) ever 1s, as system high water-level, which enable the circuit-break
# mechanism, for example, NACK will be disabled if high water-level.
# Default: 90
high_threshold 90;
# Reset the high water-level, if number of pulse under high_threshold.
# @remark 0 to disable the high water-level.
# Default: 2
high_pulse 2;
# The CPU percent(0, 100) ever 1s, as system critical water-level, which enable the circuit-break
# mechanism, for example, TWCC will be disabled if high water-level.
# @note All circuit-break mechanism of high-water-level scope are enabled in critical.
# Default: 95
critical_threshold 95;
# Reset the critical water-level, if number of pulse under critical_threshold.
# @remark 0 to disable the critical water-level.
# Default: 1
critical_pulse 1;
}

#############################################################################################
# heartbeat/stats sections
#############################################################################################
Expand Down
72 changes: 70 additions & 2 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3393,7 +3393,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit"
&& n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker"
&& n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate"
&& n != "srs_log_flush_interval" && n != "threads") {
&& n != "srs_log_flush_interval" && n != "threads" && n != "circuit_breaker") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
}
}
Expand Down Expand Up @@ -4206,7 +4206,7 @@ int SrsConfig::get_threads_max_recv_queue()
{
static int DEFAULT = 5000;

SrsConfDirective* conf = root->get("threads");
SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}
Expand All @@ -4219,6 +4219,74 @@ int SrsConfig::get_threads_max_recv_queue()
return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_high_threshold()
{
static int DEFAULT = 90;

SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}

conf = conf->get("high_threshold");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_high_pulse()
{
static int DEFAULT = 2;

SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}

conf = conf->get("high_pulse");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_critical_threshold()
{
static int DEFAULT = 95;

SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}

conf = conf->get("critical_threshold");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_critical_pulse()
{
static int DEFAULT = 1;

SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}

conf = conf->get("critical_pulse");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

vector<SrsConfDirective*> SrsConfig::get_stream_casters()
{
srs_assert(root);
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ class SrsConfig
virtual bool get_threads_async_recv();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
virtual int get_threads_max_recv_queue();
virtual int get_high_threshold();
virtual int get_high_pulse();
virtual int get_critical_threshold();
virtual int get_critical_pulse();
// stream_caster section
public:
// Get all stream_caster in config file.
Expand Down
19 changes: 19 additions & 0 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ extern SrsPps* _srs_pps_snack2;

extern SrsPps* _srs_pps_rnack;
extern SrsPps* _srs_pps_rnack2;
extern SrsPps* _srs_pps_snack4;

#define SRS_TICKID_RTCP 0
#define SRS_TICKID_TWCC 1
Expand Down Expand Up @@ -1283,6 +1284,12 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2*& pkt, SrsBuf
}
}

// If circuit-breaker is enabled, disable nack.
if (_srs_thread_pool->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

// For NACK to handle packet.
// @remark Note that the pkt might be set to NULL.
if (nack_enabled_) {
Expand Down Expand Up @@ -1529,6 +1536,12 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
if (twcc_enabled_ && type == SRS_TICKID_TWCC) {
++_srs_pps_twcc->sugar;

// If circuit-breaker is dropping packet, disable TWCC.
if (_srs_thread_pool->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

// We should not depends on the received packet,
// instead we should send feedback every Nms.
if ((err = send_periodic_twcc()) != srs_success) {
Expand Down Expand Up @@ -2333,6 +2346,12 @@ srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t

// For publisher to send NACK.
if (type == SRS_TICKID_SEND_NACKS) {
// If circuit-breaker is enabled, disable nack.
if (_srs_thread_pool->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

// TODO: FIXME: Merge with hybrid system clock.
srs_update_system_time();

Expand Down
21 changes: 21 additions & 0 deletions trunk/src/app/srs_app_rtc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ using namespace std;
#include <srs_kernel_rtc_rtp.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_threads.hpp>

#include <srs_protocol_kbps.hpp>

extern SrsPps* _srs_pps_snack3;
extern SrsPps* _srs_pps_snack4;

SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
{
Expand Down Expand Up @@ -228,6 +234,12 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver()

void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last)
{
// If circuit-breaker is enabled, disable nack.
if (_srs_thread_pool->hybrid_high_water_level()) {
++_srs_pps_snack4->sugar;
return;
}

for (uint16_t s = first; s != last; ++s) {
queue_[s] = SrsRtpNackInfo();
}
Expand Down Expand Up @@ -259,6 +271,13 @@ void SrsRtpNackForReceiver::check_queue_size()

void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_nacks)
{
// If circuit-breaker is enabled, disable nack.
if (_srs_thread_pool->hybrid_high_water_level()) {
queue_.clear();
++_srs_pps_snack4->sugar;
return;
}

srs_utime_t now = srs_get_system_time();

srs_utime_t interval = now - pre_check_time_;
Expand Down Expand Up @@ -294,6 +313,8 @@ void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_n
++nack_info.req_nack_count_;
nack_info.pre_req_nack_time_ = now;
seqs.add_lost_sn(seq);

++_srs_pps_snack3->sugar;
}

++iter;
Expand Down
13 changes: 7 additions & 6 deletions trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ extern SrsPps* _srs_pps_rr;

extern SrsPps* _srs_pps_snack;
extern SrsPps* _srs_pps_snack2;
extern SrsPps* _srs_pps_snack3;
extern SrsPps* _srs_pps_snack4;
extern SrsPps* _srs_pps_sanack;
extern SrsPps* _srs_pps_svnack;

Expand Down Expand Up @@ -687,9 +689,9 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
}

string snk_desc;
_srs_pps_snack->update(); _srs_pps_snack2->update(); _srs_pps_sanack->update(); _srs_pps_svnack->update();
if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s()) {
snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s());
_srs_pps_snack->update(); _srs_pps_snack2->update(); _srs_pps_snack3->update(); _srs_pps_snack4->update(); _srs_pps_sanack->update(); _srs_pps_svnack->update();
if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s() || _srs_pps_snack3->r10s() || _srs_pps_snack4->r10s()) {
snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d,h3:%d,h4:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), _srs_pps_snack4->r10s());
snk_desc = buf;
}

Expand All @@ -702,9 +704,8 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)

// TODO: FIXME: Should move to Hybrid server stat.
string loss_desc;
_srs_pps_aloss->update();
if (_srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
snprintf(buf, sizeof(buf), ", loss=(r:%d,s:%d,a:%d)", _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
if (!snk_desc.empty() || _srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
loss_desc = buf;
}

Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
// The NACK sent by us(SFU).
SrsPps* _srs_pps_snack = new SrsPps();
SrsPps* _srs_pps_snack2 = new SrsPps();
SrsPps* _srs_pps_snack3 = new SrsPps();
SrsPps* _srs_pps_snack4 = new SrsPps();
SrsPps* _srs_pps_sanack = new SrsPps();
SrsPps* _srs_pps_svnack = new SrsPps();

Expand Down
10 changes: 4 additions & 6 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1282,10 +1282,6 @@ srs_error_t SrsServer::setup_ticks()
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}

if ((err = timer_->tick(10, 5 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}
}

if (_srs_config->get_heartbeat_enabled()) {
Expand All @@ -1307,14 +1303,16 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick)

switch (event) {
case 2: srs_update_system_rusage(); break;
case 3: srs_update_proc_stat(); break;
case 3:
srs_update_system_proc_stat();
srs_update_self_proc_stat();
break;
case 4: srs_update_disk_stat(); break;
case 5: srs_update_meminfo(); break;
case 6: srs_update_platform_info(); break;
case 7: srs_update_network_devices(); break;
case 8: resample_kbps(); break;
case 9: http_heartbeat->heartbeat(); break;
case 10: srs_update_udp_snmp_statistic(); break;
}

return err;
Expand Down
Loading

0 comments on commit 9a05d24

Please sign in to comment.