Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mpegts input: implement MPEGTS_DATA_CC_RESTART
  • Loading branch information
perexg committed Nov 19, 2015
1 parent 42fafd0 commit b5442a3
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 38 deletions.
12 changes: 11 additions & 1 deletion src/input/mpegts.h
Expand Up @@ -49,6 +49,7 @@ typedef struct mpegts_input mpegts_input_t;
typedef struct mpegts_table_feed mpegts_table_feed_t;
typedef struct mpegts_network_link mpegts_network_link_t;
typedef struct mpegts_packet mpegts_packet_t;
typedef struct mpegts_pcr mpegts_pcr_t;
typedef struct mpegts_buffer mpegts_buffer_t;

/* Lists */
Expand Down Expand Up @@ -121,9 +122,18 @@ struct mpegts_packet
TAILQ_ENTRY(mpegts_packet) mp_link;
size_t mp_len;
mpegts_mux_t *mp_mux;
uint8_t mp_cc_restart;
uint8_t mp_data[0];
};

struct mpegts_pcr {
int64_t pcr_first;
int64_t pcr_last;
uint16_t pcr_pid;
};

#define MPEGTS_DATA_CC_RESTART (1<<0)

typedef int (*mpegts_table_callback_t)
( mpegts_table_t*, const uint8_t *buf, int len, int tableid );

Expand Down Expand Up @@ -894,7 +904,7 @@ void mpegts_mux_update_pids ( mpegts_mux_t *mm );

void mpegts_input_recv_packets
(mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid);
int flags, mpegts_pcr_t *pcr);

int mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
int mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
Expand Down
34 changes: 25 additions & 9 deletions src/input/mpegts/iptv/iptv.c
Expand Up @@ -506,14 +506,30 @@ iptv_input_pause_handler ( iptv_mux_t *im, int pause )
tvhpoll_add(iptv_poll, &ev, 1);
}

void
iptv_input_recv_flush ( iptv_mux_t *im )
{
mpegts_mux_instance_t *mmi = im->mm_active;

if (mmi == NULL)
return;
mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
&im->mm_iptv_buffer, MPEGTS_DATA_CC_RESTART,
NULL);
}

int
iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
{
static time_t t1 = 0, t2;
iptv_network_t *in = (iptv_network_t*)im->mm_network;
mpegts_mux_instance_t *mmi;
int64_t pcr_first = PTS_UNSET, pcr_last = PTS_UNSET, s64;
mpegts_pcr_t pcr;
int64_t s64;

pcr.pcr_first = PTS_UNSET;
pcr.pcr_last = PTS_UNSET;
pcr.pcr_pid = im->im_pcr_pid;
in->in_bps += len * 8;
time(&t2);
if (t2 != t1) {
Expand All @@ -537,23 +553,23 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
return 1;
}
mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
&im->mm_iptv_buffer,
&pcr_first, &pcr_last, &im->im_pcr_pid);
if (pcr_first != PTS_UNSET && pcr_last != PTS_UNSET) {
&im->mm_iptv_buffer, 0, &pcr);
if (pcr.pcr_first != PTS_UNSET && pcr.pcr_last != PTS_UNSET) {
im->im_pcr_pid = pcr.pcr_pid;
if (im->im_pcr == PTS_UNSET) {
s64 = pts_diff(pcr_first, pcr_last);
s64 = pts_diff(pcr.pcr_first, pcr.pcr_last);
if (s64 != PTS_UNSET) {
im->im_pcr = pcr_first;
im->im_pcr = pcr.pcr_first;
im->im_pcr_start = getmonoclock();
im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
tvhtrace("iptv-pcr", "pcr: first %"PRId64" last %"PRId64", time start %"PRId64", end %"PRId64,
pcr_first, pcr_last, im->im_pcr_start, im->im_pcr_end);
pcr.pcr_first, pcr.pcr_last, im->im_pcr_start, im->im_pcr_end);
}
} else {
s64 = pts_diff(im->im_pcr, pcr_last);
s64 = pts_diff(im->im_pcr, pcr.pcr_last);
if (s64 != PTS_UNSET) {
im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
tvhtrace("iptv-pcr", "pcr: last %"PRId64", time end %"PRId64, pcr_last, im->im_pcr_end);
tvhtrace("iptv-pcr", "pcr: last %"PRId64", time end %"PRId64, pcr.pcr_last, im->im_pcr_end);
}
}
if (iptv_input_pause_check(im)) {
Expand Down
6 changes: 4 additions & 2 deletions src/input/mpegts/iptv/iptv_http.c
Expand Up @@ -166,11 +166,13 @@ iptv_http_header ( http_client_t *hc )

hp->m3u_header = 0;
hp->off = 0;
pthread_mutex_lock(&global_lock);
if (!hp->started) {
pthread_mutex_lock(&global_lock);
iptv_input_mux_started(hp->im);
pthread_mutex_unlock(&global_lock);
} else {
iptv_input_recv_flush(hp->im);
}
pthread_mutex_unlock(&global_lock);
hp->started = 1;
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/input/mpegts/iptv/iptv_private.h
Expand Up @@ -67,6 +67,7 @@ struct iptv_input
int iptv_input_fd_started ( iptv_mux_t *im );
void iptv_input_mux_started ( iptv_mux_t *im );
int iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
void iptv_input_recv_flush ( iptv_mux_t *im );
void iptv_input_pause_handler ( iptv_mux_t *im, int pause );

struct iptv_network
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/linuxdvb/linuxdvb_frontend.c
Expand Up @@ -1136,7 +1136,7 @@ linuxdvb_frontend_input_thread ( void *aux )
}

