Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking Kernel#system via fiber scheduler hook. #4595

Merged
merged 2 commits into from Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/process.h
Expand Up @@ -26,6 +26,7 @@
#define RB_MAX_GROUPS (65536)

struct waitpid_state;
struct rb_process_status;
struct rb_execarg {
union {
struct {
Expand Down Expand Up @@ -56,6 +57,7 @@ struct rb_execarg {
unsigned gid_given : 1;
unsigned exception : 1;
unsigned exception_given : 1;
struct rb_process_status *status;
struct waitpid_state *waitpid_state; /* for async process management */
rb_pid_t pgroup_pgid; /* asis(-1), new pgroup(0), specified pgroup (0<V). */
VALUE rlimit_limits; /* Qfalse or [[rtype, softlim, hardlim], ...] */
Expand Down
172 changes: 107 additions & 65 deletions process.c
Expand Up @@ -3851,7 +3851,7 @@ rb_thread_sleep_that_takes_VALUE_as_sole_argument(VALUE n)
}

static int
handle_fork_error(int err, int *status, int *ep, volatile int *try_gc_p)
handle_fork_error(int err, struct rb_process_status *status, int *ep, volatile int *try_gc_p)
{
int state = 0;

Expand All @@ -3872,7 +3872,7 @@ handle_fork_error(int err, int *status, int *ep, volatile int *try_gc_p)
}
else {
rb_protect(rb_thread_sleep_that_takes_VALUE_as_sole_argument, INT2FIX(1), &state);
if (status) *status = state;
if (status) status->status = state;
if (!state) return 0;
}
break;
Expand Down Expand Up @@ -4161,7 +4161,7 @@ disable_child_handler_fork_child(struct child_handler_disabler_state *old, char
}

static rb_pid_t
retry_fork_async_signal_safe(int *status, int *ep,
retry_fork_async_signal_safe(struct rb_process_status *status, int *ep,
int (*chfunc)(void*, char *, size_t), void *charg,
char *errmsg, size_t errmsg_buflen,
struct waitpid_state *w)
Expand Down Expand Up @@ -4203,7 +4203,7 @@ retry_fork_async_signal_safe(int *status, int *ep,
_exit(127);
#endif
}
err = errno;
err = errno;
waitpid_lock = waitpid_lock_init;
if (waitpid_lock) {
if (pid > 0 && w != WAITPID_LOCK_ONLY) {
Expand All @@ -4222,39 +4222,54 @@ retry_fork_async_signal_safe(int *status, int *ep,
}

static rb_pid_t
fork_check_err(int *status, int (*chfunc)(void*, char *, size_t), void *charg,
fork_check_err(struct rb_process_status *status, int (*chfunc)(void*, char *, size_t), void *charg,
VALUE fds, char *errmsg, size_t errmsg_buflen,
struct rb_execarg *eargp)
{
rb_pid_t pid;
int err;
int ep[2];
int error_occurred;
struct waitpid_state *w;

w = eargp && eargp->waitpid_state ? eargp->waitpid_state : 0;
struct waitpid_state *w = eargp && eargp->waitpid_state ? eargp->waitpid_state : 0;

if (status) *status = 0;
if (status) status->status = 0;

if (pipe_nocrash(ep, fds)) return -1;
pid = retry_fork_async_signal_safe(status, ep, chfunc, charg,
errmsg, errmsg_buflen, w);
if (pid < 0)

pid = retry_fork_async_signal_safe(status, ep, chfunc, charg, errmsg, errmsg_buflen, w);

if (status) status->pid = pid;

if (pid < 0) {
if (status) status->error = errno;

return pid;
}

close(ep[1]);

error_occurred = recv_child_error(ep[0], &err, errmsg, errmsg_buflen);

if (error_occurred) {
if (status) {
int state = 0;
status->error = err;

VM_ASSERT((w == 0 || w == WAITPID_LOCK_ONLY) &&
"only used by extensions");
rb_protect(proc_syswait, (VALUE)pid, status);
rb_protect(proc_syswait, (VALUE)pid, &state);

status->status = state;
}
else if (!w || w == WAITPID_LOCK_ONLY) {
rb_syswait(pid);
}

errno = err;
return -1;
}

return pid;
}

Expand All @@ -4270,38 +4285,61 @@ rb_fork_async_signal_safe(int *status,
int (*chfunc)(void*, char *, size_t), void *charg,
VALUE fds, char *errmsg, size_t errmsg_buflen)
{
return fork_check_err(status, chfunc, charg, fds, errmsg, errmsg_buflen, 0);
struct rb_process_status process_status;

rb_pid_t result = fork_check_err(&process_status, chfunc, charg, fds, errmsg, errmsg_buflen, 0);

if (status) {
*status = process_status.status;
}

return result;
}

rb_pid_t
rb_fork_ruby(int *status)
{
rb_fork_ruby2(struct rb_process_status *status) {
rb_pid_t pid;
int try_gc = 1, err;
struct child_handler_disabler_state old;

if (status) *status = 0;
if (status) status->status = 0;

while (1) {
prefork();
prefork();
if (mjit_enabled) mjit_pause(false); // Don't leave locked mutex to child. Note: child_handler must be enabled to pause MJIT.
disable_child_handler_before_fork(&old);
before_fork_ruby();
pid = rb_fork();
err = errno;
disable_child_handler_before_fork(&old);
before_fork_ruby();
pid = rb_fork();
err = errno;
after_fork_ruby();
disable_child_handler_fork_parent(&old); /* yes, bad name */
disable_child_handler_fork_parent(&old); /* yes, bad name */

if (mjit_enabled && pid > 0) mjit_resume(); /* child (pid == 0) is cared by rb_thread_atfork */
if (pid >= 0) { /* fork succeed */

if (pid >= 0) { /* fork succeed */
if (pid == 0) rb_thread_atfork();
return pid;
return pid;
}

/* fork failed */
if (handle_fork_error(err, status, NULL, &try_gc)) {
return -1;
}
/* fork failed */
if (handle_fork_error(err, status, NULL, &try_gc))
return -1;
}
}

rb_pid_t
rb_fork_ruby(int *status)
{
struct rb_process_status process_status = {0};

rb_pid_t pid = rb_fork_ruby2(&process_status);

if (status) *status = process_status.status;

return pid;
}

#endif

#if defined(HAVE_WORKING_FORK) && !defined(CANNOT_FORK_WITH_PTHREAD)
Expand Down Expand Up @@ -4574,7 +4612,7 @@ rb_spawn_process(struct rb_execarg *eargp, char *errmsg, size_t errmsg_buflen)
#endif

#if defined HAVE_WORKING_FORK && !USE_SPAWNV
pid = fork_check_err(0, rb_exec_atfork, eargp, eargp->redirect_fds, errmsg, errmsg_buflen, eargp);
pid = fork_check_err(eargp->status, rb_exec_atfork, eargp, eargp->redirect_fds, errmsg, errmsg_buflen, eargp);
#else
prog = eargp->use_shell ? eargp->invoke.sh.shell_script : eargp->invoke.cmd.command_name;

Expand Down Expand Up @@ -4727,58 +4765,62 @@ rb_spawn(int argc, const VALUE *argv)
static VALUE
rb_f_system(int argc, VALUE *argv, VALUE _)
{
/*
* n.b. using alloca for now to simplify future Thread::Light code
* when we need to use malloc for non-native Fiber
*/
struct waitpid_state *w = alloca(sizeof(struct waitpid_state));
rb_pid_t pid; /* may be different from waitpid_state.pid on exec failure */
VALUE execarg_obj;
struct rb_execarg *eargp;
int exec_errnum;
VALUE execarg_obj = rb_execarg_new(argc, argv, TRUE, TRUE);
struct rb_execarg *eargp = rb_execarg_get(execarg_obj);

execarg_obj = rb_execarg_new(argc, argv, TRUE, TRUE);
eargp = rb_execarg_get(execarg_obj);
w->ec = GET_EC();
waitpid_state_init(w, 0, 0);
eargp->waitpid_state = w;
pid = rb_execarg_spawn(execarg_obj, 0, 0);
exec_errnum = pid < 0 ? errno : 0;
struct rb_process_status status = {0};
eargp->status = &status;

#if defined(HAVE_WORKING_FORK) || defined(HAVE_SPAWNV)
if (w->pid > 0) {
/* `pid' (not w->pid) may be < 0 here if execve failed in child */
if (WAITPID_USE_SIGCHLD) {
rb_ensure(waitpid_sleep, (VALUE)w, waitpid_cleanup, (VALUE)w);
rb_last_status_clear();

// This function can set the thread's last status.
// May be different from waitpid_state.pid on exec failure.
rb_pid_t pid = rb_execarg_spawn(execarg_obj, 0, 0);

if (pid > 0) {
VALUE status = rb_process_status_wait(pid, 0);
struct rb_process_status *data = RTYPEDDATA_DATA(status);

// Set the last status:
rb_obj_freeze(status);
GET_THREAD()->last_status = status;

if (data->status == EXIT_SUCCESS) {
return Qtrue;
}
else {
waitpid_no_SIGCHLD(w);

if (data->error != 0) {
if (eargp->exception) {
VALUE command = eargp->invoke.sh.shell_script;
RB_GC_GUARD(execarg_obj);
rb_syserr_fail_str(data->error, command);
}
else {
return Qnil;
}
}
rb_last_status_set(w->status, w->ret);
}
#endif
if (w->pid < 0 /* fork failure */ || pid < 0 /* exec failure */) {
if (eargp->exception) {
int err = exec_errnum ? exec_errnum : w->errnum;
else if (eargp->exception) {
VALUE command = eargp->invoke.sh.shell_script;
VALUE str = rb_str_new_cstr("Command failed with");
rb_str_cat_cstr(pst_message_status(str, data->status), ": ");
rb_str_append(str, command);
RB_GC_GUARD(execarg_obj);
rb_syserr_fail_str(err, command);
rb_exc_raise(rb_exc_new_str(rb_eRuntimeError, str));
}
else {
return Qnil;
return Qfalse;
}

RB_GC_GUARD(status);
}
if (w->status == EXIT_SUCCESS) return Qtrue;

if (eargp->exception) {
VALUE command = eargp->invoke.sh.shell_script;
VALUE str = rb_str_new_cstr("Command failed with");
rb_str_cat_cstr(pst_message_status(str, w->status), ": ");
rb_str_append(str, command);
RB_GC_GUARD(execarg_obj);
rb_exc_raise(rb_exc_new_str(rb_eRuntimeError, str));
rb_syserr_fail_str(errno, command);
}
else {
return Qfalse;
return Qnil;
}
}

Expand Down