Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
SAT>IP server: handle streams correctly, add more RTSP headers
  • Loading branch information
perexg committed Mar 11, 2015
1 parent 78e4ddf commit da6eb2c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 34 deletions.
8 changes: 5 additions & 3 deletions src/satip/rtp.c
Expand Up @@ -520,7 +520,9 @@ void satip_rtp_init(void)
void satip_rtp_done(void)
{
assert(TAILQ_EMPTY(&satip_rtp_sessions));
satip_rtcp_run = 0;
pthread_kill(satip_rtcp_tid, SIGTERM);
pthread_join(satip_rtcp_tid, NULL);
if (satip_rtcp_run) {
satip_rtcp_run = 0;
pthread_kill(satip_rtcp_tid, SIGTERM);
pthread_join(satip_rtcp_tid, NULL);
}
}
110 changes: 79 additions & 31 deletions src/satip/rtsp.c
Expand Up @@ -49,6 +49,7 @@ typedef struct session {
} session_t;

static uint32_t session_number;
static uint16_t stream_id;
static char *rtsp_ip = NULL;
static int rtsp_port = -1;
static void *rtsp_server = NULL;
Expand Down Expand Up @@ -99,15 +100,19 @@ rtsp_delsys(int fe, int *findex)
*
*/
static struct session *
rtsp_new_session(int delsys)
rtsp_new_session(int delsys, uint32_t nsession, int session)
{
struct session *rs = calloc(1, sizeof(*rs));
if (rs == NULL)
return NULL;
rs->delsys = delsys;
rs->nsession = session_number;
rs->nsession = nsession ?: session_number;
snprintf(rs->session, sizeof(rs->session), "%08X", session_number);
session_number += 9876;
if (nsession) {
session_number += 9876;
if (session_number == 0)
session_number += 9876;
}
TAILQ_INSERT_TAIL(&rtsp_sessions, rs, link);
return rs;
}
Expand All @@ -116,16 +121,20 @@ rtsp_new_session(int delsys)
*
*/
static struct session *
rtsp_find_session(http_connection_t *hc)
rtsp_find_session(http_connection_t *hc, int stream)
{
struct session *rs;
struct session *rs, *first = NULL;

if (hc->hc_session == NULL)
return NULL;
TAILQ_FOREACH(rs, &rtsp_sessions, link)
if (!strcmp(rs->session, hc->hc_session))
return rs;
return NULL;
if (!strcmp(rs->session, hc->hc_session)) {
if (stream == rs->stream)
return rs;
if (first == NULL)
first = rs;
}
return first;
}

/*
Expand Down Expand Up @@ -271,11 +280,11 @@ rtsp_clean(session_t *rs)
*
*/
static int
rtsp_start(http_connection_t *hc, session_t *rs)
rtsp_start(http_connection_t *hc, session_t *rs, char *addrbuf)
{
mpegts_network_t *mn;
dvb_network_t *ln;
char buf[256], addrbuf[50];
char buf[256];
int res = HTTP_STATUS_SERVICE, qsize = 3000000;

if (rs->mux)
Expand Down Expand Up @@ -307,7 +316,6 @@ rtsp_start(http_connection_t *hc, session_t *rs)
}
if (profile_chain_raw_open(&rs->prch, (mpegts_mux_t *)rs->mux, qsize))
goto endclean;
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, sizeof(addrbuf));
rs->subs = subscription_create_from_mux(&rs->prch, NULL,
config_get_int("satip_weight", 100),
"SAT>IP",
Expand All @@ -316,14 +324,6 @@ rtsp_start(http_connection_t *hc, session_t *rs)
http_arg_get(&hc->hc_args, "User-Agent"), NULL);
if (!rs->subs)
goto endclean;
memset(&rs->udp_rtp, 0, sizeof(rs->udp_rtp));
memset(&rs->udp_rtcp, 0, sizeof(rs->udp_rtcp));
if (udp_bind_double(&rs->udp_rtp, &rs->udp_rtcp,
"satips", "rtsp", "rtcp",
addrbuf, 0, NULL,
4*1024, 4*1024,
RTP_BUFSIZE, RTCP_BUFSIZE))
goto endclean;
satip_rtp_queue((void *)(intptr_t)rs->nsession,
rs->subs, &rs->prch.prch_sq,
hc->hc_peer, ntohs(IP_PORT(rs->udp_rtp->ip)),
Expand All @@ -348,19 +348,30 @@ rtsp_process_options(http_connection_t *hc)
http_arg_list_t args;
char *u = tvh_strdupa(hc->hc_url);
session_t *rs;
int found;

if ((u = rtsp_check_urlbase(u)) == NULL)
goto error;
if (*u)
goto error;

pthread_mutex_lock(&rtsp_lock);
rs = rtsp_find_session(hc);
if (rs)
rtsp_rearm_session_timer(rs);
pthread_mutex_unlock(&rtsp_lock);
if (hc->hc_session) {
found = 0;
pthread_mutex_lock(&rtsp_lock);
TAILQ_FOREACH(rs, &rtsp_sessions, link)
if (!strcmp(rs->session, hc->hc_session)) {
rtsp_rearm_session_timer(rs);
found = 1;
}
pthread_mutex_unlock(&rtsp_lock);
if (!found) {
http_error(hc, HTTP_STATUS_BAD_SESSION);
return 0;
}
}
http_arg_init(&args);
http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN");
http_arg_set(&args, "Session", hc->hc_session);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
http_arg_flush(&args);
return 0;
Expand Down Expand Up @@ -571,7 +582,10 @@ rtsp_process_play(http_connection_t *hc, int setup)
char *pids, *addpids, *delpids;
int16_t _pids[RTSP_PIDS+1], _addpids[RTSP_PIDS+1], _delpids[RTSP_PIDS+1];
dvb_mux_conf_t *dmc;
char buf[256];
char buf[256], addrbuf[50];
http_arg_list_t args;

tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, sizeof(addrbuf));

