Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
mpegts: implemented weighted PID subscriptions (sequential PMT scan)
  • Loading branch information
perexg committed Apr 27, 2015
1 parent 4fce256 commit 2623cc5
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 113 deletions.
26 changes: 19 additions & 7 deletions src/input/mpegts.h
Expand Up @@ -37,7 +37,7 @@
#define MPEGTS_PID_NONE 0xFFFF

/* Types */
typedef int16_t mpegts_apid_t;
typedef struct mpegts_apid mpegts_apid_t;
typedef struct mpegts_apids mpegts_apids_t;
typedef struct mpegts_table mpegts_table_t;
typedef struct mpegts_network mpegts_network_t;
Expand Down Expand Up @@ -78,29 +78,39 @@ void mpegts_done ( void );
* PIDs
* *************************************************************************/

struct mpegts_apid {
uint16_t pid;
uint16_t weight;
};

struct mpegts_apids {
mpegts_apid_t *pids;
int alloc;
int count;
int all;
int sorted;
};

int mpegts_pid_init ( mpegts_apids_t *pids );
void mpegts_pid_done ( mpegts_apids_t *pids );
mpegts_apids_t *mpegts_pid_alloc ( void );
void mpegts_pid_destroy ( mpegts_apids_t **pids );
void mpegts_pid_reset ( mpegts_apids_t *pids );
int mpegts_pid_add ( mpegts_apids_t *pids, mpegts_apid_t pid );
int mpegts_pid_add ( mpegts_apids_t *pids, uint16_t pid, uint16_t weight );
int mpegts_pid_add_group ( mpegts_apids_t *pids, mpegts_apids_t *vals );
int mpegts_pid_del ( mpegts_apids_t *pids, mpegts_apid_t pid );
int mpegts_pid_del ( mpegts_apids_t *pids, uint16_t pid, uint16_t weight );
int mpegts_pid_del_group ( mpegts_apids_t *pids, mpegts_apids_t *vals );
int mpegts_pid_find_index ( mpegts_apids_t *pids, mpegts_apid_t pid );
static inline int mpegts_pid_exists ( mpegts_apids_t *pids, mpegts_apid_t pid )
{ return pids->all || mpegts_pid_find_index(pids, pid) >= 0; }
int mpegts_pid_find_windex ( mpegts_apids_t *pids, uint16_t pid, uint16_t weight );
int mpegts_pid_find_rindex ( mpegts_apids_t *pids, uint16_t pid );
static inline int mpegts_pid_wexists ( mpegts_apids_t *pids, uint16_t pid, uint16_t weight )
{ return pids->all || mpegts_pid_find_windex(pids, pid, weight) >= 0; }
static inline int mpegts_pid_rexists ( mpegts_apids_t *pids, uint16_t pid )
{ return pids->all || mpegts_pid_find_rindex(pids, pid) >= 0; }
int mpegts_pid_copy ( mpegts_apids_t *dst, mpegts_apids_t *src );
int mpegts_pid_compare ( mpegts_apids_t *dst, mpegts_apids_t *src,
mpegts_apids_t *add, mpegts_apids_t *del );
int mpegts_pid_dump ( mpegts_apids_t *pids, char *buf, int len );
int mpegts_pid_weighted( mpegts_apids_t *dst, mpegts_apids_t *src, int limit );
int mpegts_pid_dump ( mpegts_apids_t *pids, char *buf, int len, int wflag, int raw );

/* **************************************************************************
* Data / SI processing
Expand Down Expand Up @@ -453,6 +463,7 @@ struct mpegts_mux
int (*mm_is_enabled) (mpegts_mux_t *mm);
void (*mm_stop) (mpegts_mux_t *mm, int force, int reason);
void (*mm_open_table) (mpegts_mux_t*,mpegts_table_t*,int subscribe);
void (*mm_unsubscribe_table)(mpegts_mux_t*,mpegts_table_t*);
void (*mm_close_table) (mpegts_mux_t*,mpegts_table_t*);
void (*mm_create_instances) (mpegts_mux_t*);
int (*mm_is_epg) (mpegts_mux_t*);
Expand Down Expand Up @@ -826,6 +837,7 @@ int mpegts_mux_set_onid ( mpegts_mux_t *mm, uint16_t onid );
int mpegts_mux_set_crid_authority ( mpegts_mux_t *mm, const char *defauth );

void mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt, int subscribe );
void mpegts_mux_unsubscribe_table ( mpegts_mux_t *mm, mpegts_table_t *mt );
void mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt );

void mpegts_mux_remove_subscriber(mpegts_mux_t *mm, th_subscription_t *s, int reason);
Expand Down
19 changes: 11 additions & 8 deletions src/input/mpegts/mpegts_input.c
Expand Up @@ -548,6 +548,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
mpegts_mux_t *mm = s->s_dvb_mux;
elementary_stream_t *st;
mpegts_apids_t *pids;
mpegts_apid_t *p;
mpegts_service_t *s2;
int i;

Expand All @@ -566,8 +567,8 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,

mi->mi_open_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, MPS_WEIGHT_PMT, s);
mi->mi_open_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, MPS_WEIGHT_PCR, s);
mpegts_pid_add(pids, s->s_pmt_pid);
mpegts_pid_add(pids, s->s_pcr_pid);
mpegts_pid_add(pids, s->s_pmt_pid, MPS_WEIGHT_PMT);
mpegts_pid_add(pids, s->s_pcr_pid, MPS_WEIGHT_PCR);
/* Open only filtered components here */
TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link)
if (st->es_type != SCT_CA) {
Expand All @@ -578,7 +579,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
/* Ensure that filtered PIDs are not send in ts_recv_raw */
TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link)
if (st->es_type != SCT_CA && st->es_pid >= 0 && st->es_pid < 8192)
mpegts_pid_add(pids, st->es_pid);
mpegts_pid_add(pids, st->es_pid, mps_weight(st));

