Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
subscription: Add watchdog for the data reception
.. for sevices and add the timeout timer for the mux subscriptions
  • Loading branch information
perexg committed Jul 27, 2014
1 parent 65facb3 commit 9b41652
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/input/mpegts.h
Expand Up @@ -538,7 +538,8 @@ struct mpegts_input
* Input processing
*/

int mi_running;
uint8_t mi_running;
uint8_t mi_live;
time_t mi_last_dispatch;

/* Data input */
Expand Down
4 changes: 4 additions & 0 deletions src/input/mpegts/mpegts_input.c
Expand Up @@ -399,6 +399,8 @@ mpegts_input_started_mux
{
/* Deliver first TS packets as fast as possible */
mi->mi_last_dispatch = 0;
/* Wait for first TS packet */
mi->mi_live = 0;

/* Arm timer */
if (LIST_FIRST(&mi->mi_mux_active) == NULL)
Expand Down Expand Up @@ -635,6 +637,8 @@ mpegts_input_process
mpegts_mux_instance_t *mmi = mm->mm_active;
mpegts_pid_t *last_mp = NULL;

mi->mi_live = 1;

/* Process */
assert((len % 188) == 0);
while ( tsb < end ) {
Expand Down
1 change: 1 addition & 0 deletions src/input/mpegts/tsdemux.c
Expand Up @@ -287,6 +287,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src)
pktbuf_ref_dec(pb);

service_set_streaming_status_flags((service_t*)t, TSS_PACKETS);
t->s_streaming_live |= TSS_LIVE;

sbuf_reset(sb, TS_REMUX_BUFSIZE);
}
Expand Down
1 change: 1 addition & 0 deletions src/parsers/parsers.c
Expand Up @@ -1407,6 +1407,7 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt, int error)
* Input is ok
*/
service_set_streaming_status_flags(t, TSS_PACKETS);
t->s_streaming_live |= TSS_LIVE;

/* Forward packet */
pkt->pkt_componentindex = st->es_index;
Expand Down
21 changes: 17 additions & 4 deletions src/service.c
Expand Up @@ -553,6 +553,7 @@ service_start(service_t *t, int instance)

assert(t->s_status != SERVICE_RUNNING);
t->s_streaming_status = 0;
t->s_streaming_live = 0;
t->s_scrambled_seen = 0;
t->s_start_time = dispatch_clock;

Expand Down Expand Up @@ -943,13 +944,21 @@ static void
service_data_timeout(void *aux)
{
service_t *t = aux;
int flags = 0;

pthread_mutex_lock(&t->s_stream_mutex);

if(!(t->s_streaming_status & TSS_PACKETS))
service_set_streaming_status_flags(t, TSS_GRACEPERIOD);
flags |= TSS_GRACEPERIOD;
if(!(t->s_streaming_live & TSS_LIVE))
flags |= TSS_TIMEOUT;
if (flags)
service_set_streaming_status_flags(t, flags);
t->s_streaming_live &= ~TSS_LIVE;

pthread_mutex_unlock(&t->s_stream_mutex);

gtimer_arm(&t->s_receive_timer, service_data_timeout, t, 5);
}

