Skip to content

Commit

Permalink
fix Ractor.yield(obj, move: true)
Browse files Browse the repository at this point in the history
Ractor.yield(obj, move: true) and
Ractor.select(..., yield_value: obj, move: true) tried to yield a
value with move semantices, but if the trial is faild, the obj
should not become a moved object.

To keep this rule, `wait_moving` wait status is introduced.

New yield/take process:
(1) If a ractor tried to yield (move:true), make taking racotr's
    wait status `wait_moving` and make a moved object by
    `ractor_move(obj)` and wakeup taking ractor.
(2) If a ractor tried to take a message from a ractor waiting fo
    yielding (move:true), wakeup the ractor and wait for (1).
  • Loading branch information
ko1 committed Jan 22, 2021
1 parent d0d6227 commit fff1edf
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 15 deletions.
25 changes: 25 additions & 0 deletions bootstraptest/test_ractor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,31 @@ def check obj1
end
}

# yield/move should not make moved object when the yield is not succeeded
assert_equal '"str"', %q{
R = Ractor.new{}
M = Ractor.current
r = Ractor.new do
s = 'str'
selected_r, v = Ractor.select R, yield_value: s, move: true
raise if selected_r != R # taken from R
M.send s.inspect # s should not be a moved object
end
Ractor.receive
}

# yield/move can fail
assert_equal "allocator undefined for Thread", %q{
r = Ractor.new do
obj = Thread.new{}
Ractor.yield obj
rescue => e
e.message
end
r.take
}

# Access to global-variables are prohibited
assert_equal 'can not access global variables $gv from non-main Ractors', %q{
$gv = 1
Expand Down
73 changes: 58 additions & 15 deletions ractor.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ static VALUE ractor_move(VALUE obj); // in this file
static VALUE ractor_copy(VALUE obj); // in this file

static void
ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will)
ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield)
{
basket->sender = rb_ec_ractor_ptr(ec)->pub.self;
basket->exception = exc;
Expand All @@ -935,15 +935,21 @@ ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket,
}
else {
basket->type = basket_type_move;
basket->v = ractor_move(obj);

if (is_yield) {
basket->v = obj; // call ractor_move() when yielding timing.
}
else {
basket->v = ractor_move(obj);
}
}
}

static VALUE
ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
{
struct rb_ractor_basket basket;
ractor_basket_setup(ec, &basket, obj, move, false, false);
ractor_basket_setup(ec, &basket, obj, move, false, false, false);
ractor_send_basket(ec, r, &basket);
return r->pub.self;
}
Expand All @@ -958,17 +964,23 @@ ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)

RACTOR_LOCK(r);
{
if (ractor_wakeup(r, wait_yielding, wakeup_by_take)) {
if (ractor_sleeping_by(r, wait_yielding)) {
MAYBE_UNUSED(bool) wakeup_result;
VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none);
basket = r->sync.wait.yielded_basket;
ractor_basket_clear(&r->sync.wait.yielded_basket);

if (r->sync.wait.yielded_basket.type == basket_type_move) {
wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry);
}
else {
wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take);
basket = r->sync.wait.yielded_basket;
ractor_basket_clear(&r->sync.wait.yielded_basket);
}
VM_ASSERT(wakeup_result);
}
else if (r->sync.outgoing_port_closed) {
closed = true;
}
else {
// not reached.
}
}
RACTOR_UNLOCK(r);

Expand All @@ -985,6 +997,12 @@ ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)
}
}

static VALUE
ractor_yield_move_body(VALUE v)
{
return ractor_move(v);
}

static bool
ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket)
{
Expand All @@ -1009,8 +1027,34 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b

RACTOR_LOCK(r);
{
if (ractor_wakeup(r, wait_taking, wakeup_by_yield)) {
if (ractor_sleeping_by(r, wait_taking)) {
VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none);

if (basket->type == basket_type_move) {
enum ractor_wait_status prev_wait_status = r->sync.wait.status;
r->sync.wait.status = wait_moving;

RACTOR_UNLOCK(r);
{
int state;
VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state);
if (state) {
r->sync.wait.status = prev_wait_status;
rb_jump_tag(state);
}
else {
basket->v = moved_value;
}
}
RACTOR_LOCK(r);

if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) {
// terminating?
}
}
else {
ractor_wakeup(r, wait_taking, wakeup_by_yield);
}
r->sync.wait.taken_basket = *basket;
}
else {
Expand Down Expand Up @@ -1080,16 +1124,15 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VAL
}
rs = NULL;

restart:

if (yield_p) {
actions[rs_len].type = ractor_select_action_yield;
actions[rs_len].v = Qundef;
wait_status |= wait_yielding;

ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false);
ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true);
}

restart:

// TODO: shuffle actions

while (1) {
Expand Down Expand Up @@ -1580,7 +1623,7 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
ASSERT_ractor_unlocking(cr);

struct rb_ractor_basket basket;
ractor_basket_setup(ec, &basket, v, Qfalse, exc, true);
ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */);

retry:
if (ractor_try_yield(ec, cr, &basket)) {
Expand Down
1 change: 1 addition & 0 deletions ractor_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct rb_ractor_sync {
wait_receiving = 0x01,
wait_taking = 0x02,
wait_yielding = 0x04,
wait_moving = 0x08,
} status;

enum ractor_wakeup_status {
Expand Down

0 comments on commit fff1edf

Please sign in to comment.