Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
SAT>IP server: implement embedded RTSP TCP data transfer mode
  • Loading branch information
perexg committed Nov 30, 2015
1 parent ec45cf3 commit 640e522
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 40 deletions.
7 changes: 6 additions & 1 deletion src/http.c
Expand Up @@ -217,7 +217,7 @@ static const char *cachemonths[12] = {
* Transmit a HTTP reply
*/
void
http_send_header(http_connection_t *hc, int rc, const char *content,
http_send_header(http_connection_t *hc, int rc, const char *content,
int64_t contentlen,
const char *encoding, const char *location,
int maxage, const char *range,
Expand All @@ -230,6 +230,8 @@ http_send_header(http_connection_t *hc, int rc, const char *content,
time_t t;
int sess = 0;

lock_assert(&hc->hc_fd_lock);

htsbuf_queue_init(&hdrs, 0);

htsbuf_qprintf(&hdrs, "%s %d %s\r\n",
Expand Down Expand Up @@ -378,6 +380,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content,
}
#endif

pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, rc, content, size,
encoding, location, maxage, 0, NULL, NULL);

Expand All @@ -387,6 +390,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content,
else
tvh_write(hc->hc_fd, data, size);
}
pthread_mutex_unlock(&hc->hc_fd_lock);

free(data);
}
Expand Down Expand Up @@ -1095,6 +1099,7 @@ http_serve_requests(http_connection_t *hc)
char *argv[3], *c, *cmdline = NULL, *hdrline = NULL;
int n, r;

pthread_mutex_init(&hc->hc_fd_lock, NULL);
http_arg_init(&hc->hc_args);
http_arg_init(&hc->hc_req_args);
htsbuf_queue_init(&spill, 0);
Expand Down
1 change: 1 addition & 0 deletions src/http.h
Expand Up @@ -121,6 +121,7 @@ typedef enum http_ver {
} http_ver_t;

typedef struct http_connection {
pthread_mutex_t hc_fd_lock;
int hc_fd;
struct sockaddr_storage *hc_peer;
char *hc_peer_ipstr;
Expand Down
139 changes: 119 additions & 20 deletions src/satip/rtp.c
Expand Up @@ -31,6 +31,7 @@

#define RTP_PACKETS 128
#define RTP_PAYLOAD (7*188+12)
#define RTP_TCP_PAYLOAD (87*188+12+4) /* cca 16kB */
#define RTCP_PAYLOAD (1420)

typedef struct satip_rtp_table {
Expand Down Expand Up @@ -63,6 +64,7 @@ typedef struct satip_rtp_session {
signal_status_t sig;
int sig_lock;
pthread_mutex_t lock;
pthread_mutex_t *tcp_lock;
uint8_t *table_data;
int table_data_len;
} satip_rtp_session_t;
Expand Down Expand Up @@ -134,9 +136,8 @@ satip_rtp_pmt_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len)
}

static void
satip_rtp_header(satip_rtp_session_t *rtp)
satip_rtp_header(satip_rtp_session_t *rtp, struct iovec *v)
{
struct iovec *v = rtp->um_iovec + rtp->um_packet;
uint8_t *data = v->iov_base;
uint32_t tstamp = dispatch_clock + rtp->seq;

Expand Down Expand Up @@ -179,7 +180,7 @@ satip_rtp_send(satip_rtp_session_t *rtp)
v->iov_len = len;
}
if (v->iov_len == 0)
satip_rtp_header(rtp);
satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet);
return 0;
}

Expand All @@ -198,7 +199,7 @@ satip_rtp_append_data(satip_rtp_session_t *rtp, struct iovec **_v, uint8_t *data
return r;
} else {
rtp->um_packet++;
satip_rtp_header(rtp);
satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet);
}
*_v = rtp->um_iovec + rtp->um_packet;
} else {
Expand Down Expand Up @@ -233,7 +234,7 @@ satip_rtp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
if (rtp->table_data_len) {
for (i = 0; i < rtp->table_data_len; i += 188) {
r = satip_rtp_append_data(rtp, &v, data);
r = satip_rtp_append_data(rtp, &v, rtp->table_data + i);
if (r < 0)
return r;
}
Expand All @@ -253,6 +254,90 @@ satip_rtp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
return 0;
}

static void
satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len)
{
assert(data_len <= 0xffff);
data[0] = '$';
data[1] = stream;
data[2] = (data_len - 4) >> 8;
data[3] = data_len & 0xff;
pthread_mutex_lock(rtp->tcp_lock);
tvh_write(rtp->fd_rtp, data, data_len);
pthread_mutex_unlock(rtp->tcp_lock);
}

static inline void
satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp, struct iovec *v)
{
if (v->iov_len)
satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len);
free(v->iov_base);
v->iov_base = NULL;
v->iov_len = 0;
}

static inline int
satip_rtp_append_tcp_data(satip_rtp_session_t *rtp, struct iovec *v, uint8_t *data, size_t len)
{
if (v->iov_base == NULL) {
v->iov_base = malloc(RTP_TCP_PAYLOAD);
v->iov_len = 4; /* keep room for RTSP embedded data header */
satip_rtp_header(rtp, v);
}
assert(v->iov_len + len <= RTP_TCP_PAYLOAD);
memcpy(v->iov_base + v->iov_len, data, len);
v->iov_len += len;
if (v->iov_len == RTP_TCP_PAYLOAD)
satip_rtp_flush_tcp_data(rtp, v);
return 0;
}


