Skip to content

Commit

Permalink
Proposed method for dealing with stack locals which have non-local li…
Browse files Browse the repository at this point in the history
…fetime.
  • Loading branch information
ioquatix committed Dec 4, 2020
1 parent 15e2331 commit 3b5b309
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 51 deletions.
1 change: 1 addition & 0 deletions common.mk
Expand Up @@ -14011,6 +14011,7 @@ thread.$(OBJEXT): $(top_srcdir)/internal/thread.h
thread.$(OBJEXT): $(top_srcdir)/internal/time.h
thread.$(OBJEXT): $(top_srcdir)/internal/vm.h
thread.$(OBJEXT): $(top_srcdir)/internal/warnings.h
thread.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
thread.$(OBJEXT): {$(VPATH)}assert.h
thread.$(OBJEXT): {$(VPATH)}backward/2/assume.h
thread.$(OBJEXT): {$(VPATH)}backward/2/attributes.h
Expand Down
16 changes: 16 additions & 0 deletions coroutine/Stack.h
@@ -0,0 +1,16 @@
/*
* This file is part of the "Coroutine" project and released under the MIT License.
*
* Created by Samuel Williams on 10/11/2020.
* Copyright, 2020, by Samuel Williams.
*/

#include COROUTINE_H

#ifdef COROUTINE_PRIVATE_STACK
#define COROUTINE_STACK_LOCAL(type, name) type *name = ruby_xmalloc(sizeof(type))
#define COROUTINE_STACK_FREE(name) ruby_xfree(name)
#else
#define COROUTINE_STACK_LOCAL(type, name) type name##_local; type * name = &name##_local
#define COROUTINE_STACK_FREE(name)
#endif
15 changes: 8 additions & 7 deletions thread.c
Expand Up @@ -1789,23 +1789,23 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
rb_execution_context_t * volatile ec = GET_EC();
volatile int saved_errno = 0;
enum ruby_tag_type state;
struct waiting_fd wfd;
COROUTINE_STACK_LOCAL(struct waiting_fd, wfd);

wfd.fd = fd;
wfd.th = rb_ec_thread_ptr(ec);
wfd->fd = fd;
wfd->th = rb_ec_thread_ptr(ec);

RB_VM_LOCK_ENTER();
{
list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node);
list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd->wfd_node);
}
RB_VM_LOCK_LEAVE();

EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
BLOCKING_REGION(wfd.th, {
BLOCKING_REGION(wfd->th, {
val = func(data1);
saved_errno = errno;
}, ubf_select, wfd.th, FALSE);
}, ubf_select, wfd->th, FALSE);
}
EC_POP_TAG();

Expand All @@ -1815,7 +1815,8 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
*/
RB_VM_LOCK_ENTER();
{
list_del(&wfd.wfd_node);
list_del(&wfd->wfd_node);
COROUTINE_STACK_FREE(wfd);
}
RB_VM_LOCK_LEAVE();

Expand Down
95 changes: 51 additions & 44 deletions thread_sync.c
@@ -1,5 +1,6 @@
/* included by thread.c */
#include "ccan/list/list.h"
#include "coroutine/Stack.h"

static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
Expand Down Expand Up @@ -270,9 +271,14 @@ static VALUE call_rb_scheduler_block(VALUE mutex) {
return rb_scheduler_block(rb_scheduler_current(), mutex, Qnil);
}

static VALUE remove_from_mutex_lock_waiters(VALUE arg) {
struct list_node *node = (struct list_node*)arg;
list_del(node);
static VALUE
delete_from_waitq(VALUE v)
{
struct sync_waiter *w = (void *)v;
list_del(&w->node);

COROUTINE_STACK_FREE(w);

return Qnil;
}

Expand All @@ -291,22 +297,21 @@ do_mutex_lock(VALUE self, int interruptible_p)
}