/* Process */
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL, NULL);
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, 0, NULL);
}

sbuf_free(&sb);
Expand Down
36 changes: 22 additions & 14 deletions src/input/mpegts/mpegts_input.c
Expand Up @@ -977,7 +977,7 @@ ts_sync_count ( const uint8_t *tsb, int len )
void
mpegts_input_recv_packets
( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid )
int flags, mpegts_pcr_t *pcr )
{
int len2 = 0, off = 0;
mpegts_packet_t *mp;
Expand All @@ -986,7 +986,7 @@ mpegts_input_recv_packets
#define MIN_TS_PKT 100
#define MIN_TS_SYN (5*188)

if (len < (MIN_TS_PKT * 188)) {
if (len < (MIN_TS_PKT * 188) && (flags & MPEGTS_DATA_CC_RESTART) == 0) {
/* For slow streams, check also against the clock */
if (dispatch_clock == mi->mi_last_dispatch)
return;
Expand All @@ -1008,24 +1008,24 @@ mpegts_input_recv_packets
// require per mmi buffers, where this is generally not required)

/* Extract PCR on demand */
if (pcr_first && pcr_last && pcr_pid) {
if (pcr) {
uint8_t *tmp, *end;
uint16_t pid;
for (tmp = tsb, end = tsb + len2; tmp < end; tmp += 188) {
pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
if (*pcr_pid == MPEGTS_PID_NONE || *pcr_pid == pid) {
if (get_pcr(tmp, pcr_first)) {
*pcr_pid = pid;
if (pcr->pcr_pid == MPEGTS_PID_NONE || pcr->pcr_pid == pid) {
if (get_pcr(tmp, &pcr->pcr_first)) {
pcr->pcr_pid = pid;
break;
}
}
}
if (*pcr_pid != MPEGTS_PID_NONE) {
if (pcr->pcr_pid != MPEGTS_PID_NONE) {
for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
if (*pcr_pid == pid) {
if (get_pcr(tmp, pcr_last)) {
*pcr_pid = pid;
if (pcr->pcr_pid == pid) {
if (get_pcr(tmp, &pcr->pcr_last)) {
pcr->pcr_pid = pid;
break;
}
}
Expand All @@ -1034,10 +1034,11 @@ mpegts_input_recv_packets
}

/* Pass */
if (len2 >= MIN_TS_SYN) {
if (len2 >= MIN_TS_SYN || (flags & MPEGTS_DATA_CC_RESTART)) {
mp = malloc(sizeof(mpegts_packet_t) + len2);
mp->mp_mux = mmi->mmi_mux;
mp->mp_len = len2;
mp->mp_mux = mmi->mmi_mux;
mp->mp_len = len2;
mp->mp_cc_restart = (flags & MPEGTS_DATA_CC_RESTART) ? 1 : 0;
memcpy(mp->mp_data, tsb, len2);

len -= len2;
Expand All @@ -1054,7 +1055,7 @@ mpegts_input_recv_packets
}

/* Adjust buffer */
if (len)
if (len && (flags & MPEGTS_DATA_CC_RESTART) == 0)
sbuf_cut(sb, off); // cut off the bottom
else
sb->sb_ptr = 0; // clear
Expand Down Expand Up @@ -1183,6 +1184,7 @@ mpegts_input_process
mpegts_pid_t *mp;
mpegts_pid_sub_t *mps;
service_t *s;
elementary_stream_t *st;
int table_wakeup = 0;
mpegts_mux_t *mm = mpkt->mp_mux;
mpegts_mux_instance_t *mmi;
Expand Down Expand Up @@ -1352,6 +1354,12 @@ mpegts_input_process
}
#endif

if (mpkt->mp_cc_restart) {
LIST_FOREACH(s, &mm->mm_transports, s_active_link)
TAILQ_FOREACH(st, &s->s_components, es_link)
st->es_cc = -1;
}

/* Wake table */
if (table_wakeup)
pthread_cond_signal(&mi->mi_table_cond);
Expand Down
5 changes: 2 additions & 3 deletions src/input/mpegts/satip/satip_frontend.c
Expand Up @@ -1024,7 +1024,7 @@ satip_frontend_rtp_data_received( http_client_t *hc, void *buf, size_t len )
mmi = lfe->sf_req->sf_mmi;
mmi->tii_stats.unc += unc;
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
&lfe->sf_sbuf, NULL, NULL, NULL);
&lfe->sf_sbuf, 0, NULL);
}
pthread_mutex_unlock(&lfe->sf_dvr_lock);
lfe->sf_last_data_tstamp = dispatch_clock;
Expand Down Expand Up @@ -1572,8 +1572,7 @@ satip_frontend_input_thread ( void *aux )
pthread_mutex_lock(&lfe->sf_dvr_lock);
if (lfe->sf_req == lfe->sf_req_thread) {
mmi->tii_stats.unc += unc;
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
sb, NULL, NULL, NULL);
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, sb, 0, NULL);
} else
fatal = 1;
pthread_mutex_unlock(&lfe->sf_dvr_lock);
Expand Down
18 changes: 11 additions & 7 deletions src/input/mpegts/tsfile/tsfile_input.c
Expand Up @@ -44,7 +44,8 @@ tsfile_input_thread ( void *aux )
tvhpoll_event_t ev;
struct stat st;
sbuf_t buf;
int64_t pcr, pcr2, pcr_last = PTS_UNSET;
mpegts_pcr_t pcr;
int64_t pcr_last = PTS_UNSET;
#if PLATFORM_LINUX
int64_t pcr_last_realtime = 0;
#endif
Expand Down Expand Up @@ -130,17 +131,20 @@ tsfile_input_thread ( void *aux )

/* Process */
if (c > 0) {
pcr = PTS_UNSET;
mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf,
&pcr, &pcr2, &tmi->mmi_tsfile_pcr_pid);
pcr.pcr_first = PTS_UNSET;
pcr.pcr_last = PTS_UNSET;
pcr.pcr_pid = tmi->mmi_tsfile_pcr_pid;
mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf, 0, &pcr);
if (pcr.pcr_pid)
tmi->mmi_tsfile_pcr_pid = pcr.pcr_pid;

/* Delay */
if (pcr != PTS_UNSET) {
if (pcr.pcr_first != PTS_UNSET) {
if (pcr_last != PTS_UNSET) {
struct timespec slp;
int64_t delta;

delta = pcr - pcr_last;
delta = pcr.pcr_first - pcr_last;

if (delta < 0)
delta = 0;
Expand All @@ -159,7 +163,7 @@ tsfile_input_thread ( void *aux )
nanosleep(&slp, NULL);
#endif
}
pcr_last = pcr;
pcr_last = pcr.pcr_first;
#if PLATFORM_LINUX
pcr_last_realtime = getmonoclock();
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c
Expand Up @@ -196,7 +196,7 @@ tvhdhomerun_frontend_input_thread ( void *aux )

//tvhdebug("tvhdhomerun", "got r=%d (thats %d)", r, (r == 7*188));

mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL, NULL);
mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, 0, NULL);
}

tvhdebug("tvhdhomerun", "setting target to none");
Expand Down

0 comments on commit b5442a3

Please sign in to comment.