Skip to content

Commit

Permalink
Ractor#receive supports filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
ko1 committed Dec 8, 2020
1 parent 2749123 commit 87f06a1
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 36 deletions.
32 changes: 32 additions & 0 deletions bootstraptest/test_ractor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,38 @@
r.take
}

# Ractor#receive can filter the message
assert_equal '[2, 3, 1]', %q{
r = Ractor.new Ractor.current do |main|
main << 1
main << 2
main << 3
end
a = []
a << Ractor.receive{|msg| msg == 2}
a << Ractor.receive{|msg| msg == 3}
a << Ractor.receive
}

# Ractor#receive with exception
assert_equal '[2, :e, 1, 3]', %q{
r = Ractor.new Ractor.current do |main|
main << 1
main << 2
main << 3
end
a = []
a << Ractor.receive{|msg| msg == 2}
begin
a << Ractor.receive{|msg| raise}
rescue => e
a << :e
end
a << Ractor.receive
a << Ractor.receive
}

###
###
# Ractor still has several memory corruption so skip huge number of tests
Expand Down
275 changes: 243 additions & 32 deletions ractor.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status)
return rb_ractor_status_p(r, status);
}

static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);

static void
ractor_queue_mark(struct rb_ractor_queue *rq)
{
for (int i=0; i<rq->cnt; i++) {
int idx = (rq->start + i) % rq->size;
rb_gc_mark(rq->baskets[idx].v);
rb_gc_mark(rq->baskets[idx].sender);
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
rb_gc_mark(b->v);
rb_gc_mark(b->sender);
}
}

Expand Down Expand Up @@ -317,33 +319,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq)
rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
}

static struct rb_ractor_basket *
ractor_queue_at(struct rb_ractor_queue *rq, int i)
{
return &rq->baskets[(rq->start + i) % rq->size];
}

static void
ractor_queue_advance(struct rb_ractor_queue *rq)
{
ASSERT_ractor_locking(GET_RACTOR());

if (rq->reserved_cnt == 0) {
rq->cnt--;
rq->start = (rq->start + 1) % rq->size;
rq->serial++;
}
else {
ractor_queue_at(rq, 0)->type = basket_type_deleted;
}
}

static bool
ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
{
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
return b->type == basket_type_deleted ||
b->type == basket_type_reserved;
}

static void
ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
{
ASSERT_ractor_locking(r);

while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
ractor_queue_advance(rq);
}
}

static bool
ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
{
ASSERT_ractor_locking(r);
return rq->cnt == 0;

if (rq->cnt == 0) {
return true;
}

ractor_queue_compact(r, rq);

for (int i=0; i<rq->cnt; i++) {
if (!ractor_queue_skip_p(rq, i)) {
return false;
}
}

return true;
}

static bool
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
{
bool b;
bool found = false;

RACTOR_LOCK(r);
{
if (!ractor_queue_empty_p(r, rq)) {
*basket = rq->baskets[rq->start];
rq->cnt--;
rq->start = (rq->start + 1) % rq->size;
b = true;
}
else {
b = false;
for (int i=0; i<rq->cnt; i++) {
if (!ractor_queue_skip_p(rq, i)) {
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
*basket = *b;

// remove from queue
b->type = basket_type_deleted;
ractor_queue_compact(r, rq);
found = true;
break;
}
}
}
}
RACTOR_UNLOCK(r);

return b;
return found;
}

static void
Expand Down Expand Up @@ -373,24 +432,29 @@ ractor_basket_clear(struct rb_ractor_basket *b)
static VALUE ractor_reset_belonging(VALUE obj); // in this file

static VALUE
ractor_basket_accept(struct rb_ractor_basket *b)
ractor_basket_value(struct rb_ractor_basket *b)
{
VALUE v;

switch (b->type) {
case basket_type_ref:
VM_ASSERT(rb_ractor_shareable_p(b->v));
v = b->v;
break;
case basket_type_copy:
case basket_type_move:
case basket_type_will:
v = ractor_reset_belonging(b->v);
b->type = basket_type_ref;
b->v = ractor_reset_belonging(b->v);
break;
default:
rb_bug("unreachable");
}

return b->v;
}

static VALUE
ractor_basket_accept(struct rb_ractor_basket *b)
{
VALUE v = ractor_basket_value(b);

if (b->exception) {
VALUE cause = v;
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
Expand Down Expand Up @@ -616,29 +680,176 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
}
}

