Skip to content

Commit

Permalink
Threads-SEND: Support async send UDP packets
Browse files Browse the repository at this point in the history
1. Support async send UDP by SrsAsyncSendManager.
2. Copy UDP packet by SrsAsyncUdpPacket.
3. Support SrsUdpMuxSocket raw sendto.
4. Config the async send by async_send.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent b0800f5 commit 56ffc28
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 3 deletions.
3 changes: 3 additions & 0 deletions trunk/conf/full.conf
Expand Up @@ -127,6 +127,9 @@ threads {
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
# Default: off
async_recv off;
# Whether enable the ASYNC SEND, send udp packets in dedicate threads.
# Default: off
async_send off;
# CPU set for affinity, for example:
# 0 means CPU0
# 0-3 means CPU0, CPU1, CPU2
Expand Down
17 changes: 17 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Expand Up @@ -4161,6 +4161,23 @@ bool SrsConfig::get_threads_async_recv()
return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_async_send()
{
static bool DEFAULT = false;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
return DEFAULT;
}

conf = conf->get("async_send");
if (!conf) {
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end)
{
static int DEFAULT_START = 0;
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_config.hpp
Expand Up @@ -480,6 +480,7 @@ class SrsConfig
virtual srs_utime_t get_threads_interval();
virtual bool get_threads_async_srtp();
virtual bool get_threads_async_recv();
virtual bool get_threads_async_send();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
virtual int get_threads_max_recv_queue();
virtual int get_high_threshold();
Expand Down
13 changes: 13 additions & 0 deletions trunk/src/app/srs_app_listener.cpp
Expand Up @@ -374,6 +374,13 @@ srs_error_t SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
{
srs_error_t err = srs_success;

if (_srs_async_send->enabled()) {
SrsAsyncUdpPacket* pkt = new SrsAsyncUdpPacket();
pkt->from(this, (char*)data, size);
_srs_async_send->add_packet(pkt);
return err;
}

++_srs_pps_spkts->sugar;

int nb_write = srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout);
Expand All @@ -396,6 +403,12 @@ srs_error_t SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
return err;
}

int SrsUdpMuxSocket::raw_sendto(void* data, int size)
{
int osfd = srs_netfd_fileno(lfd);
return ::sendto(osfd, data, size, 0, (sockaddr*)&from, (socklen_t)fromlen);
}

srs_netfd_t SrsUdpMuxSocket::stfd()
{
return lfd;
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_listener.hpp
Expand Up @@ -169,6 +169,8 @@ class SrsUdpMuxSocket
int on_recvfrom();
public:
srs_error_t sendto(void* data, int size, srs_utime_t timeout);
int raw_sendto(void* data, int size);
public:
srs_netfd_t stfd();
sockaddr_in* peer_addr();
socklen_t peer_addrlen();
Expand Down
93 changes: 90 additions & 3 deletions trunk/src/app/srs_app_threads.cpp
Expand Up @@ -238,13 +238,15 @@ srs_error_t SrsThreadPool::initialize()
bool async_srtp = _srs_config->get_threads_async_srtp();

int recv_queue = _srs_config->get_threads_max_recv_queue();
srs_trace("AsyncRecv: Set max_queue_size=%d", recv_queue);
_srs_async_recv->set_max_recv_queue(recv_queue);

srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d",
bool async_send = _srs_config->get_threads_async_send();
_srs_async_send->set_enabled(async_send);

srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d",
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue);
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send);

return err;
}
Expand Down Expand Up @@ -1224,3 +1226,88 @@ srs_error_t SrsAsyncRecvManager::consume(int* nn_consumed)
}

SrsAsyncRecvManager* _srs_async_recv = new SrsAsyncRecvManager();

SrsAsyncUdpPacket::SrsAsyncUdpPacket()
{
skt_ = NULL;
data_ = NULL;
size_ = 0;
}

SrsAsyncUdpPacket::~SrsAsyncUdpPacket()
{
srs_freep(skt_);
srs_freepa(data_);
}

