Skip to content

Commit

Permalink
add publish api for gb(/gb/v1/publish)
Browse files Browse the repository at this point in the history
  • Loading branch information
duiniuluantanqin committed Feb 19, 2024
1 parent d768cbf commit 9babb87
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 31 deletions.
250 changes: 221 additions & 29 deletions trunk/src/app/srs_app_gb28181.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_protocol_raw_avc.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_app_server.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_http_hooks.hpp>

#include <sstream>
using namespace std;
Expand Down Expand Up @@ -455,6 +461,24 @@ srs_error_t SrsGbListener::listen()
return srs_error_wrap(err, "listen");
}

if ((err = listen_api()) != srs_success) {
return srs_error_wrap(err, "listen api");
}

return err;
}

srs_error_t SrsGbListener::listen_api()
{
srs_error_t err = srs_success;

// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
ISrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();

if ((err = http_api_mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

return err;
}

Expand All @@ -481,7 +505,7 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf
} else if (listener == media_listener_) {
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* conn = new SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>();
SrsLazyGbMediaTcpConn* resource = dynamic_cast<SrsLazyGbMediaTcpConn*>(conn->resource());
resource->setup(conf_, stfd);
resource->setup(stfd);

if ((err = resource->start()) != srs_success) {
srs_freep(conn);
Expand Down Expand Up @@ -546,6 +570,11 @@ std::string SrsLazyGbSipTcpConn::device_id()
return register_->device_id();
}

void SrsLazyGbSipTcpConn::set_device_id(const std::string &id)
{
register_->from_address_user_ = id;
}

void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid)
{
trd_->set_cid(cid);
Expand Down Expand Up @@ -1231,7 +1260,6 @@ SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper<SrsLazyGbMedia
trd_ = new SrsSTCoroutine("media", this);
buffer_ = new uint8_t[65535];
conn_ = NULL;
conf_ = NULL;

session_ = NULL;
connected_ = false;
Expand All @@ -1246,16 +1274,12 @@ SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn()
srs_freepa(buffer_);
srs_freep(pack_);
srs_freep(session_);
srs_freep(conf_);
}

void SrsLazyGbMediaTcpConn::setup(SrsConfDirective* conf, srs_netfd_t stfd)
void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd)
{
srs_freep(conn_);
conn_ = new SrsTcpConnection(stfd);

srs_freep(conf_);
conf_ = conf->copy();
}

bool SrsLazyGbMediaTcpConn::is_connected()
Expand Down Expand Up @@ -1477,23 +1501,8 @@ srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrap
srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine.

// Find exists session for register, might be created by another object and still alive.
SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (!session) {
// Create new GB session.
session = new SrsLazyObjectWrapper<SrsLazyGbSession>();

if ((err = session->resource()->initialize(conf_)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "initialize");
}

if ((err = session->resource()->start()) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "start");
}

_srs_gb_manager->add_with_id(std::to_string(ssrc), session);
}
SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc)); // TODO: same ssrc?
if (!session) return err;

_srs_gb_manager->add_with_fast_id(ssrc, session);
session->resource()->on_media_transport(wrapper);
Expand Down Expand Up @@ -2127,11 +2136,7 @@ srs_error_t SrsGbMuxer::connect()
// Cleanup the data before connect again.
close();

string stream = session_->sip_transport()->resource()->device_id();
if (stream.empty()) {
stream = std::to_string(session_->media_transport()->resource()->ssrc());
}
string url = srs_string_replace(output_, "[stream]", stream);
string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id());
srs_trace("Muxer: Convert GB to RTMP %s", url.c_str());

srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
Expand Down Expand Up @@ -2734,3 +2739,190 @@ void srs_sip_parse_address(const std::string& address, std::string& user, std::s

SrsResourceManager* _srs_gb_manager = NULL;

SrsGoApiGbPublish::SrsGoApiGbPublish(SrsConfDirective* conf)
{
conf_ = conf->copy();
security_ = new SrsSecurity();
}

SrsGoApiGbPublish::~SrsGoApiGbPublish()
{
srs_freep(conf_);
srs_freep(security_);
}

// Request:
// POST /gb/v1/publish/
// {
// "clientip": "...","
// "stream": "...",
// "ssrc": "..."
// }
// Response:
// {"port":9000, "is_tcp": true}
srs_error_t SrsGoApiGbPublish::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r)
{
srs_error_t err = srs_success;

SrsJsonObject* res = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, res);

if ((err = do_serve_http(w, r, res)) != srs_success) {
srs_warn("GB error %s", srs_error_desc(err).c_str()); srs_freep(err);
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}

return srs_api_response(w, r, res->dumps());
}

srs_error_t SrsGoApiGbPublish::do_serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, SrsJsonObject *res)
{
srs_error_t err = srs_success;

// For each GB session, we use short-term HTTP connection.
w->header()->set("Connection", "Close");

// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
SrsAutoFree(SrsJsonObject, req);
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
return srs_error_wrap(err, "read body");
}

SrsJsonAny* json = SrsJsonAny::loads(req_json);
if (!json || !json->is_object()) {
return srs_error_new(ERROR_HTTP_DATA_INVALID, "invalid body %s", req_json.c_str());
}

req = json->to_object();
}

