Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
iptv: update to use new mpegts_input API
  • Loading branch information
adamsutton committed Apr 14, 2014
1 parent f579279 commit aa29bca
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 47 deletions.
35 changes: 13 additions & 22 deletions src/input/mpegts/iptv/iptv.c
Expand Up @@ -148,13 +148,13 @@ iptv_input_get_weight ( mpegts_input_t *mi )
}

/* Service subs */
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MIN(w, ths->ths_weight);
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
}

return w;
Expand All @@ -176,7 +176,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )

/* Do we need to stop something? */
if (!iptv_input_is_free(mi)) {
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
mpegts_mux_instance_t *m, *s = NULL;
int w = 1000000;
LIST_FOREACH(m, &mi->mi_mux_active, mmi_active_link) {
Expand All @@ -186,7 +186,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
w = t;
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);

/* Stop */
if (s)
Expand Down Expand Up @@ -244,8 +244,7 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
}

/* Free memory */
free(im->mm_iptv_tsb);
im->mm_iptv_tsb = NULL;
free(im->mm_iptv_buffer.sb_data);

/* Clear bw limit */
LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link) {
Expand All @@ -266,7 +265,7 @@ static void *
iptv_input_thread ( void *aux )
{
int nfds;
ssize_t len;
ssize_t n;
size_t off;
iptv_mux_t *im;
tvhpoll_event_t ev;
Expand All @@ -293,12 +292,12 @@ iptv_input_thread ( void *aux )

/* Get data */
off = 0;
if ((len = im->im_handler->read(im, &off)) < 0) {
if ((n = im->im_handler->read(im, &off)) < 0) {
tvhlog(LOG_ERR, "iptv", "read() error %s", strerror(errno));
im->im_handler->stop(im);
goto done;
}
iptv_input_recv_packets(im, off, len);
iptv_input_recv_packets(im, n, off);

done:
pthread_mutex_unlock(&iptv_lock);
Expand All @@ -307,7 +306,7 @@ iptv_input_thread ( void *aux )
}

void
iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len )
iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len, size_t off )
{
static time_t t1 = 0, t2;
iptv_network_t *in = (iptv_network_t*)im->mm_network;
Expand All @@ -327,12 +326,8 @@ iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len )
}

/* Pass on */
im->mm_iptv_pos
= mpegts_input_recv_packets((mpegts_input_t*)iptv_input,
im->mm_active,
im->mm_iptv_tsb + off,
im->mm_iptv_pos + len - off,
NULL, NULL, "iptv");
mpegts_input_recv_packets((mpegts_input_t*)iptv_input, im->mm_active,
&im->mm_iptv_buffer, off, NULL, NULL);
}

void
Expand All @@ -343,8 +338,7 @@ iptv_input_mux_started ( iptv_mux_t *im )
im->mm_display_name((mpegts_mux_t*)im, buf, sizeof(buf));

/* Allocate input buffer */
im->mm_iptv_pos = 0;
im->mm_iptv_tsb = calloc(1, IPTV_PKT_SIZE);
sbuf_init_fixed(&im->mm_iptv_buffer, IPTV_PKT_SIZE);

