diff --git a/.gitignore b/.gitignore index 6216052aad..f869a35448 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,8 @@ *.pyc *.swp .DS_Store +.vscode +.vscode/* /trunk/Makefile /trunk/objs /trunk/src/build-qt-Desktop-Debug diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh index 10e76e99f7..96f5526d34 100755 --- a/trunk/auto/auto_headers.sh +++ b/trunk/auto/auto_headers.sh @@ -70,6 +70,12 @@ else srs_undefine_macro "SRS_AUTO_HDS" $SRS_AUTO_HEADERS_H fi +if [ $SRS_SRT = YES ]; then + srs_define_macro "SRS_AUTO_SRT" $SRS_AUTO_HEADERS_H +else + srs_undefine_macro "SRS_AUTO_SRT" $SRS_AUTO_HEADERS_H +fi + if [ $SRS_MEM_WATCH = YES ]; then srs_define_macro "SRS_AUTO_MEM_WATCH" $SRS_AUTO_HEADERS_H else diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index a6436535c6..a46685e6f8 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -16,6 +16,7 @@ help=no ################################################################ # feature options SRS_HDS=NO +SRS_SRT=NO SRS_NGINX=NO SRS_FFMPEG_TOOL=NO SRS_LIBRTMP=NO @@ -121,6 +122,7 @@ Options: --with-librtmp enable srs-librtmp, library for client. --with-research build the research tools. --with-utest build the utest for SRS. + --with-srt build the srt for SRS. --with-gperf build SRS with gperf tools(no gmd/gmc/gmp/gcp, with tcmalloc only). --with-gmc build memory check for SRS with gperf tools. --with-gmd build memory defense(corrupt memory) for SRS with gperf tools. @@ -141,6 +143,7 @@ Options: --without-librtmp disable srs-librtmp, library for client. --without-research do not build the research tools. --without-utest do not build the utest for SRS. + --without-srt do not build the srt for SRS. --without-gperf do not build SRS with gperf tools(without tcmalloc and gmd/gmc/gmp/gcp). --without-gmc do not build memory check for SRS with gperf tools. --without-gmd do not build memory defense for SRS with gperf tools. @@ -231,6 +234,7 @@ function parse_user_option() { --with-librtmp) SRS_LIBRTMP=YES ;; --with-research) SRS_RESEARCH=YES ;; --with-utest) SRS_UTEST=YES ;; + --with-srt) SRS_SRT=YES ;; --with-gperf) SRS_GPERF=YES ;; --with-gmc) SRS_GPERF_MC=YES ;; --with-gmd) SRS_GPERF_MD=YES ;; @@ -251,6 +255,7 @@ function parse_user_option() { --without-librtmp) SRS_LIBRTMP=NO ;; --without-research) SRS_RESEARCH=NO ;; --without-utest) SRS_UTEST=NO ;; + --without-srt) SRS_SRT=NO ;; --without-gperf) SRS_GPERF=NO ;; --without-gmc) SRS_GPERF_MC=NO ;; --without-gmd) SRS_GPERF_MD=NO ;; @@ -536,6 +541,7 @@ SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}" if [ $SRS_LIBRTMP = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-librtmp"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-librtmp"; fi if [ $SRS_RESEARCH = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-research"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-research"; fi if [ $SRS_UTEST = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-utest"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-utest"; fi + if [ $SRS_SRT = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-srt"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-srt"; fi if [ $SRS_GPERF = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-gperf"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-gperf"; fi if [ $SRS_GPERF_MC = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-gmc"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-gmc"; fi if [ $SRS_GPERF_MD = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-gmd"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-gmd"; fi diff --git a/trunk/auto/utest.sh b/trunk/auto/utest.sh index d658bf7f4a..ab96caea28 100755 --- a/trunk/auto/utest.sh +++ b/trunk/auto/utest.sh @@ -58,7 +58,7 @@ USER_DIR = . CPPFLAGS += -I\$(GTEST_DIR)/include # Flags passed to the C++ compiler. -CXXFLAGS += -g -Wall -Wextra -O0 ${EXTRA_DEFINES} +CXXFLAGS += ${CXXFLAGS} -Wextra ${EXTRA_DEFINES} # All tests produced by this Makefile. Remember to add new tests you # created to the list. diff --git a/trunk/conf/srt.conf b/trunk/conf/srt.conf new file mode 100644 index 0000000000..9dc1e564e6 --- /dev/null +++ b/trunk/conf/srt.conf @@ -0,0 +1,32 @@ +# SRT config. + +listen 1935; +max_connections 1000; +srs_log_tank console; +daemon off; + +http_api { + enabled on; + listen 1985; +} +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} + +srt_server { + enabled on; + listen 10080; +} + +# @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577492117 +vhost __defaultVhost__ { +} +vhost srs.srt.com.cn { +} + +stats { + network 0; + disk sda sdb xvda xvdb; +} diff --git a/trunk/configure b/trunk/configure index 3b366397f3..5b8cd4b0f6 100755 --- a/trunk/configure +++ b/trunk/configure @@ -98,6 +98,9 @@ GDBDebug=" -g -O0" WarnLevel=" -Wall" # the compile standard. CppStd="-ansi" +if [[ $SRS_SRT == YES ]]; then + CppStd="-std=c++11" +fi # for library compile if [[ $SRS_EXPORT_LIBRTMP_PROJECT == YES ]]; then LibraryCompile=" -fPIC" @@ -157,8 +160,15 @@ fi if [ $SRS_GPERF_MD = YES ]; then LibGperfFile="${SRS_OBJS_DIR}/gperf/lib/libtcmalloc_debug.a"; fi +# srt code path +if [[ $SRS_SRT == YES ]]; then + LibSRTRoot="${SRS_WORKDIR}/src/srt" +fi # the link options, always use static link SrsLinkOptions="-ldl"; +if [[ $SRS_SRT == YES ]]; then + SrsLinkOptions="${SrsLinkOptions} -pthread -lsrt"; +fi if [[ $SRS_SSL == YES && $SRS_USE_SYS_SSL == YES ]]; then SrsLinkOptions="${SrsLinkOptions} -lssl -lcrypto"; fi @@ -205,6 +215,17 @@ MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack" "srs_protocol_format") PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh PROTOCOL_OBJS="${MODULE_OBJS[@]}" +# +#srt protocol features. +if [ $SRS_SRT = YES ]; then + MODULE_ID="SRT" + MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") + ModuleLibIncs=(${SRS_OBJS_DIR}) + MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data") + SRT_INCS=${LibSRTRoot}; MODULE_DIR=${LibSRTRoot} . auto/modules.sh + SRT_OBJS="${MODULE_OBJS[@]}" +fi + # #Service Module, for both Server and Client Modules. if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then @@ -224,6 +245,9 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_ID="APP" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE") ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibSSLRoot}) + if [[ $SRS_SRT == YES ]]; then + ModuleLibIncs+=("${LibSRTRoot[*]}") + fi MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_source" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http_stream" "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" @@ -258,7 +282,13 @@ LIBS_OBJS="${MODULE_OBJS[@]}" if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_ID="SERVER" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") + if [[ $SRS_SRT == YES ]]; then + MODULE_DEPENDS+=("SRT") + fi ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) + if [[ $SRS_SRT == YES ]]; then + ModuleLibIncs+=("${LibSRTRoot[*]}") + fi MODULE_FILES=("srs_main_server") SERVER_INCS="src/main"; MODULE_DIR=${SERVER_INCS} . auto/modules.sh SERVER_OBJS="${MODULE_OBJS[@]}" @@ -298,6 +328,10 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile}) # all depends objects MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]} ${SERVER_OBJS[@]}" + ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) + if [[ $SRS_SRT == YES ]]; then + MODULE_OBJS="${MODULE_OBJS} ${SRT_OBJS[@]}" + fi LINK_OPTIONS="${SrsLinkOptions}${SrsGprofLink}${SrsGperfLink}" # # srs: srs(simple rtmp server) over st(state-threads) @@ -325,9 +359,15 @@ if [ $SRS_UTEST = YES ]; then "srs_utest_config" "srs_utest_rtmp" "srs_utest_http" "srs_utest_avc" "srs_utest_reload" "srs_utest_mp4" "srs_utest_service" "srs_utest_app") ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSTRoot} ${LibSSLRoot}) + if [[ $SRS_SRT == YES ]]; then + ModuleLibIncs+=("${LibSRTRoot[*]}") + fi ModuleLibFiles=(${LibSTfile} ${LibSSLfile}) MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") - MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]}" + if [[ $SRS_SRT == YES ]]; then + MODULE_DEPENDS+=("SRT") + fi + MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]} ${SRT_OBJS[@]}" LINK_OPTIONS="-lpthread ${SrsLinkOptions}" MODULE_DIR="src/utest" APP_NAME="srs_utest" . auto/utest.sh fi diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index a0277f3c95..2987b48916 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -479,7 +479,7 @@ srs_error_t srs_config_transform_vhost(SrsConfDirective* root) ++it; continue; } - + // SRS3.0, change the folowing like a shadow: // mode, origin, token_traverse, vhost, debug_srs_upnode // SRS1/2: @@ -3470,7 +3470,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" - && n != "http_server" && n != "stream_caster" + && n != "http_server" && n != "stream_caster" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" ) { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); @@ -3504,6 +3504,15 @@ srs_error_t SrsConfig::check_normal_config() } } } + if (true) { + SrsConfDirective* conf = root->get("srt_server"); + for (int i = 0; conf && i < (int)conf->directives.size(); i++) { + string n = conf->at(i)->name; + if (n != "enabled" && n != "listen") { + return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str()); + } + } + } if (true) { SrsConfDirective* conf = get_heartbeart(); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { @@ -3631,6 +3640,7 @@ srs_error_t SrsConfig::check_normal_config() get_vhosts(vhosts); for (int n = 0; n < (int)vhosts.size(); n++) { SrsConfDirective* vhost = vhosts[n]; + for (int i = 0; vhost && i < (int)vhost->directives.size(); i++) { SrsConfDirective* conf = vhost->at(i); string n = conf->name; @@ -6629,6 +6639,39 @@ bool SrsConfig::get_raw_api_allow_update() return SRS_CONF_PERFER_FALSE(conf->arg0()); } + +bool SrsConfig::get_srt_enabled() +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +unsigned short SrsConfig::get_srt_listen_port() +{ + static unsigned short DEFAULT = 10080; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("listen"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return (unsigned short)atoi(conf->arg0().c_str()); +} + bool SrsConfig::get_http_stream_enabled() { SrsConfDirective* conf = root->get("http_server"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index c9c03e24f5..cc936affcb 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -588,6 +588,13 @@ class SrsConfig virtual bool get_forward_enabled(std::string vhost); // Get the forward directive of vhost. virtual SrsConfDirective* get_forwards(std::string vhost); + +public: + // Whether the srt sevice enabled + virtual bool get_srt_enabled(); + // Get the srt service listen port + virtual unsigned short get_srt_listen_port(); + // http_hooks section private: // Get the http_hooks directive of vhost. diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 8c6d214702..c85930da37 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -688,7 +688,12 @@ srs_error_t SrsServer::listen() if ((err = conn_manager->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); } - +#ifdef SRS_AUTO_SRT + if ((err = listen_srt()) != srs_success) { + return srs_error_wrap(err, "srt listen"); + } +#endif + return err; } @@ -1004,6 +1009,32 @@ srs_error_t SrsServer::do_cycle() return err; } +#ifdef SRS_AUTO_SRT +srs_error_t SrsServer::listen_srt() { + srs_error_t err = srs_success; + + if(_srs_config->get_srt_enabled()) { + srs_trace("srt server is enabled..."); + unsigned short srt_port = _srs_config->get_srt_listen_port(); + srs_trace("srt server listen port:%d", srt_port); + err = srt2rtmp::get_instance()->init(); + if (err != srs_success) { + srs_error_wrap(err, "srt start srt2rtmp error"); + return err; + } + + srt_ptr = std::make_shared(srt_port); + if (!srt_ptr) { + srs_error_wrap(err, "srt listen %d", srt_port); + } + srt_ptr->start(); + } else { + srs_trace("srt server is disabled..."); + } + return err; +} +#endif + srs_error_t SrsServer::listen_rtmp() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index fd8cbe3ad7..e08a5f8d96 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -36,6 +36,10 @@ #include #include #include +#ifdef SRS_AUTO_SRT +#include +#include +#endif class SrsServer; class SrsConnection; @@ -209,6 +213,10 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; SrsCoroutineManager* conn_manager; +#ifdef SRS_AUTO_SRT + // srt server + SRT_SERVER_PTR srt_ptr; +#endif private: // The pid file fd, lock the file write when server is running. // @remark the init.d script should cleanup the pid file, when stop service, @@ -279,6 +287,10 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan virtual srs_error_t listen_http_api(); virtual srs_error_t listen_http_stream(); virtual srs_error_t listen_stream_caster(); +#ifdef SRS_AUTO_SRT + //start listen srt udp port + virtual srs_error_t listen_srt(); +#endif // Close the listeners for specified type, // Remove the listen object from manager. virtual void close_listeners(SrsListenerType type); diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 3bfd9ebb13..797e0d90ae 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -214,6 +214,7 @@ void show_macro_features() ss << ", dash:" << "on"; ss << ", hls:" << srs_bool2switch(true); ss << ", hds:" << srs_bool2switch(SRS_AUTO_HDS_BOOL); + ss << ", srt:" << srs_bool2switch(SRS_AUTO_SRT_BOOL); // hc(http callback) ss << ", hc:" << srs_bool2switch(true); // ha(http api) diff --git a/trunk/src/srt/srt_conn.cpp b/trunk/src/srt/srt_conn.cpp new file mode 100644 index 0000000000..2df80b011f --- /dev/null +++ b/trunk/src/srt/srt_conn.cpp @@ -0,0 +1,169 @@ +#include "srt_conn.hpp" +#include "time_help.h" +#include "stringex.hpp" +#include + +bool is_streamid_valid(const std::string& streamid) { + int mode = ERR_SRT_MODE; + std::string url_subpash; + + bool ret = get_streamid_info(streamid, mode, url_subpash); + if (!ret) { + return ret; + } + + if ((mode != PULL_SRT_MODE) && (mode != PUSH_SRT_MODE)) { + return false; + } + + if (url_subpash.empty()) { + return false; + } + + std::vector info_vec; + string_split(url_subpash, "/", info_vec); + if (info_vec.size() < 2) { + return false; + } + + return true; +} + +bool get_key_value(const std::string& info, std::string& key, std::string& value) { + size_t pos = info.find("="); + + if (pos == info.npos) { + return false; + } + + key = info.substr(0, pos); + value = info.substr(pos+1); + + if (key.empty() || value.empty()) { + return false; + } + return true; +} + +//eg. streamid=#!::h:live/livestream,m:publish +bool get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash) { + std::vector info_vec; + std::string real_streamid; + + size_t pos = streamid.find("#!::h"); + if (pos != 0) { + return false; + } + real_streamid = streamid.substr(4); + + string_split(real_streamid, ",", info_vec); + if (info_vec.size() < 2) { + return false; + } + + for (int index = 0; index < info_vec.size(); index++) { + std::string key; + std::string value; + + bool ret = get_key_value(info_vec[index], key, value); + if (!ret) { + continue; + } + + if (key == "h") { + url_subpash = value;//eg. h=live/stream + } else if (key == "m") { + std::string mode_str = string_lower(value);//m=publish or m=request + if (mode_str == "publish") { + mode = PUSH_SRT_MODE; + } else if (mode_str == "request") { + mode = PULL_SRT_MODE; + } else { + mode = ERR_SRT_MODE; + return false; + } + } else {//not suport + continue; + } + } + + return true; +} + +srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn_fd), + _streamid(streamid) { + get_streamid_info(streamid, _mode, _url_subpath); + _update_timestamp = now_ms(); + + std::vector path_vec; + + string_split(_url_subpath, "/", path_vec); + if (path_vec.size() >= 3) { + _vhost = path_vec[0]; + } else { + _vhost = "__default_host__"; + } + srs_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s", + streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str()); +} + +srt_conn::~srt_conn() { + close(); +} + +std::string srt_conn::get_vhost() { + return _vhost; +} + +void srt_conn::update_timestamp(long long now_ts) { + _update_timestamp = now_ts; +} + +long long srt_conn::get_last_ts() { + return _update_timestamp; +} + +void srt_conn::close() { + if (_conn_fd == SRT_INVALID_SOCK) { + return; + } + srt_close(_conn_fd); + _conn_fd = SRT_INVALID_SOCK; +} + +SRTSOCKET srt_conn::get_conn() { + return _conn_fd; +} +int srt_conn::get_mode() { + return _mode; +} + +std::string srt_conn::get_streamid() { + return _streamid; +} + +std::string srt_conn::get_subpath() { + return _url_subpath; +} + +int srt_conn::read(unsigned char* data, int len) { + int ret = 0; + + ret = srt_recv(_conn_fd, (char*)data, len); + if (ret <= 0) { + srs_error("srt read error:%d, socket fd:%d", ret, _conn_fd); + return ret; + } + return ret; +} + +int srt_conn::write(unsigned char* data, int len) { + int ret = 0; + + ret = srt_send(_conn_fd, (char*)data, len); + if (ret <= 0) { + srs_error("srt write error:%d, socket fd:%d", ret, _conn_fd); + return ret; + } + return ret; +} \ No newline at end of file diff --git a/trunk/src/srt/srt_conn.hpp b/trunk/src/srt/srt_conn.hpp new file mode 100644 index 0000000000..adc79eaf47 --- /dev/null +++ b/trunk/src/srt/srt_conn.hpp @@ -0,0 +1,50 @@ +#ifndef SRT_CONN_H +#define SRT_CONN_H +#include "stringex.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ERR_SRT_MODE 0x00 +#define PULL_SRT_MODE 0x01 +#define PUSH_SRT_MODE 0x02 + +bool is_streamid_valid(const std::string& streamid); +bool get_key_value(const std::string& info, std::string& key, std::string& value); +bool get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash); + +class srt_conn { +public: + srt_conn(SRTSOCKET conn_fd, const std::string& streamid); + ~srt_conn(); + + void close(); + SRTSOCKET get_conn(); + int get_mode(); + std::string get_streamid(); + std::string get_subpath(); + std::string get_vhost(); + int read(unsigned char* data, int len); + int write(unsigned char* data, int len); + + void update_timestamp(long long now_ts); + long long get_last_ts(); + +private: + SRTSOCKET _conn_fd; + std::string _streamid; + std::string _url_subpath; + std::string _vhost; + int _mode; + long long _update_timestamp; +}; + +typedef std::shared_ptr SRT_CONN_PTR; + +#endif //SRT_CONN_H \ No newline at end of file diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp new file mode 100644 index 0000000000..075064b142 --- /dev/null +++ b/trunk/src/srt/srt_data.cpp @@ -0,0 +1,31 @@ +#include "srt_data.hpp" +#include + +SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len) + ,_key_path(path) { + _data_p = new unsigned char[len]; + memset(_data_p, 0, len); +} + +SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len) + ,_key_path(path) +{ + _data_p = new unsigned char[len]; + memcpy(_data_p, data_p, len); +} + +SRT_DATA_MSG::~SRT_DATA_MSG() { + delete _data_p; +} + +std::string SRT_DATA_MSG::get_path() { + return _key_path; +} + +unsigned int SRT_DATA_MSG::data_len() { + return _len; +} + +unsigned char* SRT_DATA_MSG::get_data() { + return _data_p; +} diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp new file mode 100644 index 0000000000..ab9cf81ea5 --- /dev/null +++ b/trunk/src/srt/srt_data.hpp @@ -0,0 +1,24 @@ +#ifndef SRT_DATA_H +#define SRT_DATA_H +#include +#include + +class SRT_DATA_MSG { +public: + SRT_DATA_MSG(unsigned int len, const std::string& path); + SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path); + ~SRT_DATA_MSG(); + + unsigned int data_len(); + unsigned char* get_data(); + std::string get_path(); + +private: + unsigned int _len; + unsigned char* _data_p; + std::string _key_path; +}; + +typedef std::shared_ptr SRT_DATA_MSG_PTR; + +#endif \ No newline at end of file diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp new file mode 100644 index 0000000000..ea25b2c831 --- /dev/null +++ b/trunk/src/srt/srt_handle.cpp @@ -0,0 +1,473 @@ + +#include "srt_handle.hpp" +#include "time_help.h" +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +static bool MONITOR_STATICS_ENABLE = false; +static long long MONITOR_TIMEOUT = 5000; +const unsigned int DEF_DATA_SIZE = 188*7; +const long long CHECK_ALIVE_INTERVAL = 10*1000; +const long long CHECK_ALIVE_TIMEOUT = 15*1000; + +long long srt_now_ms = 0; + +srt_handle::srt_handle():_run_flag(false) + ,_last_timestamp(0) + ,_last_check_alive_ts(0) { +} + +srt_handle::~srt_handle() { + +} + +int srt_handle::start() { + _handle_pollid = srt_epoll_create(); + if (_handle_pollid < -1) { + srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid); + return -1; + } + + _run_flag = true; + srs_trace("srt handle is starting..."); + _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); + + return 0; +} + +void srt_handle::stop() { + _run_flag = false; + _work_thread_ptr->join(); + return; +} + +void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { + SRT_TRACEBSTATS mon; + srt_bstats(srtsocket, &mon, 1); + std::ostringstream output; + long long now_ul = now_ms(); + + if (!MONITOR_STATICS_ENABLE) { + return; + } + if (_last_timestamp == 0) { + _last_timestamp = now_ul; + return; + } + + if ((now_ul - _last_timestamp) < MONITOR_TIMEOUT) { + return; + } + _last_timestamp = now_ul; + output << "======= SRT STATS: sid=" << streamid << std::endl; + output << "PACKETS SENT: " << std::setw(11) << mon.pktSent << " RECEIVED: " << std::setw(11) << mon.pktRecv << std::endl; + output << "LOST PKT SENT: " << std::setw(11) << mon.pktSndLoss << " RECEIVED: " << std::setw(11) << mon.pktRcvLoss << std::endl; + output << "REXMIT SENT: " << std::setw(11) << mon.pktRetrans << " RECEIVED: " << std::setw(11) << mon.pktRcvRetrans << std::endl; + output << "DROP PKT SENT: " << std::setw(11) << mon.pktSndDrop << " RECEIVED: " << std::setw(11) << mon.pktRcvDrop << std::endl; + output << "RATE SENDING: " << std::setw(11) << mon.mbpsSendRate << " RECEIVING: " << std::setw(11) << mon.mbpsRecvRate << std::endl; + output << "BELATED RECEIVED: " << std::setw(11) << mon.pktRcvBelated << " AVG TIME: " << std::setw(11) << mon.pktRcvAvgBelatedTime << std::endl; + output << "REORDER DISTANCE: " << std::setw(11) << mon.pktReorderDistance << std::endl; + output << "WINDOW FLOW: " << std::setw(11) << mon.pktFlowWindow << " CONGESTION: " << std::setw(11) << mon.pktCongestionWindow << " FLIGHT: " << std::setw(11) << mon.pktFlightSize << std::endl; + output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl; + output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl; + + srs_trace("\r\n%s", output.str().c_str()); + return; +} + +void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) { + _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + + auto iter = _streamid_map.find(stream_id); + if (iter == _streamid_map.end()) { + std::unordered_map srtsocket_map; + srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + + _streamid_map.insert(std::make_pair(stream_id, srtsocket_map)); + srs_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str()); + } else { + iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + srs_trace("add new puller fd:%d, streamid:%s, size:%d", + conn_ptr->get_conn(), stream_id.c_str(), iter->second.size()); + } + + return; +} + +void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) { + srs_warn("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str()); + srt_epoll_remove_usock(_handle_pollid, srtsocket); + + auto streamid_iter = _streamid_map.find(stream_id); + if (streamid_iter != _streamid_map.end()) { + auto srtsocket_map = streamid_iter->second; + if (srtsocket_map.size() == 0) { + _streamid_map.erase(stream_id); + } else if (srtsocket_map.size() == 1) { + srtsocket_map.erase(srtsocket); + _streamid_map.erase(stream_id); + } else { + srtsocket_map.erase(srtsocket); + } + } else { + assert(0); + } + + auto conn_iter = _conn_map.find(srtsocket); + if (conn_iter != _conn_map.end()) { + _conn_map.erase(conn_iter); + return; + } else { + assert(0); + } + + return; +} + +SRT_CONN_PTR srt_handle::get_srt_conn(SRTSOCKET conn_srt_socket) { + SRT_CONN_PTR ret_conn; + + auto iter = _conn_map.find(conn_srt_socket); + if (iter == _conn_map.end()) { + return ret_conn; + } + + ret_conn = iter->second; + + return ret_conn; +} + +void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { + int val_i; + int opt_len = sizeof(int); + + val_i = 1000; + srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len); + val_i = 2048; + srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len); + + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len); + srs_trace("srto SRTO_LATENCY=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len); + srs_trace("srto SRTO_SNDBUF=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len); + srs_trace("srto SRTO_RCVBUF=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, &opt_len); + srs_trace("srto SRTO_MAXBW=%d", val_i); + + if (conn_ptr->get_mode() == PULL_SRT_MODE) { + add_new_puller(conn_ptr, conn_ptr->get_subpath()); + } else { + if(add_new_pusher(conn_ptr) == false) { + srs_trace("push connection is repeated and rejected, fd:%d, streamid:%s", + conn_ptr->get_conn(), conn_ptr->get_streamid().c_str()); + conn_ptr->close(); + return; + } + } + srs_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events); + int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events); + if (ret < 0) { + srs_error("srt handle run add epoll error:%d", ret); + return; + } + + return; +} + +int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) { + auto iter = _conn_map.find(conn_srt_socket); + if (iter == _conn_map.end()) { + return 0; + } + return iter->second->get_mode(); +} + +void srt_handle::insert_message_queue(request_message_t msg) { + std::unique_lock lock(_queue_mutex); + _message_queue.push(msg); +} + +bool srt_handle::get_message_from_queue(request_message_t& msg) { + std::unique_lock lock(_queue_mutex); + bool ret = false; + + if (_message_queue.empty()) { + return ret; + } + ret = true; + msg = _message_queue.front(); + _message_queue.pop(); + + return ret; +} + +void srt_handle::onwork() +{ + const unsigned int SRT_FD_MAX = 1024; + SRT_SOCKSTATUS status = SRTS_INIT; + std::string streamid; + int ret; + const int64_t DEF_TIMEOUT_INTERVAL = 30; + + srs_trace("srt handle epoll work is starting..."); + while(_run_flag) + { + SRTSOCKET read_fds[SRT_FD_MAX]; + SRTSOCKET write_fds[SRT_FD_MAX]; + int rfd_num = SRT_FD_MAX; + int wfd_num = SRT_FD_MAX; + + srt_now_ms = now_ms(); + + request_message_t msg; + + if (get_message_from_queue(msg)) { + add_newconn(msg.conn_ptr, msg.events); + } + + check_alive(); + + ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num, + DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0); + if (ret < 0) { + srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld", + ret, srt_now_ms); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + continue; + } + + for (int index = 0; index < rfd_num; index++) + { + status = srt_getsockstate(read_fds[index]); + srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd", + rfd_num, status, streamid.c_str(), read_fds[index]); + handle_srt_socket(status, read_fds[index]); + } + + for (int index = 0; index < wfd_num; index++) + { + status = srt_getsockstate(write_fds[index]); + streamid = UDT::getstreamid(write_fds[index]); + srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd", + wfd_num, status, streamid.c_str(), write_fds[index]); + handle_srt_socket(status, write_fds[index]); + } + + } +} + +void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { + SRT_CONN_PTR srt_conn_ptr; + unsigned char data[DEF_DATA_SIZE]; + int ret; + srt_conn_ptr = get_srt_conn(conn_fd); + + if (!srt_conn_ptr) { + srs_error("handle_push_data fd:%d fail to find srt connection.", conn_fd); + return; + } + + if (status != SRTS_CONNECTED) { + srs_error("handle_push_data error status:%d fd:%d", status, conn_fd); + close_push_conn(conn_fd); + return; + } + + ret = srt_conn_ptr->read(data, DEF_DATA_SIZE); + if (ret <= 0) { + srs_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd); + close_push_conn(conn_fd); + return; + } + srt_conn_ptr->update_timestamp(srt_now_ms); + + srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); + + //send data to subscriber(players) + //streamid, play map + auto streamid_iter = _streamid_map.find(subpath); + if (streamid_iter == _streamid_map.end()) {//no puler + srs_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd); + return; + } + srs_info("receive data size(%d) from pusher(%d) to pullers, count:%d", + ret, conn_fd, streamid_iter->second.size()); + + for (auto puller_iter = streamid_iter->second.begin(); + puller_iter != streamid_iter->second.end(); + puller_iter++) { + auto player_conn = puller_iter->second; + if (!player_conn) { + srs_error("handle_push_data get srt connect error from fd:%d", puller_iter->first); + continue; + } + int write_ret = player_conn->write(data, ret); + srs_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first); + if (write_ret > 0) { + puller_iter->second->update_timestamp(srt_now_ms); + } + } + + return; +} + +void srt_handle::check_alive() { + long long diff_t; + std::list conn_list; + + if (_last_check_alive_ts == 0) { + _last_check_alive_ts = srt_now_ms; + return; + } + diff_t = srt_now_ms - _last_check_alive_ts; + if (diff_t < CHECK_ALIVE_INTERVAL) { + return; + } + + for (auto conn_iter = _conn_map.begin(); + conn_iter != _conn_map.end(); + conn_iter++) + { + long long timeout = srt_now_ms - conn_iter->second->get_last_ts(); + if (timeout > CHECK_ALIVE_TIMEOUT) { + conn_list.push_back(conn_iter->second); + } + } + + for (auto del_iter = conn_list.begin(); + del_iter != conn_list.end(); + del_iter++) + { + SRT_CONN_PTR conn_ptr = *del_iter; + if (conn_ptr->get_mode() == PUSH_SRT_MODE) { + srs_warn("check alive close pull connection fd:%d, streamid:%s", + conn_ptr->get_conn(), conn_ptr->get_subpath().c_str()); + close_push_conn(conn_ptr->get_conn()); + } else if (conn_ptr->get_mode() == PULL_SRT_MODE) { + srs_warn("check alive close pull connection fd:%d, streamid:%s", + conn_ptr->get_conn(), conn_ptr->get_subpath().c_str()); + close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_subpath()); + } else { + srs_error("check_alive get unkown srt mode:%d, fd:%d", + conn_ptr->get_mode(), conn_ptr->get_conn()); + assert(0); + } + } +} + +void srt_handle::close_push_conn(SRTSOCKET srtsocket) { + auto iter = _conn_map.find(srtsocket); + + if (iter != _conn_map.end()) { + SRT_CONN_PTR conn_ptr = iter->second; + auto push_iter = _push_conn_map.find(conn_ptr->get_subpath()); + if (push_iter != _push_conn_map.end()) { + _push_conn_map.erase(push_iter); + } + _conn_map.erase(iter); + conn_ptr->close(); + } + + srt_epoll_remove_usock(_handle_pollid, srtsocket); + + return; +} + +bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) { + auto push_iter = _push_conn_map.find(conn_ptr->get_subpath()); + if (push_iter != _push_conn_map.end()) { + return false; + } + _push_conn_map.insert(std::make_pair(conn_ptr->get_subpath(), conn_ptr)); + _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + srs_trace("srt_handle add new pusher streamid:%s, subpath:%s", + conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str()); + return true; +} + +void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { + srs_info("handle_pull_data status:%d, subpath:%s, fd:%d", + status, subpath.c_str(), conn_fd); + auto conn_ptr = get_srt_conn(conn_fd); + if (!conn_ptr) { + srs_error("handle_pull_data fail to find fd(%d)", conn_fd); + assert(0); + return; + } + conn_ptr->update_timestamp(srt_now_ms); +} + +void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) +{ + std::string subpath; + int mode; + auto conn_ptr = get_srt_conn(conn_fd); + + if (!conn_ptr) { + if (status != SRTS_CLOSED) { + srs_error("handle_srt_socket find srt connection error, fd:%d, status:%d", + conn_fd, status); + } + return; + } + bool ret = get_streamid_info(conn_ptr->get_streamid(), mode, subpath); + if (!ret) { + conn_ptr->close(); + conn_ptr = nullptr; + return; + } + + if (mode == PUSH_SRT_MODE) { + switch (status) + { + case SRTS_CONNECTED: + { + handle_push_data(status, subpath, conn_fd); + break; + } + case SRTS_BROKEN: + { + srs_warn("srt push disconnected event fd:%d, streamid:%s", + conn_fd, conn_ptr->get_streamid().c_str()); + close_push_conn(conn_fd); + break; + } + default: + srs_error("push mode unkown status:%d, fd:%d", status, conn_fd); + break; + } + } else if (mode == PULL_SRT_MODE) { + switch (status) + { + case SRTS_CONNECTED: + { + handle_pull_data(status, subpath, conn_fd); + break; + } + case SRTS_BROKEN: + { + srs_warn("srt pull disconnected fd:%d, streamid:%s", + conn_fd, conn_ptr->get_streamid().c_str()); + close_pull_conn(conn_fd, subpath); + break; + } + default: + srs_error("pull mode unkown status:%d, fd:%d", status, conn_fd); + break; + } + } else { + assert(0); + } + return; +} \ No newline at end of file diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp new file mode 100644 index 0000000000..da555b7149 --- /dev/null +++ b/trunk/src/srt/srt_handle.hpp @@ -0,0 +1,80 @@ +#ifndef SRT_HANDLE_H +#define SRT_HANDLE_H +#include + +#include +#include +#include +#include +#include +#include + +#include "srt_conn.hpp" +#include "srt_to_rtmp.hpp" + +typedef struct { + SRT_CONN_PTR conn_ptr; + int events; +} request_message_t; + +class srt_handle { +public: + srt_handle(); + ~srt_handle(); + + int start();//create srt epoll and create epoll thread + void stop();//close srt epoll and end epoll thread + + void insert_message_queue(request_message_t msg); + bool get_message_from_queue(request_message_t& msg); + +private: + //add new srt connection into epoll event + void add_newconn(SRT_CONN_PTR conn_ptr, int events); + //get srt conn object by srt socket + SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); + //get srt connect mode: push or pull + int get_srt_mode(SRTSOCKET conn_srt_socket); + + void onwork();//epoll thread loop + //handle recv/send srt socket + void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); + void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); + void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); + + //add new puller into puller list and conn_map + void add_new_puller(SRT_CONN_PTR, std::string stream_id); + //remove pull srt from play list + void close_pull_conn(SRTSOCKET srtsocket, std::string stream_id); + + //add new pusher into pusher map: + bool add_new_pusher(SRT_CONN_PTR conn_ptr); + //remove push connection and remove epoll + void close_push_conn(SRTSOCKET srtsocket); + + //check srt connection whether it's still alive. + void check_alive(); + + //debug statics + void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); + +private: + bool _run_flag; + int _handle_pollid; + + std::unordered_map _conn_map;//save all srt connection: pull or push + std::shared_ptr _work_thread_ptr; + + //save push srt connection for prevent from repeat push connection + std::unordered_map _push_conn_map;//key:streamid, value:SRT_CONN_PTR + //streamid, play map + std::unordered_map> _streamid_map; + + std::mutex _queue_mutex; + std::queue _message_queue; + + long long _last_timestamp; + long long _last_check_alive_ts; +}; + +#endif //SRT_HANDLE_H \ No newline at end of file diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp new file mode 100644 index 0000000000..b938f3678f --- /dev/null +++ b/trunk/src/srt/srt_server.cpp @@ -0,0 +1,201 @@ +#include "srt_server.hpp" +#include "srt_handle.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +srt_server::srt_server(unsigned short port):listen_port(port) + ,server_socket(-1) +{ + handle_ptr = std::make_shared(); +} + +srt_server::~srt_server() +{ + +} + +int srt_server::init_srt() { + if (server_socket != -1) { + return -1; + } + + server_socket = srt_create_socket(); + sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons(listen_port); + + sockaddr* psa = (sockaddr*)&sa; + + int ret = srt_bind(server_socket, psa, sizeof(sa)); + if ( ret == SRT_ERROR ) + { + srt_close(server_socket); + srs_error("srt bind error: %d", ret); + return -1; + } + + ret = srt_listen(server_socket, 5); + if (ret == SRT_ERROR) + { + srt_close(server_socket); + srs_error("srt listen error: %d", ret); + return -2; + } + + _pollid = srt_epoll_create(); + if (_pollid < -1) { + srs_error("srt server srt_epoll_create error, port=%d", listen_port); + return -1; + } + + int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; + ret = srt_epoll_add_usock(_pollid, server_socket, &events); + if (ret < 0) { + srs_error("srt server run add epoll error:%d", ret); + return ret; + } + + srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket); + + return 0; +} + +int srt_server::start() +{ + int ret; + + if ((ret = init_srt()) < 0) { + return ret; + } + ret = handle_ptr->start(); + if (ret < 0) { + return ret; + } + + run_flag = true; + srs_trace("srt server is starting... port(%d)", listen_port); + thread_run_ptr = std::make_shared(&srt_server::on_work, this); + return 0; +} + +void srt_server::stop() +{ + run_flag = false; + if (!thread_run_ptr) { + return; + } + thread_run_ptr->join(); + + handle_ptr->stop(); + return; +} + +void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { + SRTSOCKET conn_fd = -1; + sockaddr_in scl; + int sclen = sizeof(scl); + int conn_event;// = SRT_EPOLL_IN |SRT_EPOLL_OUT| SRT_EPOLL_ERR; + + switch(status) { + case SRTS_LISTENING: + { + conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); + if (conn_fd == -1) { + return; + } + //add new srt connect into srt handle + std::string streamid = UDT::getstreamid(conn_fd); + if (!is_streamid_valid(streamid)) { + srs_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd); + srt_close(conn_fd); + return; + } + SRT_CONN_PTR srt_conn_ptr = std::make_shared(conn_fd, streamid); + + std::string vhost_str = srt_conn_ptr->get_vhost(); + srs_trace("new srt connection streamid:%s, fd:%d, vhost:%s", + streamid.c_str(), conn_fd, vhost_str.c_str()); + SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true); + if (!vhost_p) { + srs_trace("srt streamid(%s): no vhost %s, fd:%d", + streamid.c_str(), vhost_str.c_str(), conn_fd); + srt_conn_ptr->close(); + return; + } + if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) { + //add SRT_EPOLL_IN for information notify + conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu + } else if (srt_conn_ptr->get_mode() == PUSH_SRT_MODE) { + conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR; + } else { + srs_trace("stream mode error, it shoulde be m=push or m=pull, streamid:%s", + srt_conn_ptr->get_streamid().c_str()); + srt_conn_ptr->close(); + return; + } + request_message_t msg = {srt_conn_ptr, conn_event}; + handle_ptr->insert_message_queue(msg); + break; + } + case SRTS_CONNECTED: + { + srs_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str()); + break; + } + case SRTS_BROKEN: + { + srt_epoll_remove_usock(_pollid, input_fd); + srt_close(input_fd); + srs_warn("srt close: socket=%d", input_fd); + break; + } + default: + { + srs_error("srt server unkown status:%d", status); + } + } +} + +void srt_server::on_work() +{ + const unsigned int SRT_FD_MAX = 100; + srs_trace("srt server is working port(%d)", listen_port); + while (run_flag) + { + SRTSOCKET read_fds[SRT_FD_MAX]; + SRTSOCKET write_fds[SRT_FD_MAX]; + int rfd_num = SRT_FD_MAX; + int wfd_num = SRT_FD_MAX; + + int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1, + nullptr, nullptr, nullptr, nullptr); + if (ret < 0) { + continue; + } + srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", + ret, rfd_num, wfd_num); + + for (int index = 0; index < rfd_num; index++) { + SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); + srt_handle_connection(status, read_fds[index], "read fd"); + } + + for (int index = 0; index < wfd_num; index++) { + SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); + srt_handle_connection(status, read_fds[index], "write fd"); + } + } +} diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp new file mode 100644 index 0000000000..7201b72633 --- /dev/null +++ b/trunk/src/srt/srt_server.hpp @@ -0,0 +1,37 @@ +#ifndef SRT_SERVER_H +#define SRT_SERVER_H +#include + +#include +#include + +class srt_handle; + +class srt_server { +public: + srt_server(unsigned short port); + ~srt_server(); + + int start();//init srt handl and create srt main thread loop + void stop();//stop srt main thread loop + +private: + //init srt socket and srt epoll + int init_srt(); + //srt main epoll loop + void on_work(); + //accept new srt connection + void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); + +private: + unsigned short listen_port; + SRTSOCKET server_socket; + int _pollid; + bool run_flag; + std::shared_ptr thread_run_ptr; + std::shared_ptr handle_ptr; +}; + +typedef std::shared_ptr SRT_SERVER_PTR; + +#endif//SRT_SERVER_H \ No newline at end of file diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp new file mode 100644 index 0000000000..ecb348b201 --- /dev/null +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -0,0 +1,446 @@ +#include "srt_to_rtmp.hpp" +#include +#include +#include +#include +#include +#include +#include "stringex.hpp" + +std::shared_ptr srt2rtmp::s_srt2rtmp_ptr; + +std::shared_ptr srt2rtmp::get_instance() { + if (!s_srt2rtmp_ptr) { + s_srt2rtmp_ptr = std::make_shared(); + } + return s_srt2rtmp_ptr; +} + +srt2rtmp::srt2rtmp() { + +} + +srt2rtmp::~srt2rtmp() { + release(); +} + +srs_error_t srt2rtmp::init() { + srs_error_t err = srs_success; + + if (_trd_ptr.get() != nullptr) { + return srs_error_wrap(err, "don't start thread again"); + } + + _trd_ptr = std::make_shared("srt2rtmp", this); + + if ((err = _trd_ptr->start()) != srs_success) { + return srs_error_wrap(err, "start thread"); + } + srs_trace("srt2rtmp start coroutine..."); + + return err; +} + +void srt2rtmp::release() { + if (!_trd_ptr) { + return; + } + _trd_ptr->stop(); + _trd_ptr = nullptr; +} + +void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) { + std::unique_lock locker(_mutex); + + SRT_DATA_MSG_PTR msg_ptr = std::make_shared(data_p, len, key_path); + _msg_queue.push(msg_ptr); + //_notify_cond.notify_one(); + return; +} + +SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { + std::unique_lock locker(_mutex); + SRT_DATA_MSG_PTR msg_ptr; + + if (_msg_queue.empty()) + { + return msg_ptr; + } + //while (_msg_queue.empty()) { + // _notify_cond.wait(locker); + //} + + msg_ptr = _msg_queue.front(); + _msg_queue.pop(); + return msg_ptr; +} + +//the cycle is running in srs coroutine +srs_error_t srt2rtmp::cycle() { + srs_error_t err = srs_success; + + while(true) { + SRT_DATA_MSG_PTR msg_ptr = get_data_message(); + + if (!msg_ptr) { + srs_usleep((30 * SRS_UTIME_MILLISECONDS)); + } else { + handle_ts_data(msg_ptr); + } + + if ((err = _trd_ptr->pull()) != srs_success) { + return srs_error_wrap(err, "forwarder"); + } + } +} + +void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { + RTMP_CLIENT_PTR rtmp_ptr; + auto iter = _rtmp_client_map.find(data_ptr->get_path()); + if (iter == _rtmp_client_map.end()) { + srs_trace("new rtmp client for srt upstream, key_path:%s", data_ptr->get_path().c_str()); + rtmp_ptr = std::make_shared(data_ptr->get_path()); + _rtmp_client_map.insert(std::make_pair(data_ptr->get_path(), rtmp_ptr)); + } else { + rtmp_ptr = iter->second; + } + + rtmp_ptr->receive_ts_data(data_ptr); + + return; +} + +rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) + , _connect_flag(false) { + _ts_demux_ptr = std::make_shared(); + _avc_ptr = std::make_shared(); + _aac_ptr = std::make_shared(); + std::vector ret_vec; + + string_split(key_path, "/", ret_vec); + + if (ret_vec.size() >= 3) { + _vhost = ret_vec[0]; + _appname = ret_vec[1]; + _streamname = ret_vec[2]; + } else { + _vhost = "DEFAULT_VHOST"; + _appname = ret_vec[0]; + _streamname = ret_vec[1]; + } + char url_sz[128]; + sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s", + _appname.c_str(), _vhost.c_str(), _streamname.c_str()); + _url = url_sz; + + _h264_sps_changed = false; + _h264_pps_changed = false; + _h264_sps_pps_sent = false; + srs_trace("rtmp client construct url:%s", url_sz); +} + +rtmp_client::~rtmp_client() { + +} + +void rtmp_client::close() { + _connect_flag = false; + if (!_rtmp_conn_ptr) { + return; + } + _rtmp_conn_ptr->close(); + _rtmp_conn_ptr = nullptr; + +} + +srs_error_t rtmp_client::connect() { + srs_error_t err = srs_success; + srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; + srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + + if (_connect_flag) { + return srs_success; + } + + if (_rtmp_conn_ptr.get() != nullptr) { + return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.", + _url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + _rtmp_conn_ptr = std::make_shared(_url, cto, sto); + + if ((err = _rtmp_conn_ptr->connect()) != srs_success) { + close(); + return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", + _url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + if ((err = _rtmp_conn_ptr->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { + close(); + return srs_error_wrap(err, "publish error, url:%s", _url.c_str()); + } + _connect_flag = true; + return err; +} + +void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { + _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback + return; +} + +srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) { + srs_error_t err = srs_success; + + // TODO: FIMXE: there exists bug, see following comments. + // when sps or pps changed, update the sequence header, + // for the pps maybe not changed while sps changed. + // so, we must check when each video ts message frame parsed. + if (!_h264_sps_changed || !_h264_pps_changed) { + return err; + } + + // h264 raw to h264 packet. + std::string sh; + if ((err = _avc_ptr->mux_sequence_header(_h264_sps, _h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + + // h264 packet to flv packet. + int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; + int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; + char* flv = NULL; + int nb_flv = 0; + if ((err = _avc_ptr->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); + } + + // reset sps and pps. + _h264_sps_changed = false; + _h264_pps_changed = false; + _h264_sps_pps_sent = true; + + return err; +} + +srs_error_t rtmp_client::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) { + srs_error_t err = srs_success; + + // when sps or pps not sent, ignore the packet. + // @see https://github.com/ossrs/srs/issues/203 + if (!_h264_sps_pps_sent) { + return srs_error_new(ERROR_H264_DROP_BEFORE_SPS_PPS, "drop sps/pps"); + } + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsVideoAvcFrameTypeKeyFrame; + } + + std::string ibp; + if ((err = _avc_ptr->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux frame"); + } + + int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; + char* flv = NULL; + int nb_flv = 0; + if ((err = _avc_ptr->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); +} + +srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) { + srs_error_t err = srs_success; + + char* data = NULL; + int size = 0; + if ((err = _aac_ptr->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); + } + + return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); +} + +srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { + srs_error_t err = srs_success; + SrsSharedPtrMessage* msg = NULL; + + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + srs_assert(msg); + + // send out encoded msg. + if ((err = _rtmp_conn_ptr->send_and_free_message(msg)) != srs_success) { + close(); + return srs_error_wrap(err, "send messages"); + } + + return err; +} + +srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + // send each frame. + while (!avs_ptr->empty()) { + char* frame = NULL; + int frame_size = 0; + if ((err = _avc_ptr->annexb_demux(avs_ptr.get(), &frame, &frame_size)) != srs_success) { + return srs_error_wrap(err, "demux annexb"); + } + + //srs_trace_data(frame, frame_size, "video annexb demux:"); + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // ignore the nalu type sps(7), pps(8), aud(9) + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) { + continue; + } + + // for sps + if (_avc_ptr->is_sps(frame, frame_size)) { + std::string sps; + if ((err = _avc_ptr->sps_demux(frame, frame_size, sps)) != srs_success) { + return srs_error_wrap(err, "demux sps"); + } + + if (_h264_sps == sps) { + continue; + } + _h264_sps_changed = true; + _h264_sps = sps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + continue; + } + + // for pps + if (_avc_ptr->is_pps(frame, frame_size)) { + std::string pps; + if ((err = _avc_ptr->pps_demux(frame, frame_size, pps)) != srs_success) { + return srs_error_wrap(err, "demux pps"); + } + + if (_h264_pps == pps) { + continue; + } + _h264_pps_changed = true; + _h264_pps = pps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + continue; + } + + // ibp frame. + // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. + srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { + return srs_error_wrap(err, "write frame"); + } + } + + return err; +} + +srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + // send each frame. + while (!avs_ptr->empty()) { + char* frame = NULL; + int frame_size = 0; + SrsRawAacStreamCodec codec; + if ((err = _aac_ptr->adts_demux(avs_ptr.get(), &frame, &frame_size, codec)) != srs_success) { + return srs_error_wrap(err, "demux adts"); + } + //srs_trace("audio annexb demux sampling_frequency_index:%d, aac_packet_type:%d, sound_rate:%d, sound_size:%d", + // codec.sampling_frequency_index, codec.aac_packet_type, codec.sound_rate, + // codec.sound_size); + //srs_trace_data(frame, frame_size, "audio annexb demux:"); + // ignore invalid frame, + // * atleast 1bytes for aac to decode the data. + if (frame_size <= 0) { + continue; + } + + // generate sh. + if (_aac_specific_config.empty()) { + std::string sh; + if ((err = _aac_ptr->mux_sequence_header(&codec, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + _aac_specific_config = sh; + + codec.aac_packet_type = 0; + + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write raw audio frame"); + } + } + + // audio raw data. + codec.aac_packet_type = 1; + if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + + return err; +} + +void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, + uint64_t dts, uint64_t pts) +{ + if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) { + assert(0); + return; + } + + auto avs_ptr = std::make_shared((char*)data_ptr->get_data(), data_ptr->data_len()); + dts = dts / 90; + pts = pts / 90; + + if (media_type == STREAM_TYPE_VIDEO_H264) { + on_ts_video(avs_ptr, dts, pts); + } else if (media_type == STREAM_TYPE_AUDIO_AAC) { + on_ts_audio(avs_ptr, dts, pts); + } else { + srs_error("mpegts demux unkown stream type:0x%02x", media_type); + assert(0); + } + return; +} diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp new file mode 100644 index 0000000000..040b832b74 --- /dev/null +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -0,0 +1,101 @@ +#ifndef SRT_TO_RTMP_H +#define SRT_TO_RTMP_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "srt_data.hpp" +#include "ts_demux.hpp" + +#define SRT_VIDEO_MSG_TYPE 0x01 +#define SRT_AUDIO_MSG_TYPE 0x02 + +typedef std::shared_ptr RTMP_CONN_PTR; +typedef std::shared_ptr AVC_PTR; +typedef std::shared_ptr AAC_PTR; + +#define DEFAULT_VHOST "__default_host__" + +class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_from_this { +public: + rtmp_client(std::string key_path); + ~rtmp_client(); + + void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); + +private: + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); + + srs_error_t connect(); + void close(); + +private: + srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); + srs_error_t on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); + virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); + virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); + virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); + +private: + virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); + +private: + std::string _key_path; + std::string _url; + std::string _vhost; + std::string _appname; + std::string _streamname; + TS_DEMUX_PTR _ts_demux_ptr; + +private: + AVC_PTR _avc_ptr; + std::string _h264_sps; + bool _h264_sps_changed; + std::string _h264_pps; + bool _h264_pps_changed; + bool _h264_sps_pps_sent; +private: + std::string _aac_specific_config; + AAC_PTR _aac_ptr; +private: + RTMP_CONN_PTR _rtmp_conn_ptr; + bool _connect_flag; +}; + +typedef std::shared_ptr RTMP_CLIENT_PTR; + +class srt2rtmp : public ISrsCoroutineHandler { +public: + static std::shared_ptr get_instance(); + srt2rtmp(); + virtual ~srt2rtmp(); + + srs_error_t init(); + void release(); + + void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); + +private: + SRT_DATA_MSG_PTR get_data_message(); + virtual srs_error_t cycle(); + void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + +private: + static std::shared_ptr s_srt2rtmp_ptr; + std::shared_ptr _trd_ptr; + std::mutex _mutex; + //std::condition_variable_any _notify_cond; + std::queue _msg_queue; + + std::unordered_map _rtmp_client_map; +}; + +#endif \ No newline at end of file diff --git a/trunk/src/srt/stringex.hpp b/trunk/src/srt/stringex.hpp new file mode 100644 index 0000000000..36e8eb8b46 --- /dev/null +++ b/trunk/src/srt/stringex.hpp @@ -0,0 +1,39 @@ +#ifndef STRING_EX_H +#define STRING_EX_H +#include +#include +#include +#include +#include +#include + +inline int string_split(const std::string& input_str, const std::string& split_str, std::vector& output_vec) { + if (input_str.length() == 0) { + return 0; + } + + std::string tempString(input_str); + do { + + size_t pos = tempString.find(split_str); + if (pos == tempString.npos) { + output_vec.push_back(tempString); + break; + } + std::string seg_str = tempString.substr(0, pos); + tempString = tempString.substr(pos+split_str.size()); + output_vec.push_back(seg_str); + } while(tempString.size() > 0); + + return output_vec.size(); +} + +inline std::string string_lower(const std::string input_str) { + std::string output_str(input_str); + + std::transform(input_str.begin(), input_str.end(), output_str.begin(), ::tolower); + + return output_str; +} + +#endif//STRING_EX_H \ No newline at end of file diff --git a/trunk/src/srt/time_help.h b/trunk/src/srt/time_help.h new file mode 100644 index 0000000000..301139df7a --- /dev/null +++ b/trunk/src/srt/time_help.h @@ -0,0 +1,10 @@ +#ifndef TIME_HELP_H +#define TIME_HELP_H +#include + +inline long long now_ms() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); +} + +#endif //TIME_HELP_H \ No newline at end of file diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp new file mode 100644 index 0000000000..4c10f871b2 --- /dev/null +++ b/trunk/src/srt/ts_demux.cpp @@ -0,0 +1,577 @@ +#include "ts_demux.hpp" +#include +#include + +ts_demux::ts_demux():_data_total(0) + ,_last_pid(0) + ,_last_dts(0) + ,_last_pts(0) +{ + +} + +ts_demux::~ts_demux() { + +} + +int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback) +{ + int pos = 0; + int npos = 0; + ts_header ts_header_info; + + ts_header_info._sync_byte = data_p[pos]; + pos++; + + ts_header_info._transport_error_indicator = (data_p[pos]&0x80)>>7; + ts_header_info._payload_unit_start_indicator = (data_p[pos]&0x40)>>6; + ts_header_info._transport_priority = (data_p[pos]&0x20)>>5; + ts_header_info._PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; + pos += 2; + + ts_header_info._transport_scrambling_control = (data_p[pos]&0xC0)>>6; + ts_header_info._adaptation_field_control = (data_p[pos]&0x30)>>4; + ts_header_info._continuity_counter = (data_p[pos]&0x0F); + pos++; + npos = pos; + + //printf("ts header(0x%02x) payload_unit_start_indicator:%d, pid:%d, adaptation_field_control:%d, pos:%d\r\n", + // ts_header_info._sync_byte, + // ts_header_info._payload_unit_start_indicator, ts_header_info._PID, + // ts_header_info._adaptation_field_control, pos); + + adaptation_field* field_p = &(ts_header_info._adaptation_field_info); + // adaptation field + // 0x01 No adaptation_field, payload only + // 0x02 Adaptation_field only, no payload + // 0x03 Adaptation_field followed by payload + if( ts_header_info._adaptation_field_control == 2 + || ts_header_info._adaptation_field_control == 3 ){ + // adaptation_field() + field_p->_adaptation_field_length = data_p[pos]; + pos++; + + if( field_p->_adaptation_field_length > 0 ){ + field_p->_discontinuity_indicator = (data_p[pos]&0x80)>>7; + field_p->_random_access_indicator = (data_p[pos]&0x40)>>6; + field_p->_elementary_stream_priority_indicator = (data_p[pos]&0x20)>>5; + field_p->_PCR_flag = (data_p[pos]&0x10)>>4; + field_p->_OPCR_flag = (data_p[pos]&0x08)>>3; + field_p->_splicing_point_flag = (data_p[pos]&0x04)>>2; + field_p->_transport_private_data_flag = (data_p[pos]&0x02)>>1; + field_p->_adaptation_field_extension_flag = (data_p[pos]&0x01); + pos++; + + if( field_p->_PCR_flag == 1 ) { // PCR info + //program_clock_reference_base 33 uimsbf + //reserved 6 bslbf + //program_clock_reference_extension 9 uimsbf + pos += 6; + } + if( field_p->_OPCR_flag == 1 ) { + //original_program_clock_reference_base 33 uimsbf + //reserved 6 bslbf + //original_program_clock_reference_extension 9 uimsbf + pos += 6; + } + if( field_p->_splicing_point_flag == 1 ) { + //splice_countdown 8 tcimsbf + pos++; + } + if( field_p->_transport_private_data_flag == 1 ) { + //transport_private_data_length 8 uimsbf + field_p->_transport_private_data_length = data_p[pos]; + pos++; + memcpy(field_p->_private_data_byte, data_p + pos, field_p->_transport_private_data_length); + } + if( field_p->_adaptation_field_extension_flag == 1 ) { + //adaptation_field_extension_length 8 uimsbf + field_p->_adaptation_field_extension_length = data_p[pos]; + pos++; + //ltw_flag 1 bslbf + field_p->_ltw_flag = (data_p[pos]&0x80)>>7; + //piecewise_rate_flag 1 bslbf + field_p->_piecewise_rate_flag = (data_p[pos]&0x40)>>6; + //seamless_splice_flag 1 bslbf + field_p->_seamless_splice_flag = (data_p[pos]&0x20)>>5; + //reserved 5 bslbf + pos++; + if (field_p->_ltw_flag == 1) { + //ltw_valid_flag 1 bslbf + //ltw_offset 15 uimsbf + pos += 2; + } + if (field_p->_piecewise_rate_flag == 1) { + //reserved 2 bslbf + //piecewise_rate 22 uimsbf + pos += 3; + } + if (field_p->_seamless_splice_flag == 1) { + //splice_type 4 bslbf + //DTS_next_AU[32..30] 3 bslbf + //marker_bit 1 bslbf + //DTS_next_AU[29..15] 15 bslbf + //marker_bit 1 bslbf + //DTS_next_AU[14..0] 15 bslbf + //marker_bit 1 bslbf + pos += 5; + } + } + } + npos += sizeof(field_p->_adaptation_field_length) + field_p->_adaptation_field_length; + } + + if(ts_header_info._adaptation_field_control == 1 + || ts_header_info._adaptation_field_control == 3 ) { + // data_byte with placeholder + // payload parser + if(ts_header_info._PID == 0x00){ + // PAT // program association table + if(ts_header_info._payload_unit_start_indicator) { + pos++; + } + _pat._table_id = data_p[pos]; + pos++; + _pat._section_syntax_indicator = (data_p[pos]>>7)&0x01; + // skip 3 bits of 1 zero and 2 reserved + _pat._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; + pos += 2; + _pat._transport_stream_id = (data_p[pos]<<8)|data_p[pos+1]; + pos += 2; + // reserved 2 bits + _pat._version_number = (data_p[pos]&0x3E)>>1; + _pat._current_next_indicator = data_p[pos]&0x01; + pos++; + _pat._section_number = data_p[pos]; + pos++; + _pat._last_section_number = data_p[pos]; + assert(_pat._table_id == 0x00); + assert((188 - npos) > (_pat._section_length+3)); // PAT = section_length + 3 + pos++; + _pat._pid_vec.clear(); + for (;pos+4 <= _pat._section_length-5-4+9 + npos;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte ) + PID_INFO pid_info; + //program_number 16 uimsbf + pid_info._program_number = data_p[pos]<<8|data_p[pos+1]; + pos += 2; +// reserved 3 bslbf + + if (pid_info._program_number == 0) { +// // network_PID 13 uimsbf + pid_info._network_id = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; + //printf("#### network id:%d.\r\n", pid_info._network_id); + pos += 2; + } + else { +// // program_map_PID 13 uimsbf + pid_info._pid = (data_p[pos]<<8|data_p[pos+1])&0x1FFF; + //printf("#### pmt id:%d.\r\n", pid_info._pid); + pos += 2; + } + _pat._pid_vec.push_back(pid_info); + // network_PID and program_map_PID save to list + } +// CRC_32 use pat to calc crc32, eq + pos += 4; + }else if(ts_header_info._PID == 0x01){ + // CAT // conditional access table + }else if(ts_header_info._PID == 0x02){ + //TSDT // transport stream description table + }else if(ts_header_info._PID == 0x03){ + //IPMP // IPMP control information table + // 0x0004-0x000F Reserved + // 0x0010-0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes + }else if(ts_header_info._PID == 0x11){ + // SDT // https://en.wikipedia.org/wiki/Service_Description_Table / https://en.wikipedia.org/wiki/MPEG_transport_stream + }else if(is_pmt(ts_header_info._PID)) { + if(ts_header_info._payload_unit_start_indicator) + pos++; + _pmt._table_id = data_p[pos]; + pos++; + _pmt._section_syntax_indicator = (data_p[pos]>>7)&0x01; + // skip 3 bits of 1 zero and 2 reserved + _pmt._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; + pos += 2; + _pmt._program_number = (data_p[pos]<<8)|data_p[pos+1]; + pos += 2; + // reserved 2 bits + _pmt._version_number = (data_p[pos]&0x3E)>>1; + _pmt._current_next_indicator = data_p[pos]&0x01; + pos++; + _pmt._section_number = data_p[pos]; + pos++; + _pmt._last_section_number = data_p[pos]; + pos++; + // skip 3 bits for reserved 3 bslbf + _pmt._PCR_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //PCR_PID 13 uimsbf + pos += 2; + + //reserved 4 bslbf + _pmt._program_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;//program_info_length 12 uimsbf + pos += 2; + assert(_pmt._table_id==0x02); // 0x02, // TS_program_map_section + memcpy(_pmt._dscr, data_p+pos, _pmt._program_info_length); +// for (i = 0; i < N; i++) { +// descriptor() +// } + pos += _pmt._program_info_length; + _pmt._stream_pid_vec.clear(); + _pmt._pid2steamtype.clear(); + + for (; pos + 5 <= _pmt._section_length + 4 - 4 + npos; ) { // pos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32) + STREAM_PID_INFO pid_info; + pid_info._stream_type = data_p[pos];//stream_type 8 uimsbf 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video + pos++; + //reserved 3 bslbf + pid_info._elementary_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //elementary_PID 13 uimsbf + pos += 2; + //reserved 4 bslbf + pid_info._ES_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; //ES_info_length 12 uimsbf + pos += 2; + if( pos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 + npos ) + break; + int absES_info_length = pos + pid_info._ES_info_length; + for (; pos< absES_info_length; ) { + //descriptor() + int descriptor_tag = data_p[pos]; + (void)descriptor_tag; + pos++; + int descriptor_length = data_p[pos]; + pos++; + memcpy(pid_info._dscr, data_p + pos, descriptor_length); + pos += descriptor_length; + } + // save program_number(stream num) elementary_PID(PES PID) stream_type(stream codec) + //printf("pmt pid:%d, streamtype:%d, pos:%d\r\n", pid_info._elementary_PID, pid_info._stream_type, pos); + _pmt._stream_pid_vec.push_back(pid_info); + _pmt._pid2steamtype.insert(std::make_pair((unsigned short)pid_info._elementary_PID, pid_info._stream_type)); + } + pos += 4;//CRC_32 + }else if(ts_header_info._PID == 0x0042){ + // USER + }else if(ts_header_info._PID == 0x1FFF){ + // Null packet + }else{//pes packet or pure data packet + //bool isFound = false; + for (size_t i = 0; i < _pmt._stream_pid_vec.size(); i++) { + if(ts_header_info._PID == _pmt._stream_pid_vec[i]._elementary_PID){ + //isFound = true; + if(ts_header_info._payload_unit_start_indicator){ + unsigned char* ret_data_p = nullptr; + size_t ret_size = 0; + + //callback last media data in data buffer + on_callback(callback, _last_pid, key_path, _last_dts, _last_pts); + + pes_parse(data_p+npos, npos, &ret_data_p, ret_size, _last_dts, _last_pts); + if ((ret_data_p != nullptr) && (ret_size > 0)) { + insert_into_databuf(ret_data_p, ret_size, key_path, ts_header_info._PID); + } + }else{ + //fwrite(p, 1, 188-(npos+pos), pes_info[i].fd); + insert_into_databuf(data_p + npos, 188-npos, key_path, ts_header_info._PID); + } + } + } + //if(!isFound){ + // printf("unknown PID = %X \n", ts_header_info._PID); + //} + } + } + + return 0; +} +int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) +{ + int ret = -1; + std::string path; + + if (!data_ptr || (data_ptr->data_len() < 188) || (data_ptr->data_len()%188 != 0)) + { + return -1; + } + + + unsigned int count = data_ptr->data_len()/188; + path = data_ptr->get_path(); + for (unsigned int index = 0; index < count; index++) + { + ret = decode_unit(data_ptr->get_data() + 188*index, path, callback); + if (ret < 0) + { + break; + } + } + return ret; +} + +void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid) { + _last_pid = pid; + _data_total += data_size; + _data_buffer_vec.push_back(std::make_shared(data_p, data_size, key_path)); + return; +} + +void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, + uint64_t dts, uint64_t pts) { + if ((_data_total <=0 ) || (_data_buffer_vec.empty())) { + return; + } + + auto iter = _pmt._pid2steamtype.find(pid); + if (iter == _pmt._pid2steamtype.end()) { + return; + } + unsigned char stream_type = iter->second; + auto total_data_ptr = std::make_shared(_data_total, key_path); + size_t pos = 0; + + for (size_t index = 0; index < _data_buffer_vec.size(); index++) { + memcpy(total_data_ptr->get_data() + pos, + _data_buffer_vec[index]->get_data(), + _data_buffer_vec[index]->data_len()); + pos += _data_buffer_vec[index]->data_len(); + } + _data_buffer_vec.clear(); + _data_total = 0; + + callback->on_data_callback(total_data_ptr, stream_type, dts, pts); + return; +} + +bool ts_demux::is_pmt(unsigned short pid) { + for (size_t index = 0; index < _pat._pid_vec.size(); index++) { + if (_pat._pid_vec[index]._program_number != 0) { + if (_pat._pid_vec[index]._pid == pid) { + return true; + } + } + } + return false; +} + + +int ts_demux::pes_parse(unsigned char* p, size_t npos, + unsigned char** ret_pp, size_t& ret_size, + uint64_t& dts, uint64_t& pts) { + int pos = 0; + int packet_start_code_prefix = (p[pos]<<16)|(p[pos+1]<<8)|p[pos+2]; //packet_start_code_prefix 24 bslbf + pos += 3; + int stream_id = p[pos]; //stream_id 8 uimsbf + pos++; + //printf("pes parse %02x %02x.\r\n", p[pos], p[pos+1]); + int PES_packet_length = ((unsigned int)p[pos]<<8)|p[pos+1]; //PES_packet_length 16 uimsbf + (void)PES_packet_length; + pos += 2; + //printf("pes parse packet_start_code_prefix:%d, npos:%lu, PES_packet_length:%d, stream_id:%d.\r\n", + // packet_start_code_prefix, npos, PES_packet_length, stream_id); + assert(0x00000001 == packet_start_code_prefix); + if (stream_id != 188//program_stream_map 1011 1100 + && stream_id != 190//padding_stream 1011 1110 + && stream_id != 191//private_stream_2 1011 1111 + && stream_id != 240//ECM 1111 0000 + && stream_id != 241//EMM 1111 0001 + && stream_id != 255//program_stream_directory 1111 1111 + && stream_id != 242//DSMCC_stream 1111 0010 + && stream_id != 248//ITU-T Rec. H.222.1 type E stream 1111 1000 + ) + { + assert(0x80 == p[pos]); + //skip 2bits//'10' 2 bslbf + int PES_scrambling_control = (p[pos]&30)>>4; //PES_scrambling_control 2 bslbf + (void)PES_scrambling_control; + int PES_priority = (p[pos]&0x08)>>3; //PES_priority 1 bslbf + (void)PES_priority; + int data_alignment_indicator = (p[pos]&0x04)>>2;//data_alignment_indicator 1 bslbf + (void)data_alignment_indicator; + int copyright = (p[pos]&0x02)>>1; //copyright 1 bslbf + (void)copyright; + int original_or_copy = (p[pos]&0x01);//original_or_copy 1 bslbf + (void)original_or_copy; + pos++; + int PTS_DTS_flags = (p[pos]&0xC0)>>6; //PTS_DTS_flags 2 bslbf + int ESCR_flag = (p[pos]&0x20)>>5; // ESCR_flag 1 bslbf + int ES_rate_flag = (p[pos]&0x10)>>4;//ES_rate_flag 1 bslbf + int DSM_trick_mode_flag = (p[pos]&0x08)>>3;//DSM_trick_mode_flag 1 bslbf + int additional_copy_info_flag = (p[pos]&0x04)>>2; //additional_copy_info_flag 1 bslbf + int PES_CRC_flag = (p[pos]&0x02)>>1; //PES_CRC_flag 1 bslbf + int PES_extension_flag = (p[pos]&0x01);//PES_extension_flag 1 bslbf + pos++; + int PES_header_data_length = p[pos]; //PES_header_data_length 8 uimsbf + (void)PES_header_data_length; + pos++; + + if (PTS_DTS_flags == 2) { + // skip 4 bits '0010' 4 bslbf + // PTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // PTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // PTS [14..0] 15 bslbf + // marker_bit 1 bslbf + pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); + pos += 5; + } + if (PTS_DTS_flags == 3) { + // '0011' 4 bslbf + // PTS [32..30] 3 bslbf + // marker_bit 1 bslbf + //PTS [29..15] 15 bslbf + //marker_bit 1 bslbf + // PTS [14..0] 15 bslbf + // marker_bit 1 bslbf + pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); + pos += 5; + // '0001' 4 bslbf + // DTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // DTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // DTS [14..0] 15 bslbf + // marker_bit 1 bslbf + dts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F); + pos += 5; + } + if (ESCR_flag == 1) { + // reserved 2 bslbf + // ESCR_base[32..30] 3 bslbf + // marker_bit 1 bslbf + // ESCR_base[29..15] 15 bslbf + // marker_bit 1 bslbf + // ESCR_base[14..0] 15 bslbf + // marker_bit 1 bslbf + // ESCR_extension 9 uimsbf + // marker_bit 1 bslbf + uint64_t ESCR_base = ((((uint64_t)p[pos] >> 3) & 0x07) << 30) | (((uint64_t)p[pos] & 0x03) << 28) | ((uint64_t)p[pos + 1] << 20) | ((((uint64_t)p[pos + 2] >> 3) & 0x1F) << 15) | (((uint64_t)p[pos + 2] & 0x3) << 13) | ((uint64_t)p[pos + 3] << 5) | ((p[pos + 4] >> 3) & 0x1F); + int ESCR_extension = ((p[pos + 4] & 0x03) << 7) | ((p[pos + 5] >> 1) & 0x7F); + (void)ESCR_base; + (void)ESCR_extension; + pos += 6; + } + if (ES_rate_flag == 1) { + // marker_bit 1 bslbf + // ES_rate 22 uimsbf + // marker_bit 1 bslbf + int ES_rate = (p[pos]&0x7F)<<15 | (p[pos+1])<<7 | (p[pos+2]&0x7F)>>1; + (void)ES_rate; + pos += 3; + } + if (DSM_trick_mode_flag == 1) { // ignore + int trick_mode_control = (p[pos]&0xE0)>>5;//trick_mode_control 3 uimsbf + if ( trick_mode_control == 0/*fast_forward*/ ) { + // field_id 2 bslbf + // intra_slice_refresh 1 bslbf + // frequency_truncation 2 bslbf + } + else if ( trick_mode_control == 1/*slow_motion*/ ) { + //rep_cntrl 5 uimsbf + } + else if ( trick_mode_control == 2/*freeze_frame*/ ) { + // field_id 2 uimsbf + // reserved 3 bslbf + } + else if ( trick_mode_control == 3/*fast_reverse*/ ) { + // field_id 2 bslbf + // intra_slice_refresh 1 bslbf + // frequency_truncation 2 bslbf + }else if ( trick_mode_control == 4/*slow_reverse*/ ) { + // rep_cntrl 5 uimsbf + } + else{ + //reserved 5 bslbf + } + pos++; + } + if ( additional_copy_info_flag == 1) { // ignore + // marker_bit 1 bslbf + // additional_copy_info 7 bslbf + pos++; + } + if ( PES_CRC_flag == 1) { // ignore + // previous_PES_packet_CRC 16 bslbf + pos += 2; + } + if ( PES_extension_flag == 1) { // ignore + int PES_private_data_flag = (p[pos]&0x80)>>7;// PES_private_data_flag 1 bslbf + int pack_header_field_flag = (p[pos]&0x40)>>6;// pack_header_field_flag 1 bslbf + int program_packet_sequence_counter_flag = (p[pos]&0x20)>>5;// program_packet_sequence_counter_flag 1 bslbf + int P_STD_buffer_flag = (p[pos]&0x10)>>4; // P-STD_buffer_flag 1 bslbf + // reserved 3 bslbf + int PES_extension_flag_2 = (p[pos]&0x01);// PES_extension_flag_2 1 bslbf + pos++; + + if ( PES_private_data_flag == 1) { + // PES_private_data 128 bslbf + pos += 16; + } + if (pack_header_field_flag == 1) { + // pack_field_length 8 uimsbf + // pack_header() + } + if (program_packet_sequence_counter_flag == 1) { + // marker_bit 1 bslbf + // program_packet_sequence_counter 7 uimsbf + // marker_bit 1 bslbf + // MPEG1_MPEG2_identifier 1 bslbf + // original_stuff_length 6 uimsbf + pos += 2; + } + if ( P_STD_buffer_flag == 1) { + // '01' 2 bslbf + // P-STD_buffer_scale 1 bslbf + // P-STD_buffer_size 13 uimsbf + pos += 2; + } + if ( PES_extension_flag_2 == 1) { + // marker_bit 1 bslbf + int PES_extension_field_length = (p[pos]&0x7F);// PES_extension_field_length 7 uimsbf + pos++; + for (int i = 0; i < PES_extension_field_length; i++) { + // reserved 8 bslbf + pos++; + } + } + } + +// for (int i = 0; i < N1; i++) { + //stuffing_byte 8 bslbf +// rpos++; +// } +// for (int i = 0; i < N2; i++) { + //PES_packet_data_byte 8 bslbf +// rpos++; +// } + *ret_pp = p+pos; + ret_size = 188-(npos+pos); + //printf("pes parse body size:%lu, data:0x%02x 0x%02x 0x%02x 0x%02x 0x%02x 0x%02x, dts:%lu(%lu), pts:%lu(%lu)\r\n", + // ret_size, p[pos], p[pos+1], p[pos+2], p[pos+3], p[pos+4], p[pos+5], + // dts, dts/90, pts, pts/90); + } + else if ( stream_id == 188//program_stream_map 1011 1100 BC + || stream_id == 191//private_stream_2 1011 1111 BF + || stream_id == 240//ECM 1111 0000 F0 + || stream_id == 241//EMM 1111 0001 F1 + || stream_id == 255//program_stream_directory 1111 1111 FF + || stream_id == 242//DSMCC_stream 1111 0010 F2 + || stream_id == 248//ITU-T Rec. H.222.1 type E stream 1111 1000 F8 + ) { +// for (i = 0; i < PES_packet_length; i++) { + //PES_packet_data_byte 8 bslbf +// rpos++; +// } + *ret_pp = p+pos; + ret_size = 188-(npos+pos); + //fwrite(p, 1, 188-(npos+rpos), fd); + } + else if ( stream_id == 190//padding_stream 1011 1110 + ) { +// for (i = 0; i < PES_packet_length; i++) { + // padding_byte 8 bslbf +// rpos++; + *ret_pp = p+pos; + ret_size = 188-(npos+pos); +// } + } + + return pos; +} \ No newline at end of file diff --git a/trunk/src/srt/ts_demux.hpp b/trunk/src/srt/ts_demux.hpp new file mode 100644 index 0000000000..4d7c9fd5e6 --- /dev/null +++ b/trunk/src/srt/ts_demux.hpp @@ -0,0 +1,237 @@ +#ifndef TS_DEMUX_H +#define TS_DEMUX_H +#include "srt_data.hpp" +#include +#include +#include +#include + +/* mpegts stream type in ts pmt +Value Description +0x00 ITU-T | ISO/IEC Reserved +0x01 ISO/IEC 11172-2 Video (mpeg video v1) +0x02 ITU-T Rec. H.262 | ISO/IEC 13818-2 Video(mpeg video v2)or ISO/IEC 11172-2 constrained parameter video stream +0x03 ISO/IEC 11172-3 Audio (MPEG 1 Audio codec Layer I, Layer II and Layer III audio specifications) +0x04 ISO/IEC 13818-3 Audio (BC Audio Codec) +0x05 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 private_sections +0x06 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 PES packets containing private data +0x07 ISO/IEC 13522 MHEG +0x08 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Annex A DSM-CC +0x09 ITU-T Rec. H.222.1 +0x0A ISO/IEC 13818-6 type A +0x0B ISO/IEC 13818-6 type B +0x0C ISO/IEC 13818-6 type C +0x0D ISO/IEC 13818-6 type D +0x0E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 auxiliary +0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax +0x10 ISO/IEC 14496-2 Visual +0x11 ISO/IEC 14496-3 Audio with the LATM transport syntax as defined in ISO/IEC 14496-3/Amd.1 +0x12 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in PES packets +0x13 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in ISO/IEC 14496_sections +0x14 ISO/IEC 13818-6 Synchronized Download Protocol +0x15 Metadata carried in PES packets +0x16 Metadata carried in metadata_sections +0x17 Metadata carried in ISO/IEC 13818-6 Data Carousel +0x18 Metadata carried in ISO/IEC 13818-6 Object Carousel +0x19 Metadata carried in ISO/IEC 13818-6 Synchronized Download Protocol +0x1A IPMP stream (defined in ISO/IEC 13818-11, MPEG-2 IPMP) +0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video (h.264) +0x1C ISO/IEC 14496-3 Audio, without using any additional transport syntax, such as DST, ALS and SLS +0x1D ISO/IEC 14496-17 Text +0x1E Auxiliary video stream as defined in ISO/IEC 23002-3 (AVS) +0x1F-0x7E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Reserved +0x7F IPMP stream 0x80-0xFF User Private +*/ +#define STREAM_TYPE_VIDEO_MPEG1 0x01 +#define STREAM_TYPE_VIDEO_MPEG2 0x02 +#define STREAM_TYPE_AUDIO_MPEG1 0x03 +#define STREAM_TYPE_AUDIO_MPEG2 0x04 +#define STREAM_TYPE_PRIVATE_SECTION 0x05 +#define STREAM_TYPE_PRIVATE_DATA 0x06 +#define STREAM_TYPE_AUDIO_AAC 0x0f +#define STREAM_TYPE_AUDIO_AAC_LATM 0x11 +#define STREAM_TYPE_VIDEO_MPEG4 0x10 +#define STREAM_TYPE_METADATA 0x15 +#define STREAM_TYPE_VIDEO_H264 0x1b +#define STREAM_TYPE_VIDEO_HEVC 0x24 +#define STREAM_TYPE_VIDEO_CAVS 0x42 +#define STREAM_TYPE_VIDEO_VC1 0xea +#define STREAM_TYPE_VIDEO_DIRAC 0xd1 + +#define STREAM_TYPE_AUDIO_AC3 0x81 +#define STREAM_TYPE_AUDIO_DTS 0x82 +#define STREAM_TYPE_AUDIO_TRUEHD 0x83 +#define STREAM_TYPE_AUDIO_EAC3 0x87 + +class ts_media_data_callback_I { +public: + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; +}; + +typedef std::shared_ptr TS_DATA_CALLBACK_PTR; + +class adaptation_field { +public: + adaptation_field(){}; + ~adaptation_field(){}; + +public: + unsigned char _adaptation_field_length; + + unsigned char _discontinuity_indicator:1; + unsigned char _random_access_indicator:1; + unsigned char _elementary_stream_priority_indicator:1; + unsigned char _PCR_flag:1; + unsigned char _OPCR_flag:1; + unsigned char _splicing_point_flag:1; + unsigned char _transport_private_data_flag:1; + unsigned char _adaptation_field_extension_flag:1; + + //if(PCR_flag == '1') + unsigned long _program_clock_reference_base;//33 bits + unsigned short _program_clock_reference_extension;//9bits + //if (OPCR_flag == '1') + unsigned long _original_program_clock_reference_base;//33 bits + unsigned short _original_program_clock_reference_extension;//9bits + //if (splicing_point_flag == '1') + unsigned char _splice_countdown; + //if (transport_private_data_flag == '1') + unsigned char _transport_private_data_length; + unsigned char _private_data_byte[256]; + //if (adaptation_field_extension_flag == '1') + unsigned char _adaptation_field_extension_length; + unsigned char _ltw_flag; + unsigned char _piecewise_rate_flag; + unsigned char _seamless_splice_flag; + unsigned char _reserved0; + //if (ltw_flag == '1') + unsigned short _ltw_valid_flag:1; + unsigned short _ltw_offset:15; + //if (piecewise_rate_flag == '1') + unsigned int _piecewise_rate;//22bits + //if (seamless_splice_flag == '1') + unsigned char _splice_type;//4bits + unsigned char _DTS_next_AU1;//3bits + unsigned char _marker_bit1;//1bit + unsigned short _DTS_next_AU2;//15bit + unsigned char _marker_bit2;//1bit + unsigned short _DTS_next_AU3;//15bit +}; + +class ts_header { +public: + ts_header(){} + ~ts_header(){} + +public: + unsigned char _sync_byte; + + unsigned short _transport_error_indicator:1; + unsigned short _payload_unit_start_indicator:1; + unsigned short _transport_priority:1; + unsigned short _PID:13; + + unsigned char _transport_scrambling_control:2; + unsigned char _adaptation_field_control:2; + unsigned char _continuity_counter:4; + + adaptation_field _adaptation_field_info; +}; + +typedef struct { + unsigned short _program_number; + unsigned short _pid; + unsigned short _network_id; +} PID_INFO; + +class pat_info { +public: + pat_info(){}; + ~pat_info(){}; + +public: + unsigned char _table_id; + + unsigned short _section_syntax_indicator:1; + unsigned short _reserved0:1; + unsigned short _reserved1:2; + unsigned short _section_length:12; + + unsigned short _transport_stream_id; + + unsigned char _reserved3:2; + unsigned char _version_number:5; + unsigned char _current_next_indicator:1; + + unsigned char _section_number; + unsigned char _last_section_number; + std::vector _pid_vec; +}; + +typedef struct { + unsigned char _stream_type; + unsigned short _reserved1:3; + unsigned short _elementary_PID:13; + unsigned short _reserved:4; + unsigned short _ES_info_length; + unsigned char _dscr[4096]; + unsigned int _crc_32; +} STREAM_PID_INFO; + +class pmt_info { +public: + pmt_info(){}; + ~pmt_info(){}; +public: + unsigned char _table_id; + unsigned short _section_syntax_indicator:1; + unsigned short _reserved1:1; + unsigned short _reserved2:2; + unsigned short _section_length:12; + unsigned short _program_number:16; + unsigned char _reserved:2; + unsigned char _version_number:5; + unsigned char _current_next_indicator:5; + unsigned char _section_number; + unsigned char _last_section_number; + unsigned short _reserved3:3; + unsigned short _PCR_PID:13; + unsigned short _reserved4:4; + unsigned short _program_info_length:12; + unsigned char _dscr[4096]; + + std::unordered_map _pid2steamtype; + std::vector _stream_pid_vec; +}; + +class ts_demux { +public: + ts_demux(); + ~ts_demux(); + + int decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback); + +private: + int decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback); + bool is_pmt(unsigned short pmt_id); + int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size, + uint64_t& dts, uint64_t& pts); + void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid); + void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, + std::string key_path, uint64_t dts, uint64_t pts); + +private: + std::string _key_path;//only for srt + + pat_info _pat; + pmt_info _pmt; + std::vector _data_buffer_vec; + size_t _data_total; + unsigned short _last_pid; + uint64_t _last_dts; + uint64_t _last_pts; +}; + +typedef std::shared_ptr TS_DEMUX_PTR; + +#endif \ No newline at end of file diff --git a/trunk/src/srt/ts_demux_test.cpp b/trunk/src/srt/ts_demux_test.cpp new file mode 100644 index 0000000000..e68d06218b --- /dev/null +++ b/trunk/src/srt/ts_demux_test.cpp @@ -0,0 +1,55 @@ +#include "ts_demux.hpp" +#include +#include + +#define TS_MAX 188 + +class media_data_get : public ts_media_data_callback_I { +public: + media_data_get() {}; + virtual ~media_data_get() {}; + +public: + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type + , uint64_t dts, uint64_t pts) { + printf("media type:%d, data len:%d, key_path:%s, dts:%lu(%lu), pts:%lu(%lu)\r\n", + media_type, data_ptr->data_len(), data_ptr->get_path().c_str(), dts, dts/90, pts, pts/90); + FILE* file_p; + char filename[80]; + + sprintf(filename, "%u.media", media_type); + file_p = fopen(filename, "ab+"); + if (file_p) { + fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p); + fclose(file_p); + } + return; + } +}; + +int main(int argn, char** argv) { + unsigned char data[TS_MAX]; + ts_demux demux_obj; + auto callback_ptr = std::make_shared(); + FILE* file_p; + if (argn < 2) { + printf("please input ts name.\r\n"); + return 0; + } + + const char* file_name = argv[1]; + printf("input ts name:%s.\r\n", file_name); + + file_p = fopen(file_name, "r"); + fseek(file_p, 0L, SEEK_END); /* 定位到文件末尾 */ + size_t flen = ftell(file_p); /* 得到文件大小 */ + fseek(file_p, 0L, SEEK_SET); /* 定位到文件开头 */ + + do { + fread(data, TS_MAX, 1, file_p); + auto input_ptr = std::make_shared((unsigned char*)data, (unsigned int)TS_MAX, std::string("live/shiwei")); + demux_obj.decode(input_ptr, callback_ptr); + flen -= TS_MAX; + } while(flen > 0); + return 1; +} \ No newline at end of file