static void
ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
{
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));

RACTOR_LOCK(cr);
{
if (ractor_queue_empty_p(cr, &cr->incoming_queue)) {
VM_ASSERT(cr->wait.status == wait_none);
cr->wait.status = wait_receiving;
cr->wait.wakeup_status = wakeup_none;
ractor_sleep(ec, cr);
cr->wait.wakeup_status = wakeup_none;
}
}
RACTOR_UNLOCK(cr);
}

static VALUE
ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
{
VM_ASSERT(r == rb_ec_ractor_ptr(ec));
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
VALUE v;

while ((v = ractor_try_receive(ec, r)) == Qundef) {
RACTOR_LOCK(r);
while ((v = ractor_try_receive(ec, cr)) == Qundef) {
ractor_receive_wait(ec, cr);
}

return v;
}

#if 0
// for debug
static const char *
basket_type_name(enum rb_ractor_basket_type type)
{
switch (type) {
#define T(t) case basket_type_##t: return #t
T(none);
T(ref);
T(copy);
T(move);
T(will);
T(deleted);
T(reserved);
default: rb_bug("unreachable");
}
}

static void
rq_dump(struct rb_ractor_queue *rq)
{
bool bug = false;
for (int i=0; i<rq->cnt; i++) {
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
if (b->type == basket_type_reserved) bug = true;
}
if (bug) rb_bug("!!");
}
#endif

struct receive_block_data {
rb_ractor_t *cr;
struct rb_ractor_queue *rq;
VALUE v;
int index;
bool success;
};

static VALUE
receive_block_body(VALUE ptr)
{
struct receive_block_data *data = (struct receive_block_data *)ptr;
VALUE block_result = rb_yield(data->v);

RACTOR_LOCK_SELF(data->cr);
{
struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
VM_ASSERT(b->type == basket_type_reserved);
data->rq->reserved_cnt--;

if (RTEST(block_result)) {
b->type = basket_type_deleted;
ractor_queue_compact(data->cr, data->rq);
}
else {
b->type = basket_type_ref;
}
}
RACTOR_UNLOCK_SELF(data->cr);

data->success = true;

if (RTEST(block_result)) {
return data->v;
}
else {
return Qundef;
}
}

static VALUE
receive_block_ensure(VALUE v)
{
struct receive_block_data *data = (struct receive_block_data *)v;

if (!data->success) {
RACTOR_LOCK_SELF(data->cr);
{
if (ractor_queue_empty_p(r, &r->incoming_queue)) {
VM_ASSERT(r->wait.status == wait_none);
r->wait.status = wait_receiving;
r->wait.wakeup_status = wakeup_none;
struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
VM_ASSERT(b->type == basket_type_reserved);
b->type = basket_type_ref;
data->rq->reserved_cnt--;
}
RACTOR_UNLOCK_SELF(data->cr);
}
return Qnil;
}

static VALUE
ractor_receive_block(rb_execution_context_t *ec, VALUE crv)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
unsigned int serial = (unsigned int)-1;
int index = 0;
struct rb_ractor_queue *rq = &cr->incoming_queue;

while (1) {
VALUE v = Qundef;

ractor_receive_wait(ec, cr);

ractor_sleep(ec, r);
RACTOR_LOCK_SELF(cr);
{
if (serial != rq->serial) {
serial = rq->serial;
index = 0;
}

r->wait.wakeup_status = wakeup_none;
// check newer version
for (int i=index; i<rq->cnt; i++) {
if (!ractor_queue_skip_p(rq, i)) {
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
v = ractor_basket_value(b);
b->type = basket_type_reserved;
rq->reserved_cnt++;
index = i;
break;
}
}
}
RACTOR_UNLOCK(r);
}
RACTOR_UNLOCK_SELF(cr);

return v;
if (v != Qundef) {
struct receive_block_data data = {
.cr = cr,
.rq = rq,
.v = v,
.index = index,
.success = false,
};

VALUE result = rb_ensure(receive_block_body, (VALUE)&data,
receive_block_ensure, (VALUE)&data);

if (result != Qundef) return result;
index++;
}
}
}

static void
Expand Down
Loading

0 comments on commit 87f06a1

Please sign in to comment.