/* Setup poll */
if (im->mm_iptv_fd > 0) {
Expand Down Expand Up @@ -538,9 +532,6 @@ void iptv_init ( void )
/* Init Network */
iptv_network_init();

/* Set table thread */
mpegts_input_table_thread_start((mpegts_input_t *)iptv_input);

/* Setup TS thread */
iptv_poll = tvhpoll_create(10);
pthread_mutex_init(&iptv_lock, NULL);
Expand All @@ -549,12 +540,12 @@ void iptv_init ( void )

void iptv_done ( void )
{
mpegts_input_table_thread_stop((mpegts_input_t *)iptv_input);
pthread_kill(iptv_thread, SIGTERM);
pthread_join(iptv_thread, NULL);
tvhpoll_destroy(iptv_poll);
pthread_mutex_lock(&global_lock);
mpegts_network_unregister_builder(&iptv_network_class);
mpegts_input_stop_all((mpegts_input_t*)iptv_input);
mpegts_input_delete((mpegts_input_t *)iptv_input, 0);
pthread_mutex_unlock(&global_lock);
}
Expand Down
12 changes: 4 additions & 8 deletions src/input/mpegts/iptv/iptv_http.c
Expand Up @@ -41,22 +41,18 @@ static size_t
iptv_http_data
( void *p, void *buf, size_t len )
{
uint8_t *tsb;
size_t ret = len;
iptv_mux_t *im = p;

pthread_mutex_lock(&iptv_lock);

tsb = im->mm_iptv_tsb + im->mm_iptv_pos;
len = MIN(len, IPTV_PKT_SIZE - im->mm_iptv_pos);
sbuf_append(&im->mm_iptv_buffer, buf, len);

memcpy(tsb, buf, len);

iptv_input_recv_packets(im, 0, len);
if (len > 0)
iptv_input_recv_packets(im, len, 0);

pthread_mutex_unlock(&iptv_lock);

return ret;
return len;
}

/*
Expand Down
11 changes: 3 additions & 8 deletions src/input/mpegts/iptv/iptv_private.h
Expand Up @@ -39,10 +39,6 @@ struct iptv_handler
const char *scheme;
int (*start) ( iptv_mux_t *im, const url_t *url );
void (*stop) ( iptv_mux_t *im );

// Return number of available bytes, with optional offset
// from start of mux buffer (useful for things with wrapper
// around TS)
ssize_t (*read) ( iptv_mux_t *im, size_t *off );

RB_ENTRY(iptv_handler) link;
Expand All @@ -56,7 +52,7 @@ struct iptv_input
};

void iptv_input_mux_started ( iptv_mux_t *im );
void iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len );
void iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len, size_t off );

struct iptv_network
{
Expand All @@ -81,11 +77,10 @@ struct iptv_mux

int mm_iptv_atsc;

uint8_t *mm_iptv_tsb;
int mm_iptv_pos;

char *mm_iptv_svcname;

sbuf_t mm_iptv_buffer;

iptv_handler_t *im_handler;

void *im_data;
Expand Down
22 changes: 13 additions & 9 deletions src/input/mpegts/iptv/iptv_udp.c
Expand Up @@ -182,10 +182,10 @@ static ssize_t
iptv_udp_read ( iptv_mux_t *im, size_t *off )
{
/* UDP/RTP should not have TS packets straddling datagrams, I think! */
im->mm_iptv_pos = 0;
im->mm_iptv_buffer.sb_ptr = 0;

/* Read */
return read(im->mm_iptv_fd, im->mm_iptv_tsb, IPTV_PKT_SIZE);
return sbuf_read(&im->mm_iptv_buffer, im->mm_iptv_fd);
}

static ssize_t
Expand All @@ -201,22 +201,26 @@ iptv_rtp_read ( iptv_mux_t *im, size_t *off )
/* Strip RTP header */
if (len < 12)
return 0; // ignore
if ((im->mm_iptv_tsb[0] & 0xC0) != 0x80)
if ((im->mm_iptv_buffer.sb_data[0] & 0xC0) != 0x80)
return 0;
if ((im->mm_iptv_tsb[1] & 0x7F) != 33)
if ((im->mm_iptv_buffer.sb_data[1] & 0x7F) != 33)
return 0;
hlen = ((im->mm_iptv_tsb[0] & 0xf) * 4) + 12;
if (im->mm_iptv_tsb[0] & 0x10) {
hlen = ((im->mm_iptv_buffer.sb_data[0] & 0xf) * 4) + 12;
if (im->mm_iptv_buffer.sb_data[0] & 0x10) {
if (len < hlen+4)
return 0;
hlen += (im->mm_iptv_tsb[hlen+2] << 8) | (im->mm_iptv_tsb[hlen+3]*4);
hlen += (im->mm_iptv_buffer.sb_data[hlen+2] << 8)
| (im->mm_iptv_buffer.sb_data[hlen+3] * 4);
hlen += 4;
}
if (len < hlen || ((len - hlen) % 188) != 0)
return 0;

/* OK */
*off = hlen;
/* Cut */
sbuf_cut(&im->mm_iptv_buffer, hlen);
// TODO: would be nice to avoid this extra copy, it was possible until I
// changed the API!

return len;
}

Expand Down

0 comments on commit aa29bca

Please sign in to comment.