Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
http streaming: show the HTTP streaming connections in the webui stat…
…us tab
  • Loading branch information
perexg committed Oct 6, 2014
1 parent b92231b commit 6508c94
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
57 changes: 48 additions & 9 deletions src/tcp.c
Expand Up @@ -376,6 +376,7 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
*
*/
static tvhpoll_t *tcp_server_poll;
static uint32_t tcp_server_launch_id;

typedef struct tcp_server {
int serverfd;
Expand All @@ -385,9 +386,11 @@ typedef struct tcp_server {

typedef struct tcp_server_launch {
pthread_t tid;
uint32_t id;
int fd;
tcp_server_ops_t ops;
void *opaque;
void (*status) (void *opaque, htsmsg_t *m);
struct sockaddr_storage peer;
struct sockaddr_storage self;
time_t started;
Expand All @@ -403,6 +406,43 @@ static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 };
/**
*
*/
void *
tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m))
{
tcp_server_launch_t *tsl;

lock_assert(&global_lock);

assert(status);

LIST_FOREACH(tsl, &tcp_server_active, alink) {
if (tsl->fd == fd) {
tsl->status = status;
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
notify_reload("connections");
return tsl;
}
}
return NULL;
}

/**
*
*/
void
tcp_connection_land(void *id)
{
tcp_server_launch_t *tsl = id;

lock_assert(&global_lock);

LIST_REMOVE(tsl, link);
notify_reload("connections");
}

/*
*
*/
static void *
tcp_server_start(void *aux)
{
Expand Down Expand Up @@ -438,21 +478,19 @@ tcp_server_start(void *aux)

/* Start */
time(&tsl->started);
pthread_mutex_lock(&global_lock);
tsl->id = ++tcp_server_launch_id;
if (!tsl->id) tsl->id = ++tcp_server_launch_id;
if (tsl->ops.status) {
pthread_mutex_lock(&global_lock);
tsl->status = tsl->ops.status;
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
notify_reload("connections");
pthread_mutex_unlock(&global_lock);
}
pthread_mutex_lock(&global_lock);
tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self);

/* Stop */
if (tsl->ops.stop) tsl->ops.stop(tsl->opaque);
if (tsl->ops.status) {
LIST_REMOVE(tsl, link);
notify_reload("connections");
}
if (tsl->ops.status) tcp_connection_land(tsl);
LIST_REMOVE(tsl, alink);
LIST_INSERT_HEAD(&tcp_server_join, tsl, jlink);
pthread_mutex_unlock(&global_lock);
Expand Down Expand Up @@ -667,13 +705,14 @@ tcp_server_connections ( void )
/* Build list */
l = htsmsg_create_list();
LIST_FOREACH(tsl, &tcp_server_launches, link) {
if (!tsl->ops.status) continue;
if (!tsl->status) continue;
c++;
e = htsmsg_create_map();
tcp_get_ip_str((struct sockaddr*)&tsl->peer, buf, sizeof(buf));
htsmsg_add_u32(e, "id", tsl->id);
htsmsg_add_str(e, "peer", buf);
htsmsg_add_s64(e, "started", tsl->started);
tsl->ops.status(tsl->opaque, e);
tsl->status(tsl->opaque, e);
htsmsg_add_msg(l, NULL, e);
}

Expand Down
3 changes: 3 additions & 0 deletions src/tcp.h
Expand Up @@ -80,6 +80,9 @@ int tcp_read_timeout(int fd, void *buf, size_t len, int timeout);

char *tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen);

void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m));
void tcp_connection_land(void *id);

htsmsg_t *tcp_server_connections ( void );

#endif /* TCP_H_ */
29 changes: 23 additions & 6 deletions src/webui/webui.c
Expand Up @@ -259,6 +259,18 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
return ret;
}

/**
* HTTP stream status callback
*/
static void
http_stream_status ( void *opaque, htsmsg_t *m )
{
http_connection_t *hc = opaque;
htsmsg_add_str(m, "type", "HTTP");
if (hc->hc_username)
htsmsg_add_str(m, "user", hc->hc_username);
}

/**
* HTTP stream loop
*/
Expand All @@ -276,6 +288,13 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
struct timeval tp;
int err = 0;
socklen_t errlen = sizeof(err);
void *tcp_id;

tcp_id = tcp_connection_launch(hc->hc_fd, http_stream_status);
if (tcp_id == NULL)
return;

pthread_mutex_unlock(&global_lock);

mux = muxer_create(mc, mcfg);
if(muxer_open_stream(mux, hc->hc_fd))
Expand Down Expand Up @@ -397,6 +416,10 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
muxer_close(mux);

muxer_destroy(mux);

pthread_mutex_lock(&global_lock);

tcp_connection_land(tcp_id);
}


Expand Down Expand Up @@ -815,9 +838,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
http_arg_get(&hc->hc_args, "User-Agent"));
if(s) {
name = tvh_strdupa(service->s_nicename);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
}

Expand Down Expand Up @@ -863,9 +884,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
if (!s)
return HTTP_STATUS_BAD_REQUEST;
name = tvh_strdupa(s->ths_title);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, MC_RAW, s, &muxcfg);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);

streaming_queue_deinit(&sq);
Expand Down Expand Up @@ -941,9 +960,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)

if(s) {
name = tvh_strdupa(channel_get_name(ch));
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
}

Expand Down

0 comments on commit 6508c94

Please sign in to comment.