Skip to content

Commit

Permalink
* thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
Browse files Browse the repository at this point in the history
  Async events such as an exception throwed by Thread#raise,
  Thread#kill and thread termination (after main thread termination)
  will be queued to th->async_errinfo_queue.
  - clear: clear the queue.
  - enque: enque err object into queue.
  - deque: deque err object from queue.
  - active_p: return 1 if the queue should be checked.
  rb_thread_t#thrown_errinfo was removed.
* vm_core.h: add declarations of rb_threadptr_async_errinfo_*.
  remove rb_thread_t#thrown_errinfo field and
  add rb_thread_t#async_errinfo_queue (queue body: Array),
  rb_thread_t#async_errinfo_queue_checked (flag),
  rb_thread_t#async_errinfo_mask_stack(Array, not used yet).
* vm.c (rb_thread_mark): fix a mark function.
* cont.c (rb_fiber_start): enque an error.
* process.c (after_fork): clear async errinfo queue.



git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36430 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
  • Loading branch information
ko1 committed Jul 18, 2012
1 parent 18c04b8 commit 2814443
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 44 deletions.
24 changes: 24 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
Wed Jul 18 14:16:51 2012 Koichi Sasada <ko1@atdot.net>

* thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
Async events such as an exception throwed by Thread#raise,
Thread#kill and thread termination (after main thread termination)
will be queued to th->async_errinfo_queue.
- clear: clear the queue.
- enque: enque err object into queue.
- deque: deque err object from queue.
- active_p: return 1 if the queue should be checked.
rb_thread_t#thrown_errinfo was removed.

* vm_core.h: add declarations of rb_threadptr_async_errinfo_*.
remove rb_thread_t#thrown_errinfo field and
add rb_thread_t#async_errinfo_queue (queue body: Array),
rb_thread_t#async_errinfo_queue_checked (flag),
rb_thread_t#async_errinfo_mask_stack(Array, not used yet).

* vm.c (rb_thread_mark): fix a mark function.

* cont.c (rb_fiber_start): enque an error.

* process.c (after_fork): clear async errinfo queue.

Wed Jul 18 14:25:55 2012 URABE Shyouhei <shyouhei@ruby-lang.org>

* pack.c: (ditto) bitwise operations are not char. Apply explicit
Expand Down
6 changes: 3 additions & 3 deletions cont.c
Original file line number Diff line number Diff line change
Expand Up @@ -1164,11 +1164,11 @@ rb_fiber_start(void)

if (state) {
if (state == TAG_RAISE) {
th->thrown_errinfo = th->errinfo;
rb_threadptr_async_errinfo_enque(th, th->errinfo);
}
else {
th->thrown_errinfo =
rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
VALUE err = rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
rb_threadptr_async_errinfo_enque(th, err);
}
RUBY_VM_SET_INTERRUPT(th);
}
Expand Down
2 changes: 1 addition & 1 deletion process.c
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ after_exec(void)
}

#define before_fork() before_exec()
#define after_fork() (GET_THREAD()->thrown_errinfo = 0, after_exec())
#define after_fork() (rb_threadptr_async_errinfo_clear(GET_THREAD()), after_exec())

#include "dln.h"

Expand Down
143 changes: 105 additions & 38 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec);
static void sleep_forever(rb_thread_t *th, int nodeadlock);
static double timeofday(void);
static int rb_threadptr_dead(rb_thread_t *th);

static void rb_check_deadlock(rb_vm_t *vm);

#define eKillSignal INT2FIX(0)
Expand Down Expand Up @@ -131,7 +130,6 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \
exec; \
blocking_region_end(__th, &__region); \
RUBY_VM_CHECK_INTS(); \
} while(0)

#if THREAD_DEBUG
Expand Down Expand Up @@ -313,9 +311,9 @@ terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)

