Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
spawn: add thread for fast info/error messages, fix spawn_pipe_read()
  • Loading branch information
perexg committed Nov 17, 2014
1 parent 3f7ac5b commit a563d78
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 7 deletions.
2 changes: 0 additions & 2 deletions src/main.c
Expand Up @@ -375,8 +375,6 @@ mainloop(void)
if (ts.tv_sec > dispatch_clock) {
dispatch_clock = ts.tv_sec;

spawn_reaper(); /* reap spawned processes */

comet_flush(); /* Flush idle comet mailboxes */
}

Expand Down
64 changes: 59 additions & 5 deletions src/spawn.c
Expand Up @@ -30,6 +30,7 @@
#include <dirent.h>

#include "tvheadend.h"
#include "tvhpoll.h"
#include "file.h"
#include "spawn.h"

Expand All @@ -49,6 +50,10 @@ static char *spawn_error_buf = NULL;
static th_pipe_t spawn_pipe_info;
static th_pipe_t spawn_pipe_error;

static pthread_t spawn_pipe_tid;

static int spawn_pipe_running;

typedef struct spawn {
LIST_ENTRY(spawn) link;
pid_t pid;
Expand Down Expand Up @@ -83,6 +88,8 @@ spawn_pipe_read( th_pipe_t *p, char **_buf, int level )
continue;
break;
}
buf[len + r] = '\0';
tvhlog_hexdump("spawn", buf + len, r);
while ((s = strchr(buf, '\n')) != NULL) {
*s++ = '\0';
tvhlog(level, "spawn", "%s", buf);
Expand All @@ -95,13 +102,58 @@ spawn_pipe_read( th_pipe_t *p, char **_buf, int level )
}
}

static void *
spawn_pipe_thread(void *aux)
{
tvhpoll_event_t ev[2];
tvhpoll_t *efd = tvhpoll_create(2);
int nfds;

memset(ev, 0, sizeof(ev));
ev[0].events = TVHPOLL_IN;
ev[0].fd = spawn_pipe_info.rd;
ev[0].data.ptr = &spawn_pipe_info;
ev[1].events = TVHPOLL_IN;
ev[1].fd = spawn_pipe_error.rd;
ev[1].data.ptr = &spawn_pipe_error;
tvhpoll_add(efd, ev, 2);

while (spawn_pipe_running) {

nfds = tvhpoll_wait(efd, ev, 2, 500);

if (nfds > 0) {
spawn_pipe_read(&spawn_pipe_info, &spawn_info_buf, LOG_INFO);
spawn_pipe_read(&spawn_pipe_error, &spawn_error_buf, LOG_ERR);
}
spawn_reaper();

}

tvhpoll_destroy(efd);
return NULL;
}

static void
spawn_pipe_write( th_pipe_t *p, const char *fmt, va_list ap )
{
char buf[512];
char buf[512], *s = buf;
int r;

vsnprintf(buf, sizeof(buf), fmt, ap);
(void)write(p->wr, buf, strlen(buf));
while (*s) {
r = write(p->wr, s, strlen(s));
if (r < 0) {
if (errno == EAGAIN)
break;
if (ERRNO_AGAIN(errno))
continue;
break;
}
if (!r)
break;
s += r;
}
}

void
Expand Down Expand Up @@ -176,9 +228,6 @@ spawn_reap(char *stxt, size_t stxtlen)
int status, res;
spawn_t *s;

spawn_pipe_read(&spawn_pipe_info, &spawn_info_buf, LOG_INFO);
spawn_pipe_read(&spawn_pipe_error, &spawn_error_buf, LOG_ERR);

pid = waitpid(-1, &status, WNOHANG);
if(pid < 1)
return -EAGAIN;
Expand Down Expand Up @@ -438,10 +487,15 @@ void spawn_init(void)
{
tvh_pipe(O_NONBLOCK, &spawn_pipe_info);
tvh_pipe(O_NONBLOCK, &spawn_pipe_error);
spawn_pipe_running = 1;
pthread_create(&spawn_pipe_tid, NULL, spawn_pipe_thread, NULL);
}

void spawn_done(void)
{
spawn_pipe_running = 0;
pthread_kill(spawn_pipe_tid, SIGTERM);
pthread_join(spawn_pipe_tid, NULL);
tvh_pipe_close(&spawn_pipe_error);
tvh_pipe_close(&spawn_pipe_info);
free(spawn_error_buf);
Expand Down

0 comments on commit a563d78

Please sign in to comment.