Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix copy coroutine implementation. #3624

Merged
merged 2 commits into from Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions common.mk
Expand Up @@ -14011,6 +14011,7 @@ thread.$(OBJEXT): $(top_srcdir)/internal/thread.h
thread.$(OBJEXT): $(top_srcdir)/internal/time.h
thread.$(OBJEXT): $(top_srcdir)/internal/vm.h
thread.$(OBJEXT): $(top_srcdir)/internal/warnings.h
thread.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
thread.$(OBJEXT): {$(VPATH)}assert.h
thread.$(OBJEXT): {$(VPATH)}backward/2/assume.h
thread.$(OBJEXT): {$(VPATH)}backward/2/attributes.h
Expand Down
16 changes: 16 additions & 0 deletions coroutine/Stack.h
@@ -0,0 +1,16 @@
/*
* This file is part of the "Coroutine" project and released under the MIT License.
*
* Created by Samuel Williams on 10/11/2020.
* Copyright, 2020, by Samuel Williams.
*/

#include COROUTINE_H

#ifdef COROUTINE_PRIVATE_STACK
#define COROUTINE_STACK_LOCAL(type, name) type *name = ruby_xmalloc(sizeof(type))
#define COROUTINE_STACK_FREE(name) ruby_xfree(name)
#else
#define COROUTINE_STACK_LOCAL(type, name) type name##_local; type * name = &name##_local
#define COROUTINE_STACK_FREE(name)
#endif
59 changes: 40 additions & 19 deletions coroutine/copy/Context.c
Expand Up @@ -7,6 +7,8 @@

#include "Context.h"

#include <stdint.h>

