Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Introduce SMT_GRACE message
  • Loading branch information
perexg committed Aug 11, 2014
1 parent 7e0d969 commit 54164ba
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/dvr/dvr_rec.c
Expand Up @@ -581,6 +581,7 @@ dvr_thread(void *aux)
}
break;

case SMT_GRACE:
case SMT_SPEED:
case SMT_SKIP:
case SMT_SIGNAL_STATUS:
Expand Down
1 change: 1 addition & 0 deletions src/htsp_server.c
Expand Up @@ -2912,6 +2912,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
htsp_subscription_status(hs, streaming_code2txt(sm->sm_code));
break;

case SMT_GRACE:
case SMT_MPEGTS:
break;

Expand Down
2 changes: 2 additions & 0 deletions src/plumbing/globalheaders.c
Expand Up @@ -252,6 +252,7 @@ gh_hold(globalheaders_t *gh, streaming_message_t *sm)
streaming_msg_free(sm);
break;

case SMT_GRACE:
case SMT_EXIT:
case SMT_SERVICE_STATUS:
case SMT_SIGNAL_STATUS:
Expand Down Expand Up @@ -283,6 +284,7 @@ gh_pass(globalheaders_t *gh, streaming_message_t *sm)
gh->gh_passthru = 0;
gh_flush(gh);
// FALLTHRU
case SMT_GRACE:
case SMT_EXIT:
case SMT_SERVICE_STATUS:
case SMT_SIGNAL_STATUS:
Expand Down
1 change: 1 addition & 0 deletions src/plumbing/transcoding.c
Expand Up @@ -1253,6 +1253,7 @@ transcoder_input(void *opaque, streaming_message_t *sm)
transcoder_stop(t);
// Fallthrough

case SMT_GRACE:
case SMT_SPEED:
case SMT_SKIP:
case SMT_TIMESHIFT_STATUS:
Expand Down
1 change: 1 addition & 0 deletions src/plumbing/tsfix.c
Expand Up @@ -366,6 +366,7 @@ tsfix_input(void *opaque, streaming_message_t *sm)
tsfix_stop(tf);
break;

case SMT_GRACE:
case SMT_EXIT:
case SMT_SERVICE_STATUS:
case SMT_SIGNAL_STATUS:
Expand Down
1 change: 1 addition & 0 deletions src/service.c
Expand Up @@ -598,6 +598,7 @@ service_start(service_t *t, int instance)
if(t->s_grace_period != NULL)
timeout = t->s_grace_period(t);

t->s_grace_delay = timeout;
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, timeout);
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/service.h
Expand Up @@ -340,6 +340,7 @@ typedef struct service {
/**
* Stream start time
*/
int s_grace_delay;
time_t s_start_time;


Expand Down
2 changes: 2 additions & 0 deletions src/streaming.c
Expand Up @@ -235,6 +235,7 @@ streaming_msg_clone(streaming_message_t *src)
memcpy(dst->sm_data, src->sm_data, sizeof(timeshift_status_t));
break;

case SMT_GRACE:
case SMT_SPEED:
case SMT_STOP:
case SMT_SERVICE_STATUS:
Expand Down Expand Up @@ -292,6 +293,7 @@ streaming_msg_free(streaming_message_t *sm)
streaming_start_unref(sm->sm_data);
break;

case SMT_GRACE:
case SMT_STOP:
case SMT_EXIT:
case SMT_SERVICE_STATUS:
Expand Down
8 changes: 8 additions & 0 deletions src/subscriptions.c
Expand Up @@ -87,6 +87,9 @@ subscription_link_service(th_subscription_t *s, service_t *t)
// Link to service output
streaming_target_connect(&t->s_streaming_pad, &s->ths_input);

sm = streaming_msg_create_code(SMT_GRACE, t->s_grace_delay);
streaming_pad_deliver(&t->s_streaming_pad, sm);

if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {

s->ths_state = SUBSCRIPTION_GOT_SERVICE;
Expand Down Expand Up @@ -332,6 +335,11 @@ subscription_input(void *opauqe, streaming_message_t *sm)
if(s->ths_state == SUBSCRIPTION_TESTING_SERVICE) {
// We are just testing if this service is good

if(sm->sm_type == SMT_GRACE) {
streaming_target_deliver(s->ths_output, sm);
return;
}

if(sm->sm_type == SMT_START) {
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
Expand Down
1 change: 1 addition & 0 deletions src/timeshift/timeshift_writer.c
Expand Up @@ -253,6 +253,7 @@ static void _process_msg
break;

/* Status */
case SMT_GRACE:
case SMT_NOSTART:
case SMT_SERVICE_STATUS:
case SMT_TIMESHIFT_STATUS:
Expand Down
8 changes: 8 additions & 0 deletions src/tvheadend.h
Expand Up @@ -317,6 +317,14 @@ typedef enum {
*/
SMT_PACKET,

/**
* Stream grace period
*
* sm_code contains number of seconds to settle things down
*/

SMT_GRACE,

/**
* Stream start
*
Expand Down
19 changes: 12 additions & 7 deletions src/webui/webui.c
Expand Up @@ -228,7 +228,7 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
int run = 1;
int started = 0;
muxer_t *mux = NULL;
int timeouts = 0;
int timeouts = 0, grace = 20;
struct timespec ts;
struct timeval tp;
int err = 0;
Expand Down Expand Up @@ -256,12 +256,12 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,

//Check socket status
getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);
if(err) {
tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig);
run = 0;
}else if(timeouts >= 20) {
tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
run = 0;
if (err) {
tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig);
run = 0;
} else if(timeouts >= grace) {
tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
run = 0;
}
}
pthread_mutex_unlock(&sq->sq_mutex);
Expand All @@ -287,7 +287,12 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
}
break;

case SMT_GRACE:
grace = sm->sm_code < 5 ? 5 : grace;
break;

case SMT_START:
grace = 10;
if(!started) {
tvhlog(LOG_DEBUG, "webui", "Start streaming %s", hc->hc_url_orig);
http_output_content(hc, muxer_mime(mux, sm->sm_data));
Expand Down

0 comments on commit 54164ba

Please sign in to comment.