diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 6d07da6b08..2c1821137a 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -22,6 +22,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include #include using namespace std; @@ -455,6 +461,24 @@ srs_error_t SrsGbListener::listen() return srs_error_wrap(err, "listen"); } + if ((err = listen_api()) != srs_success) { + return srs_error_wrap(err, "listen api"); + } + + return err; +} + +srs_error_t SrsGbListener::listen_api() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Fetch api from hybrid manager, not from SRS. + ISrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server(); + + if ((err = http_api_mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) { + return srs_error_wrap(err, "handle publish"); + } + return err; } @@ -481,7 +505,7 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf } else if (listener == media_listener_) { SrsLazyObjectWrapper* conn = new SrsLazyObjectWrapper(); SrsLazyGbMediaTcpConn* resource = dynamic_cast(conn->resource()); - resource->setup(conf_, stfd); + resource->setup(stfd); if ((err = resource->start()) != srs_success) { srs_freep(conn); @@ -546,6 +570,11 @@ std::string SrsLazyGbSipTcpConn::device_id() return register_->device_id(); } +void SrsLazyGbSipTcpConn::set_device_id(const std::string &id) +{ + register_->from_address_user_ = id; +} + void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid) { trd_->set_cid(cid); @@ -1231,7 +1260,6 @@ SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrappercopy(); } bool SrsLazyGbMediaTcpConn::is_connected() @@ -1477,23 +1501,8 @@ srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrap srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine. // Find exists session for register, might be created by another object and still alive. - SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); - if (!session) { - // Create new GB session. - session = new SrsLazyObjectWrapper(); - - if ((err = session->resource()->initialize(conf_)) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "initialize"); - } - - if ((err = session->resource()->start()) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "start"); - } - - _srs_gb_manager->add_with_id(std::to_string(ssrc), session); - } + SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); // TODO: same ssrc? + if (!session) return err; _srs_gb_manager->add_with_fast_id(ssrc, session); session->resource()->on_media_transport(wrapper); @@ -2127,11 +2136,7 @@ srs_error_t SrsGbMuxer::connect() // Cleanup the data before connect again. close(); - string stream = session_->sip_transport()->resource()->device_id(); - if (stream.empty()) { - stream = std::to_string(session_->media_transport()->resource()->ssrc()); - } - string url = srs_string_replace(output_, "[stream]", stream); + string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id()); srs_trace("Muxer: Convert GB to RTMP %s", url.c_str()); srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; @@ -2734,3 +2739,190 @@ void srs_sip_parse_address(const std::string& address, std::string& user, std::s SrsResourceManager* _srs_gb_manager = NULL; +SrsGoApiGbPublish::SrsGoApiGbPublish(SrsConfDirective* conf) +{ + conf_ = conf->copy(); + security_ = new SrsSecurity(); +} + +SrsGoApiGbPublish::~SrsGoApiGbPublish() +{ + srs_freep(conf_); + srs_freep(security_); +} + +// Request: +// POST /gb/v1/publish/ +// { +// "clientip": "..."," +// "stream": "...", +// "ssrc": "..." +// } +// Response: +// {"port":9000, "is_tcp": true} +srs_error_t SrsGoApiGbPublish::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r) +{ + srs_error_t err = srs_success; + + SrsJsonObject* res = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, res); + + if ((err = do_serve_http(w, r, res)) != srs_success) { + srs_warn("GB error %s", srs_error_desc(err).c_str()); srs_freep(err); + return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); + } + + return srs_api_response(w, r, res->dumps()); +} + +srs_error_t SrsGoApiGbPublish::do_serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, SrsJsonObject *res) +{ + srs_error_t err = srs_success; + + // For each GB session, we use short-term HTTP connection. + w->header()->set("Connection", "Close"); + + // Parse req, the request json object, from body. + SrsJsonObject* req = NULL; + SrsAutoFree(SrsJsonObject, req); + if (true) { + string req_json; + if ((err = r->body_read_all(req_json)) != srs_success) { + return srs_error_wrap(err, "read body"); + } + + SrsJsonAny* json = SrsJsonAny::loads(req_json); + if (!json || !json->is_object()) { + return srs_error_new(ERROR_HTTP_DATA_INVALID, "invalid body %s", req_json.c_str()); + } + + req = json->to_object(); + } + + // Fetch params from req object. + SrsJsonAny* prop = NULL; + if ((prop = req->ensure_property_string("stream")) == NULL) { + return srs_error_wrap(err, "not stream"); + } + string stream = prop->to_str(); + + if ((prop = req->ensure_property_string("ssrc")) == NULL) { + return srs_error_wrap(err, "not ssrc"); + } + uint64_t ssrc = atoi(prop->to_str().c_str()); + + string clientip; + if ((prop = req->ensure_property_string("clientip")) != NULL) { + clientip = prop->to_str(); + } + if (clientip.empty()){ + clientip = dynamic_cast(r)->connection()->remote_ip(); + // Overwrite by ip from proxy. + string oip = srs_get_original_ip(r); + if (!oip.empty()) { + clientip = oip; + } + } + // create temp Request, for security check and http hooks. + SrsRequest* request = new SrsRequest(); + SrsAutoFree(SrsRequest, request); + request->ip = clientip; + + // discovery vhost, resolve the vhost from config + SrsConfDirective* parsed_vhost = _srs_config->get_vhost(request->vhost); + if (parsed_vhost) { + request->vhost = parsed_vhost->arg0(); + } + + if ((err = security_->check(SrsRtmpConnFlashPublish, clientip, request)) != srs_success) { + return srs_error_wrap(err, "security check"); + } + + // We must do hook after stat, because depends on it. + if ((err = http_hooks_on_publish(request)) != srs_success) { + return srs_error_wrap(err, "http_hooks_on_publish"); + } + + if ((err = bind_session(stream, ssrc)) != srs_success) { + return srs_error_wrap(err, "bind session"); + } + + srs_trace("GB publish stream: %s, ssrc=%lu, clientip=%s", stream.c_str(), ssrc, clientip.c_str()); + + res->set("code", SrsJsonAny::integer(ERROR_SUCCESS)); + res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str())); + res->set("service", SrsJsonAny::str(SrsStatistic::instance()->service_id().c_str())); + res->set("pid", SrsJsonAny::str(SrsStatistic::instance()->service_pid().c_str())); + + int port = _srs_config->get_stream_caster_listen(conf_); + res->set("port", SrsJsonAny::integer(port)); + res->set("is_tcp", SrsJsonAny::boolean(true)); // only tcp supported + + return err; +} + +srs_error_t SrsGoApiGbPublish::bind_session(std::string stream, uint64_t ssrc) +{ + srs_error_t err = srs_success; + + SrsLazyObjectWrapper* session = dynamic_cast*>(_srs_gb_manager->find_by_id(stream)); + if (session) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "stream already exists"); + } + + session = dynamic_cast*>(_srs_gb_manager->find_by_fast_id(ssrc)); + if (session) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "ssrc already exists"); + } + + // Create new GB session. + session = new SrsLazyObjectWrapper(); + + if ((err = session->resource()->initialize(conf_)) != srs_success) { + srs_freep(session); + return srs_error_wrap(err, "initialize"); + } + + if ((err = session->resource()->start()) != srs_success) { + srs_freep(session); + return srs_error_wrap(err, "start"); + } + + session->resource()->sip_transport()->resource()->set_device_id(stream); + + _srs_gb_manager->add_with_id(stream, session); + _srs_gb_manager->add_with_fast_id(ssrc, session); + + return err; +} + +srs_error_t SrsGoApiGbPublish::http_hooks_on_publish(SrsRequest *req) +{ + srs_error_t err = srs_success; + + if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) { + return err; + } + + // the http hooks will cause context switch, + // so we must copy all hooks for the on_connect may freed. + // @see https://github.com/ossrs/srs/issues/475 + vector hooks; + + if (true) { + SrsConfDirective* conf = _srs_config->get_vhost_on_publish(req->vhost); + if (!conf) { + return err; + } + hooks = conf->args; + } + + for (int i = 0; i < (int)hooks.size(); i++) { + std::string url = hooks.at(i); + if ((err = SrsHttpHooks::on_publish(url, req)) != srs_success) { + return srs_error_wrap(err, "on_publish %s", url.c_str()); + } + } + + return err; +} \ No newline at end of file diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index bf31c733e6..d5a5260cf4 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include @@ -89,6 +91,23 @@ enum SrsGbSipState }; std::string srs_gb_sip_state(SrsGbSipState state); +class SrsGoApiGbPublish : public ISrsHttpHandler +{ +private: + SrsConfDirective* conf_; + SrsSecurity* security_; +public: + SrsGoApiGbPublish(SrsConfDirective* conf); + virtual ~SrsGoApiGbPublish(); +public: + virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +private: + virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res); + srs_error_t bind_session(std::string stream, uint64_t ssrc); +private: + virtual srs_error_t http_hooks_on_publish(SrsRequest* req); +}; + // The main logic object for GB, the session. class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler { @@ -180,6 +199,7 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler public: srs_error_t initialize(SrsConfDirective* conf); srs_error_t listen(); + srs_error_t listen_api(); void close(); // Interface ISrsTcpHandler public: @@ -217,6 +237,8 @@ class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public IS void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd); // Get the SIP device id. std::string device_id(); + // Set the SIP device id. + void set_device_id(const std::string& id); // Set the cid of all coroutines. virtual void set_cid(const SrsContextId& cid); private: @@ -356,7 +378,7 @@ class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public virtual ~SrsLazyGbMediaTcpConn(); public: // Setup object, to keep empty constructor. - void setup(SrsConfDirective* conf, srs_netfd_t stfd); + void setup(srs_netfd_t stfd); // Whether media is connected. bool is_connected(); // Interrupt transport by session. diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index 46512d8517..b5ee43d79a 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -594,7 +594,7 @@ srs_error_t SrsGoApiRtcPublish::http_hooks_on_publish(SrsRequest* req) for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); if ((err = SrsHttpHooks::on_publish(url, req)) != srs_success) { - return srs_error_wrap(err, "rtmp on_publish %s", url.c_str()); + return srs_error_wrap(err, "on_publish %s", url.c_str()); } }