Skip to content

Commit

Permalink
job: Refactor process spawning and startup arguments
Browse files Browse the repository at this point in the history
- process spawning was decoupled from the rest of the job control logic.  The
  goal is reusing it for spawning processes connected to pseudo terminal file
  descriptors.
- job_start now receives a JobOptions structure containing all the startup
  options.
  • Loading branch information
tarruda committed Feb 24, 2015
1 parent 0b8d3cb commit 1ec7db7
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 179 deletions.
2 changes: 2 additions & 0 deletions .valgrind.supp
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
Memcheck:Leak
fun:malloc
fun:uv_spawn
fun:pipe_process_spawn
fun:process_spawn
fun:job_start
}
16 changes: 7 additions & 9 deletions src/nvim/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -10725,15 +10725,13 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)

// The last item of argv must be NULL
argv[i] = NULL;

job_start(argv,
xstrdup((char *)argvars[0].vval.v_string),
true,
on_job_stdout,
on_job_stderr,
on_job_exit,
0,
&rettv->vval.v_number);
JobOptions opts = JOB_OPTIONS_INIT;
opts.argv = argv;
opts.data = xstrdup((char *)argvars[0].vval.v_string);
opts.stdout_cb = on_job_stdout;
opts.stderr_cb = on_job_stderr;
opts.exit_cb = on_job_exit;
job_start(opts, &rettv->vval.v_number);