if (th != main_thread) {
thread_debug("terminate_i: %p\n", (void *)th);
rb_threadptr_interrupt(th);
th->thrown_errinfo = eTerminateSignal;
rb_threadptr_async_errinfo_enque(th, eTerminateSignal);
th->status = THREAD_TO_KILL;
rb_threadptr_interrupt(th);
}
else {
thread_debug("terminate_i: main thread (%p)\n", (void *)th);
Expand Down Expand Up @@ -564,6 +562,10 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
th->priority = GET_THREAD()->priority;
th->thgroup = GET_THREAD()->thgroup;

th->async_errinfo_queue = rb_ary_new();
th->async_errinfo_queue_checked = 0;
th->async_errinfo_mask_stack = rb_ary_new();

native_mutex_initialize(&th->interrupt_lock);
if (GET_VM()->event_hooks != NULL)
th->event_flags |= RUBY_EVENT_VM;
Expand Down Expand Up @@ -1133,6 +1135,10 @@ rb_thread_call_without_gvl(void *(*func)(void *), void *data1,
val = func(data1);
saved_errno = errno;
}, ubf, data2);

/* TODO: check */
RUBY_VM_CHECK_INTS();

errno = saved_errno;

return val;
Expand All @@ -1144,13 +1150,28 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
VALUE val;
rb_thread_t *th = GET_THREAD();
int saved_errno = 0;
int state;

th->waiting_fd = fd;
BLOCKING_REGION({
val = func(data1);
saved_errno = errno;
}, ubf_select, th);

TH_PUSH_TAG(th);
if ((state = EXEC_TAG()) == 0) {
BLOCKING_REGION({
val = func(data1);
saved_errno = errno;
}, ubf_select, th);
}
TH_POP_TAG();

/* clear waitinf_fd anytime */
th->waiting_fd = -1;

if (state) {
JUMP_TAG(state);
}
/* TODO: check func() */
RUBY_VM_CHECK_INTS();

errno = saved_errno;

return val;
Expand Down Expand Up @@ -1294,12 +1315,14 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th)
}

/* exception from another thread */
if (th->thrown_errinfo) {
VALUE err = th->thrown_errinfo;
th->thrown_errinfo = 0;
if (rb_threadptr_async_errinfo_active_p(th)) {
VALUE err = rb_threadptr_async_errinfo_deque(th);
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);

if (err == eKillSignal || err == eTerminateSignal) {
if (err == eKillSignal /* Thread#kill receieved */ ||
err == eTerminateSignal /* Terminate thread */ ) {
rb_threadptr_async_errinfo_clear(th);
th->status = THREAD_TO_KILL;
th->errinfo = INT2FIX(TAG_FATAL);
TH_JUMP_TAG(th, TAG_FATAL);
}
Expand Down Expand Up @@ -1353,6 +1376,59 @@ rb_gc_mark_threads(void)

/*****************************************************/

/*
* rb_threadptr_async_errinfo_* - manage async errors queue
*
* Async events such as an exception throwed by Thread#raise,
* Thread#kill and thread termination (after main thread termination)
* will be queued to th->async_errinfo_queue.
* - clear: clear the queue.
* - enque: enque err object into queue.
* - deque: deque err object from queue.
* - active_p: return 1 if the queue should be checked.
*
* All rb_threadptr_async_errinfo_* functions are called by
* a GVL acquired thread, of course.
* Note that all "rb_" prefix APIs need GVL to call.
*/

void
rb_threadptr_async_errinfo_clear(rb_thread_t *th)
{
rb_ary_clear(th->async_errinfo_queue);
}

void
rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
{
rb_ary_push(th->async_errinfo_queue, v);
th->async_errinfo_queue_checked = 0;
}

VALUE
rb_threadptr_async_errinfo_deque(rb_thread_t *th)
{
VALUE err = rb_ary_shift(th->async_errinfo_queue);
if (RARRAY_LEN(th->async_errinfo_queue) == 0) {
th->async_errinfo_queue_checked = 1;
}
return err;
}

int
rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
{
if (th->async_errinfo_queue_checked) {
return 0;
}
else {
return RARRAY_LEN(th->async_errinfo_queue) > 0;
}
}

VALUE
rb_thread

static void
rb_threadptr_ready(rb_thread_t *th)
{
Expand All @@ -1364,19 +1440,13 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
{
VALUE exc;

again:
if (rb_threadptr_dead(th)) {
return Qnil;
}

if (th->thrown_errinfo != 0 || th->raised_flag) {
rb_thread_schedule();
goto again;
}

exc = rb_make_exception(argc, argv);
th->thrown_errinfo = exc;
rb_threadptr_ready(th);
rb_threadptr_async_errinfo_enque(th, exc);
rb_threadptr_interrupt(th);
return Qnil;
}

Expand Down Expand Up @@ -1436,28 +1506,17 @@ rb_threadptr_reset_raised(rb_thread_t *th)
return 1;
}

#define THREAD_IO_WAITING_P(th) ( \
((th)->status == THREAD_STOPPED || \
(th)->status == THREAD_STOPPED_FOREVER) && \
(th)->blocking_region_buffer && \
(th)->unblock.func == ubf_select && \
1)

static int
thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
{
int fd = (int)data;
rb_thread_t *th;
GetThreadPtr((VALUE)key, th);

if (THREAD_IO_WAITING_P(th)) {
native_mutex_lock(&th->interrupt_lock);
if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
RUBY_VM_SET_INTERRUPT(th);
(th->unblock.func)(th->unblock.arg);
}
native_mutex_unlock(&th->interrupt_lock);
if (th->waiting_fd == fd) {
VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
rb_threadptr_async_errinfo_enque(th, err);
rb_threadptr_interrupt(th);
}
return ST_CONTINUE;
}
Expand Down Expand Up @@ -1530,10 +1589,9 @@ rb_thread_kill(VALUE thread)

thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);