u = tvh_strdupa(hc->hc_url);
if ((u = rtsp_check_urlbase(u)) == NULL ||
Expand All @@ -596,7 +610,7 @@ rtsp_process_play(http_connection_t *hc, int setup)

pthread_mutex_lock(&rtsp_lock);

rs = rtsp_find_session(hc);
rs = rtsp_find_session(hc, stream);

if (fe > 0) {
delsys = rtsp_delsys(fe, &findex);
Expand All @@ -608,7 +622,9 @@ rtsp_process_play(http_connection_t *hc, int setup)
if (msys == DVB_SYS_NONE)
goto error;
if (!rs)
rs = rtsp_new_session(msys);
rs = rtsp_new_session(msys, 0, -1);
else if (stream != rs->stream)
rs = rtsp_new_session(msys, rs->nsession, stream);
else
rtsp_close_session(rs);
} else {
Expand Down Expand Up @@ -721,11 +737,27 @@ rtsp_process_play(http_connection_t *hc, int setup)
}

dvb_mux_conf_str(dmc, buf, sizeof(buf));
tvhdebug("satips", "%i: setup %s", rs->frontend, buf);
tvhdebug("satips", "%i/%s/%d: setup %s", rs->frontend, rs->session, rs->stream, buf);

dmc->dmc_fe_freq = freq;
dmc->dmc_fe_modulation = mtype;

stream_id++;
if (stream_id == 0)
stream_id++;
rs->stream = stream_id % 0x7fff;

memset(&rs->udp_rtp, 0, sizeof(rs->udp_rtp));
memset(&rs->udp_rtcp, 0, sizeof(rs->udp_rtcp));
if (udp_bind_double(&rs->udp_rtp, &rs->udp_rtcp,
"satips", "rtsp", "rtcp",
addrbuf, 0, NULL,
4*1024, 4*1024,
RTP_BUFSIZE, RTCP_BUFSIZE)) {
errcode = HTTP_STATUS_INTERNAL;
goto error;
}

if (setup) {
if (pids)
rtsp_addpids(rs, _pids);
Expand All @@ -737,13 +769,25 @@ rtsp_process_play(http_connection_t *hc, int setup)
rtsp_delpids(rs, _delpids);
if (addpids)
rtsp_addpids(rs, _addpids);
if ((r = rtsp_start(hc, rs)) < 0) {
if ((r = rtsp_start(hc, rs, addrbuf)) < 0) {
errcode = r;
goto error;
}
tvhdebug("satips", "%i: play", rs->frontend);
tvhdebug("satips", "%i/%s/%d: play", rs->frontend, rs->session, rs->stream);

end:

http_arg_init(&args);
snprintf(buf, sizeof(buf), "%s;timeout=%d", rs->session, RTSP_TIMEOUT);
http_arg_set(&args, "Session", buf);
r = IP_PORT(rs->udp_rtp->ip);
snprintf(buf, sizeof(buf), "RTP/AVP;unicast;client_port=%d-%d", r, r+1);
http_arg_set(&args, "Transport", buf);
snprintf(buf, sizeof(buf), "%d", rs->stream);
http_arg_set(&args, "com.ses.streamID", buf);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
http_arg_flush(&args);

pthread_mutex_unlock(&rtsp_lock);
return 0;

Expand All @@ -762,6 +806,7 @@ rtsp_process_teardown(http_connection_t *hc)
{
char *u = tvh_strdupa(hc->hc_url);
struct session *rs = NULL;
http_arg_list_t args;
int stream;

if ((u = rtsp_check_urlbase(u)) == NULL ||
Expand All @@ -771,15 +816,18 @@ rtsp_process_teardown(http_connection_t *hc)
}

pthread_mutex_lock(&rtsp_lock);
rs = rtsp_find_session(hc);
rs = rtsp_find_session(hc, stream);
if (!rs || stream != rs->stream) {
pthread_mutex_unlock(&rtsp_lock);
http_error(hc, !rs ? HTTP_STATUS_BAD_SESSION : HTTP_STATUS_NOT_FOUND);
} else {
rtsp_close_session(rs);
pthread_mutex_unlock(&rtsp_lock);
rtsp_free_session(rs);
http_arg_init(&args);
http_arg_set(&args, "Session", rs->session);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL);
http_arg_flush(&args);
}
return 0;
}
Expand Down

0 comments on commit da6eb2c

Please sign in to comment.