Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mpegts input: move active list of services from input to mux, fixes #…
…2801

In last changes, the mux filter was removed from the main TS packet
decoder loop. Also, it is not good to have this filter in this
"exponed" function. The solution is quite easy - move the active list
of services from the input object to the mux object. It seems that
this change reduces the code (mux filters) all around.
  • Loading branch information
perexg committed Apr 24, 2015
1 parent 0e4a411 commit 3f4c630
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 62 deletions.
4 changes: 2 additions & 2 deletions src/input/mpegts.h
Expand Up @@ -388,7 +388,8 @@ struct mpegts_mux
*/

LIST_HEAD(, mpegts_mux_instance) mm_instances;
mpegts_mux_instance_t *mm_active;
mpegts_mux_instance_t *mm_active;
LIST_HEAD(,service) mm_transports;

/*
* Raw subscriptions
Expand Down Expand Up @@ -622,7 +623,6 @@ struct mpegts_input

/* Active sources */
LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
LIST_HEAD(,service) mi_transports;

/* Table processing */
pthread_t mi_table_tid;
Expand Down
10 changes: 5 additions & 5 deletions src/input/mpegts/iptv/iptv.c
Expand Up @@ -127,18 +127,18 @@ iptv_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags )
int w = 0;
const th_subscription_t *ths;
const service_t *s;
mpegts_mux_instance_t *mmi;

/* Find the "min" weight */
if (!iptv_input_is_free(mi, mm)) {
w = 1000000;

/* Service subs */
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MIN(w, ths->ths_weight);
}
}
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
LIST_FOREACH(s, &mmi->mmi_mux->mm_transports, s_active_link)
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link)
w = MIN(w, ths->ths_weight);
pthread_mutex_unlock(&mi->mi_output_lock);
}

Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/linuxdvb/linuxdvb_frontend.c
Expand Up @@ -854,7 +854,7 @@ linuxdvb_frontend_monitor ( void *aux )
sigstat.tc_block = mmi->tii_stats.tc_block;
sm.sm_type = SMT_SIGNAL_STATUS;
sm.sm_data = &sigstat;
LIST_FOREACH(s, &lfe->mi_transports, s_active_link) {
LIST_FOREACH(s, &mmi->mmi_mux->mm_transports, s_active_link) {
pthread_mutex_lock(&s->s_stream_mutex);
streaming_pad_deliver(&s->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&s->s_stream_mutex);
Expand Down
78 changes: 38 additions & 40 deletions src/input/mpegts/mpegts_input.c
Expand Up @@ -324,18 +324,19 @@ mpegts_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
int
mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags )
{
const mpegts_mux_instance_t *mmi;
const service_t *s;
const th_subscription_t *ths;
int w = 0, count = 0;

/* Service subs */
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MAX(w, ths->ths_weight);
count++;
}
}
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
LIST_FOREACH(s, &mmi->mmi_mux->mm_transports, s_active_link)
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MAX(w, ths->ths_weight);
count++;
}
pthread_mutex_unlock(&mi->mi_output_lock);
return w > 0 ? w + count - 1 : 0;
}
Expand Down Expand Up @@ -530,6 +531,7 @@ mpegts_input_close_pid
void
mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags, int init )
{
mpegts_mux_t *mm = s->s_dvb_mux;
elementary_stream_t *st;
mpegts_apids_t *pids;
mpegts_service_t *s2;
Expand All @@ -538,7 +540,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
/* Add to list */
pthread_mutex_lock(&mi->mi_output_lock);
if (!s->s_dvb_active_input) {
LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link);
LIST_INSERT_HEAD(&mm->mm_transports, ((service_t*)s), s_active_link);
s->s_dvb_active_input = mi;
}

Expand All @@ -548,15 +550,15 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,

pids = mpegts_pid_alloc();

mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
mi->mi_open_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, s);
mi->mi_open_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, s);
mpegts_pid_add(pids, s->s_pmt_pid);
mpegts_pid_add(pids, s->s_pcr_pid);
/* Open only filtered components here */
TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link)
if (st->es_type != SCT_CA) {
st->es_pid_opened = 1;
mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
mi->mi_open_pid(mi, mm, st->es_pid, MPS_SERVICE, s);
}