rb_threadptr_interrupt(th);
th->thrown_errinfo = eKillSignal;
rb_threadptr_async_errinfo_enque(th, eKillSignal);
th->status = THREAD_TO_KILL;

rb_threadptr_interrupt(th);
return thread;
}

Expand Down Expand Up @@ -2592,6 +2650,9 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
result = native_fd_select(n, read, write, except, timeout, th);
if (result < 0) lerrno = errno;
}, ubf_select, th);

RUBY_VM_CHECK_INTS();

errno = lerrno;

if (result < 0) {
Expand Down Expand Up @@ -2815,6 +2876,8 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
if (result < 0) lerrno = errno;
}, ubf_select, GET_THREAD());

RUBY_VM_CHECK_INTS();

if (result < 0) {
errno = lerrno;
switch (errno) {
Expand Down Expand Up @@ -4707,6 +4770,10 @@ Init_Thread(void)
gvl_init(th->vm);
gvl_acquire(th->vm, th);
native_mutex_initialize(&th->interrupt_lock);

th->async_errinfo_queue = rb_ary_new();
th->async_errinfo_queue_checked = 0;
th->async_errinfo_mask_stack = rb_ary_new();
}
}

Expand Down
3 changes: 2 additions & 1 deletion vm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,8 @@ rb_thread_mark(void *ptr)
RUBY_MARK_UNLESS_NULL(th->thgroup);
RUBY_MARK_UNLESS_NULL(th->value);
RUBY_MARK_UNLESS_NULL(th->errinfo);
RUBY_MARK_UNLESS_NULL(th->thrown_errinfo);
RUBY_MARK_UNLESS_NULL(th->async_errinfo_queue);
RUBY_MARK_UNLESS_NULL(th->async_errinfo_mask_stack);
RUBY_MARK_UNLESS_NULL(th->root_svar);
RUBY_MARK_UNLESS_NULL(th->top_self);
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
Expand Down
11 changes: 10 additions & 1 deletion vm_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,13 @@ typedef struct rb_thread_struct {
VALUE thgroup;
VALUE value;

/* temporary place of errinfo */
VALUE errinfo;
VALUE thrown_errinfo;

/* async errinfo queue */
VALUE async_errinfo_queue;
int async_errinfo_queue_checked;
VALUE async_errinfo_mask_stack;

rb_atomic_t interrupt_flag;
rb_thread_lock_t interrupt_lock;
Expand Down Expand Up @@ -767,6 +772,10 @@ void rb_threadptr_signal_exit(rb_thread_t *th);
void rb_threadptr_execute_interrupts(rb_thread_t *);
void rb_threadptr_interrupt(rb_thread_t *th);
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
void rb_threadptr_async_errinfo_clear(rb_thread_t *th);
void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v);
VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th);
int rb_threadptr_async_errinfo_active_p(rb_thread_t *th);

void rb_thread_lock_unlock(rb_thread_lock_t *);
void rb_thread_lock_destroy(rb_thread_lock_t *);
Expand Down

0 comments on commit 2814443

Please sign in to comment.