Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
streaming: change streaming_pad_deliver() semantics
  • Loading branch information
perexg committed Oct 24, 2014
1 parent ad5f59c commit 8a0078d
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/input/mpegts/linuxdvb/linuxdvb_frontend.c
Expand Up @@ -793,7 +793,7 @@ linuxdvb_frontend_monitor ( void *aux )
sm.sm_data = &sigstat;
LIST_FOREACH(s, &lfe->mi_transports, s_active_link) {
pthread_mutex_lock(&s->s_stream_mutex);
streaming_pad_deliver(&s->s_streaming_pad, &sm);
streaming_pad_deliver(&s->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&s->s_stream_mutex);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/mpegts_input.c
Expand Up @@ -814,7 +814,7 @@ mpegts_input_process
memset(&sm, 0, sizeof(sm));
sm.sm_type = SMT_MPEGTS;
sm.sm_data = pb;
streaming_pad_deliver(&mmi->mmi_streaming_pad, &sm);
streaming_pad_deliver(&mmi->mmi_streaming_pad, streaming_msg_clone(&sm));
pktbuf_ref_dec(pb);
}

Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/satip/satip_frontend.c
Expand Up @@ -1345,7 +1345,7 @@ satip_frontend_signal_cb( void *aux )
sm.sm_data = &sigstat;
LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
streaming_pad_deliver(&svc->s_streaming_pad, &sm);
streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
}
gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 250);
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/tsdemux.c
Expand Up @@ -283,7 +283,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src)

sm.sm_type = SMT_MPEGTS;
sm.sm_data = pb;
streaming_pad_deliver(&t->s_streaming_pad, &sm);
streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_clone(&sm));

pktbuf_ref_dec(pb);

Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c
Expand Up @@ -333,7 +333,7 @@ tvhdhomerun_frontend_monitor_cb( void *aux )

LIST_FOREACH(svc, &hfe->mi_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
streaming_pad_deliver(&svc->s_streaming_pad, &sm);
streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/parsers/parser_teletext.c
Expand Up @@ -801,9 +801,7 @@ extract_subtitle(mpegts_service_t *t, elementary_stream_t *st,
th_pkt_t *pkt = pkt_alloc(sub, off, pts, pts);
pkt->pkt_componentindex = st->es_index;

streaming_message_t *sm = streaming_msg_create_pkt(pkt);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt));

/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);
Expand Down
5 changes: 1 addition & 4 deletions src/parsers/parsers.c
Expand Up @@ -1408,10 +1408,7 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt, int error)
/* Forward packet */
pkt->pkt_componentindex = st->es_index;

streaming_message_t *sm = streaming_msg_create_pkt(pkt);

streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt));

/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);
Expand Down
22 changes: 9 additions & 13 deletions src/service.c
Expand Up @@ -1117,7 +1117,6 @@ service_servicetype_txt ( service_t *s )
void
service_set_streaming_status_flags_(service_t *t, int set)
{
streaming_message_t *sm;
lock_assert(&t->s_stream_mutex);

if(set == t->s_streaming_status)
Expand All @@ -1136,10 +1135,9 @@ service_set_streaming_status_flags_(service_t *t, int set)
set & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "",
set & TSS_TIMEOUT ? "[Data timeout] " : "");

sm = streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status));

pthread_cond_broadcast(&t->s_tss_cond);
}
Expand All @@ -1153,22 +1151,20 @@ service_set_streaming_status_flags_(service_t *t, int set)
void
service_restart(service_t *t, int had_components)
{
streaming_message_t *sm;
pthread_mutex_lock(&t->s_stream_mutex);

if(had_components) {
sm = streaming_msg_create_code(SMT_STOP, SM_CODE_SOURCE_RECONFIGURED);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_STOP,
SM_CODE_SOURCE_RECONFIGURED));
}

service_build_filter(t);

if(TAILQ_FIRST(&t->s_filt_components) != NULL) {
sm = streaming_msg_create_data(SMT_START,
service_build_stream_start(t));
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_data(SMT_START,
service_build_stream_start(t)));
}

pthread_mutex_unlock(&t->s_stream_mutex);
Expand Down
19 changes: 12 additions & 7 deletions src/streaming.c
Expand Up @@ -316,10 +316,10 @@ streaming_msg_free(streaming_message_t *sm)
void
streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
{
if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
streaming_msg_free(sm);
else
st->st_cb(st->st_opaque, sm);
streaming_target_deliver(st, sm);
}

/**
Expand All @@ -328,18 +328,23 @@ streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
void
streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
{
streaming_target_t *st, *next;
streaming_target_t *st, *next, *run = NULL;

for(st = LIST_FIRST(&sp->sp_targets);st; st = next) {
for (st = LIST_FIRST(&sp->sp_targets); st; st = next) {
next = LIST_NEXT(st, st_link);
assert(next != st);
if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
continue;
st->st_cb(st->st_opaque, streaming_msg_clone(sm));
if (run)
streaming_target_deliver(run, streaming_msg_clone(sm));
run = st;
}
if (run)
streaming_target_deliver(run, sm);
else
streaming_msg_free(sm);
}


/**
*
*/
Expand Down
4 changes: 3 additions & 1 deletion src/streaming.h
Expand Up @@ -101,7 +101,9 @@ streaming_message_t *streaming_msg_create_code(streaming_message_type_t type,

streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);

#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm)))
static inline void
streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm)
{ st->st_cb(st->st_opaque, sm); }

void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm);

Expand Down
7 changes: 4 additions & 3 deletions src/subscriptions.c
Expand Up @@ -89,9 +89,10 @@ subscription_link_service(th_subscription_t *s, service_t *t)
// Link to service output
streaming_target_connect(&t->s_streaming_pad, &s->ths_input);

sm = streaming_msg_create_code(SMT_GRACE, s->ths_postpone + t->s_grace_delay);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_GRACE,
s->ths_postpone +
t->s_grace_delay));

if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {

Expand Down

0 comments on commit 8a0078d

Please sign in to comment.