static int
satip_rtp_tcp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
{
int i, j, pid, last_pid = -1, r;
mpegts_apid_t *pids = rtp->pids.pids;
struct iovec v = {NULL, 0};
satip_rtp_table_t *tbl;

assert((len % 188) == 0);
if (len > 0)
rtp->sig_lock = 1;
for ( ; len >= 188 ; data += 188, len -= 188) {
pid = ((data[1] & 0x1f) << 8) | data[2];
if (pid != last_pid && !rtp->pids.all) {
for (i = 0; i < rtp->pids.count; i++) {
j = pids[i].pid;
if (pid < j) break;
if (j == pid) goto found;
}
continue;
found:
TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
if (tbl->pid == pid) {
dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
if (rtp->table_data_len) {
satip_rtp_append_tcp_data(rtp, &v, rtp->table_data, rtp->table_data_len);
free(rtp->table_data);
rtp->table_data = NULL;
}
break;
}
if (tbl)
continue;
last_pid = pid;
}
r = satip_rtp_append_tcp_data(rtp, &v, data, 188);
if (r < 0)
return r;
}
if (v.iov_len)
satip_rtp_flush_tcp_data(rtp, &v);
return 0;
}

static void
satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig)
{
Expand All @@ -271,9 +356,11 @@ satip_rtp_thread(void *aux)
pktbuf_t *pb;
char peername[50];
int alive = 1, fatal = 0, r;
int tcp = rtp->port == RTSP_TCP_DATA;

tcp_get_str_from_ip((struct sockaddr *)&rtp->peer, peername, sizeof(peername));
tvhdebug("satips", "RTP streaming to %s:%d open", peername, rtp->port);
tvhdebug("satips", "RTP streaming to %s:%d open", peername,
tcp ? IP_PORT(rtp->peer) : rtp->port);

pthread_mutex_lock(&sq->sq_mutex);
while (rtp->sq && !fatal) {
Expand All @@ -295,7 +382,10 @@ satip_rtp_thread(void *aux)
pb = sm->sm_data;
subscription_add_bytes_out(subs, pktbuf_len(pb));
pthread_mutex_lock(&rtp->lock);
r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
if (tcp)
r = satip_rtp_tcp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
else
r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
pthread_mutex_unlock(&rtp->lock);
if (r) fatal = 1;
break;
Expand Down Expand Up @@ -350,42 +440,43 @@ satip_rtp_find(void *id)
*/
void satip_rtp_queue(void *id, th_subscription_t *subs,
streaming_queue_t *sq,
pthread_mutex_t *tcp_lock,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source, dvb_mux_conf_t *dmc,
mpegts_apids_t *pids, int perm_lock)
{
satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp));
int dscp;

if (rtp == NULL)
return;

rtp->id = id;
rtp->peer = *peer;
rtp->peer2 = *peer;
IP_PORT_SET(rtp->peer2, htons(port + 1));
if (port != RTSP_TCP_DATA)
IP_PORT_SET(rtp->peer2, htons(port + 1));
rtp->port = port;
rtp->fd_rtp = fd_rtp;
rtp->fd_rtcp = fd_rtcp;
rtp->subs = subs;
rtp->sq = sq;
rtp->tcp_lock = tcp_lock;
mpegts_pid_init(&rtp->pids);
mpegts_pid_copy(&rtp->pids, pids);
TAILQ_INIT(&rtp->pmt_tables);
udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec);
satip_rtp_header(rtp);
satip_rtp_header(rtp, rtp->um_iovec);
rtp->frontend = frontend;
rtp->dmc = *dmc;
rtp->source = source;
pthread_mutex_init(&rtp->lock, NULL);

if (config.dscp >= 0) {
socket_set_dscp(rtp->fd_rtp, config.dscp, NULL, 0);
socket_set_dscp(rtp->fd_rtcp, config.dscp, NULL, 0);
} else {
socket_set_dscp(rtp->fd_rtp, IPTOS_DSCP_EF, NULL, 0);
socket_set_dscp(rtp->fd_rtcp, IPTOS_DSCP_EF, NULL, 0);
}
dscp = config.dscp >= 0 ? config.dscp : IPTOS_DSCP_EF;
socket_set_dscp(rtp->fd_rtp, dscp, NULL, 0);
if (rtp->fd_rtcp >= 0)
socket_set_dscp(rtp->fd_rtcp, dscp, NULL, 0);

if (perm_lock) {
rtp->sig.signal_scale = SIGNAL_STATUS_SCALE_RELATIVE;
Expand Down Expand Up @@ -462,12 +553,15 @@ void satip_rtp_close(void *id)
if (rtp) {
TAILQ_REMOVE(&satip_rtp_sessions, rtp, link);
sq = rtp->sq;
pthread_mutex_lock(rtp->tcp_lock);
pthread_mutex_lock(&sq->sq_mutex);
rtp->sq = NULL;
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
pthread_mutex_unlock(&satip_rtp_lock);
pthread_mutex_lock(rtp->tcp_lock);
pthread_join(rtp->tid, NULL);
pthread_mutex_unlock(rtp->tcp_lock);
udp_multisend_free(&rtp->um);
mpegts_pid_done(&rtp->pids);
while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) {
Expand Down Expand Up @@ -776,10 +870,15 @@ satip_rtcp_thread(void *aux)
tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
tvhtrace("satips", "RTCP send to %s:%d : %s", addrbuf, IP_PORT(rtp->peer2), msg + 16);
}
r = sendto(rtp->fd_rtcp, msg, len, 0,
(struct sockaddr*)&rtp->peer2,
rtp->peer2.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
if (rtp->port == RTSP_TCP_DATA) {
satip_rtp_tcp_data(rtp, 1, msg, len);
r = 0;
} else {
r = sendto(rtp->fd_rtcp, msg, len, 0,
(struct sockaddr*)&rtp->peer2,
rtp->peer2.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
}
if (r < 0) {
err = errno;
tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
Expand Down

0 comments on commit 640e522

Please sign in to comment.