Skip to content

Commit

Permalink
Threads: Directly use hybrid thread to consume messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 14bc7bc commit a505b6b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 59 deletions.
23 changes: 20 additions & 3 deletions trunk/src/app/srs_app_hybrid.cpp
Expand Up @@ -205,6 +205,7 @@ srs_error_t SrsHybridServer::run()
{
srs_error_t err = srs_success;

// Run all servers, which should never block.
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
Expand All @@ -214,9 +215,25 @@ srs_error_t SrsHybridServer::run()
}
}

// TODO: FIXME: Should run the signal manager and directly quit.
// Wait for all server to quit.
srs_usleep(SRS_UTIME_NO_TIMEOUT);
// Consume the async UDP/SRTP packets.
while (true) {
int consumed = 0;

// Consume the received UDP packets.
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Consume the cooked SRTP packets.
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Wait for a while if no packets.
if (!consumed) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
}
}

return err;
}
Expand Down
47 changes: 0 additions & 47 deletions trunk/src/app/srs_app_threads.cpp
Expand Up @@ -158,8 +158,6 @@ SrsThreadPool::SrsThreadPool()
hybrid_critical_water_level_ = 0;
hybrid_dying_water_level_ = 0;

trd_ = new SrsFastCoroutine("pool", this);

high_threshold_ = 0;
high_pulse_ = 0;
critical_threshold_ = 0;
Expand Down Expand Up @@ -188,8 +186,6 @@ SrsThreadPool::SrsThreadPool()
// TODO: FIMXE: If free the pool, we should stop all threads.
SrsThreadPool::~SrsThreadPool()
{
srs_freep(trd_);

srs_freep(lock_);
}

Expand Down Expand Up @@ -496,49 +492,6 @@ void* SrsThreadPool::start(void* arg)
return NULL;
}

srs_error_t SrsThreadPool::consume()
{
srs_error_t err = srs_success;

if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

return err;
}

srs_error_t SrsThreadPool::cycle()
{
srs_error_t err = srs_success;

while (true) {
int consumed = 0;

// Check error before consume packets.
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Check error before consume packets.
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

if (!consumed) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}
}

return err;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

Expand Down
10 changes: 1 addition & 9 deletions trunk/src/app/srs_app_threads.hpp
Expand Up @@ -112,7 +112,7 @@ class SrsThreadEntry

// Allocate a(or almost) fixed thread poll to execute tasks,
// so that we can take the advantage of multiple CPUs.
class SrsThreadPool : public ISrsCoroutineHandler
class SrsThreadPool
{
private:
SrsThreadEntry* entry_;
Expand All @@ -136,9 +136,6 @@ class SrsThreadPool : public ISrsCoroutineHandler
int critical_pulse_;
int dying_threshold_;
int dying_pulse_;
private:
// A coroutine to consume cooked packets.
SrsFastCoroutine* trd_;
public:
SrsThreadPool();
virtual ~SrsThreadPool();
Expand All @@ -159,11 +156,6 @@ class SrsThreadPool : public ISrsCoroutineHandler
void stop();
private:
static void* start(void* arg);
public:
// Consume packets. Must call in worker/service thread.
virtual srs_error_t consume();
private:
srs_error_t cycle();
};

// The global thread pool.
Expand Down

0 comments on commit a505b6b

Please sign in to comment.