Skip to content

Commit

Permalink
* eval.c, intern.h, ext/thread/thread.c: should not free queue
Browse files Browse the repository at this point in the history
  while any live threads are waiting.
  [ruby-dev:30653]


git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_1_8_6@12469 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
  • Loading branch information
shyouhei committed Jun 7, 2007
1 parent c4aee4c commit 887e101
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 38 deletions.
6 changes: 6 additions & 0 deletions ChangeLog
@@ -1,3 +1,9 @@
Thu Jun 7 20:10:51 2007 Nobuyoshi Nakada <nobu@ruby-lang.org>

* eval.c, intern.h, ext/thread/thread.c: should not free queue
while any live threads are waiting.
[ruby-dev:30653]

Thu Jun 7 14:53:46 2007 URABE Shyouhei <shyouhei@ruby-lang.org>

* eval.c (method_inspect): show proper class name.
Expand Down
28 changes: 23 additions & 5 deletions eval.c
Expand Up @@ -11212,11 +11212,20 @@ rb_thread_list()
VALUE
rb_thread_wakeup(thread)
VALUE thread;
{
if (!RTEST(rb_thread_wakeup_alive(thread)))
rb_raise(rb_eThreadError, "killed thread");
return thread;
}

VALUE
rb_thread_wakeup_alive(thread)
VALUE thread;
{
rb_thread_t th = rb_thread_check(thread);

if (th->status == THREAD_KILLED)
rb_raise(rb_eThreadError, "killed thread");
return Qnil;
rb_thread_ready(th);

return thread;
Expand Down Expand Up @@ -11291,7 +11300,7 @@ rb_thread_kill(thread)
rb_thread_t th = rb_thread_check(thread);

kill_thread(th, 0);
return thread;
return thread;
}


Expand Down Expand Up @@ -11644,6 +11653,15 @@ rb_thread_abort_exc_set(thread, val)
}


enum rb_thread_status
rb_thread_status(thread)
VALUE thread;
{
rb_thread_t th = rb_thread_check(thread);
return th->status;
}


/*
* call-seq:
* thr.group => thgrp or nil
Expand Down Expand Up @@ -12152,7 +12170,7 @@ rb_thread_value(thread)
*/

static VALUE
rb_thread_status(thread)
rb_thread_status_name(thread)
VALUE thread;
{
rb_thread_t th = rb_thread_check(thread);
Expand All @@ -12179,7 +12197,7 @@ rb_thread_status(thread)
* thr.alive? #=> false
*/

static VALUE
VALUE
rb_thread_alive_p(thread)
VALUE thread;
{
Expand Down Expand Up @@ -13013,7 +13031,7 @@ Init_Thread()
rb_define_method(rb_cThread, "terminate!", rb_thread_kill_bang, 0);
rb_define_method(rb_cThread, "exit!", rb_thread_kill_bang, 0);
rb_define_method(rb_cThread, "value", rb_thread_value, 0);
rb_define_method(rb_cThread, "status", rb_thread_status, 0);
rb_define_method(rb_cThread, "status", rb_thread_status_name, 0);
rb_define_method(rb_cThread, "join", rb_thread_join_m, -1);
rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0);
rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0);
Expand Down
98 changes: 66 additions & 32 deletions ext/thread/thread.c
Expand Up @@ -12,6 +12,7 @@
#include <ruby.h>
#include <intern.h>
#include <rubysig.h>
#include <node.h>

static VALUE rb_cMutex;
static VALUE rb_cConditionVariable;
Expand Down Expand Up @@ -207,15 +208,16 @@ array_from_list(List const *list)
static VALUE
wake_thread(VALUE thread)
{
return rb_rescue2(rb_thread_wakeup, thread,
NULL, Qundef, rb_eThreadError, 0);
return rb_thread_wakeup_alive(thread);
}

static VALUE
run_thread(VALUE thread)
{
return rb_rescue2(rb_thread_run, thread,
NULL, Qundef, rb_eThreadError, 0);
thread = wake_thread(thread);
if (RTEST(thread) && !rb_thread_critical)
rb_thread_schedule();
return thread;
}

static VALUE
Expand All @@ -225,7 +227,9 @@ wake_one(List *list)

waking = Qnil;
while (list->entries && !RTEST(waking)) {
waking = wake_thread(shift_list(list));
waking = shift_list(list);
if (waking == Qundef) break;
waking = wake_thread(waking);
}

return waking;
Expand Down Expand Up @@ -266,10 +270,17 @@ static void
assert_no_survivors(List *waiting, const char *label, void *addr)
{
Entry *entry;
VALUE ths = 0;

for (entry = waiting->entries; entry; entry = entry->next) {
if (RTEST(wake_thread(entry->value))) {
rb_bug("%s %p freed with live thread(s) waiting", label, addr);
}
if (RTEST(wake_thread(entry->value))) {
if (!ths) ths = rb_ary_new();
rb_ary_push(ths, entry->value);
}
}
if (ths) {
rb_bug("%s %p freed with live thread(s) %s waiting",
label, addr, RSTRING_PTR(rb_inspect(ths)));
}
}

Expand Down Expand Up @@ -303,6 +314,8 @@ typedef struct _Mutex {
List waiting;
} Mutex;

#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner))