/**
Expand Down Expand Up @@ -1052,15 +1061,16 @@ service_set_streaming_status_flags_(service_t *t, int set)

t->s_streaming_status = n;

tvhlog(LOG_DEBUG, "service", "%s: Status changed to %s%s%s%s%s%s%s",
tvhlog(LOG_DEBUG, "service", "%s: Status changed to %s%s%s%s%s%s%s%s",
service_nicename(t),
n & TSS_INPUT_HARDWARE ? "[Hardware input] " : "",
n & TSS_INPUT_SERVICE ? "[Input on service] " : "",
n & TSS_MUX_PACKETS ? "[Demuxed packets] " : "",
n & TSS_PACKETS ? "[Reassembled packets] " : "",
n & TSS_NO_DESCRAMBLER ? "[No available descrambler] " : "",
n & TSS_NO_ACCESS ? "[No access] " : "",
n & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "");
n & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "",
n & TSS_TIMEOUT ? "[Data timeout] " : "");

sm = streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status);
Expand Down Expand Up @@ -1331,6 +1341,9 @@ service_tss2text(int flags)
if(flags & TSS_GRACEPERIOD)
return "No input detected";

if(flags & TSS_TIMEOUT)
return "Data timeout";

return "No status";
}

Expand All @@ -1347,7 +1360,7 @@ tss2errcode(int tss)
if(tss & TSS_NO_DESCRAMBLER)
return SM_CODE_NO_DESCRAMBLER;

if(tss & TSS_GRACEPERIOD)
if(tss & (TSS_GRACEPERIOD|TSS_TIMEOUT))
return SM_CODE_NO_INPUT;

return SM_CODE_OK;
Expand Down
9 changes: 9 additions & 0 deletions src/service.h
Expand Up @@ -383,10 +383,19 @@ typedef struct service {
// Errors
#define TSS_NO_DESCRAMBLER 0x10000
#define TSS_NO_ACCESS 0x20000
#define TSS_TIMEOUT 0x40000

#define TSS_ERRORS 0xffff0000


/**
*
*/
int s_streaming_live;

// Live status
#define TSS_LIVE 0x01

/**
* For simple streaming sources (such as video4linux) keeping
* track of the video and audio stream is convenient.
Expand Down
31 changes: 29 additions & 2 deletions src/subscriptions.c
Expand Up @@ -152,6 +152,8 @@ subscription_unlink_mux(th_subscription_t *s, int reason)
mpegts_mux_t *mm = mmi->mmi_mux;
mpegts_input_t *mi = mmi->mmi_input;

gtimer_disarm(&s->ths_receive_timer);

pthread_mutex_lock(&mi->mi_output_lock);
s->ths_mmi = NULL;

Expand Down Expand Up @@ -363,6 +365,14 @@ subscription_input(void *opauqe, streaming_message_t *sm)
return;
}

if (sm->sm_type == SMT_SERVICE_STATUS &&
sm->sm_code & TSS_TIMEOUT) {
error = tss2errcode(sm->sm_code);
if (error > s->ths_testing_error)
s->ths_testing_error = error;
s->ths_state = SUBSCRIPTION_BAD_SERVICE;
}

/* Pass to direct handler to log traffic */
subscription_input_direct(s, sm);
}
Expand Down Expand Up @@ -598,6 +608,20 @@ mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si )
}
}

static void
mux_data_timeout ( void *aux )
{
th_subscription_t *s = aux;
mpegts_input_t *mi = s->ths_mmi->mmi_input;

if (!mi->mi_live) {
subscription_unlink_mux(s, SM_CODE_NO_INPUT);
return;
}
mi->mi_live = 0;

gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, 5);
}

th_subscription_t *
subscription_create_from_mux
Expand Down Expand Up @@ -639,7 +663,7 @@ subscription_create_from_mux
pthread_mutex_unlock(&mi->mi_output_lock);
}

pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_output_lock);
pthread_mutex_lock(&mi->mi_output_lock);

/* Store */
LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
Expand Down Expand Up @@ -672,7 +696,10 @@ subscription_create_from_mux
sm = streaming_msg_create_data(SMT_START, ss);
streaming_target_deliver(s->ths_output, sm);

pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_output_lock);
pthread_mutex_unlock(&mi->mi_output_lock);

r = (mi->mi_get_grace ? mi->mi_get_grace(mi, mm) : 0) + 20;
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r);

return s;
}
Expand Down
1 change: 1 addition & 0 deletions src/subscriptions.h
Expand Up @@ -93,6 +93,7 @@ typedef struct th_subscription {
// (repeated) logic elsewhere
LIST_ENTRY(th_subscription) ths_mmi_link;
struct mpegts_mux_instance *ths_mmi;
gtimer_t ths_receive_timer;
#endif

} th_subscription_t;
Expand Down

0 comments on commit 9b41652

Please sign in to comment.