// http://gcc.gnu.org/onlinedocs/gcc/Alternate-Keywords.html
#ifndef __GNUC__
#define __asm__ asm
Expand Down Expand Up @@ -35,13 +37,22 @@ static void coroutine_flush_register_windows() {
static void coroutine_flush_register_windows() {}
#endif

int coroutine_save_stack(struct coroutine_context * context) {
void *stack_pointer = &stack_pointer;
__attribute__((noinline))
void *coroutine_stack_pointer() {
return (void*)(
(char*)__builtin_frame_address(0)
);
}

// Save the current stack to a private area. It is likely that when restoring the stack, this stack frame will be incomplete. But that is acceptable since the previous stack frame which called `setjmp` should be correctly restored.
__attribute__((noinline))
int coroutine_save_stack_1(struct coroutine_context * context) {
assert(context->stack);
assert(context->base);

// At this point, you may need to ensure on architectures that use register windows, that all registers are flushed to the stack.
void *stack_pointer = coroutine_stack_pointer();

// At this point, you may need to ensure on architectures that use register windows, that all registers are flushed to the stack, otherwise the copy of the stack will not contain the valid registers:
coroutine_flush_register_windows();

// Save stack to private area:
Expand All @@ -59,16 +70,30 @@ int coroutine_save_stack(struct coroutine_context * context) {
context->used = size;
}

// Save registers / restore point:
return _setjmp(context->state);
// Initialized:
return 0;
}

// Copy the current stack to a private memory buffer.
int coroutine_save_stack(struct coroutine_context * context) {
if (_setjmp(context->state)) {
// Restored.
return 1;
}

// We need to invoke the memory copy from one stack frame deeper than the one that calls setjmp. That is because if you don't do this, the setjmp might be restored into an invalid stack frame (truncated, etc):
return coroutine_save_stack_1(context);
}

__attribute__((noreturn, noinline))
static void coroutine_restore_stack_padded(struct coroutine_context *context, void * buffer) {
void *stack_pointer = &stack_pointer;
void coroutine_restore_stack_padded(struct coroutine_context *context, void * buffer) {
void *stack_pointer = coroutine_stack_pointer();

assert(context->base);

// At this point, you may need to ensure on architectures that use register windows, that all registers are flushed to the stack, otherwise when we copy in the new stack, the registers would not be updated:
coroutine_flush_register_windows();

// Restore stack from private area:
if (stack_pointer < context->base) {
void * bottom = (char*)context->base - context->used;
Expand All @@ -82,28 +107,24 @@ static void coroutine_restore_stack_padded(struct coroutine_context *context, vo
memcpy(context->base, context->stack, context->used);
}

// Restore registers:
// The `| (int)buffer` is to force the compiler NOT to elide he buffer and `alloca`.
_longjmp(context->state, 1 | (int)buffer);
// Restore registers. The `buffer` is to force the compiler NOT to elide he buffer and `alloca`:
_longjmp(context->state, (int)(1 | (intptr_t)buffer));
}

static const size_t GAP = 128;

// In order to swap between coroutines, we need to swap the stack and registers.
// `setjmp` and `longjmp` are able to swap registers, but what about swapping stacks? You can use `memcpy` to copy the current stack to a private area and `memcpy` to copy the private stack of the next coroutine to the main stack.
// But if the stack yop are copying in to the main stack is bigger than the currently executing stack, the `memcpy` will clobber the current stack frame (including the context argument). So we use `alloca` to push the current stack frame *beyond* the stack we are about to copy in. This ensures the current stack frame in `coroutine_restore_stack_padded` remains valid for calling `longjmp`.
__attribute__((noreturn))
void coroutine_restore_stack(struct coroutine_context *context) {
void *stack_pointer = &stack_pointer;
void *stack_pointer = coroutine_stack_pointer();
void *buffer = NULL;
ssize_t offset = 0;

// We must ensure that the next stack frame is BEYOND the stack we are restoring:
if (stack_pointer < context->base) {
offset = (char*)stack_pointer - ((char*)context->base - context->used) + GAP;
intptr_t offset = (intptr_t)stack_pointer - ((intptr_t)context->base - context->used);
if (offset > 0) buffer = alloca(offset);
} else {
offset = ((char*)context->base + context->used) - (char*)stack_pointer + GAP;
intptr_t offset = ((intptr_t)context->base + context->used) - (intptr_t)stack_pointer;
if (offset > 0) buffer = alloca(offset);
}

Expand All @@ -128,9 +149,9 @@ struct coroutine_context *coroutine_transfer(struct coroutine_context *current,
// It's possible to come here, even thought the current fiber has been terminated. We are never going to return so we don't bother saving the stack.

if (current->stack) {
if (coroutine_save_stack(current) == 0) {
coroutine_restore_stack(target);
}
if (coroutine_save_stack(current) == 0) {
coroutine_restore_stack(target);
}
} else {
coroutine_restore_stack(target);
}
Expand Down
15 changes: 8 additions & 7 deletions thread.c
Expand Up @@ -1789,23 +1789,23 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
rb_execution_context_t * volatile ec = GET_EC();
volatile int saved_errno = 0;
enum ruby_tag_type state;
struct waiting_fd wfd;
COROUTINE_STACK_LOCAL(struct waiting_fd, wfd);

wfd.fd = fd;
wfd.th = rb_ec_thread_ptr(ec);
wfd->fd = fd;
wfd->th = rb_ec_thread_ptr(ec);

RB_VM_LOCK_ENTER();
{
list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node);
list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd->wfd_node);
}
RB_VM_LOCK_LEAVE();

EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
BLOCKING_REGION(wfd.th, {
BLOCKING_REGION(wfd->th, {
val = func(data1);
saved_errno = errno;
}, ubf_select, wfd.th, FALSE);
}, ubf_select, wfd->th, FALSE);
}
EC_POP_TAG();

Expand All @@ -1815,7 +1815,8 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
*/
RB_VM_LOCK_ENTER();
{
list_del(&wfd.wfd_node);
list_del(&wfd->wfd_node);
COROUTINE_STACK_FREE(wfd);
}
RB_VM_LOCK_LEAVE();

Expand Down
95 changes: 51 additions & 44 deletions thread_sync.c
@@ -1,5 +1,6 @@
/* included by thread.c */
#include "ccan/list/list.h"
#include "coroutine/Stack.h"

static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
Expand Down Expand Up @@ -270,9 +271,14 @@ static VALUE call_rb_scheduler_block(VALUE mutex) {
return rb_scheduler_block(rb_scheduler_current(), mutex, Qnil);
}

static VALUE remove_from_mutex_lock_waiters(VALUE arg) {
struct list_node *node = (struct list_node*)arg;
list_del(node);
static VALUE
delete_from_waitq(VALUE v)
{
struct sync_waiter *w = (void *)v;
list_del(&w->node);

COROUTINE_STACK_FREE(w);

return Qnil;
}

Expand All @@ -291,22 +297,21 @@ do_mutex_lock(VALUE self, int interruptible_p)
}

if (rb_mutex_trylock(self) == Qfalse) {
struct sync_waiter w = {
.self = self,
.th = th,
.fiber = fiber
};

if (mutex->fiber == fiber) {
rb_raise(rb_eThreadError, "deadlock; recursive locking");
}

while (mutex->fiber != fiber) {
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
list_add_tail(&mutex->waitq, &w.node);
COROUTINE_STACK_LOCAL(struct sync_waiter, w);
w->self = self;
w->th = th;
w->fiber = fiber;

list_add_tail(&mutex->waitq, &w->node);

rb_ensure(call_rb_scheduler_block, self, remove_from_mutex_lock_waiters, (VALUE)&w.node);
rb_ensure(call_rb_scheduler_block, self, delete_from_waitq, (VALUE)w);

if (!mutex->fiber) {
mutex->fiber = fiber;
Expand All @@ -330,11 +335,18 @@ do_mutex_lock(VALUE self, int interruptible_p)
patrol_thread = th;
}

list_add_tail(&mutex->waitq, &w.node);
COROUTINE_STACK_LOCAL(struct sync_waiter, w);
w->self = self;
w->th = th;
w->fiber = fiber;

list_add_tail(&mutex->waitq, &w->node);

native_sleep(th, timeout); /* release GVL */

list_del(&w.node);
list_del(&w->node);

COROUTINE_STACK_FREE(w);

if (!mutex->fiber) {
mutex->fiber = fiber;
Expand Down Expand Up @@ -949,6 +961,8 @@ queue_sleep_done(VALUE p)
list_del(&qw->w.node);
qw->as.q->num_waiting--;

COROUTINE_STACK_FREE(qw);

return Qfalse;
}

Expand All @@ -960,6 +974,8 @@ szqueue_sleep_done(VALUE p)
list_del(&qw->w.node);
qw->as.sq->num_waiting_push--;

COROUTINE_STACK_FREE(qw);

return Qfalse;
}

Expand All @@ -977,20 +993,21 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
}
else {
rb_execution_context_t *ec = GET_EC();
struct queue_waiter qw;

assert(RARRAY_LEN(q->que) == 0);
assert(queue_closed_p(self) == 0);

qw.w.self = self;
qw.w.th = ec->thread_ptr;
qw.w.fiber = ec->fiber_ptr;
COROUTINE_STACK_LOCAL(struct queue_waiter, qw);

qw->w.self = self;
qw->w.th = ec->thread_ptr;
qw->w.fiber = ec->fiber_ptr;

qw.as.q = q;
list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
qw.as.q->num_waiting++;
qw->as.q = q;
list_add_tail(queue_waitq(qw->as.q), &qw->w.node);
qw->as.q->num_waiting++;

rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)qw);
}
}

