Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
streaming: improve the client shutdown detection
  • Loading branch information
perexg committed Oct 20, 2015
1 parent 76b2320 commit d7cf558
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/input/mpegts/tsdemux.c
Expand Up @@ -395,6 +395,7 @@ ts_skip(mpegts_service_t *t, const uint8_t *src, int len)
return;

ts_flush(t, sb);
service_send_streaming_status((service_t *)t);
}

/*
Expand Down
4 changes: 3 additions & 1 deletion src/parsers/parsers.c
Expand Up @@ -178,8 +178,10 @@ skip_mpeg_ts(service_t *t, elementary_stream_t *st, const uint8_t *data, int len
{
if(len >= 188)
sbuf_err(&st->es_buf, len / 188);
if(st->es_buf.sb_err > 1000)
if(st->es_buf.sb_err > 1000) {
parser_deliver_error(t, st);
service_send_streaming_status((service_t *)t);
}
}

/**
Expand Down
22 changes: 16 additions & 6 deletions src/service.c
Expand Up @@ -1183,6 +1183,21 @@ service_servicetype_txt ( service_t *s )
}


/**
*
*/
void
service_send_streaming_status(service_t *t)
{
lock_assert(&t->s_stream_mutex);

streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status));

pthread_cond_broadcast(&t->s_tss_cond);
}

/**
*
*/
Expand All @@ -1208,14 +1223,9 @@ service_set_streaming_status_flags_(service_t *t, int set)
set & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "",
set & TSS_TIMEOUT ? "[Data timeout] " : "");

streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status));

pthread_cond_broadcast(&t->s_tss_cond);
service_send_streaming_status(t);
}


/**
* Restart output on a service.
* Happens if the stream composition changes.
Expand Down
4 changes: 4 additions & 0 deletions src/service.h
Expand Up @@ -550,6 +550,9 @@ void service_remove_raw(service_t *);
void service_remove_subscriber(service_t *t, struct th_subscription *s,
int reason);


void service_send_streaming_status(service_t *t);

void service_set_streaming_status_flags_(service_t *t, int flag);

static inline void
Expand All @@ -568,6 +571,7 @@ service_reset_streaming_status_flags(service_t *t, int flag)
service_set_streaming_status_flags_(t, n & ~flag);
}


struct streaming_start;
struct streaming_start *service_build_stream_start(service_t *t);

Expand Down
18 changes: 18 additions & 0 deletions src/tcp.c
Expand Up @@ -374,6 +374,24 @@ tcp_read_timeout(int fd, void *buf, size_t len, int timeout)

}

/**
*
*/
int
tcp_socket_dead(int fd)
{
int err = 0;
socklen_t errlen = sizeof(err);

if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen))
return -errno;
if (err)
return -err;
if (recv(fd, NULL, 0, MSG_PEEK | MSG_DONTWAIT) == 0)
return -EIO;
return 0;
}

/**
*
*/
Expand Down
2 changes: 2 additions & 0 deletions src/tcp.h
Expand Up @@ -91,6 +91,8 @@ char *tcp_get_str_from_ip(const struct sockaddr *sa, char *dst, size_t maxlen);

struct sockaddr *tcp_get_ip_from_str(const char *str, struct sockaddr *sa);

int tcp_socket_dead(int fd);

struct access;

uint32_t tcp_connection_count(struct access *aa);
Expand Down
10 changes: 4 additions & 6 deletions src/webui/webui.c
Expand Up @@ -308,8 +308,6 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
int ptimeout, grace = 20;
struct timespec ts;
struct timeval tp;
int err = 0;
socklen_t errlen = sizeof(err);
streaming_start_t *ss_copy;
int64_t mono;

Expand Down Expand Up @@ -343,7 +341,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {

/* Check socket status */
if (getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) || err) {
if (tcp_socket_dead(hc->hc_fd)) {
tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig);
run = 0;
} else if((!started && dispatch_clock - lastpkt > grace) ||
Expand All @@ -364,7 +362,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
case SMT_PACKET:
lastpkt = dispatch_clock;
if(started) {
pktbuf_t *pb;;
pktbuf_t *pb;
if (sm->sm_type == SMT_PACKET)
pb = ((th_pkt_t*)sm->sm_data)->pkt_payload;
else
Expand All @@ -390,7 +388,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
streaming_msg_free(sm);
mono = getmonoclock() + 2000000;
while (getmonoclock() < mono) {
if (getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) || err)
if (tcp_socket_dead(hc->hc_fd))
break;
usleep(50000);
}
Expand All @@ -417,7 +415,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
break;

case SMT_SERVICE_STATUS:
if(getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, &err, &errlen) || err) {
if(tcp_socket_dead(hc->hc_fd)) {
tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up",
hc->hc_url_orig);
run = 0;
Expand Down

0 comments on commit d7cf558

Please sign in to comment.