Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mpegts input: add queue size checks for raw mpegts data for slow mach…
…ines
  • Loading branch information
perexg committed May 13, 2016
1 parent c195db2 commit 1936395
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/input/mpegts.h
Expand Up @@ -683,6 +683,8 @@ struct mpegts_input
pthread_mutex_t mi_input_lock;
tvh_cond_t mi_input_cond;
TAILQ_HEAD(,mpegts_packet) mi_input_queue;
uint64_t mi_input_queue_size;
tvhlog_limit_t mi_input_queue_loglimit;

/* Data processing/output */
// Note: this lock (mi_output_lock) protects all the remaining
Expand All @@ -696,6 +698,8 @@ struct mpegts_input
pthread_t mi_table_tid;
tvh_cond_t mi_table_cond;
mpegts_table_feed_queue_t mi_table_queue;
uint64_t mi_table_queue_size;
tvhlog_limit_t mi_table_queue_loglimit;

/* DBus */
#if ENABLE_DBUS_1
Expand Down
35 changes: 26 additions & 9 deletions src/input/mpegts/mpegts_input.c
Expand Up @@ -1123,9 +1123,16 @@ mpegts_input_recv_packets

pthread_mutex_lock(&mi->mi_input_lock);
if (mmi->mmi_mux->mm_active == mmi) {
memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
tvh_cond_signal(&mi->mi_input_cond, 0);
if (mi->mi_input_queue_size < 50*1024*1024) {
mi->mi_input_queue_size += len2;
memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
tvh_cond_signal(&mi->mi_input_cond, 0);
} else {
if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
tvhwarn("mpegts", "too much queued input data (over 50MB), discarding new");
free(mp);
}
} else {
free(mp);
}
Expand Down Expand Up @@ -1373,12 +1380,18 @@ mpegts_input_process
if (type & MPS_FTABLE)
mpegts_input_table_dispatch(mm, muxname, tsb, llen);
if (type & MPS_TABLE) {
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
mtf->mtf_len = llen;
memcpy(mtf->mtf_tsb, tsb, llen);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
if (mi->mi_table_queue_size >= 2*1024*1024) {
if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
tvhwarn("mpegts", "too much queued table input data (over 2MB), discarding new");
} else {
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
mtf->mtf_len = llen;
memcpy(mtf->mtf_tsb, tsb, llen);
mtf->mtf_mux = mm;
mi->mi_table_queue_size += llen;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
}
}
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
Expand Down Expand Up @@ -1473,6 +1486,7 @@ mpegts_input_thread ( void * p )
tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
continue;
}
mi->mi_input_queue_size -= mp->mp_len;
memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
pthread_mutex_unlock(&mi->mi_input_lock);
Expand Down Expand Up @@ -1516,6 +1530,7 @@ mpegts_input_thread ( void * p )
TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
free(mp);
}
mi->mi_input_queue_size = 0;
pthread_mutex_unlock(&mi->mi_input_lock);

return NULL;
Expand All @@ -1537,6 +1552,7 @@ mpegts_input_table_thread ( void *aux )
tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
continue;
}
mi->mi_table_queue_size -= mtf->mtf_len;
TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
pthread_mutex_unlock(&mi->mi_output_lock);

Expand All @@ -1563,6 +1579,7 @@ mpegts_input_table_thread ( void *aux )
TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
free(mtf);
}
mi->mi_table_queue_size = 0;
pthread_mutex_unlock(&mi->mi_output_lock);

return NULL;
Expand Down

0 comments on commit 1936395

Please sign in to comment.