/* Ensure that filtered PIDs are not send in ts_recv_raw */
Expand All @@ -575,15 +577,15 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
} else {
if ((pids = s->s_pids) != NULL) {
if (pids->all) {
mi->mi_open_pid(mi, s->s_dvb_mux, MPEGTS_FULLMUX_PID, MPS_RAW | MPS_ALL, s);
mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW | MPS_ALL, s);
} else {
for (i = 0; i < pids->count; i++)
mi->mi_open_pid(mi, s->s_dvb_mux, pids->pids[i], MPS_RAW, s);
mi->mi_open_pid(mi, mm, pids->pids[i], MPS_RAW, s);
}
} else if (flags & SUBSCRIPTION_TABLES) {
mi->mi_open_pid(mi, s->s_dvb_mux, MPEGTS_TABLES_PID, MPS_RAW | MPS_TABLES, s);
mi->mi_open_pid(mi, mm, MPEGTS_TABLES_PID, MPS_RAW | MPS_TABLES, s);
} else if (flags & SUBSCRIPTION_MINIMAL) {
mi->mi_open_pid(mi, s->s_dvb_mux, DVB_PAT_PID, MPS_RAW, s);
mi->mi_open_pid(mi, mm, DVB_PAT_PID, MPS_RAW, s);
}
}

Expand All @@ -593,7 +595,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
/* Add PMT monitor */
if(s->s_type == STYPE_STD) {
s->s_pmt_mon =
mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK,
mpegts_table_add(mm, DVB_PMT_BASE, DVB_PMT_MASK,
dvb_pmt_callback, s, "pmt",
MT_CRC, s->s_pmt_pid);
}
Expand All @@ -602,6 +604,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
void
mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
{
mpegts_mux_t *mm = s->s_dvb_mux;
elementary_stream_t *st;
mpegts_apids_t *pids;
mpegts_service_t *s2;
Expand All @@ -624,15 +627,15 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )

pids = mpegts_pid_alloc();

mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
mi->mi_close_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, s);
mi->mi_close_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, s);
mpegts_pid_del(pids, s->s_pmt_pid);
mpegts_pid_del(pids, s->s_pcr_pid);
/* Close all opened PIDs (the component filter may be changed at runtime) */
TAILQ_FOREACH(st, &s->s_components, es_link) {
if (st->es_pid_opened) {
st->es_pid_opened = 0;
mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
mi->mi_close_pid(mi, mm, st->es_pid, MPS_SERVICE, s);
}
if (st->es_pid >= 0 && st->es_pid < 8192)
mpegts_pid_del(pids, st->es_pid);
Expand All @@ -647,7 +650,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
mpegts_pid_destroy(&pids);

} else {
mpegts_input_close_pids(mi, s->s_dvb_mux, s, 1);
mpegts_input_close_pids(mi, mm, s, 1);
}