void SrsAsyncUdpPacket::from(SrsUdpMuxSocket* skt, char* data, int size)
{
skt_ = skt->copy();
size_ = size;

if (size) {
data_ = new char[size];
memcpy(data_, data, size);
}
}

SrsAsyncSendManager::SrsAsyncSendManager()
{
enabled_ = false;
sending_packets_ = new SrsThreadQueue<SrsAsyncUdpPacket>();
}

SrsAsyncSendManager::~SrsAsyncSendManager()
{
srs_freep(sending_packets_);
}

void SrsAsyncSendManager::add_packet(SrsAsyncUdpPacket* pkt)
{
sending_packets_->push_back(pkt);
}

srs_error_t SrsAsyncSendManager::start(void* arg)
{
SrsAsyncSendManager* srtp = (SrsAsyncSendManager*)arg;
return srtp->do_start();
}

srs_error_t SrsAsyncSendManager::do_start()
{
srs_error_t err = srs_success;

// TODO: FIXME: Config it?
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;

while (true) {
vector<SrsAsyncUdpPacket*> flying_sending_packets;
sending_packets_->swap(flying_sending_packets);

for (int i = 0; i < (int)flying_sending_packets.size(); i++) {
SrsAsyncUdpPacket* pkt = flying_sending_packets.at(i);

int r0 = pkt->skt_->raw_sendto(pkt->data_, pkt->size_);
if (r0 <= 0) {
// Ignore any error.
}

srs_freep(pkt);
}

// Once there are packets to send, we MUST send it ASAP.
if (!flying_sending_packets.empty()) {
continue;
}

// TODO: FIXME: Maybe we should use cond wait?
timespec tv = {0};
tv.tv_sec = interval / SRS_UTIME_SECONDS;
tv.tv_nsec = (interval % SRS_UTIME_SECONDS) * 1000;
nanosleep(&tv, NULL);
}

return err;
}

SrsAsyncSendManager* _srs_async_send = new SrsAsyncSendManager();
40 changes: 40 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Expand Up @@ -438,4 +438,44 @@ class SrsAsyncRecvManager
// The global async RECV manager.
extern SrsAsyncRecvManager* _srs_async_recv;

// The async UDP packet.
class SrsAsyncUdpPacket
{
public:
SrsUdpMuxSocket* skt_;
char* data_;
int size_;
public:
SrsAsyncUdpPacket();
virtual ~SrsAsyncUdpPacket();
public:
void from(SrsUdpMuxSocket* skt, char* data, int size);
};

// The async SEND manager, to send UDP packets.
class SrsAsyncSendManager
{
private:
// By config.
bool enabled_;
// The UDP packets to sending.
SrsThreadQueue<SrsAsyncUdpPacket>* sending_packets_;
public:
SrsAsyncSendManager();
virtual ~SrsAsyncSendManager();
public:
// Whether the async manager is enabled.
bool enabled() { return enabled_; }
void set_enabled(bool v) { enabled_ = v; }
// Send the packet.
void add_packet(SrsAsyncUdpPacket* pkt);
// Start the thread.
static srs_error_t start(void* arg);
private:
srs_error_t do_start();
};

// The global async SEND manager.
extern SrsAsyncSendManager* _srs_async_send;

#endif
5 changes: 5 additions & 0 deletions trunk/src/main/srs_main_server.cpp
Expand Up @@ -500,6 +500,11 @@ srs_error_t run_in_thread_pool()
return srs_error_wrap(err, "start async recv thread");
}

// Start the async SEND worker thread, to send UDP packets.
if ((err = _srs_thread_pool->execute("send", SrsAsyncSendManager::start, _srs_async_send)) != srs_success) {
return srs_error_wrap(err, "start async send thread");
}

// Start the service worker thread, for RTMP and RTC server, etc.
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "start hybrid server thread");
Expand Down

0 comments on commit 56ffc28

Please sign in to comment.