static void
mark_mutex(Mutex *mutex)
{
Expand Down Expand Up @@ -361,7 +374,7 @@ rb_mutex_locked_p(VALUE self)
{
Mutex *mutex;
Data_Get_Struct(self, Mutex, mutex);
return RTEST(mutex->owner) ? Qtrue : Qfalse;
return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse;
}

/*
Expand All @@ -380,7 +393,7 @@ rb_mutex_try_lock(VALUE self)

Data_Get_Struct(self, Mutex, mutex);

if (RTEST(mutex->owner))
if (MUTEX_LOCKED_P(mutex))
return Qfalse;

mutex->owner = rb_thread_current();
Expand All @@ -403,11 +416,20 @@ lock_mutex(Mutex *mutex)

rb_thread_critical = 1;

while (RTEST(mutex->owner)) {
wait_list(&mutex->waiting);
rb_thread_critical = 1;
if (!MUTEX_LOCKED_P(mutex)) {
mutex->owner = current;
}
else {
push_list(&mutex->waiting, current);
do {
rb_thread_stop();
rb_thread_critical = 1;
if (!MUTEX_LOCKED_P(mutex)) {
mutex->owner = current;
break;
}
} while (mutex->owner != current);
}
mutex->owner = current;

rb_thread_critical = 0;
return Qnil;
Expand All @@ -422,6 +444,22 @@ rb_mutex_lock(VALUE self)
return self;
}

static VALUE
relock_mutex(Mutex *mutex)
{
VALUE current = rb_thread_current();

switch (rb_thread_status(current)) {
case THREAD_RUNNABLE:
case THREAD_STOPPED:
lock_mutex(mutex);
break;
default:
break;
}
return Qundef;
}

/*
* Document-method: unlock
*
Expand All @@ -434,16 +472,12 @@ unlock_mutex_inner(Mutex *mutex)
{
VALUE waking;

if (!RTEST(mutex->owner)) {
rb_raise(rb_eThreadError, "not owner");
}

if (mutex->owner != rb_thread_current()) {
rb_raise(rb_eThreadError, "not owner");
}

mutex->owner = Qnil;
waking = wake_one(&mutex->waiting);
mutex->owner = waking;

return waking;
}
Expand All @@ -462,14 +496,11 @@ unlock_mutex(Mutex *mutex)

rb_thread_critical = 1;
waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0);

if (waking == Qundef) {
if (!RTEST(waking)) {
return Qfalse;
}

if (RTEST(waking)) {
run_thread(waking);
}
run_thread(waking);

return Qtrue;
}
Expand Down Expand Up @@ -515,13 +546,11 @@ rb_mutex_exclusive_unlock(VALUE self)
rb_thread_critical = 1;
waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0);

if (waking == Qundef) {
if (waking == Qundef || !RTEST(waking)) {
return Qnil;
}

if (RTEST(waking)) {
run_thread(waking);
}
run_thread(waking);

return self;
}
Expand Down Expand Up @@ -633,13 +662,18 @@ rb_condvar_alloc(VALUE klass)
static void
wait_condvar(ConditionVariable *condvar, Mutex *mutex)
{
VALUE waking;

rb_thread_critical = 1;
if (rb_thread_current() != mutex->owner) {
rb_thread_critical = 0;
rb_raise(rb_eThreadError, "not owner of the synchronization mutex");
}
unlock_mutex_inner(mutex);
rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex);
waking = unlock_mutex_inner(mutex);
if (RTEST(waking)) {
wake_thread(waking);
}
rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex);
}

static VALUE
Expand Down Expand Up @@ -835,10 +869,10 @@ rb_queue_marshal_load(VALUE self, VALUE data)

array = rb_marshal_load(data);
if (TYPE(array) != T_ARRAY) {
rb_raise(rb_eRuntimeError, "expected Array of queue data");
rb_raise(rb_eTypeError, "expected Array of queue data");
}
if (RARRAY(array)->len < 1) {
rb_raise(rb_eRuntimeError, "missing capacity value");
rb_raise(rb_eArgError, "missing capacity value");
}
queue->capacity = NUM2ULONG(rb_ary_shift(array));
push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);
Expand Down
3 changes: 3 additions & 0 deletions intern.h
Expand Up @@ -204,10 +204,13 @@ int rb_thread_alone _((void));
void rb_thread_polling _((void));
void rb_thread_sleep _((int));
void rb_thread_sleep_forever _((void));
enum rb_thread_status rb_thread_status _((VALUE));
VALUE rb_thread_stop _((void));
VALUE rb_thread_wakeup _((VALUE));
VALUE rb_thread_wakeup_alive _((VALUE));
VALUE rb_thread_run _((VALUE));
VALUE rb_thread_kill _((VALUE));
VALUE rb_thread_alive_p _((VALUE));
VALUE rb_thread_create _((VALUE (*)(ANYARGS), void*));
void rb_thread_interrupt _((void));
void rb_thread_trap_eval _((VALUE, int, int));
Expand Down
2 changes: 1 addition & 1 deletion version.h
Expand Up @@ -2,7 +2,7 @@
#define RUBY_RELEASE_DATE "2007-06-07"
#define RUBY_VERSION_CODE 186
#define RUBY_RELEASE_CODE 20070607
#define RUBY_PATCHLEVEL 30
#define RUBY_PATCHLEVEL 31

#define RUBY_VERSION_MAJOR 1
#define RUBY_VERSION_MINOR 8
Expand Down

0 comments on commit 887e101

Please sign in to comment.