From 6f1c74927b59074229f7bf90918e2e1b65c1214e Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 30 May 2024 14:57:55 +0800 Subject: [PATCH 01/16] SmartPtr: Support shared ptr and test. --- trunk/src/core/srs_core_autofree.hpp | 72 ++++++++++++++++++++++++++++ trunk/src/utest/srs_utest_core.cpp | 70 +++++++++++++++++++++++++++ trunk/src/utest/srs_utest_core.hpp | 13 +++++ 3 files changed, 155 insertions(+) diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index e8f281519e..f3bf8e2138 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -81,4 +81,76 @@ class impl_SrsAutoFree } }; +// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107 +// Usage: +// SrsSharedPtr ptr(new MyClass()); +// ptr->do_something(); +// +// SrsSharedPtr cp = ptr; +// cp->do_something(); +template +class SrsSharedPtr +{ +private: + // The pointer to the object. + T* ptr_; + // The reference count of the object. + uint32_t* ref_count_; +public: + // Create a shared ptr with the object. + SrsSharedPtr(T* ptr) { + ptr_ = ptr; + ref_count_ = new uint32_t(1); + } + // Copy the shared ptr. + SrsSharedPtr(const SrsSharedPtr& cp) { + ptr_ = cp.ptr_; + ref_count_ = cp.ref_count_; + if (ref_count_) (*ref_count_)++; + } + // Dispose and delete the shared ptr. + ~SrsSharedPtr() { + reset(); + } +private: + // Reset the shared ptr. + void reset() { + if (!ref_count_) return; + + (*ref_count_)--; + if (*ref_count_ == 0) { + delete ptr_; + delete ref_count_; + } + + ptr_ = NULL; + ref_count_ = NULL; + } +public: + // Get the object. + T* get() { + return ptr_; + } + // Overload the -> operator. + T* operator->() { + return ptr_; + } +private: + // Overload the * operator. + T& operator*() { + return *ptr_; + } + // Overload the bool operator. + operator bool() const { + return ptr_ != NULL; + } +private: + // Disable the assign operator. + SrsSharedPtr& operator=(const SrsSharedPtr&); + // Disable the move constructor. + SrsSharedPtr(SrsSharedPtr&&); + // Disable the move assign operator. + SrsSharedPtr& operator=(SrsSharedPtr&&); +}; + #endif diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 1b399fd771..dc7271b0f9 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -86,3 +86,73 @@ VOID TEST(CoreLogger, CheckVsnprintf) } } +VOID TEST(CoreLogger, SharedPtrTypical) +{ + if (true) { + SrsSharedPtr p(new int(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, *p); + } + + if (true) { + SrsSharedPtr p = SrsSharedPtr(new int(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, *p); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + EXPECT_TRUE(p); + EXPECT_TRUE(q); + EXPECT_EQ(100, *p); + EXPECT_EQ(100, *q); + } +} + +VOID TEST(CoreLogger, SharedPtrReset) +{ + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + p.reset(); + EXPECT_FALSE(p); + EXPECT_TRUE(q); + EXPECT_EQ(100, *q); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + q.reset(); + EXPECT_TRUE(p); + EXPECT_FALSE(q); + EXPECT_EQ(100, *p); + } +} + +VOID TEST(CoreLogger, SharedPtrObject) +{ + SrsSharedPtr p(new MyNormalObject(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, p->id()); +} + +VOID TEST(CoreLogger, SharedPtrNullptr) +{ + SrsSharedPtr p(NULL); + EXPECT_FALSE(p); + + p.reset(); + EXPECT_FALSE(p); + + SrsSharedPtr q = p; + EXPECT_FALSE(q); +} + diff --git a/trunk/src/utest/srs_utest_core.hpp b/trunk/src/utest/srs_utest_core.hpp index 1c7795b977..8c7306384d 100644 --- a/trunk/src/utest/srs_utest_core.hpp +++ b/trunk/src/utest/srs_utest_core.hpp @@ -14,5 +14,18 @@ #include +class MyNormalObject +{ +private: + int id_; +public: + MyNormalObject(int id) { + id_ = id; + } + int id() { + return id_; + } +}; + #endif From 6a0ed789b30aed05ff8eb2828a4b6f3ad6920fdf Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 4 Jun 2024 10:52:34 +0800 Subject: [PATCH 02/16] SmartPtr: Support executor coroutine for resource. --- trunk/src/app/srs_app_conn.hpp | 3 ++ trunk/src/app/srs_app_st.cpp | 36 ++++++++++++++++ trunk/src/app/srs_app_st.hpp | 30 +++++++++++-- trunk/src/core/srs_core_autofree.hpp | 2 +- trunk/src/protocol/srs_protocol_conn.hpp | 2 + trunk/src/utest/srs_utest_core.cpp | 55 ++++++++++++++++++++++++ 6 files changed, 124 insertions(+), 4 deletions(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index b5aeb48da0..c4c7a859ec 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -20,6 +20,7 @@ #include #include #include +#include class SrsWallClock; class SrsBuffer; @@ -126,6 +127,7 @@ class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManag }; // A simple lazy-sweep GC, just wait for a long time to delete the disposable resources. +// TODO: FIXME: Remove it. class SrsLazySweepGc : public ISrsLazyGc { public: @@ -180,6 +182,7 @@ extern ISrsLazyGc* _srs_gc; // wrapper might be managed by other resource manager, such as _srs_gb_manager for SrsLazyGbSession. Furthermore, other // copied out wrappers might be freed by srs_freep. All are ok, because all wrapper and resources are simply normal // object, so if you added to manager then you should use manager to remove it, and you can also directly delete it. +// TODO: FIXME: Remove it. template class SrsLazyObjectWrapper : public ISrsResource { diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 77f581a640..dae3ea2a5c 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -342,3 +342,39 @@ void SrsWaitGroup::wait() } } +SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h) +{ + resource_ = r; + handler_ = h; + manager_ = m; + trd_ = new SrsSTCoroutine("ar", this, resource_->get_id()); +} + +SrsExecutorCoroutine::~SrsExecutorCoroutine() +{ + manager_->remove(resource_); + srs_freep(trd_); +} + +srs_error_t SrsExecutorCoroutine::start() +{ + return trd_->start(); +} + +srs_error_t SrsExecutorCoroutine::cycle() +{ + srs_error_t err = handler_->cycle(); + manager_->remove(this); + return err; +} + +const SrsContextId& SrsExecutorCoroutine::get_id() +{ + return resource_->get_id(); +} + +std::string SrsExecutorCoroutine::desc() +{ + return resource_->desc(); +} + diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 51282bca25..aeea103003 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -15,6 +15,7 @@ #include #include #include +#include class SrsFastCoroutine; @@ -64,7 +65,7 @@ class ISrsStartable virtual srs_error_t start() = 0; }; -// The corotine object. +// The coroutine object. class SrsCoroutine : public ISrsStartable { public: @@ -192,7 +193,7 @@ class SrsFastCoroutine static void* pfn(void* arg); }; -// Like goroytine sync.WaitGroup. +// Like goroutine sync.WaitGroup. class SrsWaitGroup { private: @@ -206,9 +207,32 @@ class SrsWaitGroup void add(int n); // When coroutine is done. void done(); - // Wait for all corotine to be done. + // Wait for all coroutine to be done. void wait(); }; +// Start a coroutine for resource executor, to execute the handler and delete resource and itself when done. +class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler +{ +private: + ISrsResourceManager* manager_; + ISrsResource* resource_; + ISrsCoroutineHandler* handler_; + SrsCoroutine* trd_; +public: + SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h); + virtual ~SrsExecutorCoroutine(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); +// Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); +// Interface ISrsResource +public: + virtual const SrsContextId& get_id(); + virtual std::string desc(); +}; + #endif diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index f3bf8e2138..47fec3c572 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -109,7 +109,7 @@ class SrsSharedPtr if (ref_count_) (*ref_count_)++; } // Dispose and delete the shared ptr. - ~SrsSharedPtr() { + virtual ~SrsSharedPtr() { reset(); } private: diff --git a/trunk/src/protocol/srs_protocol_conn.hpp b/trunk/src/protocol/srs_protocol_conn.hpp index b136716adf..1a9d616975 100644 --- a/trunk/src/protocol/srs_protocol_conn.hpp +++ b/trunk/src/protocol/srs_protocol_conn.hpp @@ -50,6 +50,7 @@ class ISrsConnection : public ISrsResource // Lazy-sweep resource, never sweep util all wrappers are freed. // See https://github.com/ossrs/srs/issues/3176#lazy-sweep +// TODO: FIXME: Remove it. class SrsLazyObject { private: @@ -69,6 +70,7 @@ class SrsLazyObject // The lazy-sweep GC, wait for a long time to dispose resource even when resource is disposable. // See https://github.com/ossrs/srs/issues/3176#lazy-sweep +// TODO: FIXME: Remove it. class ISrsLazyGc { public: diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index dc7271b0f9..bac90e41be 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -8,6 +8,8 @@ using namespace std; #include +#include +#include VOID TEST(CoreAutoFreeTest, Free) { @@ -156,3 +158,56 @@ VOID TEST(CoreLogger, SharedPtrNullptr) EXPECT_FALSE(q); } +class MockWrapper +{ +public: + int* ptr; +public: + MockWrapper(int* p) { + ptr = p; + *ptr = *ptr + 1; + } + ~MockWrapper() { + *ptr = *ptr - 1; + } +}; + +VOID TEST(CoreLogger, SharedPtrWrapper) +{ + int* ptr = new int(100); + SrsAutoFree(int, ptr); + EXPECT_EQ(100, *ptr); + + { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q = p; + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + EXPECT_EQ(101, *q->ptr); + + SrsSharedPtr r(new MockWrapper(ptr)); + EXPECT_EQ(102, *ptr); + EXPECT_EQ(102, *p->ptr); + EXPECT_EQ(102, *q->ptr); + EXPECT_EQ(102, *r->ptr); + + SrsSharedPtr s(new MockWrapper(ptr)); + EXPECT_EQ(103, *ptr); + EXPECT_EQ(103, *p->ptr); + EXPECT_EQ(103, *q->ptr); + EXPECT_EQ(103, *r->ptr); + EXPECT_EQ(103, *s->ptr); + } + EXPECT_EQ(100, *ptr); + + { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + } + EXPECT_EQ(100, *ptr); +} + From 197ded7679942cba0b417bfa21c20b1de071ad85 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 4 Jun 2024 10:52:34 +0800 Subject: [PATCH 03/16] SmartPtr: Support shared resource and tests. --- trunk/src/app/srs_app_conn.hpp | 51 +++++++++++++++++++++++++++ trunk/src/utest/srs_utest_core.cpp | 56 ++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index c4c7a859ec..f85c4c563e 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -223,6 +223,57 @@ class SrsLazyObjectWrapper : public ISrsResource } }; +// This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this +// smart pointer resource, such as by implementing delayed release. +// It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage +// is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted +// to one another. +// Usage: +// SrsSharedResource* ptr = new SrsSharedResource(new MyClass()); +// (*ptr)->do_something(); +// +// ISrsResourceManager* manager = ...; +// manager->remove(ptr); +template +class SrsSharedResource : virtual public ISrsResource +{ +private: + SrsSharedPtr ptr_; +public: + SrsSharedResource(T* ptr) : ptr_(ptr) { + } + SrsSharedResource(const SrsSharedResource& cp) : ptr_(cp.ptr_) { + } + virtual ~SrsSharedResource() { + } +public: + // Get the object. + T* get() { + return ptr_.get(); + } + // Overload the -> operator. + T* operator->() { + return ptr_.operator->(); + } +private: + // Overload the * operator. + T& operator*() { + return ptr_.operator*(); + } + // Overload the bool operator. + operator bool() const { + return ptr_.operator bool(); + } +// Interface ISrsResource +public: + virtual const SrsContextId& get_id() { + return ptr_->get_id(); + } + virtual std::string desc() { + return ptr_->desc(); + } +}; + // If a connection is able be expired, user can use HTTP-API to kick-off it. class ISrsExpire { diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index bac90e41be..36a8e079bd 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -211,3 +211,59 @@ VOID TEST(CoreLogger, SharedPtrWrapper) EXPECT_EQ(100, *ptr); } +class MockResource : public ISrsResource +{ +public: + SrsContextId id_; + int value_; +public: + MockResource(int value) : value_(value) { + } + virtual ~MockResource() { + } +public: + virtual const SrsContextId& get_id() { + return id_; + } + virtual std::string desc() { + return id_.c_str(); + } +}; + +VOID TEST(CoreLogger, SharedResourceTypical) +{ + if (true) { + SrsSharedResource* p = new SrsSharedResource(new MockResource(100)); + EXPECT_TRUE(*p); + EXPECT_EQ(100, (*p)->value_); + srs_freep(p); + } + + if (true) { + SrsSharedResource p(new MockResource(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, p->value_); + } + + if (true) { + SrsSharedResource p = SrsSharedResource(new MockResource(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, p->value_); + } + + if (true) { + SrsSharedResource p(new MockResource(100)); + SrsSharedResource q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedResource p(new MockResource(100)); + SrsSharedResource q = p; + EXPECT_TRUE(p); + EXPECT_TRUE(q); + EXPECT_EQ(100, p->value_); + EXPECT_EQ(100, q->value_); + } +} + From 7002639a1dc95637344a9d025f30560bd9461cf1 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 7 Jun 2024 08:30:46 +0800 Subject: [PATCH 04/16] SmartPtr: Support copy assign operator. --- trunk/src/app/srs_app_conn.hpp | 7 +++ trunk/src/core/srs_core_autofree.hpp | 21 ++++++--- trunk/src/utest/srs_utest_core.cpp | 67 ++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index f85c4c563e..a279e2194c 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -255,6 +255,13 @@ class SrsSharedResource : virtual public ISrsResource T* operator->() { return ptr_.operator->(); } + // The assign operator. + SrsSharedResource& operator=(const SrsSharedResource& cp) { + if (this != &cp) { + ptr_ = cp.ptr_; + } + return *this; + } private: // Overload the * operator. T& operator*() { diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index 47fec3c572..604a8f2301 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -104,9 +104,7 @@ class SrsSharedPtr } // Copy the shared ptr. SrsSharedPtr(const SrsSharedPtr& cp) { - ptr_ = cp.ptr_; - ref_count_ = cp.ref_count_; - if (ref_count_) (*ref_count_)++; + copy(cp); } // Dispose and delete the shared ptr. virtual ~SrsSharedPtr() { @@ -126,6 +124,12 @@ class SrsSharedPtr ptr_ = NULL; ref_count_ = NULL; } + // Copy from other shared ptr. + void copy(const SrsSharedPtr& cp) { + ptr_ = cp.ptr_; + ref_count_ = cp.ref_count_; + if (ref_count_) (*ref_count_)++; + } public: // Get the object. T* get() { @@ -135,6 +139,15 @@ class SrsSharedPtr T* operator->() { return ptr_; } + // The assign operator. + SrsSharedPtr& operator=(const SrsSharedPtr& cp) { + if (this != &cp) { + reset(); + copy(cp); + } + + return *this; + } private: // Overload the * operator. T& operator*() { @@ -145,8 +158,6 @@ class SrsSharedPtr return ptr_ != NULL; } private: - // Disable the assign operator. - SrsSharedPtr& operator=(const SrsSharedPtr&); // Disable the move constructor. SrsSharedPtr(SrsSharedPtr&&); // Disable the move assign operator. diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 36a8e079bd..9d238830db 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -108,6 +108,12 @@ VOID TEST(CoreLogger, SharedPtrTypical) EXPECT_EQ(p.get(), q.get()); } + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(p); + EXPECT_EQ(p.get(), q.get()); + } + if (true) { SrsSharedPtr p(new int(100)); SrsSharedPtr q = p; @@ -211,6 +217,53 @@ VOID TEST(CoreLogger, SharedPtrWrapper) EXPECT_EQ(100, *ptr); } +VOID TEST(CoreLogger, SharedPtrAssign) +{ + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(NULL); + q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(new int(101)); + + int* q0 = q.get(); + q = p; + EXPECT_EQ(p.get(), q.get()); + EXPECT_NE(q0, q.get()); + } + + int* ptr0 = new int(100); + SrsAutoFree(int, ptr0); + EXPECT_EQ(100, *ptr0); + + int* ptr1 = new int(200); + SrsAutoFree(int, ptr1); + EXPECT_EQ(200, *ptr1); + + { + SrsSharedPtr p(new MockWrapper(ptr0)); + EXPECT_EQ(101, *ptr0); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q(new MockWrapper(ptr1)); + EXPECT_EQ(201, *ptr1); + EXPECT_EQ(201, *q->ptr); + + q = p; + EXPECT_EQ(200, *ptr1); + EXPECT_EQ(101, *ptr0); + EXPECT_EQ(101, *p->ptr); + EXPECT_EQ(101, *q->ptr); + } + + EXPECT_EQ(100, *ptr0); + EXPECT_EQ(200, *ptr1); +} + class MockResource : public ISrsResource { public: @@ -257,6 +310,20 @@ VOID TEST(CoreLogger, SharedResourceTypical) EXPECT_EQ(p.get(), q.get()); } + if (true) { + SrsSharedResource p(new MockResource(100)); + SrsSharedResource q(NULL); + q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedResource p(new MockResource(100)); + SrsSharedResource q(new MockResource(200)); + q = p; + EXPECT_EQ(p.get(), q.get()); + } + if (true) { SrsSharedResource p(new MockResource(100)); SrsSharedResource q = p; From d6d9002739eaebc6f70a8aa7d63404642bb50dd4 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 7 Jun 2024 11:39:28 +0800 Subject: [PATCH 05/16] SmartPtr: Use shared ptr in GB. --- trunk/src/app/srs_app_conn.cpp | 22 -- trunk/src/app/srs_app_conn.hpp | 97 ------- trunk/src/app/srs_app_gb28181.cpp | 335 +++++++++++------------ trunk/src/app/srs_app_gb28181.hpp | 102 ++++--- trunk/src/app/srs_app_server.cpp | 5 - trunk/src/app/srs_app_st.cpp | 56 +++- trunk/src/app/srs_app_st.hpp | 88 +++++- trunk/src/app/srs_app_threads.cpp | 1 - trunk/src/protocol/srs_protocol_conn.cpp | 32 --- trunk/src/protocol/srs_protocol_conn.hpp | 33 --- 10 files changed, 352 insertions(+), 419 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 7652be15b7..90d818ece0 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -413,28 +413,6 @@ void SrsResourceManager::dispose(ISrsResource* c) } } -SrsLazySweepGc::SrsLazySweepGc() -{ -} - -SrsLazySweepGc::~SrsLazySweepGc() -{ -} - -srs_error_t SrsLazySweepGc::start() -{ - srs_error_t err = srs_success; - return err; -} - -void SrsLazySweepGc::remove(SrsLazyObject* c) -{ - // TODO: FIXME: MUST lazy sweep. - srs_freep(c); -} - -ISrsLazyGc* _srs_gc = NULL; - ISrsExpire::ISrsExpire() { } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index a279e2194c..9b37886c86 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -126,103 +126,6 @@ class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManag void dispose(ISrsResource* c); }; -// A simple lazy-sweep GC, just wait for a long time to delete the disposable resources. -// TODO: FIXME: Remove it. -class SrsLazySweepGc : public ISrsLazyGc -{ -public: - SrsLazySweepGc(); - virtual ~SrsLazySweepGc(); -public: - virtual srs_error_t start(); - virtual void remove(SrsLazyObject* c); -}; - -extern ISrsLazyGc* _srs_gc; - -// A wrapper template for lazy-sweep resource. -// See https://github.com/ossrs/srs/issues/3176#lazy-sweep -// -// Usage for resource which manages itself in coroutine cycle, see SrsLazyGbSession: -// class Resource { -// private: -// SrsLazyObjectWrapper* wrapper_; -// private: -// friend class SrsLazyObjectWrapper; -// Resource(SrsLazyObjectWrapper* wrapper) { wrapper_ = wrapper; } -// public: -// srs_error_t Resource::cycle() { -// srs_error_t err = do_cycle(); -// _srs_gb_manager->remove(wrapper_); -// return err; -// } -// }; -// SrsLazyObjectWrapper* obj = new SrsLazyObjectWrapper*(); -// _srs_gb_manager->add(obj); // Add wrapper to resource manager. -// Start a coroutine to do obj->resource()->cycle(). -// -// Usage for resource managed by other object: -// class Resource { -// private: -// friend class SrsLazyObjectWrapper; -// Resource(SrsLazyObjectWrapper* /*wrapper*/) { -// } -// }; -// class Manager { -// private: -// SrsLazyObjectWrapper* wrapper_; -// public: -// Manager() { wrapper_ = new SrsLazyObjectWrapper(); } -// ~Manager() { srs_freep(wrapper_); } -// }; -// Manager* manager = new Manager(); -// srs_freep(manager); -// -// Note that under-layer resource are destroyed by _srs_gc, which is literally equal to srs_freep. However, the root -// wrapper might be managed by other resource manager, such as _srs_gb_manager for SrsLazyGbSession. Furthermore, other -// copied out wrappers might be freed by srs_freep. All are ok, because all wrapper and resources are simply normal -// object, so if you added to manager then you should use manager to remove it, and you can also directly delete it. -// TODO: FIXME: Remove it. -template -class SrsLazyObjectWrapper : public ISrsResource -{ -private: - T* resource_; -public: - SrsLazyObjectWrapper() { - init(new T(this)); - } - virtual ~SrsLazyObjectWrapper() { - resource_->gc_dispose(); - if (resource_->gc_ref() == 0) { - _srs_gc->remove(resource_); - } - } -private: - SrsLazyObjectWrapper(T* resource) { - init(resource); - } - void init(T* resource) { - resource_ = resource; - resource_->gc_use(); - } -public: - SrsLazyObjectWrapper* copy() { - return new SrsLazyObjectWrapper(resource_); - } - T* resource() { - return resource_; - } -// Interface ISrsResource -public: - virtual const SrsContextId& get_id() { - return resource_->get_id(); - } - virtual std::string desc() { - return resource_->desc(); - } -}; - // This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this // smart pointer resource, such as by implementing delayed release. // It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 99c9dbf0cd..b2c765fd26 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -70,11 +70,15 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state) return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str()); } -SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root) +SrsLazyGbSession::SrsLazyGbSession() : sip_(NULL), media_(NULL) { - wrapper_root_ = wrapper_root; - sip_ = new SrsLazyObjectWrapper(); - media_ = new SrsLazyObjectWrapper(); + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + + sip_ = SrsSharedResource(new SrsLazyGbSipTcpConn()); + media_ = SrsSharedResource(new SrsLazyGbMediaTcpConn()); + muxer_ = new SrsGbMuxer(this); state_ = SrsGbSessionStateInit; @@ -102,38 +106,40 @@ SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapp cid_ = _srs_context->generate_id(); _srs_context->set_id(cid_); // Also change current coroutine cid as session's. - trd_ = new SrsSTCoroutine("GBS", this, cid_); } SrsLazyGbSession::~SrsLazyGbSession() { - srs_freep(trd_); - srs_freep(sip_); - srs_freep(media_); srs_freep(muxer_); srs_freep(ppp_); } -srs_error_t SrsLazyGbSession::initialize(SrsConfDirective* conf) +void SrsLazyGbSession::setup(SrsConfDirective* conf) { - srs_error_t err = srs_success; - pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf); if (candidate_ == "*") { pip_ = srs_get_public_internet_address(true); } std::string output = _srs_config->get_stream_caster_output(conf); - if ((err = muxer_->initialize(output)) != srs_success) { - return srs_error_wrap(err, "muxer"); - } + muxer_->setup(output); connecting_timeout_ = _srs_config->get_stream_caster_sip_timeout(conf); reinvite_wait_ = _srs_config->get_stream_caster_sip_reinvite(conf); srs_trace("Session: Start timeout=%dms, reinvite=%dms, candidate=%s, pip=%s, output=%s", srsu2msi(connecting_timeout_), srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str()); +} - return err; +void SrsLazyGbSession::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +{ + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; +} + +void SrsLazyGbSession::on_executor_done(ISrsInterruptable* executor) +{ + owner_coroutine_ = NULL; } void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) @@ -195,27 +201,24 @@ void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const st } } -void SrsLazyGbSession::on_sip_transport(SrsLazyObjectWrapper* sip) +void SrsLazyGbSession::on_sip_transport(SrsSharedResource sip) { - srs_freep(sip_); - sip_ = sip->copy(); - + sip_ = sip; // Change id of SIP and all its child coroutines. - sip_->resource()->set_cid(cid_); + sip_->set_cid(cid_); } -SrsLazyObjectWrapper* SrsLazyGbSession::sip_transport() +SrsSharedResource SrsLazyGbSession::sip_transport() { return sip_; } -void SrsLazyGbSession::on_media_transport(SrsLazyObjectWrapper* media) +void SrsLazyGbSession::on_media_transport(SrsSharedResource media) { - srs_freep(media_); - media_ = media->copy(); + media_ = media; // Change id of SIP and all its child coroutines. - media_->resource()->set_cid(cid_); + media_->set_cid(cid_); } std::string SrsLazyGbSession::pip() @@ -223,29 +226,22 @@ std::string SrsLazyGbSession::pip() return pip_; } -srs_error_t SrsLazyGbSession::start() +srs_error_t SrsLazyGbSession::cycle() { srs_error_t err = srs_success; - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "coroutine"); - } - - return err; -} + // Update all context id to cid of session. + _srs_context->set_id(cid_); + owner_cid_->set_cid(cid_); + sip_->set_cid(cid_); + media_->set_cid(cid_); -srs_error_t SrsLazyGbSession::cycle() -{ - srs_error_t err = do_cycle(); + // Drive the session cycle. + err = do_cycle(); // Interrupt the SIP and media transport when session terminated. - sip_->resource()->interrupt(); - media_->resource()->interrupt(); - - // Note that we added wrapper to manager, so we must free the wrapper, not this connection. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. - _srs_gb_manager->remove(wrapper); + sip_->interrupt(); + media_->interrupt(); // success. if (err == srs_success) { @@ -279,7 +275,8 @@ srs_error_t SrsLazyGbSession::do_cycle() srs_error_t err = srs_success; while (true) { - if ((err = trd_->pull()) != srs_success) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } @@ -287,7 +284,7 @@ srs_error_t SrsLazyGbSession::do_cycle() srs_usleep(SRS_GB_SESSION_DRIVE_INTERVAL); // Client send bye, we should dispose the session. - if (sip_->resource()->is_bye()) { + if (sip_->is_bye()) { return err; } @@ -316,29 +313,27 @@ srs_error_t SrsLazyGbSession::drive_state() #define SRS_GB_CHANGE_STATE_TO(state) { \ SrsGbSessionState ostate = set_state(state); \ - srs_trace("Session: Change device=%s, state=%s", sip_->resource()->device_id().c_str(), \ + srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \ srs_gb_state(ostate, state_).c_str()); \ } if (state_ == SrsGbSessionStateInit) { // Set to connecting, whatever media is connected or not, because the connecting state will handle it if media // is connected, so we don't need to handle it here. - if (sip_->resource()->is_registered()) { + if (sip_->is_registered()) { SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting); connecting_starttime_ = srs_update_system_time(); } // Invite if media is not connected. - if (sip_->resource()->is_registered() && !media_->resource()->is_connected()) { + if (sip_->is_registered() && !media_->is_connected()) { uint32_t ssrc = 0; - if ((err = sip_->resource()->invite_request(&ssrc)) != srs_success) { + if ((err = sip_->invite_request(&ssrc)) != srs_success) { return srs_error_wrap(err, "invite"); } // Now, we're able to query session by ssrc, for media packets. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. - _srs_gb_manager->add_with_fast_id(ssrc, wrapper); + _srs_gb_manager->add_with_fast_id(ssrc, wrapper_); } } @@ -349,32 +344,32 @@ srs_error_t SrsLazyGbSession::drive_state() } srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(), - srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); - sip_->resource()->reset_to_register(); + srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected()); + sip_->reset_to_register(); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); } - if (sip_->resource()->is_stable() && media_->resource()->is_connected()) { + if (sip_->is_stable() && media_->is_connected()) { SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished); } } if (state_ == SrsGbSessionStateEstablished) { - if (sip_->resource()->is_bye()) { + if (sip_->is_bye()) { srs_trace("Session: Dispose for client bye"); return err; } // When media disconnected, we wait for a while then reinvite. - if (!media_->resource()->is_connected()) { + if (!media_->is_connected()) { if (!reinviting_starttime_) { reinviting_starttime_ = srs_update_system_time(); } if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) { reinviting_starttime_ = 0; srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(), - srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); - sip_->resource()->reset_to_register(); + srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected()); + sip_->reset_to_register(); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); } } @@ -463,27 +458,33 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf // Handle TCP connections. if (listener == sip_listener_) { - SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); - SrsLazyGbSipTcpConn* resource = dynamic_cast(conn->resource()); - resource->setup(conf_, sip_listener_, media_listener_, stfd); + SrsLazyGbSipTcpConn* raw_conn = new SrsLazyGbSipTcpConn(); + raw_conn->setup(conf_, sip_listener_, media_listener_, stfd); + + SrsSharedResource* conn = new SrsSharedResource(raw_conn); + _srs_gb_manager->add(conn, NULL); - if ((err = resource->start()) != srs_success) { - srs_freep(conn); + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); + raw_conn->setup_owner(conn, executor, executor); + + if ((err = executor->start()) != srs_success) { + srs_freep(executor); return srs_error_wrap(err, "gb sip"); } + } else if (listener == media_listener_) { + SrsLazyGbMediaTcpConn* raw_conn = new SrsLazyGbMediaTcpConn(); + raw_conn->setup(stfd); + SrsSharedResource* conn = new SrsSharedResource(raw_conn); _srs_gb_manager->add(conn, NULL); - } else if (listener == media_listener_) { - SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); - SrsLazyGbMediaTcpConn* resource = dynamic_cast(conn->resource()); - resource->setup(stfd); - if ((err = resource->start()) != srs_success) { - srs_freep(conn); + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); + raw_conn->setup_owner(conn, executor, executor); + + if ((err = executor->start()) != srs_success) { + srs_freep(executor); return srs_error_wrap(err, "gb media"); } - - _srs_gb_manager->add(conn, NULL); } else { srs_warn("GB: Ignore TCP client"); srs_close_stfd(stfd); @@ -492,9 +493,13 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf return err; } -SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root) +SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn() { - wrapper_root_ = wrapper_root; + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + cid_ = _srs_context->get_id(); + session_ = NULL; state_ = SrsGbSipStateInit; register_ = new SrsSipMessage(); @@ -507,17 +512,13 @@ SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrappercopy(); - session_ = NULL; sip_listener_ = sip; media_listener_ = media; conn_ = new SrsTcpConnection(stfd); @@ -536,6 +536,18 @@ void SrsLazyGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, Srs sender_ = new SrsLazyGbSipTcpSender(conn_); } +void SrsLazyGbSipTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +{ + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; +} + +void SrsLazyGbSipTcpConn::on_executor_done(ISrsInterruptable* executor) +{ + owner_coroutine_ = NULL; +} + std::string SrsLazyGbSipTcpConn::device_id() { return register_->device_id(); @@ -543,9 +555,10 @@ std::string SrsLazyGbSipTcpConn::device_id() void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) { - trd_->set_cid(cid); + if (owner_cid_) owner_cid_->set_cid(cid); receiver_->set_cid(cid); sender_->set_cid(cid); + cid_ = cid; } void SrsLazyGbSipTcpConn::query_ports(int* sip, int* media) @@ -703,7 +716,7 @@ void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status statu void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) { - string pip = session_->resource()->pip(); // Parse from CANDIDATE + string pip = session_->pip(); // Parse from CANDIDATE int sip_port; query_ports(&sip_port, NULL); string gb_device_id = srs_fmt("sip:%s@%s", msg->to_address_user_.c_str(), msg->to_address_host_.c_str()); string branch = srs_random_str(6); @@ -765,7 +778,7 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) if (pssrc) *pssrc = ssrc_v_; } - string pip = session_->resource()->pip(); // Parse from CANDIDATE + string pip = session_->pip(); // Parse from CANDIDATE int sip_port, media_port; query_ports(&sip_port, &media_port); string srs_device_id = srs_fmt("sip:%s@%s", register_->request_uri_user_.c_str(), register_->request_uri_host_.c_str()); string gb_device_id = srs_fmt("sip:%s@%s", register_->from_address_user_.c_str(), register_->from_address_host_.c_str()); @@ -838,7 +851,7 @@ void SrsLazyGbSipTcpConn::interrupt() { receiver_->interrupt(); sender_->interrupt(); - trd_->interrupt(); + if (owner_coroutine_) owner_coroutine_->interrupt(); } SrsGbSipState SrsLazyGbSipTcpConn::state() @@ -875,7 +888,7 @@ SrsGbSipState SrsLazyGbSipTcpConn::set_state(SrsGbSipState v) const SrsContextId& SrsLazyGbSipTcpConn::get_id() { - return trd_->cid(); + return cid_; } std::string SrsLazyGbSipTcpConn::desc() @@ -883,14 +896,10 @@ std::string SrsLazyGbSipTcpConn::desc() return "GB-SIP-TCP"; } -srs_error_t SrsLazyGbSipTcpConn::start() +srs_error_t SrsLazyGbSipTcpConn::cycle() { srs_error_t err = srs_success; - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "sip"); - } - if ((err = receiver_->start()) != srs_success) { return srs_error_wrap(err, "receiver"); } @@ -899,22 +908,20 @@ srs_error_t SrsLazyGbSipTcpConn::start() return srs_error_wrap(err, "sender"); } - return err; -} + // Wait for the SIP connection to be terminated. + while (true) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { + return srs_error_wrap(err, "pull"); + } -srs_error_t SrsLazyGbSipTcpConn::cycle() -{ - srs_error_t err = do_cycle(); + srs_usleep(SRS_UTIME_NO_TIMEOUT); + } // Interrupt the receiver and sender coroutine. receiver_->interrupt(); sender_->interrupt(); - // Note that we added wrapper to manager, so we must free the wrapper, not this connection. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. - _srs_gb_manager->remove(wrapper); - // success. if (err == srs_success) { srs_trace("client finished."); @@ -942,23 +949,7 @@ srs_error_t SrsLazyGbSipTcpConn::cycle() return srs_success; } -srs_error_t SrsLazyGbSipTcpConn::do_cycle() -{ - srs_error_t err = srs_success; - - while (true) { - if ((err = trd_->pull()) != srs_success) { - return srs_error_wrap(err, "pull"); - } - - // TODO: Handle other messages. - srs_usleep(SRS_UTIME_NO_TIMEOUT); - } - - return err; -} - -srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession) +srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSession** psession) { srs_error_t err = srs_success; @@ -968,32 +959,32 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW // Only create session for REGISTER request. if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err; - // The lazy-sweep wrapper for this resource. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine of receiver. - // Find exists session for register, might be created by another object and still alive. - SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); + SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); + SrsLazyGbSession* raw_session = session ? (*session).get() : NULL; if (!session) { // Create new GB session. - session = new SrsLazyObjectWrapper(); + raw_session = new SrsLazyGbSession(); + raw_session->setup(conf_); - if ((err = session->resource()->initialize(conf_)) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "initialize"); - } + session = new SrsSharedResource(raw_session); + _srs_gb_manager->add_with_id(device, session); - if ((err = session->resource()->start()) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "start"); - } + // Notice SIP session to use current SIP connection. + raw_session->on_sip_transport(*wrapper_); - _srs_gb_manager->add_with_id(device, session); + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session); + raw_session->setup_owner(session, executor, executor); + + if ((err = executor->start()) != srs_success) { + srs_freep(executor); + return srs_error_wrap(err, "gb session"); + } } // Try to load state from previous SIP connection. - SrsLazyGbSipTcpConn* pre = dynamic_cast(session->resource()->sip_transport()->resource()); - if (pre) { + SrsSharedResource pre = raw_session->sip_transport(); + if (pre.get() && pre.get() != this) { state_ = pre->state_; ssrc_str_ = pre->ssrc_str_; ssrc_v_ = pre->ssrc_v_; @@ -1001,9 +992,8 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy(); } - // Notice SIP session to use current SIP connection. - session->resource()->on_sip_transport(wrapper); - *psession = session->copy(); + // Save session for SIP transport. + *psession = raw_session; return err; } @@ -1219,14 +1209,17 @@ ISrsPsPackHandler::~ISrsPsPackHandler() { } -SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root) +SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn() { - wrapper_root_ = wrapper_root; pack_ = new SrsPackContext(this); - trd_ = new SrsSTCoroutine("media", this); buffer_ = new uint8_t[65535]; conn_ = NULL; + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + cid_ = _srs_context->get_id(); + session_ = NULL; connected_ = false; nn_rtcp_ = 0; @@ -1234,11 +1227,9 @@ SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +{ + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; +} + +void SrsLazyGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor) +{ + owner_coroutine_ = NULL; +} + bool SrsLazyGbMediaTcpConn::is_connected() { return connected_; @@ -1254,17 +1257,18 @@ bool SrsLazyGbMediaTcpConn::is_connected() void SrsLazyGbMediaTcpConn::interrupt() { - trd_->interrupt(); + if (owner_coroutine_) owner_coroutine_->interrupt(); } void SrsLazyGbMediaTcpConn::set_cid(const SrsContextId& cid) { - trd_->set_cid(cid); + if (owner_cid_) owner_cid_->set_cid(cid); + cid_ = cid; } const SrsContextId& SrsLazyGbMediaTcpConn::get_id() { - return _srs_context->get_id(); + return cid_; } std::string SrsLazyGbMediaTcpConn::desc() @@ -1272,17 +1276,6 @@ std::string SrsLazyGbMediaTcpConn::desc() return "GB-Media-TCP"; } -srs_error_t SrsLazyGbMediaTcpConn::start() -{ - srs_error_t err = srs_success; - - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "coroutine"); - } - - return err; -} - srs_error_t SrsLazyGbMediaTcpConn::cycle() { srs_error_t err = do_cycle(); @@ -1295,11 +1288,6 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle() connected_ = false; srs_trace("PS: Media disconnect, code=%d", srs_error_code(err)); - // Note that we added wrapper to manager, so we must free the wrapper, not this connection. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. - _srs_gb_manager->remove(wrapper); - // success. if (err == srs_success) { srs_trace("client finished."); @@ -1341,7 +1329,8 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle() uint32_t reserved = 0; for (;;) { - if ((err = trd_->pull()) != srs_success) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } @@ -1437,7 +1426,7 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector } // Notify session about the media pack. - session_->resource()->on_ps_pack(pack_, ps, msgs); + session_->on_ps_pack(pack_, ps, msgs); //for (vector::const_iterator it = msgs.begin(); it != msgs.end(); ++it) { // SrsTsMessage* msg = *it; @@ -1450,23 +1439,21 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector return err; } -srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession) +srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyGbSession** psession) { srs_error_t err = srs_success; if (!ssrc) return err; - // The lazy-sweep wrapper for this resource. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. - // Find exists session for register, might be created by another object and still alive. - SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); + SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); if (!session) return err; - _srs_gb_manager->add_with_fast_id(ssrc, session); - session->resource()->on_media_transport(wrapper); - *psession = session->copy(); + SrsLazyGbSession* raw_session = (*session).get(); + srs_assert(raw_session); + + raw_session->on_media_transport(*wrapper_); + *psession = raw_session; return err; } @@ -1580,13 +1567,9 @@ SrsGbMuxer::~SrsGbMuxer() srs_freep(pprint_); } -srs_error_t SrsGbMuxer::initialize(std::string output) +void SrsGbMuxer::setup(std::string output) { - srs_error_t err = srs_success; - output_ = output; - - return err; } srs_error_t SrsGbMuxer::on_ts_message(SrsTsMessage* msg) @@ -2095,7 +2078,7 @@ srs_error_t SrsGbMuxer::connect() // Cleanup the data before connect again. close(); - string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id()); + string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->device_id()); srs_trace("Muxer: Convert GB to RTMP %s", url.c_str()); srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 783842505f..fbe4067160 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -90,16 +90,23 @@ enum SrsGbSipState std::string srs_gb_sip_state(SrsGbSipState state); // The main logic object for GB, the session. -class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler +// Each session contains a SIP object and a media object, that are managed by session. This means session always +// lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word, +// SIP and media objects use directly pointer to session, while session use shared ptr. +class SrsLazyGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: - SrsCoroutine* trd_; SrsContextId cid_; +private: + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; private: SrsGbSessionState state_; - SrsLazyObjectWrapper* wrapper_root_; - SrsLazyObjectWrapper* sip_; - SrsLazyObjectWrapper* media_; + SrsSharedResource sip_; + SrsSharedResource media_; SrsGbMuxer* muxer_; private: // The candidate for SDP in configuration. @@ -132,26 +139,27 @@ class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsS uint64_t media_recovered_; uint64_t media_msgs_dropped_; uint64_t media_reserved_; -private: - friend class SrsLazyObjectWrapper; - SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root); public: + SrsLazyGbSession(); virtual ~SrsLazyGbSession(); public: // Initialize the GB session. - srs_error_t initialize(SrsConfDirective* conf); + void setup(SrsConfDirective* conf); + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); +// Interface ISrsExecutorHandler +public: + virtual void on_executor_done(ISrsInterruptable* executor); +public: // When got a pack of messages. void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs); // When got available SIP transport. - void on_sip_transport(SrsLazyObjectWrapper* sip); - SrsLazyObjectWrapper* sip_transport(); + void on_sip_transport(SrsSharedResource sip); + SrsSharedResource sip_transport(); // When got available media transport. - void on_media_transport(SrsLazyObjectWrapper* media); + void on_media_transport(SrsSharedResource media); // Get the candidate for SDP generation, the public IP address for device to connect to. std::string pip(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); @@ -186,12 +194,12 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler }; // A GB28181 TCP SIP connection. -class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler +class SrsLazyGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: SrsGbSipState state_; - SrsLazyObjectWrapper* wrapper_root_; - SrsLazyObjectWrapper* session_; + // The owner session object, note that we use the raw pointer and should never free it. + SrsLazyGbSession* session_; SrsSipMessage* register_; SrsSipMessage* invite_ok_; private: @@ -201,19 +209,29 @@ class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public IS SrsConfDirective* conf_; SrsTcpListener* sip_listener_; SrsTcpListener* media_listener_; +private: + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; + SrsContextId cid_; private: SrsTcpConnection* conn_; SrsLazyGbSipTcpReceiver* receiver_; SrsLazyGbSipTcpSender* sender_; - SrsCoroutine* trd_; -private: - friend class SrsLazyObjectWrapper; - SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root); public: + SrsLazyGbSipTcpConn(); virtual ~SrsLazyGbSipTcpConn(); public: // Setup object, to keep empty constructor. void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd); + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); +// Interface ISrsExecutorHandler +public: + virtual void on_executor_done(ISrsInterruptable* executor); +public: // Get the SIP device id. std::string device_id(); // Set the cid of all coroutines. @@ -253,17 +271,12 @@ class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public IS public: virtual const SrsContextId& get_id(); virtual std::string desc(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); -private: - virtual srs_error_t do_cycle(); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession); + srs_error_t bind_session(SrsSipMessage* msg, SrsLazyGbSession** psession); }; // Start a coroutine to receive SIP messages. @@ -333,27 +346,36 @@ class ISrsPsPackHandler }; // A GB28181 TCP media connection, for PS stream. -class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler - , public ISrsPsPackHandler +class SrsLazyGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler { private: bool connected_; - SrsLazyObjectWrapper* wrapper_root_; - SrsLazyObjectWrapper* session_; + // The owner session object, note that we use the raw pointer and should never free it. + SrsLazyGbSession* session_; uint32_t nn_rtcp_; +private: + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; + SrsContextId cid_; private: SrsPackContext* pack_; SrsTcpConnection* conn_; - SrsCoroutine* trd_; uint8_t* buffer_; -private: - friend class SrsLazyObjectWrapper; - SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root); public: + SrsLazyGbMediaTcpConn(); virtual ~SrsLazyGbMediaTcpConn(); public: // Setup object, to keep empty constructor. void setup(srs_netfd_t stfd); + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); +// Interface ISrsExecutorHandler +public: + virtual void on_executor_done(ISrsInterruptable* executor); +public: // Whether media is connected. bool is_connected(); // Interrupt transport by session. @@ -364,9 +386,6 @@ class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public public: virtual const SrsContextId& get_id(); virtual std::string desc(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); @@ -377,7 +396,7 @@ class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector& msgs); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession); + srs_error_t bind_session(uint32_t ssrc, SrsLazyGbSession** psession); }; // The queue for mpegts over udp to send packets. @@ -402,6 +421,7 @@ class SrsMpegpsQueue class SrsGbMuxer { private: + // The owner session object, note that we use the raw pointer and should never free it. SrsLazyGbSession* session_; std::string output_; SrsSimpleRtmpClient* sdk_; @@ -431,7 +451,7 @@ class SrsGbMuxer SrsGbMuxer(SrsLazyGbSession* session); virtual ~SrsGbMuxer(); public: - srs_error_t initialize(std::string output); + void setup(std::string output); srs_error_t on_ts_message(SrsTsMessage* msg); private: virtual srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d213f3f098..c5f6500751 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1369,11 +1369,6 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg) } #endif - SrsLazySweepGc* gc = dynamic_cast(_srs_gc); - if ((err = gc->start()) != srs_success) { - return srs_error_wrap(err, "start gc"); - } - return err; } diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index dae3ea2a5c..3e21e468cd 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -30,6 +30,30 @@ ISrsStartable::~ISrsStartable() { } +ISrsInterruptable::ISrsInterruptable() +{ +} + +ISrsInterruptable::~ISrsInterruptable() +{ +} + +ISrsContextIdSetter::ISrsContextIdSetter() +{ +} + +ISrsContextIdSetter::~ISrsContextIdSetter() +{ +} + +ISrsContextIdGetter::ISrsContextIdGetter() +{ +} + +ISrsContextIdGetter::~ISrsContextIdGetter() +{ +} + SrsCoroutine::SrsCoroutine() { } @@ -342,11 +366,20 @@ void SrsWaitGroup::wait() } } -SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h) +ISrsExecutorHandler::ISrsExecutorHandler() +{ +} + +ISrsExecutorHandler::~ISrsExecutorHandler() +{ +} + +SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb) { resource_ = r; handler_ = h; manager_ = m; + callback_ = cb; trd_ = new SrsSTCoroutine("ar", this, resource_->get_id()); } @@ -361,9 +394,30 @@ srs_error_t SrsExecutorCoroutine::start() return trd_->start(); } +void SrsExecutorCoroutine::interrupt() +{ + trd_->interrupt(); +} + +srs_error_t SrsExecutorCoroutine::pull() +{ + return trd_->pull(); +} + +const SrsContextId& SrsExecutorCoroutine::cid() +{ + return trd_->cid(); +} + +void SrsExecutorCoroutine::set_cid(const SrsContextId& cid) +{ + trd_->set_cid(cid); +} + srs_error_t SrsExecutorCoroutine::cycle() { srs_error_t err = handler_->cycle(); + if (callback_) callback_->on_executor_done(this); manager_->remove(this); return err; } diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index aeea103003..d7315b20cd 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -18,6 +18,7 @@ #include class SrsFastCoroutine; +class SrsExecutorCoroutine; // Each ST-coroutine must implements this interface, // to do the cycle job and handle some events. @@ -65,21 +66,46 @@ class ISrsStartable virtual srs_error_t start() = 0; }; +// Allow user to interrupt the coroutine, for example, to stop it. +class ISrsInterruptable +{ +public: + ISrsInterruptable(); + virtual ~ISrsInterruptable(); +public: + virtual void interrupt() = 0; + virtual srs_error_t pull() = 0; +}; + +// Get the context id. +class ISrsContextIdSetter +{ +public: + ISrsContextIdSetter(); + virtual ~ISrsContextIdSetter(); +public: + virtual void set_cid(const SrsContextId& cid) = 0; +}; + +// Set the context id. +class ISrsContextIdGetter +{ +public: + ISrsContextIdGetter(); + virtual ~ISrsContextIdGetter(); +public: + virtual const SrsContextId& cid() = 0; +}; + // The coroutine object. -class SrsCoroutine : public ISrsStartable +class SrsCoroutine : public ISrsStartable, public ISrsInterruptable + , public ISrsContextIdSetter, public ISrsContextIdGetter { public: SrsCoroutine(); virtual ~SrsCoroutine(); public: virtual void stop() = 0; - virtual void interrupt() = 0; - // @return a copy of error, which should be freed by user. - // NULL if not terminated and user should pull again. - virtual srs_error_t pull() = 0; - // Get and set the context id of coroutine. - virtual const SrsContextId& cid() = 0; - virtual void set_cid(const SrsContextId& cid) = 0; }; // An empty coroutine, user can default to this object before create any real coroutine. @@ -211,20 +237,60 @@ class SrsWaitGroup void wait(); }; -// Start a coroutine for resource executor, to execute the handler and delete resource and itself when done. -class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler +// The callback when executor cycle done. +class ISrsExecutorHandler +{ +public: + ISrsExecutorHandler(); + virtual ~ISrsExecutorHandler(); +public: + virtual void on_executor_done(ISrsInterruptable* executor) = 0; +}; + +// Start a coroutine for resource executor, to execute the handler and delete resource and itself when +// handler cycle done. +// +// Note that the executor will free itself by manager, then free the resource by manager. This is a helper +// that is used for a goroutine to execute a handler and free itself after the cycle is done. +// +// Generally, the handler, resource, and callback generally are the same object. But we do not define a single +// interface, because shared resource is a special interface. +// +// Note that the resource may live longer than executor, because it is shared resource, so you should process +// the callback. For example, you should never use the executor after it's stopped and deleted. +// +// Usage: +// ISrsResourceManager* manager = ...; +// ISrsResource* resource, ISrsCoroutineHandler* handler, ISrsExecutorHandler* callback = ...; // Resource, handler, and callback are the same object. +// SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(manager, resource, handler); +// if ((err = executor->start()) != srs_success) { +// srs_freep(executor); +// return err; +// } +class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsInterruptable + , public ISrsContextIdSetter, public ISrsContextIdGetter, public ISrsCoroutineHandler { private: ISrsResourceManager* manager_; ISrsResource* resource_; ISrsCoroutineHandler* handler_; + ISrsExecutorHandler* callback_; +private: SrsCoroutine* trd_; public: - SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h); + SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb); virtual ~SrsExecutorCoroutine(); // Interface ISrsStartable public: virtual srs_error_t start(); +// Interface ISrsInterruptable +public: + virtual void interrupt(); + virtual srs_error_t pull(); +// Interface ISrsContextId +public: + virtual const SrsContextId& cid(); + virtual void set_cid(const SrsContextId& cid); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index b42a163be1..d5c6e42297 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -335,7 +335,6 @@ srs_error_t srs_global_initialize() #ifdef SRS_GB28181 _srs_gb_manager = new SrsResourceManager("GB", true); #endif - _srs_gc = new SrsLazySweepGc(); // Initialize global pps, which depends on _srs_clock _srs_pps_ids = new SrsPps(); diff --git a/trunk/src/protocol/srs_protocol_conn.cpp b/trunk/src/protocol/srs_protocol_conn.cpp index 0c988b9b60..b0fba2dab8 100644 --- a/trunk/src/protocol/srs_protocol_conn.cpp +++ b/trunk/src/protocol/srs_protocol_conn.cpp @@ -40,35 +40,3 @@ ISrsConnection::~ISrsConnection() { } -SrsLazyObject::SrsLazyObject() -{ - gc_ref_ = 0; -} - -SrsLazyObject::~SrsLazyObject() -{ -} - -void SrsLazyObject::gc_use() -{ - gc_ref_++; -} - -void SrsLazyObject::gc_dispose() -{ - gc_ref_--; -} - -int32_t SrsLazyObject::gc_ref() -{ - return gc_ref_; -} - -ISrsLazyGc::ISrsLazyGc() -{ -} - -ISrsLazyGc::~ISrsLazyGc() -{ -} - diff --git a/trunk/src/protocol/srs_protocol_conn.hpp b/trunk/src/protocol/srs_protocol_conn.hpp index 1a9d616975..e9088145c3 100644 --- a/trunk/src/protocol/srs_protocol_conn.hpp +++ b/trunk/src/protocol/srs_protocol_conn.hpp @@ -48,38 +48,5 @@ class ISrsConnection : public ISrsResource virtual std::string remote_ip() = 0; }; -// Lazy-sweep resource, never sweep util all wrappers are freed. -// See https://github.com/ossrs/srs/issues/3176#lazy-sweep -// TODO: FIXME: Remove it. -class SrsLazyObject -{ -private: - // The reference count of resource, 0 is no wrapper and safe to sweep. - int32_t gc_ref_; -public: - SrsLazyObject(); - virtual ~SrsLazyObject(); -public: - // For wrapper to use this resource. - virtual void gc_use(); - // For wrapper to dispose this resource. - virtual void gc_dispose(); - // The current reference count of resource. - virtual int32_t gc_ref(); -}; - -// The lazy-sweep GC, wait for a long time to dispose resource even when resource is disposable. -// See https://github.com/ossrs/srs/issues/3176#lazy-sweep -// TODO: FIXME: Remove it. -class ISrsLazyGc -{ -public: - ISrsLazyGc(); - virtual ~ISrsLazyGc(); -public: - // Remove then free the specified resource. - virtual void remove(SrsLazyObject* c) = 0; -}; - #endif From 3a3cea5817e7f8c7ca7f46024c2973e785cd6439 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 7 Jun 2024 15:54:05 +0800 Subject: [PATCH 06/16] SmartPtr: Fix utest failed. --- trunk/src/app/srs_app_conn.hpp | 7 +++++++ trunk/src/core/srs_core_autofree.hpp | 2 ++ trunk/src/utest/srs_utest_core.cpp | 28 ++++++++++++++-------------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 9b37886c86..549ed2e7c0 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -174,6 +174,13 @@ class SrsSharedResource : virtual public ISrsResource operator bool() const { return ptr_.operator bool(); } +#if __cplusplus >= 201103L +private: + // Disable the move constructor. + SrsSharedResource(SrsSharedResource&&); + // Disable the move assign operator. + SrsSharedResource& operator=(SrsSharedResource&&); +#endif // Interface ISrsResource public: virtual const SrsContextId& get_id() { diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index 604a8f2301..59ef247065 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -157,11 +157,13 @@ class SrsSharedPtr operator bool() const { return ptr_ != NULL; } +#if __cplusplus >= 201103L private: // Disable the move constructor. SrsSharedPtr(SrsSharedPtr&&); // Disable the move assign operator. SrsSharedPtr& operator=(SrsSharedPtr&&); +#endif }; #endif diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 9d238830db..25980539f2 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -264,15 +264,15 @@ VOID TEST(CoreLogger, SharedPtrAssign) EXPECT_EQ(200, *ptr1); } -class MockResource : public ISrsResource +class MockIntResource : public ISrsResource { public: SrsContextId id_; int value_; public: - MockResource(int value) : value_(value) { + MockIntResource(int value) : value_(value) { } - virtual ~MockResource() { + virtual ~MockIntResource() { } public: virtual const SrsContextId& get_id() { @@ -286,47 +286,47 @@ class MockResource : public ISrsResource VOID TEST(CoreLogger, SharedResourceTypical) { if (true) { - SrsSharedResource* p = new SrsSharedResource(new MockResource(100)); + SrsSharedResource* p = new SrsSharedResource(new MockIntResource(100)); EXPECT_TRUE(*p); EXPECT_EQ(100, (*p)->value_); srs_freep(p); } if (true) { - SrsSharedResource p(new MockResource(100)); + SrsSharedResource p(new MockIntResource(100)); EXPECT_TRUE(p); EXPECT_EQ(100, p->value_); } if (true) { - SrsSharedResource p = SrsSharedResource(new MockResource(100)); + SrsSharedResource p = SrsSharedResource(new MockIntResource(100)); EXPECT_TRUE(p); EXPECT_EQ(100, p->value_); } if (true) { - SrsSharedResource p(new MockResource(100)); - SrsSharedResource q = p; + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q = p; EXPECT_EQ(p.get(), q.get()); } if (true) { - SrsSharedResource p(new MockResource(100)); - SrsSharedResource q(NULL); + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(NULL); q = p; EXPECT_EQ(p.get(), q.get()); } if (true) { - SrsSharedResource p(new MockResource(100)); - SrsSharedResource q(new MockResource(200)); + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(new MockIntResource(200)); q = p; EXPECT_EQ(p.get(), q.get()); } if (true) { - SrsSharedResource p(new MockResource(100)); - SrsSharedResource q = p; + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q = p; EXPECT_TRUE(p); EXPECT_TRUE(q); EXPECT_EQ(100, p->value_); From 9c076f8c0588e221f90fa2ebd774c1f5ca70d887 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 7 Jun 2024 17:03:30 +0800 Subject: [PATCH 07/16] SmartPtr: Support move assign and constructor for C++11. --- trunk/src/app/srs_app_conn.hpp | 18 +++-- trunk/src/core/srs_core_autofree.hpp | 28 +++++-- trunk/src/utest/srs_utest_core.cpp | 114 ++++++++++++++++++++++++--- 3 files changed, 138 insertions(+), 22 deletions(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 549ed2e7c0..9cdb6d1adb 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -174,12 +174,18 @@ class SrsSharedResource : virtual public ISrsResource operator bool() const { return ptr_.operator bool(); } -#if __cplusplus >= 201103L -private: - // Disable the move constructor. - SrsSharedResource(SrsSharedResource&&); - // Disable the move assign operator. - SrsSharedResource& operator=(SrsSharedResource&&); +#if __cplusplus >= 201103L // C++11 +public: + // The move constructor. + SrsSharedResource(SrsSharedResource&& cp) : ptr_(cp.ptr_) { + }; + // The move assign operator. + SrsSharedResource& operator=(SrsSharedResource&& cp) { + if (this != &cp) { + ptr_ = cp.ptr_; + } + return *this; + } #endif // Interface ISrsResource public: diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index 59ef247065..28f7662767 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -130,6 +130,13 @@ class SrsSharedPtr ref_count_ = cp.ref_count_; if (ref_count_) (*ref_count_)++; } + // Move from other shared ptr. + void move(SrsSharedPtr& cp) { + ptr_ = cp.ptr_; + ref_count_ = cp.ref_count_; + cp.ptr_ = NULL; + cp.ref_count_ = NULL; + } public: // Get the object. T* get() { @@ -145,7 +152,6 @@ class SrsSharedPtr reset(); copy(cp); } - return *this; } private: @@ -157,12 +163,20 @@ class SrsSharedPtr operator bool() const { return ptr_ != NULL; } -#if __cplusplus >= 201103L -private: - // Disable the move constructor. - SrsSharedPtr(SrsSharedPtr&&); - // Disable the move assign operator. - SrsSharedPtr& operator=(SrsSharedPtr&&); +#if __cplusplus >= 201103L // C++11 +public: + // The move constructor. + SrsSharedPtr(SrsSharedPtr&& cp) { + move(cp); + }; + // The move assign operator. + SrsSharedPtr& operator=(SrsSharedPtr&& cp) { + if (this != &cp) { + reset(); + move(cp); + } + return *this; + }; #endif }; diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 25980539f2..7eadb7a464 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -96,12 +96,6 @@ VOID TEST(CoreLogger, SharedPtrTypical) EXPECT_EQ(100, *p); } - if (true) { - SrsSharedPtr p = SrsSharedPtr(new int(100)); - EXPECT_TRUE(p); - EXPECT_EQ(100, *p); - } - if (true) { SrsSharedPtr p(new int(100)); SrsSharedPtr q = p; @@ -184,7 +178,7 @@ VOID TEST(CoreLogger, SharedPtrWrapper) SrsAutoFree(int, ptr); EXPECT_EQ(100, *ptr); - { + if (true) { SrsSharedPtr p(new MockWrapper(ptr)); EXPECT_EQ(101, *ptr); EXPECT_EQ(101, *p->ptr); @@ -209,7 +203,7 @@ VOID TEST(CoreLogger, SharedPtrWrapper) } EXPECT_EQ(100, *ptr); - { + if (true) { SrsSharedPtr p(new MockWrapper(ptr)); EXPECT_EQ(101, *ptr); EXPECT_EQ(101, *p->ptr); @@ -244,7 +238,7 @@ VOID TEST(CoreLogger, SharedPtrAssign) SrsAutoFree(int, ptr1); EXPECT_EQ(200, *ptr1); - { + if (true) { SrsSharedPtr p(new MockWrapper(ptr0)); EXPECT_EQ(101, *ptr0); EXPECT_EQ(101, *p->ptr); @@ -264,6 +258,78 @@ VOID TEST(CoreLogger, SharedPtrAssign) EXPECT_EQ(200, *ptr1); } +template +SrsSharedPtr mock_shared_ptr_move_assign(SrsSharedPtr p) +{ + SrsSharedPtr q = p; + return q; +} + +template +SrsSharedPtr mock_shared_ptr_move_ctr(SrsSharedPtr p) +{ + return p; +} + +VOID TEST(CoreLogger, SharedPtrMove) +{ + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(new int(101)); + q = mock_shared_ptr_move_ctr(p); + EXPECT_EQ(q.get(), p.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(new int(101)); + q = mock_shared_ptr_move_assign(p); + EXPECT_EQ(q.get(), p.get()); + } + + int* ptr = new int(100); + SrsAutoFree(int, ptr); + EXPECT_EQ(100, *ptr); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q(new MockWrapper(ptr)); + q = mock_shared_ptr_move_ctr(p); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *q->ptr); + } + EXPECT_EQ(100, *ptr); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q(new MockWrapper(ptr)); + q = mock_shared_ptr_move_assign(p); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *q->ptr); + } + EXPECT_EQ(100, *ptr); + + // Note that this will not trigger the move constructor or move assignment operator. + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = mock_shared_ptr_move_assign(p); + EXPECT_EQ(q.get(), p.get()); + } + + // Note that this will not trigger the move constructor or move assignment operator. + if (true) { + SrsSharedPtr p = SrsSharedPtr(new int(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, *p); + } +} + class MockIntResource : public ISrsResource { public: @@ -334,3 +400,33 @@ VOID TEST(CoreLogger, SharedResourceTypical) } } +template +SrsSharedResource mock_shared_resource_move_assign(SrsSharedResource p) +{ + SrsSharedResource q = p; + return q; +} + +template +SrsSharedResource mock_shared_resource_move_ctr(SrsSharedResource p) +{ + return p; +} + +VOID TEST(CoreLogger, SharedResourceMove) +{ + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(new MockIntResource(101)); + q = mock_shared_resource_move_ctr(p); + EXPECT_EQ(q.get(), p.get()); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(new MockIntResource(101)); + q = mock_shared_resource_move_assign(p); + EXPECT_EQ(q.get(), p.get()); + } +} + From 79bbdc7a826c047547cd92fdbcf6127a3442932d Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 7 Jun 2024 20:19:56 +0800 Subject: [PATCH 08/16] SmartPtr: Fix utest fail. --- trunk/src/app/srs_app_gb28181.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index b2c765fd26..f7d2560bb3 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -970,9 +970,6 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSessi session = new SrsSharedResource(raw_session); _srs_gb_manager->add_with_id(device, session); - // Notice SIP session to use current SIP connection. - raw_session->on_sip_transport(*wrapper_); - SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session); raw_session->setup_owner(session, executor, executor); @@ -992,7 +989,8 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSessi srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy(); } - // Save session for SIP transport. + // Notice session to use current SIP connection. + raw_session->on_sip_transport(*wrapper_); *psession = raw_session; return err; @@ -1452,6 +1450,7 @@ srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyGbSession* SrsLazyGbSession* raw_session = (*session).get(); srs_assert(raw_session); + // Notice session to use current media connection. raw_session->on_media_transport(*wrapper_); *psession = raw_session; From cec4026a8294835c20f7e3a4c9b2dc1fdd1f08ca Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 10 Jun 2024 08:51:05 +0800 Subject: [PATCH 09/16] GB28181: Remove lazy from names. --- trunk/src/app/srs_app_gb28181.cpp | 176 +++++++++++++++--------------- trunk/src/app/srs_app_gb28181.hpp | 84 +++++++------- 2 files changed, 130 insertions(+), 130 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index f7d2560bb3..e95b95cf6b 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -70,14 +70,14 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state) return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str()); } -SrsLazyGbSession::SrsLazyGbSession() : sip_(NULL), media_(NULL) +SrsGbSession::SrsGbSession() : sip_(NULL), media_(NULL) { wrapper_ = NULL; owner_coroutine_ = NULL; owner_cid_ = NULL; - sip_ = SrsSharedResource(new SrsLazyGbSipTcpConn()); - media_ = SrsSharedResource(new SrsLazyGbMediaTcpConn()); + sip_ = SrsSharedResource(new SrsGbSipTcpConn()); + media_ = SrsSharedResource(new SrsGbMediaTcpConn()); muxer_ = new SrsGbMuxer(this); state_ = SrsGbSessionStateInit; @@ -108,13 +108,13 @@ SrsLazyGbSession::SrsLazyGbSession() : sip_(NULL), media_(NULL) _srs_context->set_id(cid_); // Also change current coroutine cid as session's. } -SrsLazyGbSession::~SrsLazyGbSession() +SrsGbSession::~SrsGbSession() { srs_freep(muxer_); srs_freep(ppp_); } -void SrsLazyGbSession::setup(SrsConfDirective* conf) +void SrsGbSession::setup(SrsConfDirective* conf) { pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf); if (candidate_ == "*") { @@ -130,19 +130,19 @@ void SrsLazyGbSession::setup(SrsConfDirective* conf) srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str()); } -void SrsLazyGbSession::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +void SrsGbSession::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) { wrapper_ = wrapper; owner_coroutine_ = owner_coroutine; owner_cid_ = owner_cid; } -void SrsLazyGbSession::on_executor_done(ISrsInterruptable* executor) +void SrsGbSession::on_executor_done(ISrsInterruptable* executor) { owner_coroutine_ = NULL; } -void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) +void SrsGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) { // Got a new context, that is new media transport. if (media_id_ != ctx->media_id_) { @@ -201,19 +201,19 @@ void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const st } } -void SrsLazyGbSession::on_sip_transport(SrsSharedResource sip) +void SrsGbSession::on_sip_transport(SrsSharedResource sip) { sip_ = sip; // Change id of SIP and all its child coroutines. sip_->set_cid(cid_); } -SrsSharedResource SrsLazyGbSession::sip_transport() +SrsSharedResource SrsGbSession::sip_transport() { return sip_; } -void SrsLazyGbSession::on_media_transport(SrsSharedResource media) +void SrsGbSession::on_media_transport(SrsSharedResource media) { media_ = media; @@ -221,12 +221,12 @@ void SrsLazyGbSession::on_media_transport(SrsSharedResourceset_cid(cid_); } -std::string SrsLazyGbSession::pip() +std::string SrsGbSession::pip() { return pip_; } -srs_error_t SrsLazyGbSession::cycle() +srs_error_t SrsGbSession::cycle() { srs_error_t err = srs_success; @@ -270,7 +270,7 @@ srs_error_t SrsLazyGbSession::cycle() return srs_success; } -srs_error_t SrsLazyGbSession::do_cycle() +srs_error_t SrsGbSession::do_cycle() { srs_error_t err = srs_success; @@ -307,7 +307,7 @@ srs_error_t SrsLazyGbSession::do_cycle() return err; } -srs_error_t SrsLazyGbSession::drive_state() +srs_error_t SrsGbSession::drive_state() { srs_error_t err = srs_success; @@ -378,19 +378,19 @@ srs_error_t SrsLazyGbSession::drive_state() return err; } -SrsGbSessionState SrsLazyGbSession::set_state(SrsGbSessionState v) +SrsGbSessionState SrsGbSession::set_state(SrsGbSessionState v) { SrsGbSessionState state = state_; state_ = v; return state; } -const SrsContextId& SrsLazyGbSession::get_id() +const SrsContextId& SrsGbSession::get_id() { return cid_; } -std::string SrsLazyGbSession::desc() +std::string SrsGbSession::desc() { return "GBS"; } @@ -458,10 +458,10 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf // Handle TCP connections. if (listener == sip_listener_) { - SrsLazyGbSipTcpConn* raw_conn = new SrsLazyGbSipTcpConn(); + SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn(); raw_conn->setup(conf_, sip_listener_, media_listener_, stfd); - SrsSharedResource* conn = new SrsSharedResource(raw_conn); + SrsSharedResource* conn = new SrsSharedResource(raw_conn); _srs_gb_manager->add(conn, NULL); SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); @@ -472,10 +472,10 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf return srs_error_wrap(err, "gb sip"); } } else if (listener == media_listener_) { - SrsLazyGbMediaTcpConn* raw_conn = new SrsLazyGbMediaTcpConn(); + SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn(); raw_conn->setup(stfd); - SrsSharedResource* conn = new SrsSharedResource(raw_conn); + SrsSharedResource* conn = new SrsSharedResource(raw_conn); _srs_gb_manager->add(conn, NULL); SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); @@ -493,7 +493,7 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf return err; } -SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn() +SrsGbSipTcpConn::SrsGbSipTcpConn() { wrapper_ = NULL; owner_coroutine_ = NULL; @@ -514,7 +514,7 @@ SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn() sender_ = NULL; } -SrsLazyGbSipTcpConn::~SrsLazyGbSipTcpConn() +SrsGbSipTcpConn::~SrsGbSipTcpConn() { srs_freep(receiver_); srs_freep(sender_); @@ -524,7 +524,7 @@ SrsLazyGbSipTcpConn::~SrsLazyGbSipTcpConn() srs_freep(conf_); } -void SrsLazyGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd) +void SrsGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd) { srs_freep(conf_); conf_ = conf->copy(); @@ -532,28 +532,28 @@ void SrsLazyGbSipTcpConn::setup(SrsConfDirective* conf, SrsTcpListener* sip, Srs sip_listener_ = sip; media_listener_ = media; conn_ = new SrsTcpConnection(stfd); - receiver_ = new SrsLazyGbSipTcpReceiver(this, conn_); - sender_ = new SrsLazyGbSipTcpSender(conn_); + receiver_ = new SrsGbSipTcpReceiver(this, conn_); + sender_ = new SrsGbSipTcpSender(conn_); } -void SrsLazyGbSipTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +void SrsGbSipTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) { wrapper_ = wrapper; owner_coroutine_ = owner_coroutine; owner_cid_ = owner_cid; } -void SrsLazyGbSipTcpConn::on_executor_done(ISrsInterruptable* executor) +void SrsGbSipTcpConn::on_executor_done(ISrsInterruptable* executor) { owner_coroutine_ = NULL; } -std::string SrsLazyGbSipTcpConn::device_id() +std::string SrsGbSipTcpConn::device_id() { return register_->device_id(); } -void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) +void SrsGbSipTcpConn::set_cid(const SrsContextId& cid) { if (owner_cid_) owner_cid_->set_cid(cid); receiver_->set_cid(cid); @@ -561,13 +561,13 @@ void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) cid_ = cid; } -void SrsLazyGbSipTcpConn::query_ports(int* sip, int* media) +void SrsGbSipTcpConn::query_ports(int* sip, int* media) { if (sip) *sip = sip_listener_->port(); if (media) *media = media_listener_->port(); } -srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg) +srs_error_t SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg) { srs_error_t err = srs_success; @@ -616,7 +616,7 @@ srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg) return err; } -void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) +void SrsGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) { // Drive state machine when enqueue message. drive_state(msg); @@ -625,7 +625,7 @@ void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) sender_->enqueue(msg); } -void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg) +void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg) { srs_error_t err = srs_success; @@ -682,7 +682,7 @@ void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg) } } -void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg) +void SrsGbSipTcpConn::register_response(SrsSipMessage* msg) { SrsSipMessage* res = new SrsSipMessage(); @@ -699,7 +699,7 @@ void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg) enqueue_sip_message(res); } -void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) +void SrsGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) { SrsSipMessage* res = new SrsSipMessage(); @@ -714,7 +714,7 @@ void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status statu enqueue_sip_message(res); } -void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) +void SrsGbSipTcpConn::invite_ack(SrsSipMessage* msg) { string pip = session_->pip(); // Parse from CANDIDATE int sip_port; query_ports(&sip_port, NULL); @@ -735,7 +735,7 @@ void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) enqueue_sip_message(req); } -void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg) +void SrsGbSipTcpConn::bye_response(SrsSipMessage* msg) { SrsSipMessage* res = new SrsSipMessage(); @@ -750,7 +750,7 @@ void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg) enqueue_sip_message(res); } -srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) +srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc) { srs_error_t err = srs_success; @@ -847,56 +847,56 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) return err; } -void SrsLazyGbSipTcpConn::interrupt() +void SrsGbSipTcpConn::interrupt() { receiver_->interrupt(); sender_->interrupt(); if (owner_coroutine_) owner_coroutine_->interrupt(); } -SrsGbSipState SrsLazyGbSipTcpConn::state() +SrsGbSipState SrsGbSipTcpConn::state() { return state_; } -void SrsLazyGbSipTcpConn::reset_to_register() +void SrsGbSipTcpConn::reset_to_register() { state_ = SrsGbSipStateRegistered; } -bool SrsLazyGbSipTcpConn::is_registered() +bool SrsGbSipTcpConn::is_registered() { return state_ >= SrsGbSipStateRegistered && state_ <= SrsGbSipStateStable; } -bool SrsLazyGbSipTcpConn::is_stable() +bool SrsGbSipTcpConn::is_stable() { return state_ == SrsGbSipStateStable; } -bool SrsLazyGbSipTcpConn::is_bye() +bool SrsGbSipTcpConn::is_bye() { return state_ == SrsGbSipStateBye; } -SrsGbSipState SrsLazyGbSipTcpConn::set_state(SrsGbSipState v) +SrsGbSipState SrsGbSipTcpConn::set_state(SrsGbSipState v) { SrsGbSipState state = state_; state_ = v; return state; } -const SrsContextId& SrsLazyGbSipTcpConn::get_id() +const SrsContextId& SrsGbSipTcpConn::get_id() { return cid_; } -std::string SrsLazyGbSipTcpConn::desc() +std::string SrsGbSipTcpConn::desc() { return "GB-SIP-TCP"; } -srs_error_t SrsLazyGbSipTcpConn::cycle() +srs_error_t SrsGbSipTcpConn::cycle() { srs_error_t err = srs_success; @@ -949,7 +949,7 @@ srs_error_t SrsLazyGbSipTcpConn::cycle() return srs_success; } -srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSession** psession) +srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** psession) { srs_error_t err = srs_success; @@ -960,14 +960,14 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSessi if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err; // Find exists session for register, might be created by another object and still alive. - SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); - SrsLazyGbSession* raw_session = session ? (*session).get() : NULL; + SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); + SrsGbSession* raw_session = session ? (*session).get() : NULL; if (!session) { // Create new GB session. - raw_session = new SrsLazyGbSession(); + raw_session = new SrsGbSession(); raw_session->setup(conf_); - session = new SrsSharedResource(raw_session); + session = new SrsSharedResource(raw_session); _srs_gb_manager->add_with_id(device, session); SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session); @@ -980,7 +980,7 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSessi } // Try to load state from previous SIP connection. - SrsSharedResource pre = raw_session->sip_transport(); + SrsSharedResource pre = raw_session->sip_transport(); if (pre.get() && pre.get() != this) { state_ = pre->state_; ssrc_str_ = pre->ssrc_str_; @@ -996,29 +996,29 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyGbSessi return err; } -SrsLazyGbSipTcpReceiver::SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn) +SrsGbSipTcpReceiver::SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn) { sip_ = sip; conn_ = conn; trd_ = new SrsSTCoroutine("sip-receiver", this); } -SrsLazyGbSipTcpReceiver::~SrsLazyGbSipTcpReceiver() +SrsGbSipTcpReceiver::~SrsGbSipTcpReceiver() { srs_freep(trd_); } -void SrsLazyGbSipTcpReceiver::interrupt() +void SrsGbSipTcpReceiver::interrupt() { trd_->interrupt(); } -void SrsLazyGbSipTcpReceiver::set_cid(const SrsContextId& cid) +void SrsGbSipTcpReceiver::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); } -srs_error_t SrsLazyGbSipTcpReceiver::start() +srs_error_t SrsGbSipTcpReceiver::start() { srs_error_t err = srs_success; @@ -1029,7 +1029,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::start() return err; } -srs_error_t SrsLazyGbSipTcpReceiver::cycle() +srs_error_t SrsGbSipTcpReceiver::cycle() { srs_error_t err = do_cycle(); @@ -1041,7 +1041,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::cycle() return err; } -srs_error_t SrsLazyGbSipTcpReceiver::do_cycle() +srs_error_t SrsGbSipTcpReceiver::do_cycle() { srs_error_t err = srs_success; @@ -1080,14 +1080,14 @@ srs_error_t SrsLazyGbSipTcpReceiver::do_cycle() return err; } -SrsLazyGbSipTcpSender::SrsLazyGbSipTcpSender(SrsTcpConnection* conn) +SrsGbSipTcpSender::SrsGbSipTcpSender(SrsTcpConnection* conn) { conn_ = conn; wait_ = srs_cond_new(); trd_ = new SrsSTCoroutine("sip-sender", this); } -SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender() +SrsGbSipTcpSender::~SrsGbSipTcpSender() { srs_freep(trd_); srs_cond_destroy(wait_); @@ -1098,23 +1098,23 @@ SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender() } } -void SrsLazyGbSipTcpSender::enqueue(SrsSipMessage* msg) +void SrsGbSipTcpSender::enqueue(SrsSipMessage* msg) { msgs_.push_back(msg); srs_cond_signal(wait_); } -void SrsLazyGbSipTcpSender::interrupt() +void SrsGbSipTcpSender::interrupt() { trd_->interrupt(); } -void SrsLazyGbSipTcpSender::set_cid(const SrsContextId& cid) +void SrsGbSipTcpSender::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); } -srs_error_t SrsLazyGbSipTcpSender::start() +srs_error_t SrsGbSipTcpSender::start() { srs_error_t err = srs_success; @@ -1125,7 +1125,7 @@ srs_error_t SrsLazyGbSipTcpSender::start() return err; } -srs_error_t SrsLazyGbSipTcpSender::cycle() +srs_error_t SrsGbSipTcpSender::cycle() { srs_error_t err = do_cycle(); @@ -1137,7 +1137,7 @@ srs_error_t SrsLazyGbSipTcpSender::cycle() return err; } -srs_error_t SrsLazyGbSipTcpSender::do_cycle() +srs_error_t SrsGbSipTcpSender::do_cycle() { srs_error_t err = srs_success; @@ -1207,7 +1207,7 @@ ISrsPsPackHandler::~ISrsPsPackHandler() { } -SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn() +SrsGbMediaTcpConn::SrsGbMediaTcpConn() { pack_ = new SrsPackContext(this); buffer_ = new uint8_t[65535]; @@ -1223,58 +1223,58 @@ SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn() nn_rtcp_ = 0; } -SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn() +SrsGbMediaTcpConn::~SrsGbMediaTcpConn() { srs_freep(conn_); srs_freepa(buffer_); srs_freep(pack_); } -void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd) +void SrsGbMediaTcpConn::setup(srs_netfd_t stfd) { srs_freep(conn_); conn_ = new SrsTcpConnection(stfd); } -void SrsLazyGbMediaTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +void SrsGbMediaTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) { wrapper_ = wrapper; owner_coroutine_ = owner_coroutine; owner_cid_ = owner_cid; } -void SrsLazyGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor) +void SrsGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor) { owner_coroutine_ = NULL; } -bool SrsLazyGbMediaTcpConn::is_connected() +bool SrsGbMediaTcpConn::is_connected() { return connected_; } -void SrsLazyGbMediaTcpConn::interrupt() +void SrsGbMediaTcpConn::interrupt() { if (owner_coroutine_) owner_coroutine_->interrupt(); } -void SrsLazyGbMediaTcpConn::set_cid(const SrsContextId& cid) +void SrsGbMediaTcpConn::set_cid(const SrsContextId& cid) { if (owner_cid_) owner_cid_->set_cid(cid); cid_ = cid; } -const SrsContextId& SrsLazyGbMediaTcpConn::get_id() +const SrsContextId& SrsGbMediaTcpConn::get_id() { return cid_; } -std::string SrsLazyGbMediaTcpConn::desc() +std::string SrsGbMediaTcpConn::desc() { return "GB-Media-TCP"; } -srs_error_t SrsLazyGbMediaTcpConn::cycle() +srs_error_t SrsGbMediaTcpConn::cycle() { srs_error_t err = do_cycle(); @@ -1313,7 +1313,7 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle() return srs_success; } -srs_error_t SrsLazyGbMediaTcpConn::do_cycle() +srs_error_t SrsGbMediaTcpConn::do_cycle() { srs_error_t err = srs_success; @@ -1413,7 +1413,7 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle() return err; } -srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs) +srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs) { srs_error_t err = srs_success; @@ -1437,17 +1437,17 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector return err; } -srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyGbSession** psession) +srs_error_t SrsGbMediaTcpConn::bind_session(uint32_t ssrc, SrsGbSession** psession) { srs_error_t err = srs_success; if (!ssrc) return err; // Find exists session for register, might be created by another object and still alive. - SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); + SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); if (!session) return err; - SrsLazyGbSession* raw_session = (*session).get(); + SrsGbSession* raw_session = (*session).get(); srs_assert(raw_session); // Notice session to use current media connection. @@ -1531,7 +1531,7 @@ SrsSharedPtrMessage* SrsMpegpsQueue::dequeue() return NULL; } -SrsGbMuxer::SrsGbMuxer(SrsLazyGbSession* session) +SrsGbMuxer::SrsGbMuxer(SrsGbSession* session) { sdk_ = NULL; session_ = session; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index fbe4067160..47793347cd 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -26,11 +26,11 @@ class SrsCoroutine; class SrsPackContext; class SrsBuffer; class SrsSipMessage; -class SrsLazyGbSession; -class SrsLazyGbSipTcpConn; -class SrsLazyGbMediaTcpConn; -class SrsLazyGbSipTcpReceiver; -class SrsLazyGbSipTcpSender; +class SrsGbSession; +class SrsGbSipTcpConn; +class SrsGbMediaTcpConn; +class SrsGbSipTcpReceiver; +class SrsGbSipTcpSender; class SrsAlonePithyPrint; class SrsGbMuxer; class SrsSimpleRtmpClient; @@ -51,7 +51,7 @@ class SrsRawAacStream; // established: // init: media is not connected. // dispose session: sip is bye. -// Please see SrsLazyGbSession::drive_state for detail. +// Please see SrsGbSession::drive_state for detail. enum SrsGbSessionState { SrsGbSessionStateInit = 0, @@ -76,7 +76,7 @@ std::string srs_gb_session_state(SrsGbSessionState state); // to bye: Got bye SIP message from device. // re-inviting: // to inviting: Got bye OK response from deivce. -// Please see SrsLazyGbSipTcpConn::drive_state for detail. +// Please see SrsGbSipTcpConn::drive_state for detail. enum SrsGbSipState { SrsGbSipStateInit = 0, @@ -93,20 +93,20 @@ std::string srs_gb_sip_state(SrsGbSipState state); // Each session contains a SIP object and a media object, that are managed by session. This means session always // lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word, // SIP and media objects use directly pointer to session, while session use shared ptr. -class SrsLazyGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler +class SrsGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: SrsContextId cid_; private: // The shared resource which own this object, we should never free it because it's managed by shared ptr. - SrsSharedResource* wrapper_; + SrsSharedResource* wrapper_; // The owner coroutine, allow user to interrupt the loop. ISrsInterruptable* owner_coroutine_; ISrsContextIdSetter* owner_cid_; private: SrsGbSessionState state_; - SrsSharedResource sip_; - SrsSharedResource media_; + SrsSharedResource sip_; + SrsSharedResource media_; SrsGbMuxer* muxer_; private: // The candidate for SDP in configuration. @@ -140,13 +140,13 @@ class SrsLazyGbSession : public ISrsResource, public ISrsCoroutineHandler, publi uint64_t media_msgs_dropped_; uint64_t media_reserved_; public: - SrsLazyGbSession(); - virtual ~SrsLazyGbSession(); + SrsGbSession(); + virtual ~SrsGbSession(); public: // Initialize the GB session. void setup(SrsConfDirective* conf); // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. - void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); // Interface ISrsExecutorHandler public: virtual void on_executor_done(ISrsInterruptable* executor); @@ -154,10 +154,10 @@ class SrsLazyGbSession : public ISrsResource, public ISrsCoroutineHandler, publi // When got a pack of messages. void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs); // When got available SIP transport. - void on_sip_transport(SrsSharedResource sip); - SrsSharedResource sip_transport(); + void on_sip_transport(SrsSharedResource sip); + SrsSharedResource sip_transport(); // When got available media transport. - void on_media_transport(SrsSharedResource media); + void on_media_transport(SrsSharedResource media); // Get the candidate for SDP generation, the public IP address for device to connect to. std::string pip(); // Interface ISrsOneCycleThreadHandler @@ -194,12 +194,12 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler }; // A GB28181 TCP SIP connection. -class SrsLazyGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler +class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: SrsGbSipState state_; // The owner session object, note that we use the raw pointer and should never free it. - SrsLazyGbSession* session_; + SrsGbSession* session_; SrsSipMessage* register_; SrsSipMessage* invite_ok_; private: @@ -211,23 +211,23 @@ class SrsLazyGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, pu SrsTcpListener* media_listener_; private: // The shared resource which own this object, we should never free it because it's managed by shared ptr. - SrsSharedResource* wrapper_; + SrsSharedResource* wrapper_; // The owner coroutine, allow user to interrupt the loop. ISrsInterruptable* owner_coroutine_; ISrsContextIdSetter* owner_cid_; SrsContextId cid_; private: SrsTcpConnection* conn_; - SrsLazyGbSipTcpReceiver* receiver_; - SrsLazyGbSipTcpSender* sender_; + SrsGbSipTcpReceiver* receiver_; + SrsGbSipTcpSender* sender_; public: - SrsLazyGbSipTcpConn(); - virtual ~SrsLazyGbSipTcpConn(); + SrsGbSipTcpConn(); + virtual ~SrsGbSipTcpConn(); public: // Setup object, to keep empty constructor. void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd); // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. - void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); // Interface ISrsExecutorHandler public: virtual void on_executor_done(ISrsInterruptable* executor); @@ -276,19 +276,19 @@ class SrsLazyGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, pu virtual srs_error_t cycle(); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(SrsSipMessage* msg, SrsLazyGbSession** psession); + srs_error_t bind_session(SrsSipMessage* msg, SrsGbSession** psession); }; // Start a coroutine to receive SIP messages. -class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler +class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler { private: SrsCoroutine* trd_; SrsTcpConnection* conn_; - SrsLazyGbSipTcpConn* sip_; + SrsGbSipTcpConn* sip_; public: - SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn); - virtual ~SrsLazyGbSipTcpReceiver(); + SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn); + virtual ~SrsGbSipTcpReceiver(); public: // Interrupt the receiver coroutine. void interrupt(); @@ -305,7 +305,7 @@ class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandle }; // Start a coroutine to send out SIP messages. -class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler +class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler { private: SrsCoroutine* trd_; @@ -314,8 +314,8 @@ class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler std::vector msgs_; srs_cond_t wait_; public: - SrsLazyGbSipTcpSender(SrsTcpConnection* conn); - virtual ~SrsLazyGbSipTcpSender(); + SrsGbSipTcpSender(SrsTcpConnection* conn); + virtual ~SrsGbSipTcpSender(); public: // Push message to queue, and sender will send out in dedicate coroutine. void enqueue(SrsSipMessage* msg); @@ -346,16 +346,16 @@ class ISrsPsPackHandler }; // A GB28181 TCP media connection, for PS stream. -class SrsLazyGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler +class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler { private: bool connected_; // The owner session object, note that we use the raw pointer and should never free it. - SrsLazyGbSession* session_; + SrsGbSession* session_; uint32_t nn_rtcp_; private: // The shared resource which own this object, we should never free it because it's managed by shared ptr. - SrsSharedResource* wrapper_; + SrsSharedResource* wrapper_; // The owner coroutine, allow user to interrupt the loop. ISrsInterruptable* owner_coroutine_; ISrsContextIdSetter* owner_cid_; @@ -365,13 +365,13 @@ class SrsLazyGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, SrsTcpConnection* conn_; uint8_t* buffer_; public: - SrsLazyGbMediaTcpConn(); - virtual ~SrsLazyGbMediaTcpConn(); + SrsGbMediaTcpConn(); + virtual ~SrsGbMediaTcpConn(); public: // Setup object, to keep empty constructor. void setup(srs_netfd_t stfd); // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. - void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); // Interface ISrsExecutorHandler public: virtual void on_executor_done(ISrsInterruptable* executor); @@ -396,7 +396,7 @@ class SrsLazyGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector& msgs); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(uint32_t ssrc, SrsLazyGbSession** psession); + srs_error_t bind_session(uint32_t ssrc, SrsGbSession** psession); }; // The queue for mpegts over udp to send packets. @@ -422,7 +422,7 @@ class SrsGbMuxer { private: // The owner session object, note that we use the raw pointer and should never free it. - SrsLazyGbSession* session_; + SrsGbSession* session_; std::string output_; SrsSimpleRtmpClient* sdk_; private: @@ -448,7 +448,7 @@ class SrsGbMuxer SrsMpegpsQueue* queue_; SrsPithyPrint* pprint_; public: - SrsGbMuxer(SrsLazyGbSession* session); + SrsGbMuxer(SrsGbSession* session); virtual ~SrsGbMuxer(); public: void setup(std::string output); From ea3b366e163f6be65a3854972170a6916cb46164 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Jun 2024 10:12:51 +0800 Subject: [PATCH 10/16] SmartPtr: Refine code. --- trunk/src/app/srs_app_conn.hpp | 18 +++++------------- trunk/src/utest/srs_utest_core.cpp | 14 ++++++-------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 9cdb6d1adb..1877b35e8c 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -128,9 +128,14 @@ class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManag // This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this // smart pointer resource, such as by implementing delayed release. +// // It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage // is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted // to one another. +// +// Note that we don't need to implement the move constructor and move assignment operator, because we directly +// use SrsSharedPtr as instance member, so we can only copy it. +// // Usage: // SrsSharedResource* ptr = new SrsSharedResource(new MyClass()); // (*ptr)->do_something(); @@ -174,19 +179,6 @@ class SrsSharedResource : virtual public ISrsResource operator bool() const { return ptr_.operator bool(); } -#if __cplusplus >= 201103L // C++11 -public: - // The move constructor. - SrsSharedResource(SrsSharedResource&& cp) : ptr_(cp.ptr_) { - }; - // The move assign operator. - SrsSharedResource& operator=(SrsSharedResource&& cp) { - if (this != &cp) { - ptr_ = cp.ptr_; - } - return *this; - } -#endif // Interface ISrsResource public: virtual const SrsContextId& get_id() { diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 7eadb7a464..2344e911ef 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -259,15 +259,13 @@ VOID TEST(CoreLogger, SharedPtrAssign) } template -SrsSharedPtr mock_shared_ptr_move_assign(SrsSharedPtr p) -{ +SrsSharedPtr mock_shared_ptr_move_assign(SrsSharedPtr p) { SrsSharedPtr q = p; return q; } template -SrsSharedPtr mock_shared_ptr_move_ctr(SrsSharedPtr p) -{ +SrsSharedPtr mock_shared_ptr_move_ctr(SrsSharedPtr p) { return p; } @@ -401,15 +399,13 @@ VOID TEST(CoreLogger, SharedResourceTypical) } template -SrsSharedResource mock_shared_resource_move_assign(SrsSharedResource p) -{ +SrsSharedResource mock_shared_resource_move_assign(SrsSharedResource p) { SrsSharedResource q = p; return q; } template -SrsSharedResource mock_shared_resource_move_ctr(SrsSharedResource p) -{ +SrsSharedResource mock_shared_resource_move_ctr(SrsSharedResource p) { return p; } @@ -419,6 +415,7 @@ VOID TEST(CoreLogger, SharedResourceMove) SrsSharedResource p(new MockIntResource(100)); SrsSharedResource q(new MockIntResource(101)); q = mock_shared_resource_move_ctr(p); + EXPECT_EQ(100, q->value_); EXPECT_EQ(q.get(), p.get()); } @@ -426,6 +423,7 @@ VOID TEST(CoreLogger, SharedResourceMove) SrsSharedResource p(new MockIntResource(100)); SrsSharedResource q(new MockIntResource(101)); q = mock_shared_resource_move_assign(p); + EXPECT_EQ(100, q->value_); EXPECT_EQ(q.get(), p.get()); } } From 596c8c3307441aba19c2d68cedfc3b7caeb2af44 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Jun 2024 10:44:07 +0800 Subject: [PATCH 11/16] SmartPtr: Fix bug. --- trunk/src/app/srs_app_gb28181.cpp | 25 +++++++++++++++++-------- trunk/src/app/srs_app_gb28181.hpp | 2 ++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index e95b95cf6b..2ab6c11963 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -909,14 +909,7 @@ srs_error_t SrsGbSipTcpConn::cycle() } // Wait for the SIP connection to be terminated. - while (true) { - if (!owner_coroutine_) return err; - if ((err = owner_coroutine_->pull()) != srs_success) { - return srs_error_wrap(err, "pull"); - } - - srs_usleep(SRS_UTIME_NO_TIMEOUT); - } + err = do_cycle(); // Interrupt the receiver and sender coroutine. receiver_->interrupt(); @@ -949,6 +942,22 @@ srs_error_t SrsGbSipTcpConn::cycle() return srs_success; } +srs_error_t SrsGbSipTcpConn::do_cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { + return srs_error_wrap(err, "pull"); + } + + srs_usleep(SRS_UTIME_NO_TIMEOUT); + } + + return err; +} + srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** psession) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 47793347cd..e44b618c6d 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -274,6 +274,8 @@ class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); +private: + srs_error_t do_cycle(); private: // Create session if no one, or bind to an existed session. srs_error_t bind_session(SrsSipMessage* msg, SrsGbSession** psession); From 0bf1ead64afd5d8ad1b93043a04b50333649cd4b Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Jun 2024 10:51:35 +0800 Subject: [PATCH 12/16] SmartPtr: Refine interface for resource manager. --- trunk/src/protocol/srs_protocol_conn.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/trunk/src/protocol/srs_protocol_conn.hpp b/trunk/src/protocol/srs_protocol_conn.hpp index e9088145c3..ee17aed56e 100644 --- a/trunk/src/protocol/srs_protocol_conn.hpp +++ b/trunk/src/protocol/srs_protocol_conn.hpp @@ -33,7 +33,9 @@ class ISrsResourceManager ISrsResourceManager(); virtual ~ISrsResourceManager(); public: - // Remove then free the specified connection. + // Remove then free the specified connection. Note that the manager always free c resource, + // in the same coroutine or another coroutine. Some manager may support add c to a map, it + // should always free it even if it's in the map. virtual void remove(ISrsResource* c) = 0; }; From 801fea3216bc1f01ff82a85e6dd5089273951d51 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Jun 2024 10:56:17 +0800 Subject: [PATCH 13/16] SmartPtr: Refine code. --- trunk/src/app/srs_app_conn.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 1877b35e8c..c0e965525f 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -143,7 +143,7 @@ class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManag // ISrsResourceManager* manager = ...; // manager->remove(ptr); template -class SrsSharedResource : virtual public ISrsResource +class SrsSharedResource : public ISrsResource { private: SrsSharedPtr ptr_; From 3088ff69c5514ba5f837eada2f86fc485c5ab7ac Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 12 Jun 2024 19:55:03 +0800 Subject: [PATCH 14/16] Refine code. --- trunk/src/app/srs_app_gb28181.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 2ab6c11963..98bfb0d2b9 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -70,15 +70,12 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state) return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str()); } -SrsGbSession::SrsGbSession() : sip_(NULL), media_(NULL) +SrsGbSession::SrsGbSession() : sip_(new SrsGbSipTcpConn()), media_(new SrsGbMediaTcpConn()) { wrapper_ = NULL; owner_coroutine_ = NULL; owner_cid_ = NULL; - sip_ = SrsSharedResource(new SrsGbSipTcpConn()); - media_ = SrsSharedResource(new SrsGbMediaTcpConn()); - muxer_ = new SrsGbMuxer(this); state_ = SrsGbSessionStateInit; From 29b688dd6c315a21ba5936812c3551d1a1350608 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 12 Jun 2024 22:35:49 +0800 Subject: [PATCH 15/16] Update release to v5.0.214 v6.0.126 --- trunk/doc/CHANGELOG.md | 2 ++ trunk/src/core/srs_core_version5.hpp | 2 +- trunk/src/core/srs_core_version6.hpp | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index f58ab269f1..4922dfe7d4 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080) * v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057) * v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044) * v6.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v6.0.123 (#4038) @@ -136,6 +137,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v5.0.214 (#4080) * v5.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v5.0.213 (#4057) * v5.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v5.0.212 (#4038) * v5.0, 2024-04-22, Merge [#4033](https://github.com/ossrs/srs/pull/4033): issue #3967: support x509 certification chiain in single pem file. v5.0.211 (#4033) diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index 8325968f7c..ef74aa44d3 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 213 +#define VERSION_REVISION 214 #endif diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index d12f389b62..e8bac6484f 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 125 +#define VERSION_REVISION 126 #endif From 5c8b2922e1776a3b13ba76906516c61efeb00dfe Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 12 Jun 2024 22:38:46 +0800 Subject: [PATCH 16/16] Update. --- trunk/doc/CHANGELOG.md | 1 - trunk/src/core/srs_core_version5.hpp | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 4922dfe7d4..3ad0c32175 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -137,7 +137,6 @@ The changelog for SRS. ## SRS 5.0 Changelog -* v5.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v5.0.214 (#4080) * v5.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v5.0.213 (#4057) * v5.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v5.0.212 (#4038) * v5.0, 2024-04-22, Merge [#4033](https://github.com/ossrs/srs/pull/4033): issue #3967: support x509 certification chiain in single pem file. v5.0.211 (#4033) diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index ef74aa44d3..8325968f7c 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 214 +#define VERSION_REVISION 213 #endif