if (rettv->vval.v_number <= 0) {
if (rettv->vval.v_number == 0) {
Expand Down
15 changes: 7 additions & 8 deletions src/nvim/msgpack_rpc/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,13 @@ uint64_t channel_from_job(char **argv)
incref(channel); // job channels are only closed by the exit_cb

int status;
channel->data.job = job_start(argv,
channel,
true,
job_out,
job_err,
job_exit,
0,
&status);
JobOptions opts = JOB_OPTIONS_INIT;
opts.argv = argv;
opts.data = channel;
opts.stdout_cb = job_out;
opts.stderr_cb = job_err;
opts.exit_cb = job_exit;
channel->data.job = job_start(opts, &status);

if (status <= 0) {
if (status == 0) { // Two decrefs needed if status == 0.
Expand Down
195 changes: 41 additions & 154 deletions src/nvim/os/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
#include "nvim/os/uv_helpers.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/job_private.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
#include "nvim/os/wstream_defs.h"
#include "nvim/os/event.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/shell.h"
#include "nvim/os/time.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
Expand All @@ -29,8 +29,8 @@
if (job->stream) { \
type##stream_free(job->stream); \
job->stream = NULL; \
if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \
uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \
if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) { \
uv_close((uv_handle_t *)job->proc_std##stream, close_cb); \
} \
} \
} while (0)
Expand All @@ -39,37 +39,9 @@
#define close_job_out(job) close_job_stream(job, out, r)
#define close_job_err(job) close_job_stream(job, err, r)

struct job {
// Job id the index in the job table plus one.
int id;
// Exit status code of the job process
int status;
// Number of references to the job. The job resources will only be freed by
// close_cb when this is 0
int refcount;
// Time when job_stop was called for the job.
uint64_t stopped_time;
// If SIGTERM was already sent to the job(only send one before SIGKILL)
bool term_sent;
// Data associated with the job
void *data;
// Callbacks
job_exit_cb exit_cb;
rstream_cb stdout_cb, stderr_cb;
// Readable streams(std{out,err})
RStream *out, *err;
// Writable stream(stdin)
WStream *in;
// Structures for process spawning/management used by libuv
uv_process_t proc;
uv_process_options_t proc_opts;
uv_stdio_container_t stdio[3];
uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
};

static Job *table[MAX_RUNNING_JOBS] = {NULL};
Job *table[MAX_RUNNING_JOBS] = {NULL};
size_t stop_requests = 0;
static uv_timer_t job_stop_timer;
uv_timer_t job_stop_timer;

// Some helpers shared in this module

Expand Down Expand Up @@ -106,29 +78,10 @@ void job_teardown(void)

/// Tries to start a new job.
///
/// @param argv Argument vector for the process. The first item is the
/// executable to run.
/// [consumed]
/// @param data Caller data that will be associated with the job
/// @param writable If true the job stdin will be available for writing with
/// job_write, otherwise it will be redirected to /dev/null
/// @param stdout_cb Callback that will be invoked when data is available
/// on stdout. If NULL stdout will be redirected to /dev/null.
/// @param stderr_cb Callback that will be invoked when data is available
/// on stderr. If NULL stderr will be redirected to /dev/null.
/// @param job_exit_cb Callback that will be invoked when the job exits
/// @param maxmem Maximum amount of memory used by the job WStream
/// @param[out] status The job id if the job started successfully, 0 if the job
/// table is full, -1 if the program could not be executed.
/// @return The job pointer if the job started successfully, NULL otherwise
Job *job_start(char **argv,
void *data,
bool writable,
rstream_cb stdout_cb,
rstream_cb stderr_cb,
job_exit_cb job_exit_cb,
size_t maxmem,
int *status)
Job *job_start(JobOptions opts, int *status)
{
int i;
Job *job;
Expand All @@ -142,7 +95,7 @@ Job *job_start(char **argv,

if (i == MAX_RUNNING_JOBS) {
// No free slots
shell_free_argv(argv);
shell_free_argv(opts.argv);
*status = 0;
return NULL;
}
Expand All @@ -153,92 +106,64 @@ Job *job_start(char **argv,
*status = job->id;
job->status = -1;
job->refcount = 1;
job->data = data;
job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb;
job->exit_cb = job_exit_cb;
job->stopped_time = 0;
job->term_sent = false;
job->proc_opts.file = argv[0];
job->proc_opts.args = argv;
job->proc_opts.stdio = job->stdio;
job->proc_opts.stdio_count = 3;
job->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE;
job->proc_opts.exit_cb = exit_cb;
job->proc_opts.cwd = NULL;
job->proc_opts.env = NULL;
job->proc.data = NULL;
job->proc_stdin.data = NULL;
job->proc_stdout.data = NULL;
job->proc_stderr.data = NULL;
job->in = NULL;
job->out = NULL;
job->err = NULL;
job->opts = opts;
job->closed = false;

// Initialize the job std{in,out,err}
job->stdio[0].flags = UV_IGNORE;
job->stdio[1].flags = UV_IGNORE;
job->stdio[2].flags = UV_IGNORE;
process_init(job);

if (writable) {
uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
handle_set_job((uv_handle_t *)&job->proc_stdin, job);
if (opts.writable) {
handle_set_job((uv_handle_t *)job->proc_stdin, job);
job->refcount++;
}

if (stdout_cb) {
uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
if (opts.stdout_cb) {
handle_set_job((uv_handle_t *)job->proc_stdout, job);
job->refcount++;
}

if (stderr_cb) {
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
handle_set_job((uv_handle_t *)&job->proc_stderr, job);
if (opts.stderr_cb) {
handle_set_job((uv_handle_t *)job->proc_stderr, job);
job->refcount++;
}

handle_set_job((uv_handle_t *)&job->proc, job);

// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
if (writable) {
if (!process_spawn(job)) {
if (opts.writable) {
uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
}
if (stdout_cb) {
if (opts.stdout_cb) {
uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
}
if (stderr_cb) {
if (opts.stderr_cb) {
uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
}
uv_close((uv_handle_t *)&job->proc, close_cb);
process_close(job);
event_poll(0);
// Manually invoke the close_cb to free the job resources
*status = -1;
return NULL;
}

if (writable) {
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
if (opts.writable) {
job->in = wstream_new(opts.maxmem);
wstream_set_stream(job->in, job->proc_stdin);
}

// Start the readable streams
if (stdout_cb) {
if (opts.stdout_cb) {
job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->out, job->proc_stdout);
rstream_start(job->out);
}

if (stderr_cb) {
if (opts.stderr_cb) {
job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_set_stream(job->err, job->proc_stderr);
rstream_start(job->err);
}
// Save the job to the table
Expand Down Expand Up @@ -327,7 +252,8 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
// Job exited, collect status and manually invoke close_cb to free the job
// resources
status = job->status;
close_cb((uv_handle_t *)&job->proc);
job_close_streams(job);
job_decref(job);
} else {
job->refcount--;
}
Expand Down Expand Up @@ -391,25 +317,7 @@ int job_id(Job *job)
/// @return The job data
void *job_data(Job *job)
{
return job->data;
}

static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
// this one
table[job->id - 1] = NULL;

if (job->exit_cb) {
// Invoke the exit callback
job->exit_cb(job, job->data);
}

if (stop_requests && !--stop_requests) {
// Stop the timer if no more stop requests are pending
DLOG("Stopping job kill timer");
uv_timer_stop(&job_stop_timer);
}
return job->opts.data;
}

/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
Expand All @@ -428,11 +336,12 @@ static void job_stop_timer_cb(uv_timer_t *handle)

if (!job->term_sent && elapsed >= TERM_TIMEOUT) {
ILOG("Sending SIGTERM to job(id: %d)", job->id);
uv_process_kill(&job->proc, SIGTERM);
uv_kill(job->pid, SIGTERM);
job->term_sent = true;
} else if (elapsed >= KILL_TIMEOUT) {
ILOG("Sending SIGKILL to job(id: %d)", job->id);
uv_process_kill(&job->proc, SIGKILL);
uv_kill(job->pid, SIGKILL);
process_close(job);
}
}
}
Expand All @@ -443,48 +352,26 @@ static void read_cb(RStream *rstream, void *data, bool eof)
Job *job = data;

if (rstream == job->out) {
job->stdout_cb(rstream, data, eof);
job->opts.stdout_cb(rstream, data, eof);
if (eof) {
close_job_out(job);
}
} else {
job->stderr_cb(rstream, data, eof);
job->opts.stderr_cb(rstream, data, eof);
if (eof) {
close_job_err(job);
}
}
}

// Emits a JobExit event if both rstreams are closed
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
void job_close_streams(Job *job)
{
Job *job = handle_get_job((uv_handle_t *)proc);

job->status = (int)status;
uv_close((uv_handle_t *)&job->proc, close_cb);
close_job_in(job);
close_job_out(job);
close_job_err(job);
}

static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);

if (handle == (uv_handle_t *)&job->proc) {
// Make sure all streams are properly closed to trigger callback invocation
// when job->proc is closed
close_job_in(job);
close_job_out(job);
close_job_err(job);
}

if (--job->refcount == 0) {
// Invoke the exit_cb
job_exit_callback(job);
// Free all memory allocated for the job
free(job->proc.data);
free(job->proc_stdin.data);
free(job->proc_stdout.data);
free(job->proc_stderr.data);
shell_free_argv(job->proc_opts.args);
free(job);
}
job_decref(handle_get_job(handle));
}
Loading

0 comments on commit 1ec7db7

Please sign in to comment.