Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
IPTV: add pause support for the correct input data timing, file:// se…
…ems working, fixes #3240
  • Loading branch information
perexg committed Nov 13, 2015
1 parent 654a14d commit 493f458
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/input/mpegts.h
Expand Up @@ -894,7 +894,7 @@ void mpegts_mux_update_pids ( mpegts_mux_t *mm );

void mpegts_input_recv_packets
(mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
int64_t *pcr, uint16_t *pcr_pid);
int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid);

int mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
int mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
Expand Down
95 changes: 90 additions & 5 deletions src/input/mpegts/iptv/iptv.c
Expand Up @@ -24,6 +24,7 @@
#include "htsstr.h"
#include "channels.h"
#include "bouquet.h"
#include "packet.h"

#include <sys/socket.h>
#include <sys/types.h>
Expand Down Expand Up @@ -368,6 +369,8 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )

pthread_mutex_lock(&iptv_lock);

gtimer_disarm(&im->im_pause_timer);

/* Stop */
if (im->im_handler->stop)
im->im_handler->stop(im);
Expand Down Expand Up @@ -404,10 +407,20 @@ iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
snprintf(buf, len, "IPTV");
}

static void
iptv_input_unpause ( void *aux )
{
iptv_mux_t *im = aux;
pthread_mutex_lock(&iptv_lock);
tvhtrace("iptv-pcr", "unpause timer callback");
im->im_handler->pause(im, 0);
pthread_mutex_unlock(&iptv_lock);
}

static void *
iptv_input_thread ( void *aux )
{
int nfds;
int nfds, r;
ssize_t n;
iptv_mux_t *im;
tvhpoll_event_t ev;
Expand All @@ -425,6 +438,7 @@ iptv_input_thread ( void *aux )
continue;
}
im = ev.data.ptr;
r = 0;

pthread_mutex_lock(&iptv_lock);

Expand All @@ -436,20 +450,51 @@ iptv_input_thread ( void *aux )
im->im_handler->stop(im);
break;
}
iptv_input_recv_packets(im, n);
r = iptv_input_recv_packets(im, n);
if (r == 1)
im->im_handler->pause(im, 1);
}

pthread_mutex_unlock(&iptv_lock);

if (r == 1) {
pthread_mutex_lock(&global_lock);
if (im->mm_active)
gtimer_arm(&im->im_pause_timer, iptv_input_unpause, im, 1);
pthread_mutex_unlock(&global_lock);
}
}
return NULL;
}

void
iptv_input_pause_handler ( iptv_mux_t *im, int pause )
{
tvhpoll_event_t ev = { 0 };

ev.fd = im->mm_iptv_fd;
ev.events = TVHPOLL_IN;
ev.data.ptr = im;
if (pause)
tvhpoll_rem(iptv_poll, &ev, 1);
else
tvhpoll_add(iptv_poll, &ev, 1);
}

static inline int
iptv_input_pause_check ( iptv_mux_t *im )
{
/* queued more than 3 seconds? trigger the pause */
return im->im_pcr_end - im->im_pcr_start >= 3000000LL;
}

int
iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
{
static time_t t1 = 0, t2;
iptv_network_t *in = (iptv_network_t*)im->mm_network;
mpegts_mux_instance_t *mmi;
int64_t pcr_first = PTS_UNSET, pcr_last = PTS_UNSET, s64;

in->in_bps += len * 8;
time(&t2);
Expand All @@ -466,13 +511,50 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
t1 = t2;
}