if (rb_mutex_trylock(self) == Qfalse) {
struct sync_waiter w = {
.self = self,
.th = th,
.fiber = fiber
};

if (mutex->fiber == fiber) {
rb_raise(rb_eThreadError, "deadlock; recursive locking");
}

while (mutex->fiber != fiber) {
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
list_add_tail(&mutex->waitq, &w.node);
COROUTINE_STACK_LOCAL(struct sync_waiter, w);
w->self = self;
w->th = th;
w->fiber = fiber;

list_add_tail(&mutex->waitq, &w->node);

rb_ensure(call_rb_scheduler_block, self, remove_from_mutex_lock_waiters, (VALUE)&w.node);
rb_ensure(call_rb_scheduler_block, self, delete_from_waitq, (VALUE)w);

if (!mutex->fiber) {
mutex->fiber = fiber;
Expand All @@ -330,11 +335,18 @@ do_mutex_lock(VALUE self, int interruptible_p)
patrol_thread = th;
}

list_add_tail(&mutex->waitq, &w.node);
COROUTINE_STACK_LOCAL(struct sync_waiter, w);
w->self = self;
w->th = th;
w->fiber = fiber;

list_add_tail(&mutex->waitq, &w->node);

native_sleep(th, timeout); /* release GVL */

list_del(&w.node);
list_del(&w->node);

COROUTINE_STACK_FREE(w);

if (!mutex->fiber) {
mutex->fiber = fiber;
Expand Down Expand Up @@ -949,6 +961,8 @@ queue_sleep_done(VALUE p)
list_del(&qw->w.node);
qw->as.q->num_waiting--;

COROUTINE_STACK_FREE(qw);

return Qfalse;
}

Expand All @@ -960,6 +974,8 @@ szqueue_sleep_done(VALUE p)
list_del(&qw->w.node);
qw->as.sq->num_waiting_push--;

COROUTINE_STACK_FREE(qw);

return Qfalse;
}

Expand All @@ -977,20 +993,21 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
}
else {
rb_execution_context_t *ec = GET_EC();
struct queue_waiter qw;

assert(RARRAY_LEN(q->que) == 0);
assert(queue_closed_p(self) == 0);

qw.w.self = self;
qw.w.th = ec->thread_ptr;
qw.w.fiber = ec->fiber_ptr;
COROUTINE_STACK_LOCAL(struct queue_waiter, qw);

qw->w.self = self;
qw->w.th = ec->thread_ptr;
qw->w.fiber = ec->fiber_ptr;

qw.as.q = q;
list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
qw.as.q->num_waiting++;
qw->as.q = q;
list_add_tail(queue_waitq(qw->as.q), &qw->w.node);
qw->as.q->num_waiting++;

rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)qw);
}
}

Expand Down Expand Up @@ -1223,18 +1240,18 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
}
else {
rb_execution_context_t *ec = GET_EC();
struct queue_waiter qw;
COROUTINE_STACK_LOCAL(struct queue_waiter, qw);
struct list_head *pushq = szqueue_pushq(sq);

qw.w.self = self;
qw.w.th = ec->thread_ptr;
qw.w.fiber = ec->fiber_ptr;
qw->w.self = self;
qw->w.th = ec->thread_ptr;
qw->w.fiber = ec->fiber_ptr;

qw.as.sq = sq;
list_add_tail(pushq, &qw.w.node);
qw->as.sq = sq;
list_add_tail(pushq, &qw->w.node);
sq->num_waiting_push++;

rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)qw);
}
}

Expand Down Expand Up @@ -1445,15 +1462,6 @@ do_sleep(VALUE args)
return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
}

static VALUE
delete_from_waitq(VALUE v)
{
struct sync_waiter *w = (void *)v;
list_del(&w->node);

return Qnil;
}

/*
* Document-method: ConditionVariable#wait
* call-seq: wait(mutex, timeout=nil)
Expand All @@ -1474,14 +1482,13 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self)

rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);

struct sync_waiter w = {
.self = args.mutex,
.th = ec->thread_ptr,
.fiber = ec->fiber_ptr,
};
COROUTINE_STACK_LOCAL(struct sync_waiter, w);
w->self = args.mutex;
w->th = ec->thread_ptr;
w->fiber = ec->fiber_ptr;

list_add_tail(&cv->waitq, &w.node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
list_add_tail(&cv->waitq, &w->node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)w);

return self;
}
Expand Down

0 comments on commit 3b5b309

Please sign in to comment.