From 9dba99a1cc6a9c70be17991a460bb43f7e3c252e Mon Sep 17 00:00:00 2001 From: Winlin Date: Fri, 14 Jun 2024 08:07:26 +0800 Subject: [PATCH] SmartPtr: Support shared ptr for RTC source. v6.0.128 (#4085) --------- Co-authored-by: Haibo Chen <495810242@qq.com> --- .github/workflows/test.yml | 2 +- .../3rdparty/srs-bench/blackbox/hevc_test.go | 4 +- trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_rtc_api.cpp | 4 +- trunk/src/app/srs_app_rtc_conn.cpp | 43 +++++----- trunk/src/app/srs_app_rtc_conn.hpp | 5 +- trunk/src/app/srs_app_rtc_server.cpp | 4 +- trunk/src/app/srs_app_rtc_source.cpp | 79 ++++++++++++------- trunk/src/app/srs_app_rtc_source.hpp | 22 +++--- trunk/src/app/srs_app_rtmp_conn.cpp | 6 +- trunk/src/app/srs_app_srt_conn.cpp | 8 +- trunk/src/app/srs_app_srt_source.cpp | 12 +-- trunk/src/app/srs_app_srt_source.hpp | 6 +- trunk/src/app/srs_app_stream_bridge.cpp | 2 +- trunk/src/app/srs_app_stream_bridge.hpp | 5 +- trunk/src/core/srs_core_version6.hpp | 2 +- trunk/src/utest/srs_utest_rtc.cpp | 14 ++-- 17 files changed, 120 insertions(+), 99 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 809823152b..b316891f67 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -190,7 +190,7 @@ jobs: docker run --rm -w /srs/trunk/3rdparty/srs-bench srs:test \ ./objs/srs_blackbox_test -test.v -test.run '^TestFast' -test.parallel 64 docker run --rm -w /srs/trunk/3rdparty/srs-bench srs:test \ - ./objs/srs_blackbox_test -test.v -test.run '^TestSlow' -test.parallel 4 + ./objs/srs_blackbox_test -test.v -test.run '^TestSlow' -test.parallel 1 # For utest - name: Run SRS utest run: docker run --rm srs:test ./objs/srs_utest diff --git a/trunk/3rdparty/srs-bench/blackbox/hevc_test.go b/trunk/3rdparty/srs-bench/blackbox/hevc_test.go index 6a98f9a500..4c6571fc77 100644 --- a/trunk/3rdparty/srs-bench/blackbox/hevc_test.go +++ b/trunk/3rdparty/srs-bench/blackbox/hevc_test.go @@ -930,8 +930,8 @@ func TestSlow_SrtPublish_HttpTsPlay_HEVC_Basic(t *testing.T) { } // Note that HLS score is low, so we only check duration. - if dv := m.Duration(); dv < duration { - r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str) + if dv := m.Duration(); dv < duration / 2 { + r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration / 2, m.String(), str) } if v := m.Video(); v == nil { diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 437e4d913c..5885d58d6c 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-06-14, Merge [#4085](https://github.com/ossrs/srs/pull/4085): SmartPtr: Support shared ptr for RTC source. v6.0.128 (#4085) * v6.0, 2024-06-13, Merge [#4083](https://github.com/ossrs/srs/pull/4083): SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (#4083) * v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080) * v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057) diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index 46512d8517..393f3869f3 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -218,8 +218,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa // Whether RTC stream is active. bool is_rtc_stream_active = false; if (true) { - SrsRtcSource* source = _srs_rtc_sources->fetch(ruc->req_); - is_rtc_stream_active = (source && !source->can_publish()); + SrsSharedPtr source = _srs_rtc_sources->fetch(ruc->req_); + is_rtc_stream_active = (source.get() && !source->can_publish()); } // For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728 diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 0b3e9c0d41..1ef65a8caf 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -415,13 +415,12 @@ std::string SrsRtcAsyncCallOnStop::to_string() return std::string(""); } -SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) +SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) : source_(new SrsRtcSource()) { cid_ = cid; trd_ = NULL; req_ = NULL; - source_ = NULL; is_started = false; session_ = s; @@ -485,7 +484,7 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::mapfetch_or_create(req_, &source_)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } @@ -642,11 +641,12 @@ srs_error_t SrsRtcPlayStream::cycle() { srs_error_t err = srs_success; - SrsRtcSource* source = source_; + SrsSharedPtr& source = source_; + srs_assert(source.get()); SrsRtcConsumer* consumer = NULL; SrsAutoFree(SrsRtcConsumer, consumer); - if ((err = source->create_consumer(consumer)) != srs_success) { + if ((err = source->create_consumer(source_, consumer)) != srs_success) { return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str()); } @@ -933,9 +933,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci { srs_error_t err = srs_success; - // The source MUST exists, when PLI thread is running. - srs_assert(source_); - ISrsRtcPublishStream* publisher = source_->publish_stream(); if (!publisher) { return err; @@ -1076,7 +1073,7 @@ std::string SrsRtcAsyncCallOnUnpublish::to_string() return std::string(""); } -SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) +SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) : source_(new SrsRtcSource()) { cid_ = cid; is_started = false; @@ -1086,7 +1083,6 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon twcc_epp_ = new SrsErrorPithyPrint(3.0); req_ = NULL; - source = NULL; nn_simulate_nack_drop = 0; nack_enabled_ = false; nack_no_copy_ = false; @@ -1113,11 +1109,8 @@ SrsRtcPublishStream::~SrsRtcPublishStream() srs_freep(timer_rtcp_); srs_freep(timer_twcc_); - // TODO: FIXME: Should remove and delete source. - if (source) { - source->set_publish_stream(NULL); - source->on_unpublish(); - } + source_->set_publish_stream(NULL); + source_->on_unpublish(); for (int i = 0; i < (int)video_tracks_.size(); ++i) { SrsRtcVideoRecvTrack* track = video_tracks_.at(i); @@ -1203,10 +1196,10 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti } // Setup the publish stream in source to enable PLI as such. - if ((err = _srs_rtc_sources->fetch_or_create(req_, &source)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) { return srs_error_wrap(err, "create source"); } - source->set_publish_stream(this); + source_->set_publish_stream(this); // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? SrsLiveSource *rtmp = _srs_sources->fetch(r); @@ -1250,7 +1243,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti return srs_error_wrap(err, "create bridge"); } - source->set_bridge(bridge); + source_->set_bridge(bridge); } #endif @@ -1265,7 +1258,7 @@ srs_error_t SrsRtcPublishStream::start() return err; } - if ((err = source->on_publish()) != srs_success) { + if ((err = source_->on_publish()) != srs_success) { return srs_error_wrap(err, "on publish"); } @@ -1447,12 +1440,12 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); if (audio_track) { pkt->frame_type = SrsFrameTypeAudio; - if ((err = audio_track->on_rtp(source, pkt)) != srs_success) { + if ((err = audio_track->on_rtp(source_, pkt)) != srs_success) { return srs_error_wrap(err, "on audio"); } } else if (video_track) { pkt->frame_type = SrsFrameTypeVideo; - if ((err = video_track->on_rtp(source, pkt)) != srs_success) { + if ((err = video_track->on_rtp(source_, pkt)) != srs_success) { return srs_error_wrap(err, "on video"); } } else { @@ -1956,8 +1949,8 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRtcUserConfig* ruc, SrsSdp& local return srs_error_wrap(err, "generate local sdp"); } - SrsRtcSource* source = NULL; - if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + SrsSharedPtr source; + if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -3056,8 +3049,8 @@ srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRtcUserConfig* ruc, s // TODO: FIME: Should check packetization-mode=1 also. bool has_42e01f = srs_sdp_has_h264_profile(remote_sdp, "42e01f"); - SrsRtcSource* source = NULL; - if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + SrsSharedPtr source; + if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) { return srs_error_wrap(err, "fetch rtc source"); } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 815547d2c9..9a8f2aaca8 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -217,7 +218,7 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler SrsRtcPLIWorker* pli_worker_; private: SrsRequest* req_; - SrsRtcSource* source_; + SrsSharedPtr source_; // key: publish_ssrc, value: send track to process rtp/rtcp std::map audio_tracks_; std::map video_tracks_; @@ -343,7 +344,7 @@ class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler SrsErrorPithyPrint* pli_epp; private: SrsRequest* req_; - SrsRtcSource* source; + SrsSharedPtr source_; // Simulators. int nn_simulate_nack_drop; private: diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 37f018be60..aec862f817 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -502,8 +502,8 @@ srs_error_t SrsRtcServer::create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sd SrsRequest* req = ruc->req_; - SrsRtcSource* source = NULL; - if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { + SrsSharedPtr source; + if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) { return srs_error_wrap(err, "create source"); } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 9e70ea7a69..c8abd37981 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -154,9 +154,9 @@ ISrsRtcSourceChangeCallback::~ISrsRtcSourceChangeCallback() { } -SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s) +SrsRtcConsumer::SrsRtcConsumer(SrsSharedPtr s) { - source = s; + source_ = s; should_update_source_id = false; handler_ = NULL; @@ -167,7 +167,7 @@ SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s) SrsRtcConsumer::~SrsRtcConsumer() { - source->on_consumer_destroy(this); + source_->on_consumer_destroy(this); vector::iterator it; for (it = queue.begin(); it != queue.end(); ++it) { @@ -205,7 +205,7 @@ srs_error_t SrsRtcConsumer::dump_packet(SrsRtpPacket** ppkt) srs_error_t err = srs_success; if (should_update_source_id) { - srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str()); + srs_trace("update source_id=%s/%s", source_->source_id().c_str(), source_->pre_source_id().c_str()); should_update_source_id = false; } @@ -251,7 +251,7 @@ SrsRtcSourceManager::~SrsRtcSourceManager() srs_mutex_destroy(lock); } -srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps) +srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr& pps) { srs_error_t err = srs_success; @@ -259,50 +259,65 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** p // @bug https://github.com/ossrs/srs/issues/1230 SrsLocker(lock); - SrsRtcSource* source = NULL; - if ((source = fetch(r)) != NULL) { + string stream_url = r->get_stream_url(); + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); + + if (it != pool.end()) { + SrsSharedPtr source = it->second; + // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. source->update_auth(r); - *pps = source; + pps = source; + return err; } - string stream_url = r->get_stream_url(); - string vhost = r->vhost; - - // should always not exists for create a source. - srs_assert (pool.find(stream_url) == pool.end()); - + SrsSharedPtr source = SrsSharedPtr(new SrsRtcSource()); srs_trace("new rtc source, stream_url=%s", stream_url.c_str()); - source = new SrsRtcSource(); if ((err = source->initialize(r)) != srs_success) { return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); } pool[stream_url] = source; - - *pps = source; + pps = source; return err; } -SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r) +SrsSharedPtr SrsRtcSourceManager::fetch(SrsRequest* r) { - SrsRtcSource* source = NULL; + // Use lock to protect coroutine switch. + // @bug https://github.com/ossrs/srs/issues/1230 + SrsLocker(lock); string stream_url = r->get_stream_url(); - if (pool.find(stream_url) == pool.end()) { - return NULL; - } + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); - source = pool[stream_url]; + SrsSharedPtr source; + if (it == pool.end()) { + return source; + } + source = it->second; return source; } +void SrsRtcSourceManager::eliminate(SrsRequest* r) +{ + // Use lock to protect coroutine switch. + // @bug https://github.com/ossrs/srs/issues/1230 + SrsLocker(lock); + + string stream_url = r->get_stream_url(); + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); + if (it != pool.end()) { + pool.erase(it); + } +} + SrsRtcSourceManager* _srs_rtc_sources = NULL; ISrsRtcPublishStream::ISrsRtcPublishStream() @@ -471,11 +486,11 @@ void SrsRtcSource::set_bridge(ISrsStreamBridge* bridge) #endif } -srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) +srs_error_t SrsRtcSource::create_consumer(SrsSharedPtr source, SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; - consumer = new SrsRtcConsumer(this); + consumer = new SrsRtcConsumer(source); consumers.push_back(consumer); // TODO: FIXME: Implements edge cluster. @@ -508,6 +523,11 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer) h->on_consumers_finished(); } } + + // Destroy and cleanup source when no publishers and consumers. + if (!is_created_ && consumers.empty()) { + _srs_rtc_sources->eliminate(req); + } } bool SrsRtcSource::can_publish() @@ -607,6 +627,11 @@ void SrsRtcSource::on_unpublish() SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_close(req); + + // Destroy and cleanup source when no publishers and consumers. + if (!is_created_ && consumers.empty()) { + _srs_rtc_sources->eliminate(req); + } } void SrsRtcSource::subscribe(ISrsRtcSourceEventHandler* h) @@ -2552,7 +2577,7 @@ void SrsRtcAudioRecvTrack::on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer *ppt = SrsRtspPacketPayloadTypeRaw; } -srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt) +srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsSharedPtr& source, SrsRtpPacket* pkt) { srs_error_t err = srs_success; @@ -2611,7 +2636,7 @@ void SrsRtcVideoRecvTrack::on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer } } -srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt) +srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsSharedPtr& source, SrsRtpPacket* pkt) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 75777a38f3..f3b904556b 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -22,6 +22,7 @@ #include #include #include +#include class SrsRequest; class SrsMetaCache; @@ -79,7 +80,7 @@ class ISrsRtcSourceChangeCallback class SrsRtcConsumer { private: - SrsRtcSource* source; + SrsSharedPtr source_; std::vector queue; // when source id changed, notice all consumers bool should_update_source_id; @@ -91,7 +92,7 @@ class SrsRtcConsumer // The callback for stream change event. ISrsRtcSourceChangeCallback* handler_; public: - SrsRtcConsumer(SrsRtcSource* s); + SrsRtcConsumer(SrsSharedPtr s); virtual ~SrsRtcConsumer(); public: // When source id changed, notice client to print. @@ -112,7 +113,7 @@ class SrsRtcSourceManager { private: srs_mutex_t lock; - std::map pool; + std::map< std::string, SrsSharedPtr > pool; public: SrsRtcSourceManager(); virtual ~SrsRtcSourceManager(); @@ -120,10 +121,13 @@ class SrsRtcSourceManager // create source when fetch from cache failed. // @param r the client request. // @param pps the matched source, if success never be NULL. - virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps); + virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSharedPtr& pps); public: // Get the exists source, NULL when not exists. - virtual SrsRtcSource* fetch(SrsRequest* r); + virtual SrsSharedPtr fetch(SrsRequest* r); +public: + // Dispose and destroy the source. + virtual void eliminate(SrsRequest* r); }; // Global singleton instance. @@ -211,7 +215,7 @@ class SrsRtcSource : public ISrsFastTimer public: // Create consumer // @param consumer, output the create consumer. - virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer); + virtual srs_error_t create_consumer(SrsSharedPtr source, SrsRtcConsumer*& consumer); // Dumps packets in cache to consumer. // @param ds, whether dumps the sequence header. // @param dm, whether dumps the metadata. @@ -565,7 +569,7 @@ class SrsRtcRecvTrack // set to NULL, nack nerver copy it but set the pkt to NULL. srs_error_t on_nack(SrsRtpPacket** ppkt); public: - virtual srs_error_t on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt) = 0; + virtual srs_error_t on_rtp(SrsSharedPtr& source, SrsRtpPacket* pkt) = 0; virtual srs_error_t check_send_nacks() = 0; protected: virtual srs_error_t do_check_send_nacks(uint32_t& timeout_nacks); @@ -579,7 +583,7 @@ class SrsRtcAudioRecvTrack : public SrsRtcRecvTrack, public ISrsRtspPacketDecode public: virtual void on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload, SrsRtspPacketPayloadType* ppt); public: - virtual srs_error_t on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt); + virtual srs_error_t on_rtp(SrsSharedPtr& source, SrsRtpPacket* pkt); virtual srs_error_t check_send_nacks(); }; @@ -591,7 +595,7 @@ class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack, public ISrsRtspPacketDecode public: virtual void on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload, SrsRtspPacketPayloadType* ppt); public: - virtual srs_error_t on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt); + virtual srs_error_t on_rtp(SrsSharedPtr& source, SrsRtpPacket* pkt); virtual srs_error_t check_send_nacks(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 6e824c3b0f..58ed3e3caf 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1086,11 +1086,11 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) // Check whether RTC stream is busy. #ifdef SRS_RTC - SrsRtcSource* rtc = NULL; + SrsSharedPtr rtc; bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost); if (rtc_server_enabled && rtc_enabled && !info->edge) { - if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req, rtc)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -1118,7 +1118,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) // Bridge to RTC streaming. #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) - if (rtc && _srs_config->get_rtc_from_rtmp(req->vhost)) { + if (rtc.get() && _srs_config->get_rtc_from_rtmp(req->vhost)) { SrsCompositeBridge* bridge = new SrsCompositeBridge(); bridge->append(new SrsFrameToRtcBridge(rtc)); diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 87897dd1e9..a1a4673381 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -389,12 +389,12 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() // Check whether RTC stream is busy. #ifdef SRS_RTC - SrsRtcSource* rtc = NULL; + SrsSharedPtr rtc; bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(req_->vhost); bool edge = _srs_config->get_vhost_is_edge(req_->vhost); if (rtc_server_enabled && rtc_enabled && ! edge) { - if ((err = _srs_rtc_sources->fetch_or_create(req_, &rtc)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req_, rtc)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -410,7 +410,7 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() bridge->append(new SrsFrameToRtmpBridge(live_source)); #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) - if (rtc && _srs_config->get_rtc_from_rtmp(req_->vhost)) { + if (rtc.get() && _srs_config->get_rtc_from_rtmp(req_->vhost)) { bridge->append(new SrsFrameToRtcBridge(rtc)); } #endif @@ -489,7 +489,7 @@ srs_error_t SrsMpegtsSrtConn::do_playing() SrsSrtConsumer* consumer = NULL; SrsAutoFree(SrsSrtConsumer, consumer); - if ((err = srt_source_->create_consumer(consumer)) != srs_success) { + if ((err = srt_source_->create_consumer(srt_source_, consumer)) != srs_success) { return srs_error_wrap(err, "create consumer, ts source=%s", req_->get_stream_url().c_str()); } srs_assert(consumer); diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index ea915dfb22..a0dfa60204 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -152,9 +152,9 @@ void SrsSrtSourceManager::eliminate(SrsRequest* r) SrsSrtSourceManager* _srs_srt_sources = NULL; -SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource* s) +SrsSrtConsumer::SrsSrtConsumer(SrsSharedPtr s) { - source = s; + source_ = s; should_update_source_id = false; mw_wait = srs_cond_new(); @@ -164,7 +164,7 @@ SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource* s) SrsSrtConsumer::~SrsSrtConsumer() { - source->on_consumer_destroy(this); + source_->on_consumer_destroy(this); vector::iterator it; for (it = queue.begin(); it != queue.end(); ++it) { @@ -202,7 +202,7 @@ srs_error_t SrsSrtConsumer::dump_packet(SrsSrtPacket** ppkt) srs_error_t err = srs_success; if (should_update_source_id) { - srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str()); + srs_trace("update source_id=%s/%s", source_->source_id().c_str(), source_->pre_source_id().c_str()); should_update_source_id = false; } @@ -942,11 +942,11 @@ void SrsSrtSource::set_bridge(ISrsStreamBridge* bridge) frame_builder_ = new SrsSrtFrameBuilder(bridge); } -srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer) +srs_error_t SrsSrtSource::create_consumer(SrsSharedPtr source, SrsSrtConsumer*& consumer) { srs_error_t err = srs_success; - consumer = new SrsSrtConsumer(this); + consumer = new SrsSrtConsumer(source); consumers.push_back(consumer); return err; diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index edbfa45ae7..46d5b8d7e2 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -70,10 +70,10 @@ extern SrsSrtSourceManager* _srs_srt_sources; class SrsSrtConsumer { public: - SrsSrtConsumer(SrsSrtSource* source); + SrsSrtConsumer(SrsSharedPtr source); virtual ~SrsSrtConsumer(); private: - SrsSrtSource* source; + SrsSharedPtr source_; std::vector queue; // when source id changed, notice all consumers bool should_update_source_id; @@ -167,7 +167,7 @@ class SrsSrtSource public: // Create consumer // @param consumer, output the create consumer. - virtual srs_error_t create_consumer(SrsSrtConsumer*& consumer); + virtual srs_error_t create_consumer(SrsSharedPtr source, SrsSrtConsumer*& consumer); // Dumps packets in cache to consumer. virtual srs_error_t consumer_dumps(SrsSrtConsumer* consumer); virtual void on_consumer_destroy(SrsSrtConsumer* consumer); diff --git a/trunk/src/app/srs_app_stream_bridge.cpp b/trunk/src/app/srs_app_stream_bridge.cpp index 543cd91f37..c024974ad8 100644 --- a/trunk/src/app/srs_app_stream_bridge.cpp +++ b/trunk/src/app/srs_app_stream_bridge.cpp @@ -63,7 +63,7 @@ srs_error_t SrsFrameToRtmpBridge::on_frame(SrsSharedPtrMessage* frame) } #ifdef SRS_RTC -SrsFrameToRtcBridge::SrsFrameToRtcBridge(SrsRtcSource* source) +SrsFrameToRtcBridge::SrsFrameToRtcBridge(SrsSharedPtr source) { source_ = source; diff --git a/trunk/src/app/srs_app_stream_bridge.hpp b/trunk/src/app/srs_app_stream_bridge.hpp index bbf42d3295..de9378669d 100644 --- a/trunk/src/app/srs_app_stream_bridge.hpp +++ b/trunk/src/app/srs_app_stream_bridge.hpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -59,13 +60,13 @@ class SrsFrameToRtmpBridge : public ISrsStreamBridge class SrsFrameToRtcBridge : public ISrsStreamBridge { private: - SrsRtcSource* source_; + SrsSharedPtr source_; private: #if defined(SRS_FFMPEG_FIT) SrsRtcRtpBuilder* rtp_builder_; #endif public: - SrsFrameToRtcBridge(SrsRtcSource* source); + SrsFrameToRtcBridge(SrsSharedPtr source); virtual ~SrsFrameToRtcBridge(); public: virtual srs_error_t initialize(SrsRequest* r); diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index 8232d45f71..30399a8b48 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 127 +#define VERSION_REVISION 128 #endif diff --git a/trunk/src/utest/srs_utest_rtc.cpp b/trunk/src/utest/srs_utest_rtc.cpp index e7e3dd1542..38f587c73b 100644 --- a/trunk/src/utest/srs_utest_rtc.cpp +++ b/trunk/src/utest/srs_utest_rtc.cpp @@ -968,9 +968,8 @@ VOID TEST(KernelRTCTest, SyncTimestampBySenderReportNormal) publish.set_all_tracks_status(true); - SrsRtcSource* rtc_source = new SrsRtcSource(); - SrsAutoFree(SrsRtcSource, rtc_source); - + SrsSharedPtr rtc_source(new SrsRtcSource()); + srand(time(NULL)); if (true) @@ -1036,8 +1035,7 @@ VOID TEST(KernelRTCTest, SyncTimestampBySenderReportOutOfOrder) publish.set_all_tracks_status(true); - SrsRtcSource* rtc_source = new SrsRtcSource(); - SrsAutoFree(SrsRtcSource, rtc_source); + SrsSharedPtr rtc_source(new SrsRtcSource()); srand(time(NULL)); @@ -1109,8 +1107,7 @@ VOID TEST(KernelRTCTest, SyncTimestampBySenderReportConsecutive) publish.set_all_tracks_status(true); - SrsRtcSource* rtc_source = new SrsRtcSource(); - SrsAutoFree(SrsRtcSource, rtc_source); + SrsSharedPtr rtc_source(new SrsRtcSource()); srand(time(NULL)); @@ -1215,8 +1212,7 @@ VOID TEST(KernelRTCTest, SyncTimestampBySenderReportDuplicated) publish.set_all_tracks_status(true); - SrsRtcSource* rtc_source = new SrsRtcSource(); - SrsAutoFree(SrsRtcSource, rtc_source); + SrsSharedPtr rtc_source(new SrsRtcSource()); srand(time(NULL));