diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index d1a9940cf9..29b2a080f4 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -205,6 +205,7 @@ srs_error_t SrsHybridServer::run() { srs_error_t err = srs_success; + // Run all servers, which should never block. vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { ISrsHybridServer* server = *it; @@ -214,9 +215,25 @@ srs_error_t SrsHybridServer::run() } } - // TODO: FIXME: Should run the signal manager and directly quit. - // Wait for all server to quit. - srs_usleep(SRS_UTIME_NO_TIMEOUT); + // Consume the async UDP/SRTP packets. + while (true) { + int consumed = 0; + + // Consume the received UDP packets. + if ((err = _srs_async_recv->consume(&consumed)) != srs_success) { + srs_error_reset(err); // Ignore any error. + } + + // Consume the cooked SRTP packets. + if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) { + srs_error_reset(err); // Ignore any error. + } + + // Wait for a while if no packets. + if (!consumed) { + srs_usleep(20 * SRS_UTIME_MILLISECONDS); + } + } return err; } diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 499bcf9d13..fd5c9eee47 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -158,8 +158,6 @@ SrsThreadPool::SrsThreadPool() hybrid_critical_water_level_ = 0; hybrid_dying_water_level_ = 0; - trd_ = new SrsFastCoroutine("pool", this); - high_threshold_ = 0; high_pulse_ = 0; critical_threshold_ = 0; @@ -188,8 +186,6 @@ SrsThreadPool::SrsThreadPool() // TODO: FIMXE: If free the pool, we should stop all threads. SrsThreadPool::~SrsThreadPool() { - srs_freep(trd_); - srs_freep(lock_); } @@ -496,49 +492,6 @@ void* SrsThreadPool::start(void* arg) return NULL; } -srs_error_t SrsThreadPool::consume() -{ - srs_error_t err = srs_success; - - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "start"); - } - - return err; -} - -srs_error_t SrsThreadPool::cycle() -{ - srs_error_t err = srs_success; - - while (true) { - int consumed = 0; - - // Check error before consume packets. - if ((err = trd_->pull()) != srs_success) { - return srs_error_wrap(err, "pull"); - } - if ((err = _srs_async_recv->consume(&consumed)) != srs_success) { - srs_error_reset(err); // Ignore any error. - } - - // Check error before consume packets. - if ((err = trd_->pull()) != srs_success) { - return srs_error_wrap(err, "pull"); - } - if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) { - srs_error_reset(err); // Ignore any error. - } - - if (!consumed) { - srs_usleep(20 * SRS_UTIME_MILLISECONDS); - continue; - } - } - - return err; -} - // TODO: FIXME: It should be thread-local or thread-safe. SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 0c77b4eb03..7ab14873f4 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -112,7 +112,7 @@ class SrsThreadEntry // Allocate a(or almost) fixed thread poll to execute tasks, // so that we can take the advantage of multiple CPUs. -class SrsThreadPool : public ISrsCoroutineHandler +class SrsThreadPool { private: SrsThreadEntry* entry_; @@ -136,9 +136,6 @@ class SrsThreadPool : public ISrsCoroutineHandler int critical_pulse_; int dying_threshold_; int dying_pulse_; -private: - // A coroutine to consume cooked packets. - SrsFastCoroutine* trd_; public: SrsThreadPool(); virtual ~SrsThreadPool(); @@ -159,11 +156,6 @@ class SrsThreadPool : public ISrsCoroutineHandler void stop(); private: static void* start(void* arg); -public: - // Consume packets. Must call in worker/service thread. - virtual srs_error_t consume(); -private: - srs_error_t cycle(); }; // The global thread pool.