// Fetch params from req object.
SrsJsonAny* prop = NULL;
if ((prop = req->ensure_property_string("stream")) == NULL) {
return srs_error_wrap(err, "not stream");
}
string stream = prop->to_str();

if ((prop = req->ensure_property_string("ssrc")) == NULL) {
return srs_error_wrap(err, "not ssrc");
}
uint64_t ssrc = atoi(prop->to_str().c_str());

string clientip;
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
if (clientip.empty()){
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
// Overwrite by ip from proxy.
string oip = srs_get_original_ip(r);
if (!oip.empty()) {
clientip = oip;
}
}
// create temp Request, for security check and http hooks.
SrsRequest* request = new SrsRequest();
SrsAutoFree(SrsRequest, request);
request->ip = clientip;

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(request->vhost);
if (parsed_vhost) {
request->vhost = parsed_vhost->arg0();
}

if ((err = security_->check(SrsRtmpConnFlashPublish, clientip, request)) != srs_success) {
return srs_error_wrap(err, "security check");
}

// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_publish(request)) != srs_success) {
return srs_error_wrap(err, "http_hooks_on_publish");
}

if ((err = bind_session(stream, ssrc)) != srs_success) {
return srs_error_wrap(err, "bind session");
}

srs_trace("GB publish stream: %s, ssrc=%lu, clientip=%s", stream.c_str(), ssrc, clientip.c_str());

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));
res->set("service", SrsJsonAny::str(SrsStatistic::instance()->service_id().c_str()));
res->set("pid", SrsJsonAny::str(SrsStatistic::instance()->service_pid().c_str()));

int port = _srs_config->get_stream_caster_listen(conf_);
res->set("port", SrsJsonAny::integer(port));
res->set("is_tcp", SrsJsonAny::boolean(true)); // only tcp supported

return err;
}

srs_error_t SrsGoApiGbPublish::bind_session(std::string stream, uint64_t ssrc)
{
srs_error_t err = srs_success;

SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_id(stream));
if (session) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "stream already exists");
}

session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (session) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "ssrc already exists");
}

// Create new GB session.
session = new SrsLazyObjectWrapper<SrsLazyGbSession>();

if ((err = session->resource()->initialize(conf_)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "initialize");
}

if ((err = session->resource()->start()) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "start");
}

session->resource()->sip_transport()->resource()->set_device_id(stream);

_srs_gb_manager->add_with_id(stream, session);
_srs_gb_manager->add_with_fast_id(ssrc, session);

return err;
}

srs_error_t SrsGoApiGbPublish::http_hooks_on_publish(SrsRequest *req)
{
srs_error_t err = srs_success;

if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}

// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;

if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_publish(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}

for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = SrsHttpHooks::on_publish(url, req)) != srs_success) {
return srs_error_wrap(err, "on_publish %s", url.c_str());
}
}

return err;
}
24 changes: 23 additions & 1 deletion trunk/src/app/srs_app_gb28181.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <srs_protocol_http_conn.hpp>
#include <srs_kernel_ps.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_security.hpp>

#include <sstream>

Expand Down Expand Up @@ -89,6 +91,23 @@ enum SrsGbSipState
};
std::string srs_gb_sip_state(SrsGbSipState state);

class SrsGoApiGbPublish : public ISrsHttpHandler
{
private:
SrsConfDirective* conf_;
SrsSecurity* security_;
public:
SrsGoApiGbPublish(SrsConfDirective* conf);
virtual ~SrsGoApiGbPublish();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t bind_session(std::string stream, uint64_t ssrc);
private:
virtual srs_error_t http_hooks_on_publish(SrsRequest* req);
};

// The main logic object for GB, the session.
class SrsLazyGbSession : public SrsLazyObject, public ISrsResource, public ISrsStartable, public ISrsCoroutineHandler
{
Expand Down Expand Up @@ -180,6 +199,7 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler
public:
srs_error_t initialize(SrsConfDirective* conf);
srs_error_t listen();
srs_error_t listen_api();
void close();
// Interface ISrsTcpHandler
public:
Expand Down Expand Up @@ -217,6 +237,8 @@ class SrsLazyGbSipTcpConn : public SrsLazyObject, public ISrsResource, public IS
void setup(SrsConfDirective* conf, SrsTcpListener* sip, SrsTcpListener* media, srs_netfd_t stfd);
// Get the SIP device id.
std::string device_id();
// Set the SIP device id.
void set_device_id(const std::string& id);
// Set the cid of all coroutines.
virtual void set_cid(const SrsContextId& cid);
private:
Expand Down Expand Up @@ -356,7 +378,7 @@ class SrsLazyGbMediaTcpConn : public SrsLazyObject, public ISrsResource, public
virtual ~SrsLazyGbMediaTcpConn();
public:
// Setup object, to keep empty constructor.
void setup(SrsConfDirective* conf, srs_netfd_t stfd);
void setup(srs_netfd_t stfd);
// Whether media is connected.
bool is_connected();
// Interrupt transport by session.
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ srs_error_t SrsGoApiRtcPublish::http_hooks_on_publish(SrsRequest* req)
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = SrsHttpHooks::on_publish(url, req)) != srs_success) {
return srs_error_wrap(err, "rtmp on_publish %s", url.c_str());
return srs_error_wrap(err, "on_publish %s", url.c_str());
}
}

Expand Down

0 comments on commit 9babb87

Please sign in to comment.