Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mpegts input: cleanups in locking and queue shutdown
  • Loading branch information
perexg committed Oct 26, 2014
1 parent 16363a4 commit 46ce1de
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 46 deletions.
34 changes: 20 additions & 14 deletions src/input/mpegts/mpegts_input.c
Expand Up @@ -478,6 +478,9 @@ static void
mpegts_input_stopping_mux
( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
{
pthread_mutex_lock(&mi->mi_input_lock);
mi->mi_stop = 1;
pthread_mutex_unlock(&mi->mi_input_lock);
pthread_mutex_lock(&mi->mi_output_lock);
mi->mi_stop = 1;
pthread_mutex_unlock(&mi->mi_output_lock);
Expand Down Expand Up @@ -599,9 +602,10 @@ mpegts_input_recv_packets
off += len2;

pthread_mutex_lock(&mi->mi_input_lock);
if (TAILQ_FIRST(&mi->mi_input_queue) == NULL)
if (!mi->mi_stop) {
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
pthread_cond_signal(&mi->mi_input_cond);
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
}
pthread_mutex_unlock(&mi->mi_input_lock);
}

Expand Down Expand Up @@ -790,11 +794,13 @@ mpegts_input_process
// TODO: might be able to optimise this a bit by having slightly
// larger buffering and trying to aggregate data (if we get
// same PID multiple times in the loop)
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
if (!mi->mi_stop) {
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
}
}
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
Expand Down Expand Up @@ -884,12 +890,10 @@ mpegts_input_table_thread ( void *aux )
pthread_mutex_unlock(&mi->mi_output_lock);

/* Process */
if (mtf->mtf_mux) {
pthread_mutex_lock(&global_lock);
if (!mi->mi_stop)
mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
pthread_mutex_unlock(&global_lock);
}
pthread_mutex_lock(&global_lock);
if (!mi->mi_stop && mtf->mtf_mux)
mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
pthread_mutex_unlock(&global_lock);

/* Cleanup */
free(mtf);
Expand Down Expand Up @@ -927,11 +931,13 @@ mpegts_input_flush_mux
}
pthread_mutex_unlock(&mi->mi_input_lock);

/* Flush table Q - the global lock is already held */
/* Flush table Q */
pthread_mutex_lock(&mi->mi_output_lock);
TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
if (mtf->mtf_mux == mm)
mtf->mtf_mux = NULL;
}
pthread_mutex_unlock(&mi->mi_output_lock);
/* stop flag must be set here */
/* otherwise the picked mtf might be processed after mux deactivation */
assert(mi->mi_stop);
Expand Down
57 changes: 25 additions & 32 deletions src/input/mpegts/mpegts_mux.c
Expand Up @@ -691,51 +691,47 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )

/* Stop possible recursion */
if (!mmi) return;

/* Clear */
mm->mm_active = NULL;

mpegts_mux_nice_name(mm, buf, sizeof(buf));
tvhdebug("mpegts", "%s - stopping mux", buf);

if (mmi) {
mi = mmi->mmi_input;
mi->mi_stopping_mux(mi, mmi);
LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
mi->mi_stop_mux(mi, mmi);
mi->mi_stopped_mux(mi, mmi);
}
mi = mmi->mmi_input;
assert(mi);
mi->mi_stopping_mux(mi, mmi);
LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
mi->mi_stop_mux(mi, mmi);
mi->mi_stopped_mux(mi, mmi);

/* Flush all tables */
tvhtrace("mpegts", "%s - flush tables", buf);
mpegts_table_flush_all(mm);

tvhtrace("mpegts", "%s - mi=%p", buf, (void *)mi);
/* Flush table data queue */
if (mi)
mpegts_input_flush_mux(mi, mm);
mpegts_input_flush_mux(mi, mm);

/* Ensure PIDs are cleared */
if (mi) {
pthread_mutex_lock(&mi->mi_output_lock);
mm->mm_last_pid = -1;
mm->mm_last_mp = NULL;
while ((mp = RB_FIRST(&mm->mm_pids))) {
assert(mi);
while ((mps = RB_FIRST(&mp->mp_subs))) {
RB_REMOVE(&mp->mp_subs, mps, mps_link);
free(mps);
}
RB_REMOVE(&mm->mm_pids, mp, mp_link);
if (mp->mp_fd != -1) {
tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid);
close(mp->mp_fd);
}
free(mp);
pthread_mutex_lock(&mi->mi_output_lock);
mm->mm_last_pid = -1;
mm->mm_last_mp = NULL;
while ((mp = RB_FIRST(&mm->mm_pids))) {
assert(mi);
while ((mps = RB_FIRST(&mp->mp_subs))) {
RB_REMOVE(&mp->mp_subs, mps, mps_link);
free(mps);
}
pthread_mutex_unlock(&mi->mi_output_lock);
} else {
assert(RB_FIRST(&mm->mm_pids) == NULL);
RB_REMOVE(&mm->mm_pids, mp, mp_link);
if (mp->mp_fd != -1) {
tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid);
close(mp->mp_fd);
}
free(mp);
}
pthread_mutex_unlock(&mi->mi_output_lock);

/* Scanning */
mpegts_network_scan_mux_cancel(mm, 1);
Expand All @@ -745,9 +741,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )

/* Events */
mpegts_fire_event(mm, ml_mux_stop);

/* Clear */
mm->mm_active = NULL;
}

void
Expand Down

0 comments on commit 46ce1de

Please sign in to comment.