Expand Down Expand Up @@ -1223,18 +1240,18 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
}
else {
rb_execution_context_t *ec = GET_EC();
struct queue_waiter qw;
COROUTINE_STACK_LOCAL(struct queue_waiter, qw);
struct list_head *pushq = szqueue_pushq(sq);

qw.w.self = self;
qw.w.th = ec->thread_ptr;
qw.w.fiber = ec->fiber_ptr;
qw->w.self = self;
qw->w.th = ec->thread_ptr;
qw->w.fiber = ec->fiber_ptr;

qw.as.sq = sq;
list_add_tail(pushq, &qw.w.node);
qw->as.sq = sq;
list_add_tail(pushq, &qw->w.node);
sq->num_waiting_push++;

rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)qw);
}
}

Expand Down Expand Up @@ -1445,15 +1462,6 @@ do_sleep(VALUE args)
return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
}

static VALUE
delete_from_waitq(VALUE v)
{
struct sync_waiter *w = (void *)v;
list_del(&w->node);

return Qnil;
}

/*
* Document-method: ConditionVariable#wait
* call-seq: wait(mutex, timeout=nil)
Expand All @@ -1474,14 +1482,13 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self)

rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);

struct sync_waiter w = {
.self = args.mutex,
.th = ec->thread_ptr,
.fiber = ec->fiber_ptr,
};
COROUTINE_STACK_LOCAL(struct sync_waiter, w);
w->self = args.mutex;
w->th = ec->thread_ptr;
w->fiber = ec->fiber_ptr;

list_add_tail(&cv->waitq, &w.node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
list_add_tail(&cv->waitq, &w->node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)w);

return self;
}
Expand Down