diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4786486a8d..5944c5f85b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -416,6 +416,8 @@ jobs: tag: ${{ github.ref }} name: Release ${{ env.SRS_TAG }} body: | + If you would like to support SRS, please consider contributing to our [OpenCollective](https://opencollective.com/srs-server). + [${{ github.sha }}](https://github.com/ossrs/srs/commit/${{ github.sha }}) ${{ github.event.head_commit.message }} diff --git a/trunk/conf/http.api.auth.conf b/trunk/conf/http.api.auth.conf new file mode 100644 index 0000000000..6ba7dee688 --- /dev/null +++ b/trunk/conf/http.api.auth.conf @@ -0,0 +1,21 @@ + +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; +http_api { + enabled on; + listen 1985; + auth { + enabled on; + username admin; + password admin; + } +} +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} +vhost __defaultVhost__ { +} diff --git a/trunk/configure b/trunk/configure index 29b10e998d..d35e13b802 100755 --- a/trunk/configure +++ b/trunk/configure @@ -294,7 +294,7 @@ if [[ $SRS_FFMPEG_FIT == YES ]]; then fi MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_source" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http_stream" - "srs_app_st" "srs_app_log" "srs_app_config" + "srs_app_st" "srs_app_log" "srs_app_config" "srs_app_stream_bridge" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index bebc38b065..5f21d1450e 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,9 +8,10 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v5.0, 2023-04-01, Merge [#3392](https://github.com/ossrs/srs/pull/3392): Support composited bridges for 1:N protocols converting. v6.0.41 (#3392) * v5.0, 2023-04-01, Merge [#3458](https://github.com/ossrs/srs/pull/3450): API: Support HTTP basic authentication for API. v6.0.40 (#3458) * v6.0, 2023-03-27, Merge [#3450](https://github.com/ossrs/srs/pull/3450): WebRTC: Error message carries the SDP when failed. v6.0.39 (#3450) -* v6.0, 2023-03-25, Merge [#3477](https://github.com/ossrs/srs/pull/3477): Remove unneccessary NULL check in srs_freep. v6.0.38 (#3477) +* v6.0, 2023-03-25, Merge [#3477](https://github.com/ossrs/srs/pull/3477): Remove unnecessary NULL check in srs_freep. v6.0.38 (#3477) * v6.0, 2023-03-25, Merge [#3455](https://github.com/ossrs/srs/pull/3455): RTC: Call on_play before create session, for it might be freed for timeout. v6.0.37 (#3455) * v6.0, 2023-03-22, Merge [#3427](https://github.com/ossrs/srs/pull/3427): WHIP: Support DELETE resource for Larix Broadcaster. v6.0.36 (#3427) * v6.0, 2023-03-20, Merge [#3460](https://github.com/ossrs/srs/pull/3460): WebRTC: Support WHIP/WHEP players. v6.0.35 (#3460) diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 87e9b170d6..469c459254 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -692,6 +692,7 @@ class SrsConfig private: SrsConfDirective* get_srt(std::string vhost); public: + // TODO: FIXME: Rename to get_vhost_srt_enabled. bool get_srt_enabled(std::string vhost); bool get_srt_to_rtmp(std::string vhost); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e124c9cea1..3ca1ac1640 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -48,6 +48,7 @@ using namespace std; #include #include #include +#include SrsPps* _srs_pps_sstuns = NULL; SrsPps* _srs_pps_srtcps = NULL; @@ -1185,6 +1186,22 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); } + // Check whether SRT stream is busy. +#ifdef SRS_SRT + SrsSrtSource* srt = NULL; + bool srt_server_enabled = _srs_config->get_srt_enabled(); + bool srt_enabled = _srs_config->get_srt_enabled(r->vhost); + if (srt_server_enabled && srt_enabled) { + if ((err = _srs_srt_sources->fetch_or_create(r, &srt)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (!srt->can_publish()) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "srt stream %s busy", r->get_stream_url().c_str()); + } + } +#endif + // Bridge to rtmp #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost); @@ -1197,7 +1214,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti // especially for stream merging. rtmp->set_cache(false); - SrsRtmpFromRtcBridge *bridge = new SrsRtmpFromRtcBridge(rtmp); + SrsCompositeBridge* bridge = new SrsCompositeBridge(); + bridge->append(new SrsFrameToRtmpBridge(rtmp)); + if ((err = bridge->initialize(r)) != srs_success) { srs_freep(bridge); return srs_error_wrap(err, "create bridge"); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 3e023bf9e9..db0c2b02fa 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -52,27 +52,23 @@ SrsPps* _srs_pps_rmnack = NULL; extern SrsPps* _srs_pps_aloss2; -// Firefox defaults as 109, Chrome is 111. -const int kAudioPayloadType = 111; const int kAudioChannel = 2; const int kAudioSamplerate = 48000; -// Firefox defaults as 126, Chrome is 102. -const int kVideoPayloadType = 102; const int kVideoSamplerate = 90000; +using namespace std; + // The RTP payload max size, reserved some paddings for SRTP as such: // kRtpPacketSize = kRtpMaxPayloadSize + paddings // For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, // which reserves 100 bytes for SRTP or paddings. -// otherwise, the kRtpPacketSize must less than MTU, in webrtc source code, +// otherwise, the kRtpPacketSize must less than MTU, in webrtc source code, // the rtp max size is assigned by kVideoMtu = 1200. // so we set kRtpMaxPayloadSize = 1200. // see @doc https://groups.google.com/g/discuss-webrtc/c/gH5ysR3SoZI const int kRtpMaxPayloadSize = kRtpPacketSize - 300; -using namespace std; - // TODO: Add this function into SrsRtpMux class. srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf) { @@ -323,14 +319,6 @@ ISrsRtcSourceEventHandler::~ISrsRtcSourceEventHandler() { } -ISrsRtcSourceBridge::ISrsRtcSourceBridge() -{ -} - -ISrsRtcSourceBridge::~ISrsRtcSourceBridge() -{ -} - SrsRtcSource::SrsRtcSource() { is_created_ = false; @@ -341,6 +329,9 @@ SrsRtcSource::SrsRtcSource() req = NULL; bridge_ = NULL; +#ifdef SRS_FFMPEG_FIT + frame_builder_ = NULL; +#endif pli_for_rtmp_ = pli_elapsed_ = 0; } @@ -351,6 +342,9 @@ SrsRtcSource::~SrsRtcSource() // for all consumers are auto free. consumers.clear(); +#ifdef SRS_FFMPEG_FIT + srs_freep(frame_builder_); +#endif srs_freep(bridge_); srs_freep(req); srs_freep(stream_desc_); @@ -464,10 +458,15 @@ SrsContextId SrsRtcSource::pre_source_id() return _pre_source_id; } -void SrsRtcSource::set_bridge(ISrsRtcSourceBridge *bridge) +void SrsRtcSource::set_bridge(ISrsStreamBridge* bridge) { srs_freep(bridge_); bridge_ = bridge; + +#ifdef SRS_FFMPEG_FIT + srs_freep(frame_builder_); + frame_builder_ = new SrsRtcFrameBuilder(bridge); +#endif } srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) @@ -541,6 +540,16 @@ srs_error_t SrsRtcSource::on_publish() // If bridge to other source, handle event and start timer to request PLI. if (bridge_) { +#ifdef SRS_FFMPEG_FIT + if ((err = frame_builder_->initialize(req)) != srs_success) { + return srs_error_wrap(err, "frame builder initialize"); + } + + if ((err = frame_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "frame builder on publish"); + } +#endif + if ((err = bridge_->on_publish()) != srs_success) { return srs_error_wrap(err, "bridge on publish"); } @@ -585,6 +594,11 @@ void SrsRtcSource::on_unpublish() // For SrsRtcSource::on_timer() _srs_hybrid->timer100ms()->unsubscribe(this); +#ifdef SRS_FFMPEG_FIT + frame_builder_->on_unpublish(); + srs_freep(frame_builder_); +#endif + bridge_->on_unpublish(); srs_freep(bridge_); } @@ -636,9 +650,11 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket* pkt) } } - if (bridge_ && (err = bridge_->on_rtp(pkt)) != srs_success) { - return srs_error_wrap(err, "bridge consume message"); +#ifdef SRS_FFMPEG_FIT + if (frame_builder_ && (err = frame_builder_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "frame builder consume packet"); } +#endif return err; } @@ -712,85 +728,56 @@ srs_error_t SrsRtcSource::on_timer(srs_utime_t interval) #ifdef SRS_FFMPEG_FIT -SrsRtcFromRtmpBridge::SrsRtcFromRtmpBridge(SrsRtcSource* source) +SrsRtcRtpBuilder::SrsRtcRtpBuilder(SrsFrameToRtcBridge* bridge, uint32_t assrc, uint8_t apt, uint32_t vssrc, uint8_t vpt) { req = NULL; - source_ = source; + bridge_ = bridge; format = new SrsRtmpFormat(); codec_ = new SrsAudioTranscoder(); latest_codec_ = SrsAudioCodecIdForbidden; - rtmp_to_rtc = false; keep_bframe = false; merge_nalus = false; meta = new SrsMetaCache(); audio_sequence = 0; video_sequence = 0; - // audio track ssrc - if (true) { - std::vector descs = source->get_track_desc("audio", "opus"); - if (!descs.empty()) { - audio_ssrc = descs.at(0)->ssrc_; - } - // Note we must use the PT of source, see https://github.com/ossrs/srs/pull/3079 - audio_payload_type_ = descs.empty() ? kAudioPayloadType : descs.front()->media_->pt_; - } - - // video track ssrc - if (true) { - std::vector descs = source->get_track_desc("video", "H264"); - if (!descs.empty()) { - video_ssrc = descs.at(0)->ssrc_; - } - // Note we must use the PT of source, see https://github.com/ossrs/srs/pull/3079 - video_payload_type_ = descs.empty() ? kVideoPayloadType : descs.front()->media_->pt_; - } + audio_ssrc_ = assrc; + audio_payload_type_ = apt; + video_ssrc_ = vssrc; + video_payload_type_ = vpt; } -SrsRtcFromRtmpBridge::~SrsRtcFromRtmpBridge() +SrsRtcRtpBuilder::~SrsRtcRtpBuilder() { srs_freep(format); srs_freep(codec_); srs_freep(meta); } -srs_error_t SrsRtcFromRtmpBridge::initialize(SrsRequest* r) +srs_error_t SrsRtcRtpBuilder::initialize(SrsRequest* r) { srs_error_t err = srs_success; req = r; - rtmp_to_rtc = _srs_config->get_rtc_from_rtmp(req->vhost); - if (rtmp_to_rtc) { - if ((err = format->initialize()) != srs_success) { - return srs_error_wrap(err, "format initialize"); - } - - // Setup the SPS/PPS parsing strategy. - format->try_annexb_first = _srs_config->try_annexb_first(r->vhost); + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); } + // Setup the SPS/PPS parsing strategy. + format->try_annexb_first = _srs_config->try_annexb_first(r->vhost); + keep_bframe = _srs_config->get_rtc_keep_bframe(req->vhost); merge_nalus = _srs_config->get_rtc_server_merge_nalus(); - srs_trace("RTC bridge from RTMP, rtmp2rtc=%d, keep_bframe=%d, merge_nalus=%d", - rtmp_to_rtc, keep_bframe, merge_nalus); + srs_trace("RTC bridge from RTMP, keep_bframe=%d, merge_nalus=%d", keep_bframe, merge_nalus); return err; } -srs_error_t SrsRtcFromRtmpBridge::on_publish() +srs_error_t SrsRtcRtpBuilder::on_publish() { srs_error_t err = srs_success; - if (!rtmp_to_rtc) { - return err; - } - - // TODO: FIXME: Should sync with bridge? - if ((err = source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "source publish"); - } - // Reset the metadata cache, to make VLC happy when disable/enable stream. // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 meta->clear(); @@ -798,30 +785,28 @@ srs_error_t SrsRtcFromRtmpBridge::on_publish() return err; } -void SrsRtcFromRtmpBridge::on_unpublish() +void SrsRtcRtpBuilder::on_unpublish() { - if (!rtmp_to_rtc) { - return; - } - // Reset the metadata cache, to make VLC happy when disable/enable stream. // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 meta->update_previous_vsh(); meta->update_previous_ash(); +} - // @remark This bridge might be disposed here, so never use it. - // TODO: FIXME: Should sync with bridge? - source_->on_unpublish(); +srs_error_t SrsRtcRtpBuilder::on_frame(SrsSharedPtrMessage* frame) +{ + if (frame->is_audio()) { + return on_audio(frame); + } else if (frame->is_video()) { + return on_video(frame); + } + return srs_success; } -srs_error_t SrsRtcFromRtmpBridge::on_audio(SrsSharedPtrMessage* msg) +srs_error_t SrsRtcRtpBuilder::on_audio(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; - if (!rtmp_to_rtc) { - return err; - } - // TODO: FIXME: Support parsing OPUS for RTC. if ((err = format->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "format consume audio"); @@ -880,7 +865,7 @@ srs_error_t SrsRtcFromRtmpBridge::on_audio(SrsSharedPtrMessage* msg) return err; } -srs_error_t SrsRtcFromRtmpBridge::init_codec(SrsAudioCodecId codec) +srs_error_t SrsRtcRtpBuilder::init_codec(SrsAudioCodecId codec) { srs_error_t err = srs_success; @@ -902,18 +887,18 @@ srs_error_t SrsRtcFromRtmpBridge::init_codec(SrsAudioCodecId codec) srs_trace("RTMP2RTC: Init audio codec to %d(%s)", codec, srs_audio_codec_id2str(codec).c_str()); } else { srs_trace("RTMP2RTC: Switch audio codec %d(%s) to %d(%s)", latest_codec_, srs_audio_codec_id2str(latest_codec_).c_str(), - codec, srs_audio_codec_id2str(codec).c_str()); + codec, srs_audio_codec_id2str(codec).c_str()); } latest_codec_ = codec; return err; } -srs_error_t SrsRtcFromRtmpBridge::transcode(SrsAudioFrame* audio) +srs_error_t SrsRtcRtpBuilder::transcode(SrsAudioFrame* audio) { srs_error_t err = srs_success; - std::vector out_audios; + std::vector out_audios; if ((err = codec_->transcode(audio, out_audios)) != srs_success) { return srs_error_wrap(err, "recode error"); } @@ -934,7 +919,7 @@ srs_error_t SrsRtcFromRtmpBridge::transcode(SrsAudioFrame* audio) break; } - if ((err = source_->on_rtp(pkt)) != srs_success) { + if ((err = bridge_->on_rtp(pkt)) != srs_success) { err = srs_error_wrap(err, "consume opus"); break; } @@ -945,12 +930,12 @@ srs_error_t SrsRtcFromRtmpBridge::transcode(SrsAudioFrame* audio) return err; } -srs_error_t SrsRtcFromRtmpBridge::package_opus(SrsAudioFrame* audio, SrsRtpPacket* pkt) +srs_error_t SrsRtcRtpBuilder::package_opus(SrsAudioFrame* audio, SrsRtpPacket* pkt) { srs_error_t err = srs_success; pkt->header.set_payload_type(audio_payload_type_); - pkt->header.set_ssrc(audio_ssrc); + pkt->header.set_ssrc(audio_ssrc_); pkt->frame_type = SrsFrameTypeAudio; pkt->header.set_marker(true); pkt->header.set_sequence(audio_sequence++); @@ -966,21 +951,10 @@ srs_error_t SrsRtcFromRtmpBridge::package_opus(SrsAudioFrame* audio, SrsRtpPacke return err; } -srs_error_t SrsRtcFromRtmpBridge::on_video(SrsSharedPtrMessage* msg) +srs_error_t SrsRtcRtpBuilder::on_video(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; - if (!rtmp_to_rtc) { - return err; - } - - // WebRTC NOT support HEVC. -#ifdef SRS_H265 - if (format->vcodec->id == SrsVideoCodecIdHEVC) { - return err; - } -#endif - // cache the sequence header if h264 bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { @@ -997,6 +971,13 @@ srs_error_t SrsRtcFromRtmpBridge::on_video(SrsSharedPtrMessage* msg) return err; } + // WebRTC does NOT support HEVC. +#ifdef SRS_H265 + if (format->vcodec->id == SrsVideoCodecIdHEVC) { + return err; + } +#endif + bool has_idr = false; vector samples; if ((err = filter(msg, format, has_idr, samples)) != srs_success) { @@ -1009,11 +990,11 @@ srs_error_t SrsRtcFromRtmpBridge::on_video(SrsSharedPtrMessage* msg) SrsRtpPacket* pkt = new SrsRtpPacket(); SrsAutoFree(SrsRtpPacket, pkt); - if ((err = package_stap_a(source_, msg, pkt)) != srs_success) { + if ((err = package_stap_a(msg, pkt)) != srs_success) { return srs_error_wrap(err, "package stap-a"); } - if ((err = source_->on_rtp(pkt)) != srs_success) { + if ((err = bridge_->on_rtp(pkt)) != srs_success) { return srs_error_wrap(err, "consume sps/pps"); } } @@ -1054,7 +1035,7 @@ srs_error_t SrsRtcFromRtmpBridge::on_video(SrsSharedPtrMessage* msg) return consume_packets(pkts); } -srs_error_t SrsRtcFromRtmpBridge::filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, vector& samples) +srs_error_t SrsRtcRtpBuilder::filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, vector& samples) { srs_error_t err = srs_success; @@ -1084,7 +1065,7 @@ srs_error_t SrsRtcFromRtmpBridge::filter(SrsSharedPtrMessage* msg, SrsFormat* fo return err; } -srs_error_t SrsRtcFromRtmpBridge::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket* pkt) +srs_error_t SrsRtcRtpBuilder::package_stap_a(SrsSharedPtrMessage* msg, SrsRtpPacket* pkt) { srs_error_t err = srs_success; @@ -1101,7 +1082,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_stap_a(SrsRtcSource* source, SrsShared } pkt->header.set_payload_type(video_payload_type_); - pkt->header.set_ssrc(video_ssrc); + pkt->header.set_ssrc(video_ssrc_); pkt->frame_type = SrsFrameTypeVideo; pkt->nalu_type = (SrsAvcNaluType)kStapA; pkt->header.set_marker(false); @@ -1143,7 +1124,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_stap_a(SrsRtcSource* source, SrsShared return err; } -srs_error_t SrsRtcFromRtmpBridge::package_nalus(SrsSharedPtrMessage* msg, const vector& samples, vector& pkts) +srs_error_t SrsRtcRtpBuilder::package_nalus(SrsSharedPtrMessage* msg, const vector& samples, vector& pkts) { srs_error_t err = srs_success; @@ -1183,7 +1164,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_nalus(SrsSharedPtrMessage* msg, const pkts.push_back(pkt); pkt->header.set_payload_type(video_payload_type_); - pkt->header.set_ssrc(video_ssrc); + pkt->header.set_ssrc(video_ssrc_); pkt->frame_type = SrsFrameTypeVideo; pkt->nalu_type = (SrsAvcNaluType)first_nalu_type; pkt->header.set_sequence(video_sequence++); @@ -1217,7 +1198,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_nalus(SrsSharedPtrMessage* msg, const pkts.push_back(pkt); pkt->header.set_payload_type(video_payload_type_); - pkt->header.set_ssrc(video_ssrc); + pkt->header.set_ssrc(video_ssrc_); pkt->frame_type = SrsFrameTypeVideo; pkt->nalu_type = (SrsAvcNaluType)kFuA; pkt->header.set_sequence(video_sequence++); @@ -1239,7 +1220,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_nalus(SrsSharedPtrMessage* msg, const } // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcFromRtmpBridge::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector& pkts) +srs_error_t SrsRtcRtpBuilder::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector& pkts) { srs_error_t err = srs_success; @@ -1247,7 +1228,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_single_nalu(SrsSharedPtrMessage* msg, pkts.push_back(pkt); pkt->header.set_payload_type(video_payload_type_); - pkt->header.set_ssrc(video_ssrc); + pkt->header.set_ssrc(video_ssrc_); pkt->frame_type = SrsFrameTypeVideo; pkt->header.set_sequence(video_sequence++); pkt->header.set_timestamp(msg->timestamp * 90); @@ -1263,7 +1244,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_single_nalu(SrsSharedPtrMessage* msg, return err; } -srs_error_t SrsRtcFromRtmpBridge::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& pkts) +srs_error_t SrsRtcRtpBuilder::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& pkts) { srs_error_t err = srs_success; @@ -1280,7 +1261,7 @@ srs_error_t SrsRtcFromRtmpBridge::package_fu_a(SrsSharedPtrMessage* msg, SrsSamp pkts.push_back(pkt); pkt->header.set_payload_type(video_payload_type_); - pkt->header.set_ssrc(video_ssrc); + pkt->header.set_ssrc(video_ssrc_); pkt->frame_type = SrsFrameTypeVideo; pkt->header.set_sequence(video_sequence++); pkt->header.set_timestamp(msg->timestamp * 90); @@ -1305,14 +1286,14 @@ srs_error_t SrsRtcFromRtmpBridge::package_fu_a(SrsSharedPtrMessage* msg, SrsSamp return err; } -srs_error_t SrsRtcFromRtmpBridge::consume_packets(vector& pkts) +srs_error_t SrsRtcRtpBuilder::consume_packets(vector& pkts) { srs_error_t err = srs_success; // TODO: FIXME: Consume a range of packets. for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket* pkt = pkts[i]; - if ((err = source_->on_rtp(pkt)) != srs_success) { + if ((err = bridge_->on_rtp(pkt)) != srs_success) { err = srs_error_wrap(err, "consume sps/pps"); break; } @@ -1326,31 +1307,28 @@ srs_error_t SrsRtcFromRtmpBridge::consume_packets(vector& pkts) return err; } -SrsRtmpFromRtcBridge::SrsRtmpFromRtcBridge(SrsLiveSource *src) +SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsStreamBridge* bridge) { - source_ = src; + bridge_ = bridge; + is_first_audio_ = true; codec_ = NULL; - is_first_audio = true; - is_first_video = true; - format = NULL; - rtp_key_frame_ts_ = -1; header_sn_ = 0; memset(cache_video_pkts_, 0, sizeof(cache_video_pkts_)); + rtp_key_frame_ts_ = -1; } -SrsRtmpFromRtcBridge::~SrsRtmpFromRtcBridge() +SrsRtcFrameBuilder::~SrsRtcFrameBuilder() { srs_freep(codec_); - srs_freep(format); clear_cached_video(); } -srs_error_t SrsRtmpFromRtcBridge::initialize(SrsRequest* r) +srs_error_t SrsRtcFrameBuilder::initialize(SrsRequest* r) { srs_error_t err = srs_success; + srs_freep(codec_); codec_ = new SrsAudioTranscoder(); - format = new SrsRtmpFormat(); SrsAudioCodecId from = SrsAudioCodecIdOpus; // TODO: From SDP? SrsAudioCodecId to = SrsAudioCodecIdAAC; // The output audio codec. @@ -1361,32 +1339,21 @@ srs_error_t SrsRtmpFromRtcBridge::initialize(SrsRequest* r) return srs_error_wrap(err, "bridge initialize"); } - if ((err = format->initialize()) != srs_success) { - return srs_error_wrap(err, "format initialize"); - } - - // Setup the SPS/PPS parsing strategy. - format->try_annexb_first = _srs_config->try_annexb_first(r->vhost); - return err; } -srs_error_t SrsRtmpFromRtcBridge::on_publish() +srs_error_t SrsRtcFrameBuilder::on_publish() { - srs_error_t err = srs_success; - - is_first_audio = true; - is_first_video = true; + is_first_audio_ = true; - // TODO: FIXME: Should sync with bridge? - if ((err = source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "source publish"); - } + return srs_success; +} - return err; +void SrsRtcFrameBuilder::on_unpublish() +{ } -srs_error_t SrsRtmpFromRtcBridge::on_rtp(SrsRtpPacket *pkt) +srs_error_t SrsRtcFrameBuilder::on_rtp(SrsRtpPacket *pkt) { srs_error_t err = srs_success; @@ -1394,7 +1361,7 @@ srs_error_t SrsRtmpFromRtcBridge::on_rtp(SrsRtpPacket *pkt) return err; } - // Have no received any sender report, can't calculate avsync_time, + // Have no received any sender report, can't calculate avsync_time, // discard it to avoid timestamp problem in live source if (pkt->get_avsync_time() <= 0) { return err; @@ -1409,35 +1376,34 @@ srs_error_t SrsRtmpFromRtcBridge::on_rtp(SrsRtpPacket *pkt) return err; } -void SrsRtmpFromRtcBridge::on_unpublish() -{ - // TODO: FIXME: Should sync with bridge? - source_->on_unpublish(); -} - -srs_error_t SrsRtmpFromRtcBridge::transcode_audio(SrsRtpPacket *pkt) +srs_error_t SrsRtcFrameBuilder::transcode_audio(SrsRtpPacket *pkt) { srs_error_t err = srs_success; // to common message. uint32_t ts = pkt->get_avsync_time(); - if (is_first_audio) { + if (is_first_audio_) { int header_len = 0; uint8_t* header = NULL; codec_->aac_codec_header(&header, &header_len); SrsCommonMessage out_rtmp; - packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio); + packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio_); - if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + SrsSharedPtrMessage msg; + if ((err = msg.create(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + + if ((err = bridge_->on_frame(&msg)) != srs_success) { return srs_error_wrap(err, "source on audio"); } - is_first_audio = false; + is_first_audio_ = false; } - std::vector out_pkts; - SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); + std::vector out_pkts; + SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); SrsAudioFrame frame; frame.add_sample(payload->payload, payload->nn_payload); @@ -1449,12 +1415,17 @@ srs_error_t SrsRtmpFromRtcBridge::transcode_audio(SrsRtpPacket *pkt) return err; } - for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { SrsCommonMessage out_rtmp; out_rtmp.header.timestamp = (*it)->dts; - packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio); + packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio_); + + SrsSharedPtrMessage msg; + if ((err = msg.create(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "create message"); + } - if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + if ((err = bridge_->on_frame(&msg)) != srs_success) { err = srs_error_wrap(err, "source on audio"); break; } @@ -1464,7 +1435,7 @@ srs_error_t SrsRtmpFromRtcBridge::transcode_audio(SrsRtpPacket *pkt) return err; } -void SrsRtmpFromRtcBridge::packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header) +void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header) { int rtmp_len = len + 2; audio->header.initialize_audio(rtmp_len, pts, 1); @@ -1481,7 +1452,7 @@ void SrsRtmpFromRtcBridge::packet_aac(SrsCommonMessage* audio, char* data, int l audio->size = rtmp_len; } -srs_error_t SrsRtmpFromRtcBridge::packet_video(SrsRtpPacket* src) +srs_error_t SrsRtcFrameBuilder::packet_video(SrsRtpPacket* src) { srs_error_t err = srs_success; @@ -1521,7 +1492,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video(SrsRtpPacket* src) return err; } -srs_error_t SrsRtmpFromRtcBridge::packet_video_key_frame(SrsRtpPacket* pkt) +srs_error_t SrsRtcFrameBuilder::packet_video_key_frame(SrsRtpPacket* pkt) { srs_error_t err = srs_success; @@ -1546,7 +1517,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_key_frame(SrsRtpPacket* pkt) char* flv = NULL; int nb_flv = 0; if ((err = avc->mux_avc2flv(sh, SrsVideoAvcFrameTypeKeyFrame, SrsVideoAvcFrameTraitSequenceHeader, pkt->get_avsync_time(), - pkt->get_avsync_time(), &flv, &nb_flv)) != srs_success) { + pkt->get_avsync_time(), &flv, &nb_flv)) != srs_success) { return srs_error_wrap(err, "avc to flv"); } @@ -1557,7 +1528,12 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_key_frame(SrsRtpPacket* pkt) return srs_error_wrap(err, "create rtmp"); } - if ((err = source_->on_video(&rtmp)) != srs_success) { + SrsSharedPtrMessage msg; + if ((err = msg.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + + if ((err = bridge_->on_frame(&msg)) != srs_success) { return err; } } @@ -1580,7 +1556,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_key_frame(SrsRtpPacket* pkt) lost_sn_ = header_sn_ + 1; clear_cached_video(); srs_warn("drop old ts=%u, header=%hu, lost=%hu, set new ts=%u, header=%hu, lost=%hu", - (uint32_t)old_ts, old_header_sn, old_lost_sn, (uint32_t)rtp_key_frame_ts_, header_sn_, lost_sn_); + (uint32_t)old_ts, old_header_sn, old_lost_sn, (uint32_t)rtp_key_frame_ts_, header_sn_, lost_sn_); } uint16_t index = cache_index(pkt->header.get_sequence()); @@ -1616,7 +1592,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_key_frame(SrsRtpPacket* pkt) return err; } -srs_error_t SrsRtmpFromRtcBridge::packet_video_rtmp(const uint16_t start, const uint16_t end) +srs_error_t SrsRtcFrameBuilder::packet_video_rtmp(const uint16_t start, const uint16_t end) { srs_error_t err = srs_success; @@ -1646,7 +1622,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_rtmp(const uint16_t start, const if (stap_payload) { for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) { SrsSample* sample = stap_payload->nalus.at(j); - if (sample->size > 0) { + if (sample->size > 0) { nb_payload += 4 + sample->size; } } @@ -1664,7 +1640,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_rtmp(const uint16_t start, const srs_warn("empty nalu"); return err; } - + //type_codec1 + avc_type + composition time + nalu size + nalu nb_payload += 1 + 1 + 3; @@ -1725,7 +1701,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_rtmp(const uint16_t start, const if (stap_payload) { for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) { SrsSample* sample = stap_payload->nalus.at(j); - if (sample->size > 0) { + if (sample->size > 0) { payload.write_4bytes(sample->size); payload.write_bytes(sample->bytes, sample->size); } @@ -1745,7 +1721,12 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_rtmp(const uint16_t start, const srs_freep(pkt); } - if ((err = source_->on_video(&rtmp)) != srs_success) { + SrsSharedPtrMessage msg; + if ((err = msg.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + + if ((err = bridge_->on_frame(&msg)) != srs_success) { srs_warn("fail to pack video frame"); } @@ -1765,7 +1746,7 @@ srs_error_t SrsRtmpFromRtcBridge::packet_video_rtmp(const uint16_t start, const return err; } -int32_t SrsRtmpFromRtcBridge::find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn) +int32_t SrsRtcFrameBuilder::find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn) { uint32_t last_rtp_ts = cache_video_pkts_[cache_index(header_sn_)].rtp_ts; for (int i = 0; i < s_cache_size; ++i) { @@ -1791,7 +1772,7 @@ int32_t SrsRtmpFromRtcBridge::find_next_lost_sn(uint16_t current_sn, uint16_t& e return -2; } -void SrsRtmpFromRtcBridge::clear_cached_video() +void SrsRtcFrameBuilder::clear_cached_video() { for (size_t i = 0; i < s_cache_size; i++) { @@ -1805,7 +1786,7 @@ void SrsRtmpFromRtcBridge::clear_cached_video() } } -bool SrsRtmpFromRtcBridge::check_frame_complete(const uint16_t start, const uint16_t end) +bool SrsRtcFrameBuilder::check_frame_complete(const uint16_t start, const uint16_t end) { int16_t cnt = srs_rtp_seq_distance(start, end) + 1; srs_assert(cnt >= 1); @@ -1833,6 +1814,7 @@ bool SrsRtmpFromRtcBridge::check_frame_complete(const uint16_t start, const uint return fu_s_c == fu_e_c; } + #endif SrsCodecPayload::SrsCodecPayload() diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 7f702ff2e2..8e8453a4e4 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -18,8 +18,10 @@ #include #include -#include #include +#include +#include +#include class SrsRequest; class SrsMetaCache; @@ -27,7 +29,7 @@ class SrsSharedPtrMessage; class SrsCommonMessage; class SrsMessageArray; class SrsRtcSource; -class SrsRtcFromRtmpBridge; +class SrsFrameToRtcBridge; class SrsAudioTranscoder; class SrsRtpPacket; class SrsSample; @@ -38,6 +40,13 @@ class SrsRtpRingBuffer; class SrsRtpNackForReceiver; class SrsJsonObject; class SrsErrorPithyPrint; +class SrsRtcFrameBuilder; +class SrsLiveSource; + +// Firefox defaults as 109, Chrome is 111. +const int kAudioPayloadType = 111; +// Firefox defaults as 126, Chrome is 102. +const int kVideoPayloadType = 102; class SrsNtp { @@ -145,18 +154,6 @@ class ISrsRtcSourceEventHandler virtual void on_consumers_finished() = 0; }; -// SrsRtcSource bridge to SrsLiveSource -class ISrsRtcSourceBridge -{ -public: - ISrsRtcSourceBridge(); - virtual ~ISrsRtcSourceBridge(); -public: - virtual srs_error_t on_publish() = 0; - virtual srs_error_t on_rtp(SrsRtpPacket *pkt) = 0; - virtual void on_unpublish() = 0; -}; - // A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. class SrsRtcSource : public ISrsFastTimer { @@ -172,8 +169,13 @@ class SrsRtcSource : public ISrsFastTimer ISrsRtcPublishStream* publish_stream_; // Steam description for this steam. SrsRtcSourceDescription* stream_desc_; +private: +#ifdef SRS_FFMPEG_FIT + // Collect and build WebRTC RTP packets to AV frames. + SrsRtcFrameBuilder* frame_builder_; +#endif // The Source bridge, bridge stream to other source. - ISrsRtcSourceBridge* bridge_; + ISrsStreamBridge* bridge_; private: // To delivery stream to clients. std::vector consumers; @@ -205,7 +207,7 @@ class SrsRtcSource : public ISrsFastTimer virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); public: - void set_bridge(ISrsRtcSourceBridge *bridge); + void set_bridge(ISrsStreamBridge* bridge); public: // Create consumer // @param consumer, output the create consumer. @@ -245,60 +247,64 @@ class SrsRtcSource : public ISrsFastTimer }; #ifdef SRS_FFMPEG_FIT -class SrsRtcFromRtmpBridge : public ISrsLiveSourceBridge + +// Convert AV frame to RTC RTP packets. +class SrsRtcRtpBuilder { private: SrsRequest* req; - SrsRtcSource* source_; + SrsFrameToRtcBridge* bridge_; // The format, codec information. SrsRtmpFormat* format; // The metadata cache. SrsMetaCache* meta; private: - bool rtmp_to_rtc; SrsAudioCodecId latest_codec_; SrsAudioTranscoder* codec_; bool keep_bframe; bool merge_nalus; uint16_t audio_sequence; uint16_t video_sequence; - uint32_t audio_ssrc; - uint32_t video_ssrc; +private: + uint32_t audio_ssrc_; + uint32_t video_ssrc_; uint8_t audio_payload_type_; uint8_t video_payload_type_; public: - SrsRtcFromRtmpBridge(SrsRtcSource* source); - virtual ~SrsRtcFromRtmpBridge(); + SrsRtcRtpBuilder(SrsFrameToRtcBridge* bridge, uint32_t assrc, uint8_t apt, uint32_t vssrc, uint8_t vpt); + virtual ~SrsRtcRtpBuilder(); public: virtual srs_error_t initialize(SrsRequest* r); virtual srs_error_t on_publish(); virtual void on_unpublish(); + virtual srs_error_t on_frame(SrsSharedPtrMessage* frame); +private: virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); private: srs_error_t init_codec(SrsAudioCodecId codec); srs_error_t transcode(SrsAudioFrame* audio); srs_error_t package_opus(SrsAudioFrame* audio, SrsRtpPacket* pkt); -public: +private: virtual srs_error_t on_video(SrsSharedPtrMessage* msg); private: srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format, bool& has_idr, std::vector& samples); - srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket* pkt); + srs_error_t package_stap_a(SrsSharedPtrMessage* msg, SrsRtpPacket* pkt); srs_error_t package_nalus(SrsSharedPtrMessage* msg, const std::vector& samples, std::vector& pkts); srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& pkts); srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& pkts); srs_error_t consume_packets(std::vector& pkts); }; -class SrsRtmpFromRtcBridge : public ISrsRtcSourceBridge +// Collect and build WebRTC RTP packets to AV frames. +class SrsRtcFrameBuilder { private: - SrsLiveSource *source_; + ISrsStreamBridge* bridge_; +private: + bool is_first_audio_; SrsAudioTranscoder *codec_; - bool is_first_audio; - bool is_first_video; - // The format, codec information. - SrsRtmpFormat* format; - +private: + const static uint16_t s_cache_size = 512; //TODO:use SrsRtpRingBuffer //TODO:jitter buffer class struct RtcPacketCache { @@ -308,33 +314,33 @@ class SrsRtmpFromRtcBridge : public ISrsRtcSourceBridge uint32_t rtp_ts; SrsRtpPacket* pkt; }; - const static uint16_t s_cache_size = 512; RtcPacketCache cache_video_pkts_[s_cache_size]; uint16_t header_sn_; uint16_t lost_sn_; int64_t rtp_key_frame_ts_; public: - SrsRtmpFromRtcBridge(SrsLiveSource *src); - virtual ~SrsRtmpFromRtcBridge(); + SrsRtcFrameBuilder(ISrsStreamBridge* bridge); + virtual ~SrsRtcFrameBuilder(); public: srs_error_t initialize(SrsRequest* r); -public: virtual srs_error_t on_publish(); - virtual srs_error_t on_rtp(SrsRtpPacket *pkt); virtual void on_unpublish(); + virtual srs_error_t on_rtp(SrsRtpPacket *pkt); private: srs_error_t transcode_audio(SrsRtpPacket *pkt); void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header); +private: srs_error_t packet_video(SrsRtpPacket* pkt); srs_error_t packet_video_key_frame(SrsRtpPacket* pkt); - srs_error_t packet_video_rtmp(const uint16_t start, const uint16_t end); - int32_t find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn); - void clear_cached_video(); inline uint16_t cache_index(uint16_t current_sn) { return current_sn % s_cache_size; } + int32_t find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn); bool check_frame_complete(const uint16_t start, const uint16_t end); + srs_error_t packet_video_rtmp(const uint16_t start, const uint16_t end); + void clear_cached_video(); }; + #endif // TODO: FIXME: Rename it. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 6411548a7f..54a231168c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -39,6 +39,7 @@ using namespace std; #include #include #include +#include // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. @@ -1081,7 +1082,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) // Check whether RTC stream is busy. #ifdef SRS_RTC - SrsRtcSource *rtc = NULL; + SrsRtcSource* rtc = NULL; bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost); if (rtc_server_enabled && rtc_enabled && !info->edge) { @@ -1095,10 +1096,28 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) } #endif + // Check whether SRT stream is busy. +#ifdef SRS_SRT + SrsSrtSource* srt = NULL; + bool srt_server_enabled = _srs_config->get_srt_enabled(); + bool srt_enabled = _srs_config->get_srt_enabled(req->vhost); + if (srt_server_enabled && srt_enabled && !info->edge) { + if ((err = _srs_srt_sources->fetch_or_create(req, &srt)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (!srt->can_publish()) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "srt stream %s busy", req->get_stream_url().c_str()); + } + } +#endif + // Bridge to RTC streaming. #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) - if (rtc) { - SrsRtcFromRtmpBridge *bridge = new SrsRtcFromRtmpBridge(rtc); + if (rtc && _srs_config->get_rtc_from_rtmp(req->vhost)) { + SrsCompositeBridge* bridge = new SrsCompositeBridge(); + bridge->append(new SrsFrameToRtcBridge(rtc)); + if ((err = bridge->initialize(req)) != srs_success) { srs_freep(bridge); return srs_error_wrap(err, "bridge init"); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 302ef907e0..ba9e9e7020 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1910,14 +1910,6 @@ void SrsLiveSourceManager::destroy() pool.clear(); } -ISrsLiveSourceBridge::ISrsLiveSourceBridge() -{ -} - -ISrsLiveSourceBridge::~ISrsLiveSourceBridge() -{ -} - SrsLiveSource::SrsLiveSource() { req = NULL; @@ -2060,7 +2052,7 @@ srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) return err; } -void SrsLiveSource::set_bridge(ISrsLiveSourceBridge* v) +void SrsLiveSource::set_bridge(ISrsStreamBridge* v) { srs_freep(bridge_); bridge_ = v; @@ -2257,31 +2249,42 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPack srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio) { srs_error_t err = srs_success; - + // Detect where stream is monotonically increasing. if (!mix_correct && is_monotonically_increase) { if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) { is_monotonically_increase = false; srs_warn("AUDIO: Timestamp %" PRId64 "=>%" PRId64 ", may need mix_correct.", - last_packet_time, shared_audio->header.timestamp); + last_packet_time, shared_audio->header.timestamp); } } last_packet_time = shared_audio->header.timestamp; - + // convert shared_audio to msg, user should not use shared_audio again. // the payload is transfer to msg, and set to NULL in shared_audio. SrsSharedPtrMessage msg; if ((err = msg.create(shared_audio)) != srs_success) { return srs_error_wrap(err, "create message"); } + + return on_frame(&msg); +} + +srs_error_t SrsLiveSource::on_frame(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; // directly process the audio message. if (!mix_correct) { - return on_audio_imp(&msg); + if (msg->is_audio()) { + return on_audio_imp(msg); + } else { + return on_video_imp(msg); + } } // insert msg to the queue. - mix_queue->push(msg.copy()); + mix_queue->push(msg->copy()); // fetch someone from mix queue. SrsSharedPtrMessage* m = mix_queue->pop(); @@ -2333,7 +2336,7 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) } // For bridge to consume the message. - if (bridge_ && (err = bridge_->on_audio(msg)) != srs_success) { + if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) { return srs_error_wrap(err, "bridge consume audio"); } @@ -2386,11 +2389,11 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) { is_monotonically_increase = false; srs_warn("VIDEO: Timestamp %" PRId64 "=>%" PRId64 ", may need mix_correct.", - last_packet_time, shared_video->header.timestamp); + last_packet_time, shared_video->header.timestamp); } } last_packet_time = shared_video->header.timestamp; - + // drop any unknown header video. // @see https://github.com/ossrs/srs/issues/421 if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) { @@ -2398,41 +2401,19 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) if (shared_video->size > 0) { b0 = shared_video->payload[0]; } - + srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); return err; } - + // convert shared_video to msg, user should not use shared_video again. // the payload is transfer to msg, and set to NULL in shared_video. SrsSharedPtrMessage msg; if ((err = msg.create(shared_video)) != srs_success) { return srs_error_wrap(err, "create message"); } - - // directly process the video message. - if (!mix_correct) { - return on_video_imp(&msg); - } - - // insert msg to the queue. - mix_queue->push(msg.copy()); - - // fetch someone from mix queue. - SrsSharedPtrMessage* m = mix_queue->pop(); - if (!m) { - return err; - } - - // consume the monotonically increase message. - if (m->is_audio()) { - err = on_audio_imp(m); - } else { - err = on_video_imp(m); - } - srs_freep(m); - - return err; + + return on_frame(&msg); } srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) @@ -2478,7 +2459,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) } // For bridge to consume the message. - if (bridge_ && (err = bridge_->on_video(msg)) != srs_success) { + if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) { return srs_error_wrap(err, "bridge consume video"); } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index ae4d6da83d..daae1f0346 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -18,6 +18,7 @@ #include #include #include +#include class SrsFormat; class SrsRtmpFormat; @@ -471,19 +472,6 @@ class SrsLiveSourceManager : public ISrsHourGlass // Global singleton instance. extern SrsLiveSourceManager* _srs_sources; -// For RTMP2RTC, bridge SrsLiveSource to SrsRtcSource -class ISrsLiveSourceBridge -{ -public: - ISrsLiveSourceBridge(); - virtual ~ISrsLiveSourceBridge(); -public: - virtual srs_error_t on_publish() = 0; - virtual srs_error_t on_audio(SrsSharedPtrMessage* audio) = 0; - virtual srs_error_t on_video(SrsSharedPtrMessage* video) = 0; - virtual void on_unpublish() = 0; -}; - // The live streaming source. class SrsLiveSource : public ISrsReloadHandler { @@ -518,7 +506,7 @@ class SrsLiveSource : public ISrsReloadHandler // The event handler. ISrsLiveSourceHandler* handler; // The source bridge for other source. - ISrsLiveSourceBridge* bridge_; + ISrsStreamBridge* bridge_; // The edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; @@ -551,7 +539,7 @@ class SrsLiveSource : public ISrsReloadHandler // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h); // Bridge to other source, forward packets to it. - void set_bridge(ISrsLiveSourceBridge* v); + void set_bridge(ISrsStreamBridge* v); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); @@ -572,6 +560,7 @@ class SrsLiveSource : public ISrsReloadHandler public: // TODO: FIXME: Use SrsSharedPtrMessage instead. virtual srs_error_t on_audio(SrsCommonMessage* audio); + srs_error_t on_frame(SrsSharedPtrMessage* msg); private: virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio); public: diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index df51160597..d3bc88aa23 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -357,64 +357,60 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() return srs_error_new(ERROR_SRT_SOURCE_BUSY, "srt stream %s busy", req_->get_stream_url().c_str()); } - if (_srs_config->get_srt_to_rtmp(req_->vhost)) { - // Check rtmp stream is busy. - SrsLiveSource *live_source = _srs_sources->fetch(req_); - if (live_source && !live_source->can_publish(false)) { - return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); - } + // Check rtmp stream is busy. + SrsLiveSource *live_source = _srs_sources->fetch(req_); + if (live_source && !live_source->can_publish(false)) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); + } - if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } + if ((err = _srs_sources->fetch_or_create(req_, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } - srs_assert(live_source != NULL); - - bool enabled_cache = _srs_config->get_gop_cache(req_->vhost); - int gcmf = _srs_config->get_gop_cache_max_frames(req_->vhost); - live_source->set_cache(enabled_cache); - live_source->set_gop_cache_max_frames(gcmf); + srs_assert(live_source != NULL); - // srt->rtmp->rtc - // TODO: FIXME: the code below is repeat in srs_app_rtmp_conn.cpp, refactor it later, use function instead. + bool enabled_cache = _srs_config->get_gop_cache(req_->vhost); + int gcmf = _srs_config->get_gop_cache_max_frames(req_->vhost); + live_source->set_cache(enabled_cache); + live_source->set_gop_cache_max_frames(gcmf); - // Check whether RTC stream is busy. + // srt->rtmp->rtc + // TODO: FIXME: the code below is repeat in srs_app_rtmp_conn.cpp, refactor it later, use function instead. + + // Check whether RTC stream is busy. #ifdef SRS_RTC - SrsRtcSource *rtc = NULL; - bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); - bool rtc_enabled = _srs_config->get_rtc_enabled(req_->vhost); - bool edge = _srs_config->get_vhost_is_edge(req_->vhost); - if (rtc_server_enabled && rtc_enabled && ! edge) { - if ((err = _srs_rtc_sources->fetch_or_create(req_, &rtc)) != srs_success) { - return srs_error_wrap(err, "create source"); - } + SrsRtcSource* rtc = NULL; + bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); + bool rtc_enabled = _srs_config->get_rtc_enabled(req_->vhost); + bool edge = _srs_config->get_vhost_is_edge(req_->vhost); + if (rtc_server_enabled && rtc_enabled && ! edge) { + if ((err = _srs_rtc_sources->fetch_or_create(req_, &rtc)) != srs_success) { + return srs_error_wrap(err, "create source"); + } - if (!rtc->can_publish()) { - return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtc stream %s busy", req_->get_stream_url().c_str()); - } + if (!rtc->can_publish()) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtc stream %s busy", req_->get_stream_url().c_str()); } + } #endif - // Bridge to RTC streaming. -#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) - if (rtc) { - SrsRtcFromRtmpBridge *bridge = new SrsRtcFromRtmpBridge(rtc); - if ((err = bridge->initialize(req_)) != srs_success) { - srs_freep(bridge); - return srs_error_wrap(err, "bridge init"); - } + if (_srs_config->get_srt_to_rtmp(req_->vhost)) { + // Bridge to RTMP and RTC streaming. + SrsCompositeBridge* bridge = new SrsCompositeBridge(); + bridge->append(new SrsFrameToRtmpBridge(live_source)); - live_source->set_bridge(bridge); +#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) + if (rtc && _srs_config->get_rtc_from_rtmp(req_->vhost)) { + bridge->append(new SrsFrameToRtcBridge(rtc)); } #endif - SrsRtmpFromSrtBridge *bridger = new SrsRtmpFromSrtBridge(live_source); - if ((err = bridger->initialize(req_)) != srs_success) { - srs_freep(bridger); - return srs_error_wrap(err, "create bridger"); + if ((err = bridge->initialize(req_)) != srs_success) { + srs_freep(bridge); + return srs_error_wrap(err, "create bridge"); } - srt_source_->set_bridge(bridger); + srt_source_->set_bridge(bridge); } if ((err = srt_source_->on_publish()) != srs_success) { diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index b88f5774b8..bfa05eaa2f 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -235,15 +235,7 @@ void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout) srs_cond_timedwait(mw_wait, timeout); } -ISrsSrtSourceBridge::ISrsSrtSourceBridge() -{ -} - -ISrsSrtSourceBridge::~ISrsSrtSourceBridge() -{ -} - -SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) : ISrsSrtSourceBridge() +SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsStreamBridge* bridge) { ts_ctx_ = new SrsTsContext(); @@ -252,7 +244,7 @@ SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) : ISrsSrtSourc pps_ = ""; req_ = NULL; - live_source_ = source; + bridge_ = bridge; video_streamid_ = 1; audio_streamid_ = 2; @@ -260,7 +252,7 @@ SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) : ISrsSrtSourc pp_audio_duration_ = new SrsAlonePithyPrint(); } -SrsRtmpFromSrtBridge::~SrsRtmpFromSrtBridge() +SrsSrtFrameBuilder::~SrsSrtFrameBuilder() { srs_freep(ts_ctx_); srs_freep(req_); @@ -268,18 +260,12 @@ SrsRtmpFromSrtBridge::~SrsRtmpFromSrtBridge() srs_freep(pp_audio_duration_); } -srs_error_t SrsRtmpFromSrtBridge::on_publish() +srs_error_t SrsSrtFrameBuilder::on_publish() { - srs_error_t err = srs_success; - - if ((err = live_source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "on publish"); - } - - return err; + return srs_success; } -srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt) +srs_error_t SrsSrtFrameBuilder::on_packet(SrsSrtPacket *pkt) { srs_error_t err = srs_success; @@ -290,10 +276,10 @@ srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt) int nb_packet = nb_buf / SRS_TS_PACKET_SIZE; for (int i = 0; i < nb_packet; i++) { char* p = buf + (i * SRS_TS_PACKET_SIZE); - + SrsBuffer* stream = new SrsBuffer(p, SRS_TS_PACKET_SIZE); SrsAutoFree(SrsBuffer, stream); - + // Process each ts packet. Note that the jitter of UDP may cause video glitch when packet loss or wrong seq. We // don't handle it because SRT will, see tlpktdrop at https://ossrs.net/lts/zh-cn/docs/v4/doc/srt-params if ((err = ts_ctx_->decode(stream, this)) != srs_success) { @@ -302,16 +288,15 @@ srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt) continue; } } - + return err; } -void SrsRtmpFromSrtBridge::on_unpublish() +void SrsSrtFrameBuilder::on_unpublish() { - live_source_->on_unpublish(); } -srs_error_t SrsRtmpFromSrtBridge::initialize(SrsRequest* req) +srs_error_t SrsSrtFrameBuilder::initialize(SrsRequest* req) { srs_error_t err = srs_success; @@ -321,7 +306,7 @@ srs_error_t SrsRtmpFromSrtBridge::initialize(SrsRequest* req) return err; } -srs_error_t SrsRtmpFromSrtBridge::on_ts_message(SrsTsMessage* msg) +srs_error_t SrsSrtFrameBuilder::on_ts_message(SrsTsMessage* msg) { srs_error_t err = srs_success; @@ -369,7 +354,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_ts_message(SrsTsMessage* msg) return err; } -srs_error_t SrsRtmpFromSrtBridge::on_ts_video_avc(SrsTsMessage* msg, SrsBuffer* avs) +srs_error_t SrsSrtFrameBuilder::on_ts_video_avc(SrsTsMessage* msg, SrsBuffer* avs) { srs_error_t err = srs_success; @@ -430,7 +415,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_ts_video_avc(SrsTsMessage* msg, SrsBuffer* return on_h264_frame(msg, ipb_frames); } -srs_error_t SrsRtmpFromSrtBridge::check_sps_pps_change(SrsTsMessage* msg) +srs_error_t SrsSrtFrameBuilder::check_sps_pps_change(SrsTsMessage* msg) { srs_error_t err = srs_success; @@ -470,14 +455,19 @@ srs_error_t SrsRtmpFromSrtBridge::check_sps_pps_change(SrsTsMessage* msg) return srs_error_wrap(err, "create rtmp"); } - if ((err = live_source_->on_video(&rtmp)) != srs_success) { + SrsSharedPtrMessage frame; + if ((err = frame.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create frame"); + } + + if ((err = bridge_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp sps/pps"); } return err; } -srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vector >& ipb_frames) +srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage* msg, vector >& ipb_frames) { srs_error_t err = srs_success; @@ -526,7 +516,12 @@ srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vectoron_video(&rtmp)) != srs_success) { + SrsSharedPtrMessage frame; + if ((err = frame.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create frame"); + } + + if ((err = bridge_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err ,"srt ts video to rtmp"); } @@ -534,7 +529,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vectoron_video(&rtmp)) != srs_success) { + SrsSharedPtrMessage frame; + if ((err = frame.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create frame"); + } + + if ((err = bridge_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp vps/sps/pps"); } return err; } -srs_error_t SrsRtmpFromSrtBridge::on_hevc_frame(SrsTsMessage* msg, vector >& ipb_frames) +srs_error_t SrsSrtFrameBuilder::on_hevc_frame(SrsTsMessage* msg, vector >& ipb_frames) { srs_error_t err = srs_success; @@ -713,7 +713,12 @@ srs_error_t SrsRtmpFromSrtBridge::on_hevc_frame(SrsTsMessage* msg, vectoron_video(&rtmp)) != srs_success) { + SrsSharedPtrMessage frame; + if ((err = frame.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create frame"); + } + + if ((err = bridge_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err ,"srt ts hevc video to rtmp"); } @@ -721,7 +726,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_hevc_frame(SrsTsMessage* msg, vectoron_audio(&rtmp)) != srs_success) { + if ((err = bridge_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp audio sh"); } return err; } -srs_error_t SrsRtmpFromSrtBridge::on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* frame, int frame_size) +srs_error_t SrsSrtFrameBuilder::on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* data, int data_size) { srs_error_t err = srs_success; - int rtmp_len = frame_size + 2/* 2 bytes of flv audio tag header*/; + int rtmp_len = data_size + 2/* 2 bytes of flv audio tag header*/; SrsCommonMessage rtmp; rtmp.header.initialize_audio(rtmp_len, pts, audio_streamid_); @@ -842,9 +852,14 @@ srs_error_t SrsRtmpFromSrtBridge::on_aac_frame(SrsTsMessage* msg, uint32_t pts, stream.write_1bytes(aac_flag); stream.write_1bytes(1); // Write audio frame. - stream.write_bytes(frame, frame_size); + stream.write_bytes(data, data_size); + + SrsSharedPtrMessage frame; + if ((err = frame.create(&rtmp)) != srs_success) { + return srs_error_wrap(err, "create frame"); + } - if ((err = live_source_->on_audio(&rtmp)) != srs_success) { + if ((err = bridge_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp audio sh"); } @@ -855,6 +870,7 @@ SrsSrtSource::SrsSrtSource() { req = NULL; can_publish_ = true; + frame_builder_ = NULL; bridge_ = NULL; } @@ -864,6 +880,7 @@ SrsSrtSource::~SrsSrtSource() // for all consumers are auto free. consumers.clear(); + srs_freep(frame_builder_); srs_freep(bridge_); srs_freep(req); } @@ -915,10 +932,13 @@ void SrsSrtSource::update_auth(SrsRequest* r) req->update_auth(r); } -void SrsSrtSource::set_bridge(ISrsSrtSourceBridge* bridge) +void SrsSrtSource::set_bridge(ISrsStreamBridge* bridge) { srs_freep(bridge_); bridge_ = bridge; + + srs_freep(frame_builder_); + frame_builder_ = new SrsSrtFrameBuilder(bridge); } srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer) @@ -965,8 +985,18 @@ srs_error_t SrsSrtSource::on_publish() return srs_error_wrap(err, "source id change"); } - if (bridge_ && (err = bridge_->on_publish()) != srs_success) { - return srs_error_wrap(err, "bridge on publish"); + if (bridge_) { + if ((err = frame_builder_->initialize(req)) != srs_success) { + return srs_error_wrap(err, "frame builder initialize"); + } + + if ((err = frame_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "frame builder on publish"); + } + + if ((err = bridge_->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridge on publish"); + } } SrsStatistic* stat = SrsStatistic::instance(); @@ -985,9 +1015,12 @@ void SrsSrtSource::on_unpublish() can_publish_ = true; if (bridge_) { + frame_builder_->on_unpublish(); + srs_freep(frame_builder_); + bridge_->on_unpublish(); + srs_freep(bridge_); } - srs_freep(bridge_); } srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet) @@ -1001,7 +1034,7 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet) } } - if (bridge_ && (err = bridge_->on_packet(packet)) != srs_success) { + if (frame_builder_ && (err = frame_builder_->on_packet(packet)) != srs_success) { return srs_error_wrap(err, "bridge consume message"); } diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index 9faf72636a..825a74981d 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -14,13 +14,14 @@ #include #include -#include +#include class SrsSharedPtrMessage; class SrsRequest; class SrsLiveSource; class SrsSrtSource; class SrsAlonePithyPrint; +class SrsSrtFrameBuilder; // The SRT packet with shared message. class SrsSrtPacket @@ -91,28 +92,18 @@ class SrsSrtConsumer virtual void wait(int nb_msgs, srs_utime_t timeout); }; -class ISrsSrtSourceBridge +// Collect and build SRT TS packet to AV frames. +class SrsSrtFrameBuilder : public ISrsTsHandler { public: - ISrsSrtSourceBridge(); - virtual ~ISrsSrtSourceBridge(); + SrsSrtFrameBuilder(ISrsStreamBridge* bridge); + virtual ~SrsSrtFrameBuilder(); public: - virtual srs_error_t on_publish() = 0; - virtual srs_error_t on_packet(SrsSrtPacket *pkt) = 0; - virtual void on_unpublish() = 0; -}; - -class SrsRtmpFromSrtBridge : public ISrsSrtSourceBridge, public ISrsTsHandler -{ -public: - SrsRtmpFromSrtBridge(SrsLiveSource* source); - virtual ~SrsRtmpFromSrtBridge(); + srs_error_t initialize(SrsRequest* r); public: virtual srs_error_t on_publish(); - virtual srs_error_t on_packet(SrsSrtPacket *pkt); + virtual srs_error_t on_packet(SrsSrtPacket* pkt); virtual void on_unpublish(); -public: - srs_error_t initialize(SrsRequest* req); // Interface ISrsTsHandler public: virtual srs_error_t on_ts_message(SrsTsMessage* msg); @@ -123,35 +114,31 @@ class SrsRtmpFromSrtBridge : public ISrsSrtSourceBridge, public ISrsTsHandler srs_error_t on_h264_frame(SrsTsMessage* msg, std::vector >& ipb_frames); srs_error_t check_audio_sh_change(SrsTsMessage* msg, uint32_t pts); srs_error_t on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* frame, int frame_size); - #ifdef SRS_H265 srs_error_t on_ts_video_hevc(SrsTsMessage *msg, SrsBuffer *avs); srs_error_t check_vps_sps_pps_change(SrsTsMessage *msg); srs_error_t on_hevc_frame(SrsTsMessage *msg, std::vector> &ipb_frames); #endif - +private: + ISrsStreamBridge* bridge_; private: SrsTsContext* ts_ctx_; - // Record sps/pps had changed, if change, need to generate new video sh frame. bool sps_pps_change_; std::string sps_; std::string pps_; - #ifdef SRS_H265 bool vps_sps_pps_change_; std::string hevc_vps_; std::string hevc_sps_; std::string hevc_pps_; #endif - // Record audio sepcific config had changed, if change, need to generate new audio sh frame. bool audio_sh_change_; std::string audio_sh_; - +private: SrsRequest* req_; - SrsLiveSource* live_source_; - +private: // SRT to rtmp, video stream id. int video_streamid_; // SRT to rtmp, audio stream id. @@ -176,7 +163,7 @@ class SrsSrtSource // Update the authentication information in request. virtual void update_auth(SrsRequest* r); public: - void set_bridge(ISrsSrtSourceBridge *bridger); + void set_bridge(ISrsStreamBridge* bridge); public: // Create consumer // @param consumer, output the create consumer. @@ -201,7 +188,9 @@ class SrsSrtSource // To delivery packets to clients. std::vector consumers; bool can_publish_; - ISrsSrtSourceBridge* bridge_; +private: + SrsSrtFrameBuilder* frame_builder_; + ISrsStreamBridge* bridge_; }; #endif diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index a355b84976..d5dba71584 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -178,7 +178,7 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj) void SrsStatisticStream::publish(std::string id) { - // To prevent duplicated publish event by bridger. + // To prevent duplicated publish event by bridge. if (active) { return; } @@ -490,14 +490,14 @@ void SrsStatistic::cleanup_stream(SrsStatisticStream* stream) // Do cleanup streams. if (true) { - std::map::iterator it; + std::map::iterator it; if ((it = streams.find(stream->id)) != streams.end()) { streams.erase(it); } } if (true) { - std::map::iterator it; + std::map::iterator it; if ((it = rstreams.find(stream->url)) != rstreams.end()) { rstreams.erase(it); } diff --git a/trunk/src/app/srs_app_stream_bridge.cpp b/trunk/src/app/srs_app_stream_bridge.cpp new file mode 100644 index 0000000000..82eee47529 --- /dev/null +++ b/trunk/src/app/srs_app_stream_bridge.cpp @@ -0,0 +1,236 @@ +// +// Copyright (c) 2013-2023 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +using namespace std; + +ISrsStreamBridge::ISrsStreamBridge() +{ +} + +ISrsStreamBridge::~ISrsStreamBridge() +{ +} + +SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsLiveSource *src) +{ + source_ = src; +} + +SrsFrameToRtmpBridge::~SrsFrameToRtmpBridge() +{ +} + +srs_error_t SrsFrameToRtmpBridge::initialize(SrsRequest* r) +{ + return srs_success; +} + +srs_error_t SrsFrameToRtmpBridge::on_publish() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Should sync with bridge? + if ((err = source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } + + return err; +} + +void SrsFrameToRtmpBridge::on_unpublish() +{ + // TODO: FIXME: Should sync with bridge? + source_->on_unpublish(); +} + +srs_error_t SrsFrameToRtmpBridge::on_frame(SrsSharedPtrMessage* frame) +{ + return source_->on_frame(frame); +} + +SrsFrameToRtcBridge::SrsFrameToRtcBridge(SrsRtcSource* source) +{ +#ifdef SRS_RTC + source_ = source; +#endif + +#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) + uint32_t audio_ssrc = 0; + uint8_t audio_payload_type = 0; + uint32_t video_ssrc = 0; + uint8_t video_payload_type = 0; + + // audio track ssrc + if (true) { + std::vector descs = source->get_track_desc("audio", "opus"); + if (!descs.empty()) { + audio_ssrc = descs.at(0)->ssrc_; + } + // Note we must use the PT of source, see https://github.com/ossrs/srs/pull/3079 + audio_payload_type = descs.empty() ? kAudioPayloadType : descs.front()->media_->pt_; + } + + // video track ssrc + if (true) { + std::vector descs = source->get_track_desc("video", "H264"); + if (!descs.empty()) { + video_ssrc = descs.at(0)->ssrc_; + } + // Note we must use the PT of source, see https://github.com/ossrs/srs/pull/3079 + video_payload_type = descs.empty() ? kVideoPayloadType : descs.front()->media_->pt_; + } + + rtp_builder_ = new SrsRtcRtpBuilder(this, audio_ssrc, audio_payload_type, video_ssrc, video_payload_type); +#endif +} + +SrsFrameToRtcBridge::~SrsFrameToRtcBridge() +{ +#ifdef SRS_FFMPEG_FIT + srs_freep(rtp_builder_); +#endif +} + +srs_error_t SrsFrameToRtcBridge::initialize(SrsRequest* r) +{ +#ifdef SRS_FFMPEG_FIT + return rtp_builder_->initialize(r); +#else + return srs_success; +#endif +} + +srs_error_t SrsFrameToRtcBridge::on_publish() +{ + srs_error_t err = srs_success; + +#ifdef SRS_RTC + // TODO: FIXME: Should sync with bridge? + if ((err = source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } +#endif + +#ifdef SRS_FFMPEG_FIT + if ((err = rtp_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtp builder publish"); + } +#endif + + return err; +} + +void SrsFrameToRtcBridge::on_unpublish() +{ +#ifdef SRS_FFMPEG_FIT + rtp_builder_->on_unpublish(); +#endif + +#ifdef SRS_RTC + // @remark This bridge might be disposed here, so never use it. + // TODO: FIXME: Should sync with bridge? + source_->on_unpublish(); +#endif +} + +srs_error_t SrsFrameToRtcBridge::on_frame(SrsSharedPtrMessage* frame) +{ +#ifdef SRS_FFMPEG_FIT + return rtp_builder_->on_frame(frame); +#else + return srs_success; +#endif +} + +srs_error_t SrsFrameToRtcBridge::on_rtp(SrsRtpPacket* pkt) +{ +#ifdef SRS_RTC + return source_->on_rtp(pkt); +#else + return srs_success; +#endif +} + +SrsCompositeBridge::SrsCompositeBridge() +{ +} + +SrsCompositeBridge::~SrsCompositeBridge() +{ + for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { + ISrsStreamBridge* bridge = *it; + srs_freep(bridge); + } +} + +srs_error_t SrsCompositeBridge::initialize(SrsRequest* r) +{ + srs_error_t err = srs_success; + + for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { + ISrsStreamBridge* bridge = *it; + if ((err = bridge->initialize(r)) != srs_success) { + return err; + } + } + + return err; +} + +srs_error_t SrsCompositeBridge::on_publish() +{ + srs_error_t err = srs_success; + + for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { + ISrsStreamBridge* bridge = *it; + if ((err = bridge->on_publish()) != srs_success) { + return err; + } + } + + return err; +} + +void SrsCompositeBridge::on_unpublish() +{ + for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { + ISrsStreamBridge* bridge = *it; + bridge->on_unpublish(); + } +} + +srs_error_t SrsCompositeBridge::on_frame(SrsSharedPtrMessage* frame) +{ + srs_error_t err = srs_success; + + for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { + ISrsStreamBridge* bridge = *it; + if ((err = bridge->on_frame(frame)) != srs_success) { + return err; + } + } + + return err; +} + +SrsCompositeBridge* SrsCompositeBridge::append(ISrsStreamBridge* bridge) +{ + bridges_.push_back(bridge); + return this; +} + diff --git a/trunk/src/app/srs_app_stream_bridge.hpp b/trunk/src/app/srs_app_stream_bridge.hpp new file mode 100644 index 0000000000..5af2ead10f --- /dev/null +++ b/trunk/src/app/srs_app_stream_bridge.hpp @@ -0,0 +1,95 @@ +// +// Copyright (c) 2013-2023 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#ifndef SRS_APP_STREAM_BRIDGE_HPP +#define SRS_APP_STREAM_BRIDGE_HPP + +#include + +#include + +#include + +class SrsRequest; +class SrsSharedPtrMessage; +class SrsLiveSource; +class SrsRtcSource; +class SrsRtmpFormat; +class SrsMetaCache; +class SrsAudioTranscoder; +class SrsRtpPacket; +class SrsRtcRtpBuilder; + +// A stream bridge is used to convert stream via different protocols, such as bridge for RTMP and RTC. Generally, we use +// frame as message for bridge. A frame is a audio or video frame, such as an I/B/P frame, a general frame for decoder. +// So you must assemble RTP or TS packets to a video frame if WebRTC or SRT. +class ISrsStreamBridge +{ +public: + ISrsStreamBridge(); + virtual ~ISrsStreamBridge(); +public: + virtual srs_error_t initialize(SrsRequest* r) = 0; + virtual srs_error_t on_publish() = 0; + virtual srs_error_t on_frame(SrsSharedPtrMessage* frame) = 0; + virtual void on_unpublish() = 0; +}; + +// A bridge to feed AV frame to RTMP stream. +class SrsFrameToRtmpBridge : public ISrsStreamBridge +{ +private: + SrsLiveSource *source_; +public: + SrsFrameToRtmpBridge(SrsLiveSource *src); + virtual ~SrsFrameToRtmpBridge(); +public: + srs_error_t initialize(SrsRequest* r); +public: + virtual srs_error_t on_publish(); + virtual void on_unpublish(); +public: + virtual srs_error_t on_frame(SrsSharedPtrMessage* frame); +}; + +// A bridge to covert AV frame to WebRTC stream. +class SrsFrameToRtcBridge : public ISrsStreamBridge +{ +private: + SrsRtcSource* source_; + SrsRtcRtpBuilder* rtp_builder_; +public: + SrsFrameToRtcBridge(SrsRtcSource* source); + virtual ~SrsFrameToRtcBridge(); +public: + virtual srs_error_t initialize(SrsRequest* r); + virtual srs_error_t on_publish(); + virtual void on_unpublish(); + virtual srs_error_t on_frame(SrsSharedPtrMessage* frame); + srs_error_t on_rtp(SrsRtpPacket* pkt); +}; + +// A bridge chain, a set of bridges. +class SrsCompositeBridge : public ISrsStreamBridge +{ +public: + SrsCompositeBridge(); + virtual ~SrsCompositeBridge(); +public: + srs_error_t initialize(SrsRequest* r); +public: + virtual srs_error_t on_publish(); + virtual void on_unpublish(); +public: + virtual srs_error_t on_frame(SrsSharedPtrMessage* frame); +public: + SrsCompositeBridge* append(ISrsStreamBridge* bridge); +private: + std::vector bridges_; +}; + +#endif + diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index 4082877c12..f9df4209e9 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 40 +#define VERSION_REVISION 41 #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index d338dfbbe9..bda0432516 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -362,7 +362,7 @@ XX(ERROR_RTC_DISABLED , 5021, "RtcDisabled", "RTC is disabled by configuration") \ XX(ERROR_RTC_NO_SESSION , 5022, "RtcNoSession", "Invalid packet for no RTC session matched") \ XX(ERROR_RTC_INVALID_PARAMS , 5023, "RtcInvalidParams", "Invalid API parameters for RTC") \ - XX(ERROR_RTC_DUMMY_BRIDGER , 5024, "RtcDummyBridger", "RTC dummy bridger error") \ + XX(ERROR_RTC_DUMMY_BRIDGE , 5024, "RtcDummyBridge", "RTC dummy bridge error") \ XX(ERROR_RTC_STREM_STARTED , 5025, "RtcStreamStarted", "RTC stream already started") \ XX(ERROR_RTC_TRACK_CODEC , 5026, "RtcTrackCodec", "RTC track codec error") \ XX(ERROR_RTC_NO_PLAYER , 5027, "RtcNoPlayer", "RTC player not found") \