LIST_FOREACH(s2, &s->s_masters, s_masters_link) {
pthread_mutex_lock(&s2->s_stream_mutex);
Expand All @@ -593,8 +594,10 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int flags,
if (pids->all) {
mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW | MPS_ALL, MPS_WEIGHT_RAW, s);
} else {
for (i = 0; i < pids->count; i++)
mi->mi_open_pid(mi, mm, pids->pids[i], MPS_RAW, MPS_WEIGHT_RAW, s);
for (i = 0; i < pids->count; i++) {
p = &pids->pids[i];
mi->mi_open_pid(mi, mm, p->pid, MPS_RAW, p->weight, s);
}
}
} else if (flags & SUBSCRIPTION_TABLES) {
mi->mi_open_pid(mi, mm, MPEGTS_TABLES_PID, MPS_RAW | MPS_TABLES, MPS_WEIGHT_PAT, s);
Expand Down Expand Up @@ -643,16 +646,16 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )

mi->mi_close_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, MPS_WEIGHT_PMT, s);
mi->mi_close_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, MPS_WEIGHT_PCR, s);
mpegts_pid_del(pids, s->s_pmt_pid);
mpegts_pid_del(pids, s->s_pcr_pid);
mpegts_pid_del(pids, s->s_pmt_pid, MPS_WEIGHT_PMT);
mpegts_pid_del(pids, s->s_pcr_pid, MPS_WEIGHT_PCR);
/* Close all opened PIDs (the component filter may be changed at runtime) */
TAILQ_FOREACH(st, &s->s_components, es_link) {
if (st->es_pid_opened) {
st->es_pid_opened = 0;
mi->mi_close_pid(mi, mm, st->es_pid, MPS_SERVICE, mps_weight(st), s);
}
if (st->es_pid >= 0 && st->es_pid < 8192)
mpegts_pid_del(pids, st->es_pid);
mpegts_pid_del(pids, st->es_pid, mps_weight(st));
}

LIST_FOREACH(s2, &s->s_masters, s_masters_link) {
Expand Down
38 changes: 26 additions & 12 deletions src/input/mpegts/mpegts_mux.c
Expand Up @@ -818,12 +818,35 @@ mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt, int subscribe )
}

void
mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
mpegts_mux_unsubscribe_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
{
mpegts_input_t *mi;

lock_assert(&mm->mm_tables_lock);

mi = mm->mm_active->mmi_input;
if (mt->mt_subscribed) {
mpegts_table_grab(mt);
mt->mt_subscribed = 0;
pthread_mutex_unlock(&mm->mm_tables_lock);
pthread_mutex_lock(&mi->mi_output_lock);
mi->mi_close_pid(mi, mm, mt->mt_pid, mpegts_table_type(mt), mt->mt_weight, mt);
pthread_mutex_unlock(&mi->mi_output_lock);
pthread_mutex_lock(&mm->mm_tables_lock);
mpegts_table_release(mt);
}
if ((mt->mt_flags & MT_DEFER) && mt->mt_defer_cmd == MT_DEFER_OPEN_PID) {
TAILQ_REMOVE(&mm->mm_defer_tables, mt, mt_defer_link);
mt->mt_defer_cmd = 0;
mpegts_table_release(mt);
}
}

void
mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
{
lock_assert(&mm->mm_tables_lock);

if (!mm->mm_active || !mm->mm_active->mmi_input) {
if (mt->mt_defer_cmd) {
TAILQ_REMOVE(&mm->mm_defer_tables, mt, mt_defer_link);
Expand Down Expand Up @@ -852,19 +875,9 @@ mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
TAILQ_INSERT_TAIL(&mm->mm_defer_tables, mt, mt_defer_link);
return;
}
mi = mm->mm_active->mmi_input;
LIST_REMOVE(mt, mt_link);
mm->mm_num_tables--;
if (mt->mt_subscribed) {
mpegts_table_grab(mt);
mt->mt_subscribed = 0;
pthread_mutex_unlock(&mm->mm_tables_lock);
pthread_mutex_lock(&mi->mi_output_lock);
mi->mi_close_pid(mi, mm, mt->mt_pid, mpegts_table_type(mt), mt->mt_weight, mt);
pthread_mutex_unlock(&mi->mi_output_lock);
pthread_mutex_lock(&mm->mm_tables_lock);
mpegts_table_release(mt);
}
mm->mm_unsubscribe_table(mm, mt);
}

/* **************************************************************************
Expand Down Expand Up @@ -1030,6 +1043,7 @@ mpegts_mux_create0

/* Table processing */
mm->mm_open_table = mpegts_mux_open_table;
mm->mm_unsubscribe_table = mpegts_mux_unsubscribe_table;
mm->mm_close_table = mpegts_mux_close_table;
pthread_mutex_init(&mm->mm_tables_lock, NULL);
TAILQ_INIT(&mm->mm_table_queue);
Expand Down

0 comments on commit 2623cc5

Please sign in to comment.