/* Pass on */
/* Pass on, but with timing */
mmi = im->mm_active;
if (mmi)
if (mmi) {
if (im->im_pcr != PTS_UNSET) {
s64 = getmonoclock() - im->im_pcr_start;
im->im_pcr_start += s64;
im->im_pcr += (((s64 / 10LL) * 9LL) + 4LL) / 10LL;
im->im_pcr &= PTS_MASK;
tvhtrace("iptv-pcr", "pcr: updated %ld, time start %ld", im->im_pcr, im->im_pcr_start);
}
if (iptv_input_pause_check(im)) {
tvhtrace("iptv-pcr", "pcr: paused");
return 1;
}
mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
&im->mm_iptv_buffer, NULL, NULL);
&im->mm_iptv_buffer,
&pcr_first, &pcr_last, &im->im_pcr_pid);
if (pcr_first != PTS_UNSET && pcr_last != PTS_UNSET) {
if (im->im_pcr == PTS_UNSET) {
s64 = pts_diff(pcr_first, pcr_last);
if (s64 != PTS_UNSET) {
im->im_pcr = pcr_first;
im->im_pcr_start = getmonoclock();
im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
tvhtrace("iptv-pcr", "pcr: first %ld last %ld, time start %ld, end %ld",
pcr_first, pcr_last, im->im_pcr_start, im->im_pcr_end);
}
} else {
s64 = pts_diff(im->im_pcr, pcr_last);
if (s64 != PTS_UNSET) {
im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
tvhtrace("iptv-pcr", "pcr: last %ld, time end %ld", pcr_last, im->im_pcr_end);
}
}
if (iptv_input_pause_check(im)) {
tvhtrace("iptv-pcr", "pcr: paused");
return 1;
}
}
}
return 0;
}


int
iptv_input_fd_started ( iptv_mux_t *im )
{
Expand Down Expand Up @@ -519,6 +601,9 @@ iptv_input_mux_started ( iptv_mux_t *im )
/* Allocate input buffer */
sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE);

im->im_pcr = PTS_UNSET;
im->im_pcr_pid = MPEGTS_PID_NONE;

if (iptv_input_fd_started(im))
return;

Expand Down
104 changes: 69 additions & 35 deletions src/input/mpegts/iptv/iptv_file.c
Expand Up @@ -25,64 +25,99 @@
#include <sys/types.h>
#include <fcntl.h>
#include <assert.h>
#include <signal.h>

typedef struct file_priv {
int fd;
int shutdown;
pthread_t tid;
pthread_cond_t cond;
} file_priv_t;

/*
* Connect UDP/RTP
* Read thread
*/
static void *
iptv_file_thread ( void *aux )
{
iptv_mux_t *im = aux;
file_priv_t *fp = im->im_data;
struct timespec ts;
int r, fd = fp->fd, pause = 0;
char buf[32*1024];

pthread_mutex_lock(&iptv_lock);
while (!fp->shutdown && fd > 0) {
while (!fp->shutdown && pause) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
if (pthread_cond_timedwait(&fp->cond, &iptv_lock, &ts) == ETIMEDOUT)
break;
}
if (fp->shutdown)
break;
pause = 0;
pthread_mutex_unlock(&iptv_lock);
r = read(fd, buf, sizeof(buf));
pthread_mutex_lock(&iptv_lock);
if (r == 0)
break;
if (r < 0) {
if (ERRNO_AGAIN(errno))
continue;
break;
}
sbuf_append(&im->mm_iptv_buffer, buf, r);
if (iptv_input_recv_packets(im, r) == 1)
pause = 1;
}
pthread_mutex_unlock(&iptv_lock);
return NULL;
}

/*
* Open file
*/
static int
iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url )
{
int fd = tvh_open(raw + 7, O_RDONLY | O_DIRECT, 0);
file_priv_t *fp;
int fd = tvh_open(raw + 7, O_RDONLY | O_NONBLOCK, 0);

if (fd < 0) {
tvherror("iptv", "unable to open file '%s'", raw + 7);
return -1;
}

im->mm_iptv_fd = fd;

fp = calloc(1, sizeof(*fp));
fp->fd = fd;
pthread_cond_init(&fp->cond, NULL);
im->im_data = fp;
iptv_input_mux_started(im);
tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile");
return 0;
}

static void
iptv_file_stop
( iptv_mux_t *im )
{
int rd = im->mm_iptv_fd;
file_priv_t *fp = im->im_data;
int rd = fp->fd;
if (rd > 0)
close(rd);
im->mm_iptv_fd = -1;
}

