diff --git a/README.md b/README.md index fe3e7cdaef..48fe5d5014 100755 --- a/README.md +++ b/README.md @@ -340,6 +340,7 @@ Remark: ## History +* v2.0, 2016-09-05, fix memory leak at source. 2.0.214 * v2.0, 2016-09-05, fix memory leak at handshake. 2.0.213 * v2.0, 2016-09-04, support valgrind for [patched st](https://github.com/ossrs/state-threads/issues/2). * v2.0, 2016-09-03, support all arm for [patched st](https://github.com/ossrs/state-threads/issues/1). 2.0.212 diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 6ebf6ced61..e82319aae6 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -71,6 +71,15 @@ int SrsThreadContext::set_id(int v) return ov; } +void SrsThreadContext::clear_cid() +{ + st_thread_t self = st_thread_self(); + std::map::iterator it = cache.find(self); + if (it != cache.end()) { + cache.erase(it); + } +} + // the max size of a line of log. #define LOG_MAX_SIZE 4096 diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index 6cfd719e08..df18e2b843 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -54,6 +54,8 @@ class SrsThreadContext : public ISrsThreadContext virtual int generate_id(); virtual int get_id(); virtual int set_id(int v); +public: + virtual void clear_cid(); }; /** diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 4d6876b2d0..23fc68b0cd 100755 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -866,6 +866,11 @@ int SrsServer::cycle() srs_warn("main cycle terminated, system quit normally."); dispose(); srs_trace("srs terminated"); + + // for valgrind to detect. + srs_freep(_srs_config); + srs_freep(_srs_log); + exit(0); #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 4295a8fd09..98b87c0eeb 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -58,6 +58,9 @@ using namespace std; // when got these videos or audios, pure audio or video, mix ok. #define SRS_MIX_CORRECT_PURE_AV 10 +// the time to cleanup source in ms. +#define SRS_SOURCE_CLEANUP 30000 + int _srs_time_jitter_string2int(std::string time_jitter) { if (time_jitter == "full") { @@ -796,16 +799,39 @@ void SrsSource::dispose_all() } int SrsSource::cycle_all() +{ + int cid = _srs_context->get_id(); + int ret = do_cycle_all(); + _srs_context->set_id(cid); + return ret; +} + +int SrsSource::do_cycle_all() { int ret = ERROR_SUCCESS; - // TODO: FIXME: support remove dead source for a long time. std::map::iterator it; - for (it = pool.begin(); it != pool.end(); ++it) { + for (it = pool.begin(); it != pool.end();) { SrsSource* source = it->second; if ((ret = source->cycle()) != ERROR_SUCCESS) { return ret; } + + if (source->expired()) { + int cid = source->source_id(); + if (cid == -1 && source->pre_source_id() > 0) { + cid = source->pre_source_id(); + } + if (cid > 0) { + _srs_context->set_id(cid); + } + srs_trace("cleanup die source, total=%d", (int)pool.size()); + + srs_freep(source); + pool.erase(it++); + } else { + ++it; + } } return ret; @@ -916,7 +942,8 @@ SrsSource::SrsSource() cache_metadata = cache_sh_video = cache_sh_audio = NULL; _can_publish = true; - _source_id = -1; + _pre_source_id = _source_id = -1; + die_at = -1; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -1001,6 +1028,20 @@ int SrsSource::cycle() return ret; } +bool SrsSource::expired() +{ + if (!consumers.empty() || die_at == -1) { + return false; + } + + int64_t now = srs_get_system_time_ms(); + if (now > die_at + SRS_SOURCE_CLEANUP) { + return true; + } + + return false; +} + int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh) { int ret = ERROR_SUCCESS; @@ -1355,6 +1396,12 @@ int SrsSource::on_source_id_changed(int id) return ret; } + if (_pre_source_id == -1) { + _pre_source_id = id; + } else if (_pre_source_id != _source_id) { + _pre_source_id = _source_id; + } + _source_id = id; // notice all consumer @@ -1372,6 +1419,11 @@ int SrsSource::source_id() return _source_id; } +int SrsSource::pre_source_id() +{ + return _pre_source_id; +} + bool SrsSource::can_publish(bool is_edge) { if (is_edge) { @@ -2107,6 +2159,11 @@ int SrsSource::on_publish() void SrsSource::on_unpublish() { + // ignore when already unpublished. + if (_can_publish) { + return; + } + // destroy all forwarders destroy_forwarders(); @@ -2142,12 +2199,18 @@ void SrsSource::on_unpublish() SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_close(_req); handler->on_unpublish(this, _req); + + // no consumer, stream is die. + if (consumers.empty()) { + die_at = srs_get_system_time_ms(); + } } int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) { int ret = ERROR_SUCCESS; + die_at = -1; consumer = new SrsConsumer(this, conn); consumers.push_back(consumer); @@ -2224,6 +2287,7 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer) if (consumers.empty()) { play_edge->on_all_client_stop(); + die_at = srs_get_system_time_ms(); } } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 6ee37d5ed1..616e739040 100755 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -439,6 +439,9 @@ class SrsSource : public ISrsReloadHandler */ static void dispose_all(); static int cycle_all(); +private: + static int do_cycle_all(); +public: /** * when system exit, destroy the sources, * for gmc to analysis mem leaks. @@ -451,6 +454,8 @@ class SrsSource : public ISrsReloadHandler // when source id changed, for example, the edge reconnect, // invoke the on_source_id_changed() to let all clients know. int _source_id; + // previous source id. + int _pre_source_id; // deep copy of client request. SrsRequest* _req; // to delivery stream to clients. @@ -501,6 +506,9 @@ class SrsSource : public ISrsReloadHandler */ // TODO: FIXME: to support reload atc. bool atc; + // last die time, when all consumers quit and no publisher, + // we will remove the source when source die. + int64_t die_at; private: SrsSharedPtrMessage* cache_metadata; // the cached video sequence header. @@ -513,6 +521,8 @@ class SrsSource : public ISrsReloadHandler public: virtual void dispose(); virtual int cycle(); + // remove source when expired. + virtual bool expired(); // initialize, get and setter. public: /** @@ -543,6 +553,7 @@ class SrsSource : public ISrsReloadHandler virtual int on_source_id_changed(int id); // get current source id. virtual int source_id(); + virtual int pre_source_id(); // logic data methods public: virtual bool can_publish(bool is_edge); diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 8a366470de..8134cdb888 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include namespace internal { ISrsThreadHandler::ISrsThreadHandler() @@ -243,6 +244,12 @@ namespace internal { obj->thread_cycle(); + // for valgrind to detect. + SrsThreadContext* ctx = dynamic_cast(_srs_context); + if (ctx) { + ctx->clear_cid(); + } + st_thread_exit(NULL); return NULL; diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index c8e496b106..8ceeb04f8e 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 213 +#define VERSION_REVISION 214 // generated by configure, only macros. #include