diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index f58ab269f1..3ad0c32175 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080) * v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057) * v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044) * v6.0, 2024-04-23, Merge [#4038](https://github.com/ossrs/srs/pull/4038): RTMP: Do not response publish start message if hooks fail. v6.0.123 (#4038) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 7652be15b7..90d818ece0 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -413,28 +413,6 @@ void SrsResourceManager::dispose(ISrsResource* c) } } -SrsLazySweepGc::SrsLazySweepGc() -{ -} - -SrsLazySweepGc::~SrsLazySweepGc() -{ -} - -srs_error_t SrsLazySweepGc::start() -{ - srs_error_t err = srs_success; - return err; -} - -void SrsLazySweepGc::remove(SrsLazyObject* c) -{ - // TODO: FIXME: MUST lazy sweep. - srs_freep(c); -} - -ISrsLazyGc* _srs_gc = NULL; - ISrsExpire::ISrsExpire() { } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index b5aeb48da0..c0e965525f 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -20,6 +20,7 @@ #include #include #include +#include class SrsWallClock; class SrsBuffer; @@ -125,98 +126,66 @@ class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManag void dispose(ISrsResource* c); }; -// A simple lazy-sweep GC, just wait for a long time to delete the disposable resources. -class SrsLazySweepGc : public ISrsLazyGc -{ -public: - SrsLazySweepGc(); - virtual ~SrsLazySweepGc(); -public: - virtual srs_error_t start(); - virtual void remove(SrsLazyObject* c); -}; - -extern ISrsLazyGc* _srs_gc; - -// A wrapper template for lazy-sweep resource. -// See https://github.com/ossrs/srs/issues/3176#lazy-sweep +// This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this +// smart pointer resource, such as by implementing delayed release. // -// Usage for resource which manages itself in coroutine cycle, see SrsLazyGbSession: -// class Resource { -// private: -// SrsLazyObjectWrapper* wrapper_; -// private: -// friend class SrsLazyObjectWrapper; -// Resource(SrsLazyObjectWrapper* wrapper) { wrapper_ = wrapper; } -// public: -// srs_error_t Resource::cycle() { -// srs_error_t err = do_cycle(); -// _srs_gb_manager->remove(wrapper_); -// return err; -// } -// }; -// SrsLazyObjectWrapper* obj = new SrsLazyObjectWrapper*(); -// _srs_gb_manager->add(obj); // Add wrapper to resource manager. -// Start a coroutine to do obj->resource()->cycle(). +// It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage +// is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted +// to one another. // -// Usage for resource managed by other object: -// class Resource { -// private: -// friend class SrsLazyObjectWrapper; -// Resource(SrsLazyObjectWrapper* /*wrapper*/) { -// } -// }; -// class Manager { -// private: -// SrsLazyObjectWrapper* wrapper_; -// public: -// Manager() { wrapper_ = new SrsLazyObjectWrapper(); } -// ~Manager() { srs_freep(wrapper_); } -// }; -// Manager* manager = new Manager(); -// srs_freep(manager); +// Note that we don't need to implement the move constructor and move assignment operator, because we directly +// use SrsSharedPtr as instance member, so we can only copy it. // -// Note that under-layer resource are destroyed by _srs_gc, which is literally equal to srs_freep. However, the root -// wrapper might be managed by other resource manager, such as _srs_gb_manager for SrsLazyGbSession. Furthermore, other -// copied out wrappers might be freed by srs_freep. All are ok, because all wrapper and resources are simply normal -// object, so if you added to manager then you should use manager to remove it, and you can also directly delete it. +// Usage: +// SrsSharedResource* ptr = new SrsSharedResource(new MyClass()); +// (*ptr)->do_something(); +// +// ISrsResourceManager* manager = ...; +// manager->remove(ptr); template -class SrsLazyObjectWrapper : public ISrsResource +class SrsSharedResource : public ISrsResource { private: - T* resource_; + SrsSharedPtr ptr_; public: - SrsLazyObjectWrapper() { - init(new T(this)); + SrsSharedResource(T* ptr) : ptr_(ptr) { } - virtual ~SrsLazyObjectWrapper() { - resource_->gc_dispose(); - if (resource_->gc_ref() == 0) { - _srs_gc->remove(resource_); - } + SrsSharedResource(const SrsSharedResource& cp) : ptr_(cp.ptr_) { } -private: - SrsLazyObjectWrapper(T* resource) { - init(resource); - } - void init(T* resource) { - resource_ = resource; - resource_->gc_use(); + virtual ~SrsSharedResource() { } public: - SrsLazyObjectWrapper* copy() { - return new SrsLazyObjectWrapper(resource_); + // Get the object. + T* get() { + return ptr_.get(); + } + // Overload the -> operator. + T* operator->() { + return ptr_.operator->(); + } + // The assign operator. + SrsSharedResource& operator=(const SrsSharedResource& cp) { + if (this != &cp) { + ptr_ = cp.ptr_; + } + return *this; + } +private: + // Overload the * operator. + T& operator*() { + return ptr_.operator*(); } - T* resource() { - return resource_; + // Overload the bool operator. + operator bool() const { + return ptr_.operator bool(); } // Interface ISrsResource public: virtual const SrsContextId& get_id() { - return resource_->get_id(); + return ptr_->get_id(); } virtual std::string desc() { - return resource_->desc(); + return ptr_->desc(); } }; diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 99c9dbf0cd..98bfb0d2b9 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -70,11 +70,12 @@ std::string srs_sip_state(SrsGbSipState ostate, SrsGbSipState state) return srs_fmt("%s->%s", srs_gb_sip_state(ostate).c_str(), srs_gb_sip_state(state).c_str()); } -SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root) +SrsGbSession::SrsGbSession() : sip_(new SrsGbSipTcpConn()), media_(new SrsGbMediaTcpConn()) { - wrapper_root_ = wrapper_root; - sip_ = new SrsLazyObjectWrapper(); - media_ = new SrsLazyObjectWrapper(); + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + muxer_ = new SrsGbMuxer(this); state_ = SrsGbSessionStateInit; @@ -102,41 +103,43 @@ SrsLazyGbSession::SrsLazyGbSession(SrsLazyObjectWrapper* wrapp cid_ = _srs_context->generate_id(); _srs_context->set_id(cid_); // Also change current coroutine cid as session's. - trd_ = new SrsSTCoroutine("GBS", this, cid_); } -SrsLazyGbSession::~SrsLazyGbSession() +SrsGbSession::~SrsGbSession() { - srs_freep(trd_); - srs_freep(sip_); - srs_freep(media_); srs_freep(muxer_); srs_freep(ppp_); } -srs_error_t SrsLazyGbSession::initialize(SrsConfDirective* conf) +void SrsGbSession::setup(SrsConfDirective* conf) { - srs_error_t err = srs_success; - pip_ = candidate_ = _srs_config->get_stream_caster_sip_candidate(conf); if (candidate_ == "*") { pip_ = srs_get_public_internet_address(true); } std::string output = _srs_config->get_stream_caster_output(conf); - if ((err = muxer_->initialize(output)) != srs_success) { - return srs_error_wrap(err, "muxer"); - } + muxer_->setup(output); connecting_timeout_ = _srs_config->get_stream_caster_sip_timeout(conf); reinvite_wait_ = _srs_config->get_stream_caster_sip_reinvite(conf); srs_trace("Session: Start timeout=%dms, reinvite=%dms, candidate=%s, pip=%s, output=%s", srsu2msi(connecting_timeout_), srsu2msi(reinvite_wait_), candidate_.c_str(), pip_.c_str(), output.c_str()); +} - return err; +void SrsGbSession::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +{ + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; +} + +void SrsGbSession::on_executor_done(ISrsInterruptable* executor) +{ + owner_coroutine_ = NULL; } -void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) +void SrsGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs) { // Got a new context, that is new media transport. if (media_id_ != ctx->media_id_) { @@ -195,57 +198,47 @@ void SrsLazyGbSession::on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const st } } -void SrsLazyGbSession::on_sip_transport(SrsLazyObjectWrapper* sip) +void SrsGbSession::on_sip_transport(SrsSharedResource sip) { - srs_freep(sip_); - sip_ = sip->copy(); - + sip_ = sip; // Change id of SIP and all its child coroutines. - sip_->resource()->set_cid(cid_); + sip_->set_cid(cid_); } -SrsLazyObjectWrapper* SrsLazyGbSession::sip_transport() +SrsSharedResource SrsGbSession::sip_transport() { return sip_; } -void SrsLazyGbSession::on_media_transport(SrsLazyObjectWrapper* media) +void SrsGbSession::on_media_transport(SrsSharedResource media) { - srs_freep(media_); - media_ = media->copy(); + media_ = media; // Change id of SIP and all its child coroutines. - media_->resource()->set_cid(cid_); + media_->set_cid(cid_); } -std::string SrsLazyGbSession::pip() +std::string SrsGbSession::pip() { return pip_; } -srs_error_t SrsLazyGbSession::start() +srs_error_t SrsGbSession::cycle() { srs_error_t err = srs_success; - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "coroutine"); - } + // Update all context id to cid of session. + _srs_context->set_id(cid_); + owner_cid_->set_cid(cid_); + sip_->set_cid(cid_); + media_->set_cid(cid_); - return err; -} - -srs_error_t SrsLazyGbSession::cycle() -{ - srs_error_t err = do_cycle(); + // Drive the session cycle. + err = do_cycle(); // Interrupt the SIP and media transport when session terminated. - sip_->resource()->interrupt(); - media_->resource()->interrupt(); - - // Note that we added wrapper to manager, so we must free the wrapper, not this connection. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. - _srs_gb_manager->remove(wrapper); + sip_->interrupt(); + media_->interrupt(); // success. if (err == srs_success) { @@ -274,12 +267,13 @@ srs_error_t SrsLazyGbSession::cycle() return srs_success; } -srs_error_t SrsLazyGbSession::do_cycle() +srs_error_t SrsGbSession::do_cycle() { srs_error_t err = srs_success; while (true) { - if ((err = trd_->pull()) != srs_success) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } @@ -287,7 +281,7 @@ srs_error_t SrsLazyGbSession::do_cycle() srs_usleep(SRS_GB_SESSION_DRIVE_INTERVAL); // Client send bye, we should dispose the session. - if (sip_->resource()->is_bye()) { + if (sip_->is_bye()) { return err; } @@ -310,35 +304,33 @@ srs_error_t SrsLazyGbSession::do_cycle() return err; } -srs_error_t SrsLazyGbSession::drive_state() +srs_error_t SrsGbSession::drive_state() { srs_error_t err = srs_success; #define SRS_GB_CHANGE_STATE_TO(state) { \ SrsGbSessionState ostate = set_state(state); \ - srs_trace("Session: Change device=%s, state=%s", sip_->resource()->device_id().c_str(), \ + srs_trace("Session: Change device=%s, state=%s", sip_->device_id().c_str(), \ srs_gb_state(ostate, state_).c_str()); \ } if (state_ == SrsGbSessionStateInit) { // Set to connecting, whatever media is connected or not, because the connecting state will handle it if media // is connected, so we don't need to handle it here. - if (sip_->resource()->is_registered()) { + if (sip_->is_registered()) { SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateConnecting); connecting_starttime_ = srs_update_system_time(); } // Invite if media is not connected. - if (sip_->resource()->is_registered() && !media_->resource()->is_connected()) { + if (sip_->is_registered() && !media_->is_connected()) { uint32_t ssrc = 0; - if ((err = sip_->resource()->invite_request(&ssrc)) != srs_success) { + if ((err = sip_->invite_request(&ssrc)) != srs_success) { return srs_error_wrap(err, "invite"); } // Now, we're able to query session by ssrc, for media packets. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. - _srs_gb_manager->add_with_fast_id(ssrc, wrapper); + _srs_gb_manager->add_with_fast_id(ssrc, wrapper_); } } @@ -349,32 +341,32 @@ srs_error_t SrsLazyGbSession::drive_state() } srs_trace("Session: Connecting timeout, nn=%d, state=%s, sip=%s, media=%d", nn_timeout_, srs_gb_session_state(state_).c_str(), - srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); - sip_->resource()->reset_to_register(); + srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected()); + sip_->reset_to_register(); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); } - if (sip_->resource()->is_stable() && media_->resource()->is_connected()) { + if (sip_->is_stable() && media_->is_connected()) { SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateEstablished); } } if (state_ == SrsGbSessionStateEstablished) { - if (sip_->resource()->is_bye()) { + if (sip_->is_bye()) { srs_trace("Session: Dispose for client bye"); return err; } // When media disconnected, we wait for a while then reinvite. - if (!media_->resource()->is_connected()) { + if (!media_->is_connected()) { if (!reinviting_starttime_) { reinviting_starttime_ = srs_update_system_time(); } if (srs_get_system_time() - reinviting_starttime_ > reinvite_wait_) { reinviting_starttime_ = 0; srs_trace("Session: Re-invite for disconnect, state=%s, sip=%s, media=%d", srs_gb_session_state(state_).c_str(), - srs_gb_sip_state(sip_->resource()->state()).c_str(), media_->resource()->is_connected()); - sip_->resource()->reset_to_register(); + srs_gb_sip_state(sip_->state()).c_str(), media_->is_connected()); + sip_->reset_to_register(); SRS_GB_CHANGE_STATE_TO(SrsGbSessionStateInit); } } @@ -383,19 +375,19 @@ srs_error_t SrsLazyGbSession::drive_state() return err; } -SrsGbSessionState SrsLazyGbSession::set_state(SrsGbSessionState v) +SrsGbSessionState SrsGbSession::set_state(SrsGbSessionState v) { SrsGbSessionState state = state_; state_ = v; return state; } -const SrsContextId& SrsLazyGbSession::get_id() +const SrsContextId& SrsGbSession::get_id() { return cid_; } -std::string SrsLazyGbSession::desc() +std::string SrsGbSession::desc() { return "GBS"; } @@ -463,27 +455,33 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf // Handle TCP connections. if (listener == sip_listener_) { - SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); - SrsLazyGbSipTcpConn* resource = dynamic_cast(conn->resource()); - resource->setup(conf_, sip_listener_, media_listener_, stfd); + SrsGbSipTcpConn* raw_conn = new SrsGbSipTcpConn(); + raw_conn->setup(conf_, sip_listener_, media_listener_, stfd); + + SrsSharedResource* conn = new SrsSharedResource(raw_conn); + _srs_gb_manager->add(conn, NULL); - if ((err = resource->start()) != srs_success) { - srs_freep(conn); + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); + raw_conn->setup_owner(conn, executor, executor); + + if ((err = executor->start()) != srs_success) { + srs_freep(executor); return srs_error_wrap(err, "gb sip"); } + } else if (listener == media_listener_) { + SrsGbMediaTcpConn* raw_conn = new SrsGbMediaTcpConn(); + raw_conn->setup(stfd); + SrsSharedResource* conn = new SrsSharedResource(raw_conn); _srs_gb_manager->add(conn, NULL); - } else if (listener == media_listener_) { - SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); - SrsLazyGbMediaTcpConn* resource = dynamic_cast(conn->resource()); - resource->setup(stfd); - if ((err = resource->start()) != srs_success) { - srs_freep(conn); + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, conn, raw_conn, raw_conn); + raw_conn->setup_owner(conn, executor, executor); + + if ((err = executor->start()) != srs_success) { + srs_freep(executor); return srs_error_wrap(err, "gb media"); } - - _srs_gb_manager->add(conn, NULL); } else { srs_warn("GB: Ignore TCP client"); srs_close_stfd(stfd); @@ -492,9 +490,13 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf return err; } -SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root) +SrsGbSipTcpConn::SrsGbSipTcpConn() { - wrapper_root_ = wrapper_root; + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + cid_ = _srs_context->get_id(); + session_ = NULL; state_ = SrsGbSipStateInit; register_ = new SrsSipMessage(); @@ -507,54 +509,62 @@ SrsLazyGbSipTcpConn::SrsLazyGbSipTcpConn(SrsLazyObjectWrappercopy(); - session_ = NULL; sip_listener_ = sip; media_listener_ = media; conn_ = new SrsTcpConnection(stfd); - receiver_ = new SrsLazyGbSipTcpReceiver(this, conn_); - sender_ = new SrsLazyGbSipTcpSender(conn_); + receiver_ = new SrsGbSipTcpReceiver(this, conn_); + sender_ = new SrsGbSipTcpSender(conn_); +} + +void SrsGbSipTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +{ + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; } -std::string SrsLazyGbSipTcpConn::device_id() +void SrsGbSipTcpConn::on_executor_done(ISrsInterruptable* executor) +{ + owner_coroutine_ = NULL; +} + +std::string SrsGbSipTcpConn::device_id() { return register_->device_id(); } -void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) +void SrsGbSipTcpConn::set_cid(const SrsContextId& cid) { - trd_->set_cid(cid); + if (owner_cid_) owner_cid_->set_cid(cid); receiver_->set_cid(cid); sender_->set_cid(cid); + cid_ = cid; } -void SrsLazyGbSipTcpConn::query_ports(int* sip, int* media) +void SrsGbSipTcpConn::query_ports(int* sip, int* media) { if (sip) *sip = sip_listener_->port(); if (media) *media = media_listener_->port(); } -srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg) +srs_error_t SrsGbSipTcpConn::on_sip_message(SrsSipMessage* msg) { srs_error_t err = srs_success; @@ -603,7 +613,7 @@ srs_error_t SrsLazyGbSipTcpConn::on_sip_message(SrsSipMessage* msg) return err; } -void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) +void SrsGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) { // Drive state machine when enqueue message. drive_state(msg); @@ -612,7 +622,7 @@ void SrsLazyGbSipTcpConn::enqueue_sip_message(SrsSipMessage* msg) sender_->enqueue(msg); } -void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg) +void SrsGbSipTcpConn::drive_state(SrsSipMessage* msg) { srs_error_t err = srs_success; @@ -669,7 +679,7 @@ void SrsLazyGbSipTcpConn::drive_state(SrsSipMessage* msg) } } -void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg) +void SrsGbSipTcpConn::register_response(SrsSipMessage* msg) { SrsSipMessage* res = new SrsSipMessage(); @@ -686,7 +696,7 @@ void SrsLazyGbSipTcpConn::register_response(SrsSipMessage* msg) enqueue_sip_message(res); } -void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) +void SrsGbSipTcpConn::message_response(SrsSipMessage* msg, http_status status) { SrsSipMessage* res = new SrsSipMessage(); @@ -701,9 +711,9 @@ void SrsLazyGbSipTcpConn::message_response(SrsSipMessage* msg, http_status statu enqueue_sip_message(res); } -void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) +void SrsGbSipTcpConn::invite_ack(SrsSipMessage* msg) { - string pip = session_->resource()->pip(); // Parse from CANDIDATE + string pip = session_->pip(); // Parse from CANDIDATE int sip_port; query_ports(&sip_port, NULL); string gb_device_id = srs_fmt("sip:%s@%s", msg->to_address_user_.c_str(), msg->to_address_host_.c_str()); string branch = srs_random_str(6); @@ -722,7 +732,7 @@ void SrsLazyGbSipTcpConn::invite_ack(SrsSipMessage* msg) enqueue_sip_message(req); } -void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg) +void SrsGbSipTcpConn::bye_response(SrsSipMessage* msg) { SrsSipMessage* res = new SrsSipMessage(); @@ -737,7 +747,7 @@ void SrsLazyGbSipTcpConn::bye_response(SrsSipMessage* msg) enqueue_sip_message(res); } -srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) +srs_error_t SrsGbSipTcpConn::invite_request(uint32_t* pssrc) { srs_error_t err = srs_success; @@ -765,7 +775,7 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) if (pssrc) *pssrc = ssrc_v_; } - string pip = session_->resource()->pip(); // Parse from CANDIDATE + string pip = session_->pip(); // Parse from CANDIDATE int sip_port, media_port; query_ports(&sip_port, &media_port); string srs_device_id = srs_fmt("sip:%s@%s", register_->request_uri_user_.c_str(), register_->request_uri_host_.c_str()); string gb_device_id = srs_fmt("sip:%s@%s", register_->from_address_user_.c_str(), register_->from_address_host_.c_str()); @@ -834,63 +844,59 @@ srs_error_t SrsLazyGbSipTcpConn::invite_request(uint32_t* pssrc) return err; } -void SrsLazyGbSipTcpConn::interrupt() +void SrsGbSipTcpConn::interrupt() { receiver_->interrupt(); sender_->interrupt(); - trd_->interrupt(); + if (owner_coroutine_) owner_coroutine_->interrupt(); } -SrsGbSipState SrsLazyGbSipTcpConn::state() +SrsGbSipState SrsGbSipTcpConn::state() { return state_; } -void SrsLazyGbSipTcpConn::reset_to_register() +void SrsGbSipTcpConn::reset_to_register() { state_ = SrsGbSipStateRegistered; } -bool SrsLazyGbSipTcpConn::is_registered() +bool SrsGbSipTcpConn::is_registered() { return state_ >= SrsGbSipStateRegistered && state_ <= SrsGbSipStateStable; } -bool SrsLazyGbSipTcpConn::is_stable() +bool SrsGbSipTcpConn::is_stable() { return state_ == SrsGbSipStateStable; } -bool SrsLazyGbSipTcpConn::is_bye() +bool SrsGbSipTcpConn::is_bye() { return state_ == SrsGbSipStateBye; } -SrsGbSipState SrsLazyGbSipTcpConn::set_state(SrsGbSipState v) +SrsGbSipState SrsGbSipTcpConn::set_state(SrsGbSipState v) { SrsGbSipState state = state_; state_ = v; return state; } -const SrsContextId& SrsLazyGbSipTcpConn::get_id() +const SrsContextId& SrsGbSipTcpConn::get_id() { - return trd_->cid(); + return cid_; } -std::string SrsLazyGbSipTcpConn::desc() +std::string SrsGbSipTcpConn::desc() { return "GB-SIP-TCP"; } -srs_error_t SrsLazyGbSipTcpConn::start() +srs_error_t SrsGbSipTcpConn::cycle() { srs_error_t err = srs_success; - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "sip"); - } - if ((err = receiver_->start()) != srs_success) { return srs_error_wrap(err, "receiver"); } @@ -899,22 +905,13 @@ srs_error_t SrsLazyGbSipTcpConn::start() return srs_error_wrap(err, "sender"); } - return err; -} - -srs_error_t SrsLazyGbSipTcpConn::cycle() -{ - srs_error_t err = do_cycle(); + // Wait for the SIP connection to be terminated. + err = do_cycle(); // Interrupt the receiver and sender coroutine. receiver_->interrupt(); sender_->interrupt(); - // Note that we added wrapper to manager, so we must free the wrapper, not this connection. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. - _srs_gb_manager->remove(wrapper); - // success. if (err == srs_success) { srs_trace("client finished."); @@ -942,23 +939,23 @@ srs_error_t SrsLazyGbSipTcpConn::cycle() return srs_success; } -srs_error_t SrsLazyGbSipTcpConn::do_cycle() +srs_error_t SrsGbSipTcpConn::do_cycle() { srs_error_t err = srs_success; while (true) { - if ((err = trd_->pull()) != srs_success) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } - // TODO: Handle other messages. srs_usleep(SRS_UTIME_NO_TIMEOUT); } return err; } -srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession) +srs_error_t SrsGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsGbSession** psession) { srs_error_t err = srs_success; @@ -968,32 +965,29 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW // Only create session for REGISTER request. if (msg->type_ != HTTP_REQUEST || msg->method_ != HTTP_REGISTER) return err; - // The lazy-sweep wrapper for this resource. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine of receiver. - // Find exists session for register, might be created by another object and still alive. - SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); + SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_id(device)); + SrsGbSession* raw_session = session ? (*session).get() : NULL; if (!session) { // Create new GB session. - session = new SrsLazyObjectWrapper(); + raw_session = new SrsGbSession(); + raw_session->setup(conf_); - if ((err = session->resource()->initialize(conf_)) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "initialize"); - } + session = new SrsSharedResource(raw_session); + _srs_gb_manager->add_with_id(device, session); - if ((err = session->resource()->start()) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "start"); - } + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session); + raw_session->setup_owner(session, executor, executor); - _srs_gb_manager->add_with_id(device, session); + if ((err = executor->start()) != srs_success) { + srs_freep(executor); + return srs_error_wrap(err, "gb session"); + } } // Try to load state from previous SIP connection. - SrsLazyGbSipTcpConn* pre = dynamic_cast(session->resource()->sip_transport()->resource()); - if (pre) { + SrsSharedResource pre = raw_session->sip_transport(); + if (pre.get() && pre.get() != this) { state_ = pre->state_; ssrc_str_ = pre->ssrc_str_; ssrc_v_ = pre->ssrc_v_; @@ -1001,36 +995,36 @@ srs_error_t SrsLazyGbSipTcpConn::bind_session(SrsSipMessage* msg, SrsLazyObjectW srs_freep(invite_ok_); invite_ok_ = pre->invite_ok_->copy(); } - // Notice SIP session to use current SIP connection. - session->resource()->on_sip_transport(wrapper); - *psession = session->copy(); + // Notice session to use current SIP connection. + raw_session->on_sip_transport(*wrapper_); + *psession = raw_session; return err; } -SrsLazyGbSipTcpReceiver::SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn) +SrsGbSipTcpReceiver::SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn) { sip_ = sip; conn_ = conn; trd_ = new SrsSTCoroutine("sip-receiver", this); } -SrsLazyGbSipTcpReceiver::~SrsLazyGbSipTcpReceiver() +SrsGbSipTcpReceiver::~SrsGbSipTcpReceiver() { srs_freep(trd_); } -void SrsLazyGbSipTcpReceiver::interrupt() +void SrsGbSipTcpReceiver::interrupt() { trd_->interrupt(); } -void SrsLazyGbSipTcpReceiver::set_cid(const SrsContextId& cid) +void SrsGbSipTcpReceiver::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); } -srs_error_t SrsLazyGbSipTcpReceiver::start() +srs_error_t SrsGbSipTcpReceiver::start() { srs_error_t err = srs_success; @@ -1041,7 +1035,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::start() return err; } -srs_error_t SrsLazyGbSipTcpReceiver::cycle() +srs_error_t SrsGbSipTcpReceiver::cycle() { srs_error_t err = do_cycle(); @@ -1053,7 +1047,7 @@ srs_error_t SrsLazyGbSipTcpReceiver::cycle() return err; } -srs_error_t SrsLazyGbSipTcpReceiver::do_cycle() +srs_error_t SrsGbSipTcpReceiver::do_cycle() { srs_error_t err = srs_success; @@ -1092,14 +1086,14 @@ srs_error_t SrsLazyGbSipTcpReceiver::do_cycle() return err; } -SrsLazyGbSipTcpSender::SrsLazyGbSipTcpSender(SrsTcpConnection* conn) +SrsGbSipTcpSender::SrsGbSipTcpSender(SrsTcpConnection* conn) { conn_ = conn; wait_ = srs_cond_new(); trd_ = new SrsSTCoroutine("sip-sender", this); } -SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender() +SrsGbSipTcpSender::~SrsGbSipTcpSender() { srs_freep(trd_); srs_cond_destroy(wait_); @@ -1110,23 +1104,23 @@ SrsLazyGbSipTcpSender::~SrsLazyGbSipTcpSender() } } -void SrsLazyGbSipTcpSender::enqueue(SrsSipMessage* msg) +void SrsGbSipTcpSender::enqueue(SrsSipMessage* msg) { msgs_.push_back(msg); srs_cond_signal(wait_); } -void SrsLazyGbSipTcpSender::interrupt() +void SrsGbSipTcpSender::interrupt() { trd_->interrupt(); } -void SrsLazyGbSipTcpSender::set_cid(const SrsContextId& cid) +void SrsGbSipTcpSender::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); } -srs_error_t SrsLazyGbSipTcpSender::start() +srs_error_t SrsGbSipTcpSender::start() { srs_error_t err = srs_success; @@ -1137,7 +1131,7 @@ srs_error_t SrsLazyGbSipTcpSender::start() return err; } -srs_error_t SrsLazyGbSipTcpSender::cycle() +srs_error_t SrsGbSipTcpSender::cycle() { srs_error_t err = do_cycle(); @@ -1149,7 +1143,7 @@ srs_error_t SrsLazyGbSipTcpSender::cycle() return err; } -srs_error_t SrsLazyGbSipTcpSender::do_cycle() +srs_error_t SrsGbSipTcpSender::do_cycle() { srs_error_t err = srs_success; @@ -1219,71 +1213,74 @@ ISrsPsPackHandler::~ISrsPsPackHandler() { } -SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root) +SrsGbMediaTcpConn::SrsGbMediaTcpConn() { - wrapper_root_ = wrapper_root; pack_ = new SrsPackContext(this); - trd_ = new SrsSTCoroutine("media", this); buffer_ = new uint8_t[65535]; conn_ = NULL; + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + cid_ = _srs_context->get_id(); + session_ = NULL; connected_ = false; nn_rtcp_ = 0; } -SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn() +SrsGbMediaTcpConn::~SrsGbMediaTcpConn() { - srs_freep(trd_); srs_freep(conn_); srs_freepa(buffer_); srs_freep(pack_); - srs_freep(session_); } -void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd) +void SrsGbMediaTcpConn::setup(srs_netfd_t stfd) { srs_freep(conn_); conn_ = new SrsTcpConnection(stfd); } -bool SrsLazyGbMediaTcpConn::is_connected() +void SrsGbMediaTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) { - return connected_; + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; } -void SrsLazyGbMediaTcpConn::interrupt() +void SrsGbMediaTcpConn::on_executor_done(ISrsInterruptable* executor) { - trd_->interrupt(); + owner_coroutine_ = NULL; } -void SrsLazyGbMediaTcpConn::set_cid(const SrsContextId& cid) +bool SrsGbMediaTcpConn::is_connected() { - trd_->set_cid(cid); + return connected_; } -const SrsContextId& SrsLazyGbMediaTcpConn::get_id() +void SrsGbMediaTcpConn::interrupt() { - return _srs_context->get_id(); + if (owner_coroutine_) owner_coroutine_->interrupt(); } -std::string SrsLazyGbMediaTcpConn::desc() +void SrsGbMediaTcpConn::set_cid(const SrsContextId& cid) { - return "GB-Media-TCP"; + if (owner_cid_) owner_cid_->set_cid(cid); + cid_ = cid; } -srs_error_t SrsLazyGbMediaTcpConn::start() +const SrsContextId& SrsGbMediaTcpConn::get_id() { - srs_error_t err = srs_success; - - if ((err = trd_->start()) != srs_success) { - return srs_error_wrap(err, "coroutine"); - } + return cid_; +} - return err; +std::string SrsGbMediaTcpConn::desc() +{ + return "GB-Media-TCP"; } -srs_error_t SrsLazyGbMediaTcpConn::cycle() +srs_error_t SrsGbMediaTcpConn::cycle() { srs_error_t err = do_cycle(); @@ -1295,11 +1292,6 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle() connected_ = false; srs_trace("PS: Media disconnect, code=%d", srs_error_code(err)); - // Note that we added wrapper to manager, so we must free the wrapper, not this connection. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // The creator wrapper MUST never be null, because we created it. - _srs_gb_manager->remove(wrapper); - // success. if (err == srs_success) { srs_trace("client finished."); @@ -1327,7 +1319,7 @@ srs_error_t SrsLazyGbMediaTcpConn::cycle() return srs_success; } -srs_error_t SrsLazyGbMediaTcpConn::do_cycle() +srs_error_t SrsGbMediaTcpConn::do_cycle() { srs_error_t err = srs_success; @@ -1341,7 +1333,8 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle() uint32_t reserved = 0; for (;;) { - if ((err = trd_->pull()) != srs_success) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { return srs_error_wrap(err, "pull"); } @@ -1426,7 +1419,7 @@ srs_error_t SrsLazyGbMediaTcpConn::do_cycle() return err; } -srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs) +srs_error_t SrsGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector& msgs) { srs_error_t err = srs_success; @@ -1437,7 +1430,7 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector } // Notify session about the media pack. - session_->resource()->on_ps_pack(pack_, ps, msgs); + session_->on_ps_pack(pack_, ps, msgs); //for (vector::const_iterator it = msgs.begin(); it != msgs.end(); ++it) { // SrsTsMessage* msg = *it; @@ -1450,23 +1443,22 @@ srs_error_t SrsLazyGbMediaTcpConn::on_ps_pack(SrsPsPacket* ps, const std::vector return err; } -srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession) +srs_error_t SrsGbMediaTcpConn::bind_session(uint32_t ssrc, SrsGbSession** psession) { srs_error_t err = srs_success; if (!ssrc) return err; - // The lazy-sweep wrapper for this resource. - SrsLazyObjectWrapper* wrapper = wrapper_root_; - srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. - // Find exists session for register, might be created by another object and still alive. - SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); + SrsSharedResource* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); if (!session) return err; - _srs_gb_manager->add_with_fast_id(ssrc, session); - session->resource()->on_media_transport(wrapper); - *psession = session->copy(); + SrsGbSession* raw_session = (*session).get(); + srs_assert(raw_session); + + // Notice session to use current media connection. + raw_session->on_media_transport(*wrapper_); + *psession = raw_session; return err; } @@ -1545,7 +1537,7 @@ SrsSharedPtrMessage* SrsMpegpsQueue::dequeue() return NULL; } -SrsGbMuxer::SrsGbMuxer(SrsLazyGbSession* session) +SrsGbMuxer::SrsGbMuxer(SrsGbSession* session) { sdk_ = NULL; session_ = session; @@ -1580,13 +1572,9 @@ SrsGbMuxer::~SrsGbMuxer() srs_freep(pprint_); } -srs_error_t SrsGbMuxer::initialize(std::string output) +void SrsGbMuxer::setup(std::string output) { - srs_error_t err = srs_success; - output_ = output; - - return err; } srs_error_t SrsGbMuxer::on_ts_message(SrsTsMessage* msg) @@ -2095,7 +2083,7 @@ srs_error_t SrsGbMuxer::connect() // Cleanup the data before connect again. close(); - string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id()); + string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->device_id()); srs_trace("Muxer: Convert GB to RTMP %s", url.c_str()); srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 783842505f..e44b618c6d 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -26,11 +26,11 @@ class SrsCoroutine; class SrsPackContext; class SrsBuffer; class SrsSipMessage; -class SrsLazyGbSession; -class SrsLazyGbSipTcpConn; -class SrsLazyGbMediaTcpConn; -class SrsLazyGbSipTcpReceiver; -class SrsLazyGbSipTcpSender; +class SrsGbSession; +class SrsGbSipTcpConn; +class SrsGbMediaTcpConn; +class SrsGbSipTcpReceiver; +class SrsGbSipTcpSender; class SrsAlonePithyPrint; class SrsGbMuxer; class SrsSimpleRtmpClient; @@ -51,7 +51,7 @@ class SrsRawAacStream; // established: // init: media is not connected. // dispose session: sip is bye. -// Please see SrsLazyGbSession::drive_state for detail. +// Please see SrsGbSession::drive_state for detail. enum SrsGbSessionState { SrsGbSessionStateInit = 0, @@ -76,7 +76,7 @@ std::string srs_gb_session_state(SrsGbSessionState state); // to bye: Got bye SIP message from device. // re-inviting: // to inviting: Got bye OK response from deivce. -// Please see SrsLazyGbSipTcpConn::drive_state for detail. +// Please see SrsGbSipTcpConn::drive_state for detail. enum SrsGbSipState { SrsGbSipStateInit = 0, @@ -90,16 +90,23 @@ enum SrsGbSipState std::string srs_gb_sip_state(SrsGbSipState state); // The main logic object for GB, the session. -class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler +// Each session contains a SIP object and a media object, that are managed by session. This means session always +// lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word, +// SIP and media objects use directly pointer to session, while session use shared ptr. +class SrsGbSession : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: - SrsCoroutine* trd_; SrsContextId cid_; +private: + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; private: SrsGbSessionState state_; - SrsLazyObjectWrapper* wrapper_root_; - SrsLazyObjectWrapper* sip_; - SrsLazyObjectWrapper* media_; + SrsSharedResource sip_; + SrsSharedResource media_; SrsGbMuxer* muxer_; private: // The candidate for SDP in configuration. @@ -132,26 +139,27 @@ class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsS uint64_t media_recovered_; uint64_t media_msgs_dropped_; uint64_t media_reserved_; -private: - friend class SrsLazyObjectWrapper; - SrsLazyGbSession(SrsLazyObjectWrapper* wrapper_root); public: - virtual ~SrsLazyGbSession(); + SrsGbSession(); + virtual ~SrsGbSession(); public: // Initialize the GB session. - srs_error_t initialize(SrsConfDirective* conf); + void setup(SrsConfDirective* conf); + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); +// Interface ISrsExecutorHandler +public: + virtual void on_executor_done(ISrsInterruptable* executor); +public: // When got a pack of messages. void on_ps_pack(SrsPackContext* ctx, SrsPsPacket* ps, const std::vector& msgs); // When got available SIP transport. - void on_sip_transport(SrsLazyObjectWrapper* sip); - SrsLazyObjectWrapper* sip_transport(); + void on_sip_transport(SrsSharedResource sip); + SrsSharedResource sip_transport(); // When got available media transport. - void on_media_transport(SrsLazyObjectWrapper* media); + void on_media_transport(SrsSharedResource media); // Get the candidate for SDP generation, the public IP address for device to connect to. std::string pip(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); @@ -186,12 +194,12 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler }; // A GB28181 TCP SIP connection. -class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler +class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: SrsGbSipState state_; - SrsLazyObjectWrapper* wrapper_root_; - SrsLazyObjectWrapper* session_; + // The owner session object, note that we use the raw pointer and should never free it. + SrsGbSession* session_; SrsSipMessage* register_; SrsSipMessage* invite_ok_; private: @@ -202,18 +210,28 @@ class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public IS SrsTcpListener* sip_listener_; SrsTcpListener* media_listener_; private: - SrsTcpConnection* conn_; - SrsLazyGbSipTcpReceiver* receiver_; - SrsLazyGbSipTcpSender* sender_; - SrsCoroutine* trd_; + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; + SrsContextId cid_; private: - friend class SrsLazyObjectWrapper; - SrsLazyGbSipTcpConn(SrsLazyObjectWrapper* wrapper_root); + SrsTcpConnection* conn_; + SrsGbSipTcpReceiver* receiver_; + SrsGbSipTcpSender* sender_; public: - virtual ~SrsLazyGbSipTcpConn(); + SrsGbSipTcpConn(); + virtual ~SrsGbSipTcpConn(); public: // Setup object, to keep empty constructor. void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd); + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); +// Interface ISrsExecutorHandler +public: + virtual void on_executor_done(ISrsInterruptable* executor); +public: // Get the SIP device id. std::string device_id(); // Set the cid of all coroutines. @@ -253,29 +271,26 @@ class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public IS public: virtual const SrsContextId& get_id(); virtual std::string desc(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); private: - virtual srs_error_t do_cycle(); + srs_error_t do_cycle(); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(SrsSipMessage* msg, SrsLazyObjectWrapper** psession); + srs_error_t bind_session(SrsSipMessage* msg, SrsGbSession** psession); }; // Start a coroutine to receive SIP messages. -class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler +class SrsGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandler { private: SrsCoroutine* trd_; SrsTcpConnection* conn_; - SrsLazyGbSipTcpConn* sip_; + SrsGbSipTcpConn* sip_; public: - SrsLazyGbSipTcpReceiver(SrsLazyGbSipTcpConn* sip, SrsTcpConnection* conn); - virtual ~SrsLazyGbSipTcpReceiver(); + SrsGbSipTcpReceiver(SrsGbSipTcpConn* sip, SrsTcpConnection* conn); + virtual ~SrsGbSipTcpReceiver(); public: // Interrupt the receiver coroutine. void interrupt(); @@ -292,7 +307,7 @@ class SrsLazyGbSipTcpReceiver : public ISrsStartable, public ISrsCoroutineHandle }; // Start a coroutine to send out SIP messages. -class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler +class SrsGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler { private: SrsCoroutine* trd_; @@ -301,8 +316,8 @@ class SrsLazyGbSipTcpSender : public ISrsStartable, public ISrsCoroutineHandler std::vector msgs_; srs_cond_t wait_; public: - SrsLazyGbSipTcpSender(SrsTcpConnection* conn); - virtual ~SrsLazyGbSipTcpSender(); + SrsGbSipTcpSender(SrsTcpConnection* conn); + virtual ~SrsGbSipTcpSender(); public: // Push message to queue, and sender will send out in dedicate coroutine. void enqueue(SrsSipMessage* msg); @@ -333,27 +348,36 @@ class ISrsPsPackHandler }; // A GB28181 TCP media connection, for PS stream. -class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler - , public ISrsPsPackHandler +class SrsGbMediaTcpConn : public ISrsResource, public ISrsCoroutineHandler, public ISrsPsPackHandler, public ISrsExecutorHandler { private: bool connected_; - SrsLazyObjectWrapper* wrapper_root_; - SrsLazyObjectWrapper* session_; + // The owner session object, note that we use the raw pointer and should never free it. + SrsGbSession* session_; uint32_t nn_rtcp_; +private: + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; + SrsContextId cid_; private: SrsPackContext* pack_; SrsTcpConnection* conn_; - SrsCoroutine* trd_; uint8_t* buffer_; -private: - friend class SrsLazyObjectWrapper; - SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper* wrapper_root); public: - virtual ~SrsLazyGbMediaTcpConn(); + SrsGbMediaTcpConn(); + virtual ~SrsGbMediaTcpConn(); public: // Setup object, to keep empty constructor. void setup(srs_netfd_t stfd); + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); +// Interface ISrsExecutorHandler +public: + virtual void on_executor_done(ISrsInterruptable* executor); +public: // Whether media is connected. bool is_connected(); // Interrupt transport by session. @@ -364,9 +388,6 @@ class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public public: virtual const SrsContextId& get_id(); virtual std::string desc(); -// Interface ISrsStartable -public: - virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: virtual srs_error_t cycle(); @@ -377,7 +398,7 @@ class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public virtual srs_error_t on_ps_pack(SrsPsPacket* ps, const std::vector& msgs); private: // Create session if no one, or bind to an existed session. - srs_error_t bind_session(uint32_t ssrc, SrsLazyObjectWrapper** psession); + srs_error_t bind_session(uint32_t ssrc, SrsGbSession** psession); }; // The queue for mpegts over udp to send packets. @@ -402,7 +423,8 @@ class SrsMpegpsQueue class SrsGbMuxer { private: - SrsLazyGbSession* session_; + // The owner session object, note that we use the raw pointer and should never free it. + SrsGbSession* session_; std::string output_; SrsSimpleRtmpClient* sdk_; private: @@ -428,10 +450,10 @@ class SrsGbMuxer SrsMpegpsQueue* queue_; SrsPithyPrint* pprint_; public: - SrsGbMuxer(SrsLazyGbSession* session); + SrsGbMuxer(SrsGbSession* session); virtual ~SrsGbMuxer(); public: - srs_error_t initialize(std::string output); + void setup(std::string output); srs_error_t on_ts_message(SrsTsMessage* msg); private: virtual srs_error_t on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d213f3f098..c5f6500751 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1369,11 +1369,6 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg) } #endif - SrsLazySweepGc* gc = dynamic_cast(_srs_gc); - if ((err = gc->start()) != srs_success) { - return srs_error_wrap(err, "start gc"); - } - return err; } diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 77f581a640..3e21e468cd 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -30,6 +30,30 @@ ISrsStartable::~ISrsStartable() { } +ISrsInterruptable::ISrsInterruptable() +{ +} + +ISrsInterruptable::~ISrsInterruptable() +{ +} + +ISrsContextIdSetter::ISrsContextIdSetter() +{ +} + +ISrsContextIdSetter::~ISrsContextIdSetter() +{ +} + +ISrsContextIdGetter::ISrsContextIdGetter() +{ +} + +ISrsContextIdGetter::~ISrsContextIdGetter() +{ +} + SrsCoroutine::SrsCoroutine() { } @@ -342,3 +366,69 @@ void SrsWaitGroup::wait() } } +ISrsExecutorHandler::ISrsExecutorHandler() +{ +} + +ISrsExecutorHandler::~ISrsExecutorHandler() +{ +} + +SrsExecutorCoroutine::SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb) +{ + resource_ = r; + handler_ = h; + manager_ = m; + callback_ = cb; + trd_ = new SrsSTCoroutine("ar", this, resource_->get_id()); +} + +SrsExecutorCoroutine::~SrsExecutorCoroutine() +{ + manager_->remove(resource_); + srs_freep(trd_); +} + +srs_error_t SrsExecutorCoroutine::start() +{ + return trd_->start(); +} + +void SrsExecutorCoroutine::interrupt() +{ + trd_->interrupt(); +} + +srs_error_t SrsExecutorCoroutine::pull() +{ + return trd_->pull(); +} + +const SrsContextId& SrsExecutorCoroutine::cid() +{ + return trd_->cid(); +} + +void SrsExecutorCoroutine::set_cid(const SrsContextId& cid) +{ + trd_->set_cid(cid); +} + +srs_error_t SrsExecutorCoroutine::cycle() +{ + srs_error_t err = handler_->cycle(); + if (callback_) callback_->on_executor_done(this); + manager_->remove(this); + return err; +} + +const SrsContextId& SrsExecutorCoroutine::get_id() +{ + return resource_->get_id(); +} + +std::string SrsExecutorCoroutine::desc() +{ + return resource_->desc(); +} + diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 51282bca25..d7315b20cd 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -15,8 +15,10 @@ #include #include #include +#include class SrsFastCoroutine; +class SrsExecutorCoroutine; // Each ST-coroutine must implements this interface, // to do the cycle job and handle some events. @@ -64,23 +66,48 @@ class ISrsStartable virtual srs_error_t start() = 0; }; -// The corotine object. -class SrsCoroutine : public ISrsStartable +// Allow user to interrupt the coroutine, for example, to stop it. +class ISrsInterruptable { public: - SrsCoroutine(); - virtual ~SrsCoroutine(); + ISrsInterruptable(); + virtual ~ISrsInterruptable(); public: - virtual void stop() = 0; virtual void interrupt() = 0; - // @return a copy of error, which should be freed by user. - // NULL if not terminated and user should pull again. virtual srs_error_t pull() = 0; - // Get and set the context id of coroutine. - virtual const SrsContextId& cid() = 0; +}; + +// Get the context id. +class ISrsContextIdSetter +{ +public: + ISrsContextIdSetter(); + virtual ~ISrsContextIdSetter(); +public: virtual void set_cid(const SrsContextId& cid) = 0; }; +// Set the context id. +class ISrsContextIdGetter +{ +public: + ISrsContextIdGetter(); + virtual ~ISrsContextIdGetter(); +public: + virtual const SrsContextId& cid() = 0; +}; + +// The coroutine object. +class SrsCoroutine : public ISrsStartable, public ISrsInterruptable + , public ISrsContextIdSetter, public ISrsContextIdGetter +{ +public: + SrsCoroutine(); + virtual ~SrsCoroutine(); +public: + virtual void stop() = 0; +}; + // An empty coroutine, user can default to this object before create any real coroutine. // @see https://github.com/ossrs/srs/pull/908 class SrsDummyCoroutine : public SrsCoroutine @@ -192,7 +219,7 @@ class SrsFastCoroutine static void* pfn(void* arg); }; -// Like goroytine sync.WaitGroup. +// Like goroutine sync.WaitGroup. class SrsWaitGroup { private: @@ -206,9 +233,72 @@ class SrsWaitGroup void add(int n); // When coroutine is done. void done(); - // Wait for all corotine to be done. + // Wait for all coroutine to be done. void wait(); }; +// The callback when executor cycle done. +class ISrsExecutorHandler +{ +public: + ISrsExecutorHandler(); + virtual ~ISrsExecutorHandler(); +public: + virtual void on_executor_done(ISrsInterruptable* executor) = 0; +}; + +// Start a coroutine for resource executor, to execute the handler and delete resource and itself when +// handler cycle done. +// +// Note that the executor will free itself by manager, then free the resource by manager. This is a helper +// that is used for a goroutine to execute a handler and free itself after the cycle is done. +// +// Generally, the handler, resource, and callback generally are the same object. But we do not define a single +// interface, because shared resource is a special interface. +// +// Note that the resource may live longer than executor, because it is shared resource, so you should process +// the callback. For example, you should never use the executor after it's stopped and deleted. +// +// Usage: +// ISrsResourceManager* manager = ...; +// ISrsResource* resource, ISrsCoroutineHandler* handler, ISrsExecutorHandler* callback = ...; // Resource, handler, and callback are the same object. +// SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(manager, resource, handler); +// if ((err = executor->start()) != srs_success) { +// srs_freep(executor); +// return err; +// } +class SrsExecutorCoroutine : public ISrsResource, public ISrsStartable, public ISrsInterruptable + , public ISrsContextIdSetter, public ISrsContextIdGetter, public ISrsCoroutineHandler +{ +private: + ISrsResourceManager* manager_; + ISrsResource* resource_; + ISrsCoroutineHandler* handler_; + ISrsExecutorHandler* callback_; +private: + SrsCoroutine* trd_; +public: + SrsExecutorCoroutine(ISrsResourceManager* m, ISrsResource* r, ISrsCoroutineHandler* h, ISrsExecutorHandler* cb); + virtual ~SrsExecutorCoroutine(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); +// Interface ISrsInterruptable +public: + virtual void interrupt(); + virtual srs_error_t pull(); +// Interface ISrsContextId +public: + virtual const SrsContextId& cid(); + virtual void set_cid(const SrsContextId& cid); +// Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); +// Interface ISrsResource +public: + virtual const SrsContextId& get_id(); + virtual std::string desc(); +}; + #endif diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index b42a163be1..d5c6e42297 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -335,7 +335,6 @@ srs_error_t srs_global_initialize() #ifdef SRS_GB28181 _srs_gb_manager = new SrsResourceManager("GB", true); #endif - _srs_gc = new SrsLazySweepGc(); // Initialize global pps, which depends on _srs_clock _srs_pps_ids = new SrsPps(); diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index e8f281519e..28f7662767 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -81,4 +81,103 @@ class impl_SrsAutoFree } }; +// Shared ptr smart pointer, see https://github.com/ossrs/srs/discussions/3667#discussioncomment-8969107 +// Usage: +// SrsSharedPtr ptr(new MyClass()); +// ptr->do_something(); +// +// SrsSharedPtr cp = ptr; +// cp->do_something(); +template +class SrsSharedPtr +{ +private: + // The pointer to the object. + T* ptr_; + // The reference count of the object. + uint32_t* ref_count_; +public: + // Create a shared ptr with the object. + SrsSharedPtr(T* ptr) { + ptr_ = ptr; + ref_count_ = new uint32_t(1); + } + // Copy the shared ptr. + SrsSharedPtr(const SrsSharedPtr& cp) { + copy(cp); + } + // Dispose and delete the shared ptr. + virtual ~SrsSharedPtr() { + reset(); + } +private: + // Reset the shared ptr. + void reset() { + if (!ref_count_) return; + + (*ref_count_)--; + if (*ref_count_ == 0) { + delete ptr_; + delete ref_count_; + } + + ptr_ = NULL; + ref_count_ = NULL; + } + // Copy from other shared ptr. + void copy(const SrsSharedPtr& cp) { + ptr_ = cp.ptr_; + ref_count_ = cp.ref_count_; + if (ref_count_) (*ref_count_)++; + } + // Move from other shared ptr. + void move(SrsSharedPtr& cp) { + ptr_ = cp.ptr_; + ref_count_ = cp.ref_count_; + cp.ptr_ = NULL; + cp.ref_count_ = NULL; + } +public: + // Get the object. + T* get() { + return ptr_; + } + // Overload the -> operator. + T* operator->() { + return ptr_; + } + // The assign operator. + SrsSharedPtr& operator=(const SrsSharedPtr& cp) { + if (this != &cp) { + reset(); + copy(cp); + } + return *this; + } +private: + // Overload the * operator. + T& operator*() { + return *ptr_; + } + // Overload the bool operator. + operator bool() const { + return ptr_ != NULL; + } +#if __cplusplus >= 201103L // C++11 +public: + // The move constructor. + SrsSharedPtr(SrsSharedPtr&& cp) { + move(cp); + }; + // The move assign operator. + SrsSharedPtr& operator=(SrsSharedPtr&& cp) { + if (this != &cp) { + reset(); + move(cp); + } + return *this; + }; +#endif +}; + #endif diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index d12f389b62..e8bac6484f 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 125 +#define VERSION_REVISION 126 #endif diff --git a/trunk/src/protocol/srs_protocol_conn.cpp b/trunk/src/protocol/srs_protocol_conn.cpp index 0c988b9b60..b0fba2dab8 100644 --- a/trunk/src/protocol/srs_protocol_conn.cpp +++ b/trunk/src/protocol/srs_protocol_conn.cpp @@ -40,35 +40,3 @@ ISrsConnection::~ISrsConnection() { } -SrsLazyObject::SrsLazyObject() -{ - gc_ref_ = 0; -} - -SrsLazyObject::~SrsLazyObject() -{ -} - -void SrsLazyObject::gc_use() -{ - gc_ref_++; -} - -void SrsLazyObject::gc_dispose() -{ - gc_ref_--; -} - -int32_t SrsLazyObject::gc_ref() -{ - return gc_ref_; -} - -ISrsLazyGc::ISrsLazyGc() -{ -} - -ISrsLazyGc::~ISrsLazyGc() -{ -} - diff --git a/trunk/src/protocol/srs_protocol_conn.hpp b/trunk/src/protocol/srs_protocol_conn.hpp index b136716adf..ee17aed56e 100644 --- a/trunk/src/protocol/srs_protocol_conn.hpp +++ b/trunk/src/protocol/srs_protocol_conn.hpp @@ -33,7 +33,9 @@ class ISrsResourceManager ISrsResourceManager(); virtual ~ISrsResourceManager(); public: - // Remove then free the specified connection. + // Remove then free the specified connection. Note that the manager always free c resource, + // in the same coroutine or another coroutine. Some manager may support add c to a map, it + // should always free it even if it's in the map. virtual void remove(ISrsResource* c) = 0; }; @@ -48,36 +50,5 @@ class ISrsConnection : public ISrsResource virtual std::string remote_ip() = 0; }; -// Lazy-sweep resource, never sweep util all wrappers are freed. -// See https://github.com/ossrs/srs/issues/3176#lazy-sweep -class SrsLazyObject -{ -private: - // The reference count of resource, 0 is no wrapper and safe to sweep. - int32_t gc_ref_; -public: - SrsLazyObject(); - virtual ~SrsLazyObject(); -public: - // For wrapper to use this resource. - virtual void gc_use(); - // For wrapper to dispose this resource. - virtual void gc_dispose(); - // The current reference count of resource. - virtual int32_t gc_ref(); -}; - -// The lazy-sweep GC, wait for a long time to dispose resource even when resource is disposable. -// See https://github.com/ossrs/srs/issues/3176#lazy-sweep -class ISrsLazyGc -{ -public: - ISrsLazyGc(); - virtual ~ISrsLazyGc(); -public: - // Remove then free the specified resource. - virtual void remove(SrsLazyObject* c) = 0; -}; - #endif diff --git a/trunk/src/utest/srs_utest_core.cpp b/trunk/src/utest/srs_utest_core.cpp index 1b399fd771..2344e911ef 100644 --- a/trunk/src/utest/srs_utest_core.cpp +++ b/trunk/src/utest/srs_utest_core.cpp @@ -8,6 +8,8 @@ using namespace std; #include +#include +#include VOID TEST(CoreAutoFreeTest, Free) { @@ -86,3 +88,343 @@ VOID TEST(CoreLogger, CheckVsnprintf) } } +VOID TEST(CoreLogger, SharedPtrTypical) +{ + if (true) { + SrsSharedPtr p(new int(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, *p); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(p); + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + EXPECT_TRUE(p); + EXPECT_TRUE(q); + EXPECT_EQ(100, *p); + EXPECT_EQ(100, *q); + } +} + +VOID TEST(CoreLogger, SharedPtrReset) +{ + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + p.reset(); + EXPECT_FALSE(p); + EXPECT_TRUE(q); + EXPECT_EQ(100, *q); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = p; + q.reset(); + EXPECT_TRUE(p); + EXPECT_FALSE(q); + EXPECT_EQ(100, *p); + } +} + +VOID TEST(CoreLogger, SharedPtrObject) +{ + SrsSharedPtr p(new MyNormalObject(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, p->id()); +} + +VOID TEST(CoreLogger, SharedPtrNullptr) +{ + SrsSharedPtr p(NULL); + EXPECT_FALSE(p); + + p.reset(); + EXPECT_FALSE(p); + + SrsSharedPtr q = p; + EXPECT_FALSE(q); +} + +class MockWrapper +{ +public: + int* ptr; +public: + MockWrapper(int* p) { + ptr = p; + *ptr = *ptr + 1; + } + ~MockWrapper() { + *ptr = *ptr - 1; + } +}; + +VOID TEST(CoreLogger, SharedPtrWrapper) +{ + int* ptr = new int(100); + SrsAutoFree(int, ptr); + EXPECT_EQ(100, *ptr); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q = p; + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + EXPECT_EQ(101, *q->ptr); + + SrsSharedPtr r(new MockWrapper(ptr)); + EXPECT_EQ(102, *ptr); + EXPECT_EQ(102, *p->ptr); + EXPECT_EQ(102, *q->ptr); + EXPECT_EQ(102, *r->ptr); + + SrsSharedPtr s(new MockWrapper(ptr)); + EXPECT_EQ(103, *ptr); + EXPECT_EQ(103, *p->ptr); + EXPECT_EQ(103, *q->ptr); + EXPECT_EQ(103, *r->ptr); + EXPECT_EQ(103, *s->ptr); + } + EXPECT_EQ(100, *ptr); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + } + EXPECT_EQ(100, *ptr); +} + +VOID TEST(CoreLogger, SharedPtrAssign) +{ + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(NULL); + q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(new int(101)); + + int* q0 = q.get(); + q = p; + EXPECT_EQ(p.get(), q.get()); + EXPECT_NE(q0, q.get()); + } + + int* ptr0 = new int(100); + SrsAutoFree(int, ptr0); + EXPECT_EQ(100, *ptr0); + + int* ptr1 = new int(200); + SrsAutoFree(int, ptr1); + EXPECT_EQ(200, *ptr1); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr0)); + EXPECT_EQ(101, *ptr0); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q(new MockWrapper(ptr1)); + EXPECT_EQ(201, *ptr1); + EXPECT_EQ(201, *q->ptr); + + q = p; + EXPECT_EQ(200, *ptr1); + EXPECT_EQ(101, *ptr0); + EXPECT_EQ(101, *p->ptr); + EXPECT_EQ(101, *q->ptr); + } + + EXPECT_EQ(100, *ptr0); + EXPECT_EQ(200, *ptr1); +} + +template +SrsSharedPtr mock_shared_ptr_move_assign(SrsSharedPtr p) { + SrsSharedPtr q = p; + return q; +} + +template +SrsSharedPtr mock_shared_ptr_move_ctr(SrsSharedPtr p) { + return p; +} + +VOID TEST(CoreLogger, SharedPtrMove) +{ + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(new int(101)); + q = mock_shared_ptr_move_ctr(p); + EXPECT_EQ(q.get(), p.get()); + } + + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q(new int(101)); + q = mock_shared_ptr_move_assign(p); + EXPECT_EQ(q.get(), p.get()); + } + + int* ptr = new int(100); + SrsAutoFree(int, ptr); + EXPECT_EQ(100, *ptr); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q(new MockWrapper(ptr)); + q = mock_shared_ptr_move_ctr(p); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *q->ptr); + } + EXPECT_EQ(100, *ptr); + + if (true) { + SrsSharedPtr p(new MockWrapper(ptr)); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *p->ptr); + + SrsSharedPtr q(new MockWrapper(ptr)); + q = mock_shared_ptr_move_assign(p); + EXPECT_EQ(101, *ptr); + EXPECT_EQ(101, *q->ptr); + } + EXPECT_EQ(100, *ptr); + + // Note that this will not trigger the move constructor or move assignment operator. + if (true) { + SrsSharedPtr p(new int(100)); + SrsSharedPtr q = mock_shared_ptr_move_assign(p); + EXPECT_EQ(q.get(), p.get()); + } + + // Note that this will not trigger the move constructor or move assignment operator. + if (true) { + SrsSharedPtr p = SrsSharedPtr(new int(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, *p); + } +} + +class MockIntResource : public ISrsResource +{ +public: + SrsContextId id_; + int value_; +public: + MockIntResource(int value) : value_(value) { + } + virtual ~MockIntResource() { + } +public: + virtual const SrsContextId& get_id() { + return id_; + } + virtual std::string desc() { + return id_.c_str(); + } +}; + +VOID TEST(CoreLogger, SharedResourceTypical) +{ + if (true) { + SrsSharedResource* p = new SrsSharedResource(new MockIntResource(100)); + EXPECT_TRUE(*p); + EXPECT_EQ(100, (*p)->value_); + srs_freep(p); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, p->value_); + } + + if (true) { + SrsSharedResource p = SrsSharedResource(new MockIntResource(100)); + EXPECT_TRUE(p); + EXPECT_EQ(100, p->value_); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(NULL); + q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(new MockIntResource(200)); + q = p; + EXPECT_EQ(p.get(), q.get()); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q = p; + EXPECT_TRUE(p); + EXPECT_TRUE(q); + EXPECT_EQ(100, p->value_); + EXPECT_EQ(100, q->value_); + } +} + +template +SrsSharedResource mock_shared_resource_move_assign(SrsSharedResource p) { + SrsSharedResource q = p; + return q; +} + +template +SrsSharedResource mock_shared_resource_move_ctr(SrsSharedResource p) { + return p; +} + +VOID TEST(CoreLogger, SharedResourceMove) +{ + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(new MockIntResource(101)); + q = mock_shared_resource_move_ctr(p); + EXPECT_EQ(100, q->value_); + EXPECT_EQ(q.get(), p.get()); + } + + if (true) { + SrsSharedResource p(new MockIntResource(100)); + SrsSharedResource q(new MockIntResource(101)); + q = mock_shared_resource_move_assign(p); + EXPECT_EQ(100, q->value_); + EXPECT_EQ(q.get(), p.get()); + } +} + diff --git a/trunk/src/utest/srs_utest_core.hpp b/trunk/src/utest/srs_utest_core.hpp index 1c7795b977..8c7306384d 100644 --- a/trunk/src/utest/srs_utest_core.hpp +++ b/trunk/src/utest/srs_utest_core.hpp @@ -14,5 +14,18 @@ #include +class MyNormalObject +{ +private: + int id_; +public: + MyNormalObject(int id) { + id_ = id; + } + int id() { + return id_; + } +}; + #endif