Expand Down Expand Up @@ -739,6 +742,7 @@ mpegts_input_stopped_mux
{
char buf[256];
service_t *s, *s_next;
mpegts_mux_t *mm = mmi->mmi_mux;

/* no longer active */
LIST_REMOVE(mmi, mmi_active_link);
Expand All @@ -749,17 +753,15 @@ mpegts_input_stopped_mux

mi->mi_display_name(mi, buf, sizeof(buf));
tvhtrace("mpegts", "%s - flush subscribers", buf);
for (s = LIST_FIRST(&mi->mi_transports); s; s = s_next) {
for (s = LIST_FIRST(&mm->mm_transports); s; s = s_next) {
s_next = LIST_NEXT(s, s_active_link);
if (((mpegts_service_t*)s)->s_dvb_mux == mmi->mmi_mux)
service_remove_subscriber(s, NULL, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
service_remove_subscriber(s, NULL, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
}
notify_reload("input_status");
mpegts_input_dbus_notify(mi, 0);

#if ENABLE_TSDEBUG
tsdebug_packet_t *tp;
mpegts_mux_t *mm = mmi->mmi_mux;
if (mm->mm_tsdebug_fd >= 0)
close(mm->mm_tsdebug_fd);
if (mm->mm_tsdebug_fd2 >= 0)
Expand All @@ -781,16 +783,14 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
const service_t *t;
const th_subscription_t *ths;
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(t, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
if (t->s_type == STYPE_RAW) {
LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link)
if (!strcmp(ths->ths_title, "keep")) break;
if (ths) continue;
}
ret = 1;
break;
LIST_FOREACH(t, &mm->mm_transports, s_active_link) {
if (t->s_type == STYPE_RAW) {
LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link)
if (!strcmp(ths->ths_title, "keep")) break;
if (ths) continue;
}
ret = 1;
break;
}
pthread_mutex_unlock(&mi->mi_output_lock);
return ret;
Expand All @@ -801,13 +801,11 @@ mpegts_input_tuning_error ( mpegts_input_t *mi, mpegts_mux_t *mm )
{
service_t *t, *t_next;
pthread_mutex_lock(&mi->mi_output_lock);
for (t = LIST_FIRST(&mi->mi_transports); t; t = t_next) {
for (t = LIST_FIRST(&mm->mm_transports); t; t = t_next) {
t_next = LIST_NEXT(t, s_active_link);
if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
pthread_mutex_lock(&t->s_stream_mutex);
service_set_streaming_status_flags(t, TSS_TUNING);
pthread_mutex_unlock(&t->s_stream_mutex);
}
pthread_mutex_lock(&t->s_stream_mutex);
service_set_streaming_status_flags(t, TSS_TUNING);
pthread_mutex_unlock(&t->s_stream_mutex);
}
pthread_mutex_unlock(&mi->mi_output_lock);
}
Expand Down Expand Up @@ -1123,7 +1121,7 @@ mpegts_input_process
} else
/* Stream table data */
if (type & MPS_STREAM) {
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(s, &mm->mm_transports, s_active_link) {
if (s->s_type != STYPE_STD) continue;
f = (type & (MPS_TABLE|MPS_FTABLE)) ||
(pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
Expand Down Expand Up @@ -1335,7 +1333,7 @@ mpegts_input_stream_status
mpegts_mux_t *mm = mmi->mmi_mux;
mpegts_input_t *mi = mmi->mmi_input;

LIST_FOREACH(t, &mi->mi_transports, s_active_link)
LIST_FOREACH(t, &mm->mm_transports, s_active_link)
if (((mpegts_service_t *)t)->s_dvb_mux == mm)
LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) {
s++;
Expand Down
15 changes: 5 additions & 10 deletions src/input/mpegts/mpegts_mux.c
Expand Up @@ -264,18 +264,13 @@ mpegts_mux_instance_weight ( mpegts_mux_instance_t *mmi )
int w = 0;
const service_t *s;
const th_subscription_t *ths;
mpegts_input_t *mi = mmi->mmi_input;
lock_assert(&mi->mi_output_lock);
mpegts_mux_t *mm = mmi->mmi_mux;
lock_assert(&mmi->mmi_input->mi_output_lock);

/* Service subs */
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
mpegts_service_t *ms = (mpegts_service_t*)s;
if (ms->s_dvb_mux == mmi->mmi_mux) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MAX(w, ths->ths_weight);
}
}
}
LIST_FOREACH(s, &mm->mm_transports, s_active_link)
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link)
w = MAX(w, ths->ths_weight);

return w;
}
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/satip/satip_frontend.c
Expand Up @@ -88,7 +88,7 @@ satip_frontend_signal_cb( void *aux )
sigstat.tc_block = mmi->tii_stats.tc_block;
sm.sm_type = SMT_SIGNAL_STATUS;
sm.sm_data = &sigstat;
LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) {
LIST_FOREACH(svc, &mmi->mmi_mux->mm_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c
Expand Up @@ -310,7 +310,7 @@ tvhdhomerun_frontend_monitor_cb( void *aux )
sm.sm_type = SMT_SIGNAL_STATUS;
sm.sm_data = &sigstat;

LIST_FOREACH(svc, &hfe->mi_transports, s_active_link) {
LIST_FOREACH(svc, &mmi->mmi_mux->mm_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
Expand Down
4 changes: 2 additions & 2 deletions src/service.c
Expand Up @@ -733,8 +733,8 @@ service_find_instance
TAILQ_FOREACH(si, sil, si_link) {
const char *name = ch ? channel_get_name(ch) : NULL;
if (!name && s) name = s->s_nicename;
tvhdebug("service", "%s si %p %s weight %d prio %d error %d",
name, si, si->si_source, si->si_weight, si->si_prio,
tvhdebug("service", "%d: %s si %p %s weight %d prio %d error %d",
si->si_instance, name, si, si->si_source, si->si_weight, si->si_prio,
si->si_error);
}

Expand Down

0 comments on commit 3f4c630

Please sign in to comment.