static ssize_t
iptv_file_read ( iptv_mux_t *im )
{
int r, fd = im->mm_iptv_fd;
ssize_t res = 0;

while (fd > 0) {
sbuf_alloc(&im->mm_iptv_buffer, 32*1024);
r = sbuf_read(&im->mm_iptv_buffer, fd);
if (r == 0) {
close(fd);
im->mm_iptv_fd = -1;
break;
}
if (r < 0) {
if (errno == EAGAIN)
break;
if (ERRNO_AGAIN(errno))
continue;
}
res += r;
}

return res;
fp->shutdown = 1;
pthread_cond_signal(&fp->cond);
pthread_mutex_unlock(&iptv_lock);
pthread_join(fp->tid, NULL);
pthread_cond_destroy(&fp->cond);
pthread_mutex_lock(&iptv_lock);
free(im->im_data);
im->im_data = NULL;
}

/*
* Initialise pipe handler
* Initialise file handler
*/

void
Expand All @@ -92,8 +127,7 @@ iptv_file_init ( void )
{
.scheme = "file",
.start = iptv_file_start,
.stop = iptv_file_stop,
.read = iptv_file_read,
.stop = iptv_file_stop
},
};
iptv_handler_register(ih, ARRAY_SIZE(ih));
Expand Down
18 changes: 17 additions & 1 deletion src/input/mpegts/iptv/iptv_http.c
Expand Up @@ -87,7 +87,8 @@ iptv_http_data
tsdebug_write((mpegts_mux_t *)im, buf, len);

if (len > 0)
iptv_input_recv_packets(im, len);
if (iptv_input_recv_packets(im, len) == 1)
hc->hc_pause = 1;

pthread_mutex_unlock(&iptv_lock);

Expand Down Expand Up @@ -228,6 +229,19 @@ iptv_http_stop
}


/*
* Pause/Unpause
*/
static void
iptv_http_pause
( iptv_mux_t *im, int pause )
{
http_client_t *hc = im->im_data;

assert(pause == 0);
http_client_unpause(hc);
}

/*
* Initialise HTTP handler
*/
Expand All @@ -240,11 +254,13 @@ iptv_http_init ( void )
.scheme = "http",
.start = iptv_http_start,
.stop = iptv_http_stop,
.pause = iptv_http_pause
},
{
.scheme = "https",
.start = iptv_http_start,
.stop = iptv_http_stop,
.pause = iptv_http_pause
}
};
iptv_handler_register(ih, 2);
Expand Down
1 change: 1 addition & 0 deletions src/input/mpegts/iptv/iptv_pipe.c
Expand Up @@ -165,6 +165,7 @@ iptv_pipe_init ( void )
.start = iptv_pipe_start,
.stop = iptv_pipe_stop,
.read = iptv_pipe_read,
.pause = iptv_input_pause_handler
},
};
iptv_handler_register(ih, ARRAY_SIZE(ih));
Expand Down
10 changes: 9 additions & 1 deletion src/input/mpegts/iptv/iptv_private.h
Expand Up @@ -52,6 +52,7 @@ struct iptv_handler
int (*start) ( iptv_mux_t *im, const char *raw, const url_t *url );
void (*stop) ( iptv_mux_t *im );
ssize_t (*read) ( iptv_mux_t *im );
void (*pause) ( iptv_mux_t *im, int pause );

RB_ENTRY(iptv_handler) link;
};
Expand All @@ -65,7 +66,8 @@ struct iptv_input

int iptv_input_fd_started ( iptv_mux_t *im );
void iptv_input_mux_started ( iptv_mux_t *im );
void iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
int iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
void iptv_input_pause_handler ( iptv_mux_t *im, int pause );

struct iptv_network
{
Expand Down Expand Up @@ -138,6 +140,12 @@ struct iptv_mux
sbuf_t mm_iptv_buffer;

iptv_handler_t *im_handler;
gtimer_t im_pause_timer;

int64_t im_pcr;
int64_t im_pcr_start;
int64_t im_pcr_end;
uint16_t im_pcr_pid;

void *im_data;

Expand Down

0 comments on commit 493f458

Please sign in to comment.