Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bootstraptest/test_ractor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ def check obj1
r.value[:frozen]
}

# Access to global-variables are prohibited
assert_equal 'can not access global variables $gv from non-main Ractors', %q{
# Access to global-variables are prohibited (read)
assert_equal 'can not access global variable $gv from non-main Ractor', %q{
$gv = 1
r = Ractor.new do
$gv
Expand All @@ -602,8 +602,8 @@ def check obj1
end
}

# Access to global-variables are prohibited
assert_equal 'can not access global variables $gv from non-main Ractors', %q{
# Access to global-variables are prohibited (write)
assert_equal 'can not access global variable $gv from non-main Ractor', %q{
r = Ractor.new do
$gv = 1
end
Expand Down
2 changes: 2 additions & 0 deletions common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -7578,6 +7578,7 @@ gc.$(OBJEXT): $(top_srcdir)/internal/class.h
gc.$(OBJEXT): $(top_srcdir)/internal/compile.h
gc.$(OBJEXT): $(top_srcdir)/internal/compilers.h
gc.$(OBJEXT): $(top_srcdir)/internal/complex.h
gc.$(OBJEXT): $(top_srcdir)/internal/concurrent_set.h
gc.$(OBJEXT): $(top_srcdir)/internal/cont.h
gc.$(OBJEXT): $(top_srcdir)/internal/error.h
gc.$(OBJEXT): $(top_srcdir)/internal/eval.h
Expand Down Expand Up @@ -19064,6 +19065,7 @@ symbol.$(OBJEXT): $(top_srcdir)/internal/array.h
symbol.$(OBJEXT): $(top_srcdir)/internal/basic_operators.h
symbol.$(OBJEXT): $(top_srcdir)/internal/class.h
symbol.$(OBJEXT): $(top_srcdir)/internal/compilers.h
symbol.$(OBJEXT): $(top_srcdir)/internal/concurrent_set.h
symbol.$(OBJEXT): $(top_srcdir)/internal/error.h
symbol.$(OBJEXT): $(top_srcdir)/internal/gc.h
symbol.$(OBJEXT): $(top_srcdir)/internal/hash.h
Expand Down
112 changes: 99 additions & 13 deletions concurrent_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity)
return obj;
}

rb_atomic_t
rb_concurrent_set_size(VALUE set_obj)
{
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);

return RUBY_ATOMIC_LOAD(set->size);
}

struct concurrent_set_probe {
int idx;
int d;
Expand Down Expand Up @@ -119,7 +127,7 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
RUBY_ASSERT(key != CONCURRENT_SET_MOVED);

if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
if (rb_objspace_garbage_object_p(key)) continue;
if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;

VALUE hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
if (hash == 0) {
Expand Down Expand Up @@ -167,21 +175,90 @@ concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr)
}
}

VALUE
rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
{
RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);

VALUE set_obj;
VALUE hash = 0;

retry:
set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr);
RUBY_ASSERT(set_obj);
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);

if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
// never change.
hash = set->funcs->hash(key);
}
RUBY_ASSERT(hash == set->funcs->hash(key));

struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);

while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);

switch (curr_key) {
case CONCURRENT_SET_EMPTY:
return 0;
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
// Wait
RB_VM_LOCKING();

goto retry;
default: {
VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
if (curr_hash != 0 && curr_hash != hash) break;

if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
// Skip it and mark it as deleted.
RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
break;
}

if (set->funcs->cmp(key, curr_key)) {
// We've found a match.
RB_GC_GUARD(set_obj);
return curr_key;
}

break;
}
}

idx = concurrent_set_probe_next(&probe);
}
}

VALUE
rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
{
RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);

bool inserting = false;
VALUE set_obj;
VALUE hash = 0;

retry:
set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr);
RUBY_ASSERT(set_obj);
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);

if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
// never change.
hash = set->funcs->hash(key);
}
RUBY_ASSERT(hash == set->funcs->hash(key));

struct concurrent_set_probe probe;
VALUE hash = set->funcs->hash(key);
int idx = concurrent_set_probe_start(&probe, set, hash);

while (true) {
Expand Down Expand Up @@ -229,19 +306,27 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
goto retry;
default: {
VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
if ((curr_hash == hash || curr_hash == 0) && set->funcs->cmp(key, curr_key)) {
if (curr_hash != 0 && curr_hash != hash) break;

if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
// Skip it and mark it as deleted.
RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
break;
}

if (set->funcs->cmp(key, curr_key)) {
// We've found a match.
if (UNLIKELY(rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
// Skip it and mark it as deleted.
RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
RB_GC_GUARD(set_obj);

// Fall through and continue our search.
}
else {
RB_GC_GUARD(set_obj);
return curr_key;
if (inserting) {
// We created key using set->funcs->create, but we didn't end
// up inserting it into the set. Free it here to prevent memory
// leaks.
if (set->funcs->free) set->funcs->free(key);
}

return curr_key;
}

break;
Expand Down Expand Up @@ -300,6 +385,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);

for (unsigned int i = 0; i < set->capacity; i++) {
struct concurrent_set_entry *entry = &set->entries[i];
VALUE key = set->entries[i].key;

switch (key) {
Expand All @@ -310,7 +396,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key
rb_bug("rb_concurrent_set_foreach_with_replace: moved entry");
break;
default: {
int ret = callback(&set->entries[i].key, data);
int ret = callback(&entry->key, data);
switch (ret) {
case ST_STOP:
return;
Expand Down
8 changes: 4 additions & 4 deletions doc/ractor.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ port.receive # get a message to the port. Only the creator Ractor can receive fr
#=> 42
```

Ractors have its own deafult port and `Ractor#send`, `Ractor.receive` will use it.
Ractors have its own default port and `Ractor#send`, `Ractor.receive` will use it.

### Copy & Move semantics to send messages

Expand Down Expand Up @@ -201,7 +201,7 @@ You can wait multiple Ractor port's receiving.
The return value of `Ractor.select()` is `[port, msg]` where `port` is a ready port and `msg` is received message.

To make convenient, `Ractor.select` can also accept Ractors to wait the termination of Ractors.
The return value of `Ractor.select()` is `[r, msg]` where `r` is a terminated Ractor and `msg` is the value of Ractor's blcok.
The return value of `Ractor.select()` is `[r, msg]` where `r` is a terminated Ractor and `msg` is the value of Ractor's block.

Wait for a single ractor (same as `Ractor#value`):

Expand Down Expand Up @@ -359,7 +359,7 @@ The following objects are shareable.

Implementation: Now shareable objects (`RVALUE`) have `FL_SHAREABLE` flag. This flag can be added lazily.

To make shareable objects, `Ractor.make_shareable(obj)` method is provided. In this case, try to make sharaeble by freezing `obj` and recursively traversable objects. This method accepts `copy:` keyword (default value is false).`Ractor.make_shareable(obj, copy: true)` tries to make a deep copy of `obj` and make the copied object shareable.
To make shareable objects, `Ractor.make_shareable(obj)` method is provided. In this case, try to make shareable by freezing `obj` and recursively traversable objects. This method accepts `copy:` keyword (default value is false).`Ractor.make_shareable(obj, copy: true)` tries to make a deep copy of `obj` and make the copied object shareable.

## Language changes to isolate unshareable objects between Ractors

Expand All @@ -384,7 +384,7 @@ rescue Ractor::RemoteError => e
end
```

Note that some special global variables, such as `$stdin`, `$stdout` and `$stderr` are Ractor-lcoal. See [[Bug #17268]](https://bugs.ruby-lang.org/issues/17268) for more details.
Note that some special global variables, such as `$stdin`, `$stdout` and `$stderr` are Ractor-local. See [[Bug #17268]](https://bugs.ruby-lang.org/issues/17268) for more details.

### Instance variables of shareable objects

Expand Down
54 changes: 17 additions & 37 deletions gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
#include "internal/class.h"
#include "internal/compile.h"
#include "internal/complex.h"
#include "internal/concurrent_set.h"
#include "internal/cont.h"
#include "internal/error.h"
#include "internal/eval.h"
Expand Down Expand Up @@ -344,6 +345,7 @@ rb_gc_shutdown_call_finalizer_p(VALUE obj)
if (rb_obj_is_fiber(obj)) return false;
if (rb_obj_is_main_ractor(obj)) return false;
if (rb_obj_is_fstring_table(obj)) return false;
if (rb_obj_is_symbol_table(obj)) return false;

return true;

Expand Down Expand Up @@ -3166,11 +3168,6 @@ rb_gc_mark_children(void *objspace, VALUE obj)
switch (BUILTIN_TYPE(obj)) {
case T_FLOAT:
case T_BIGNUM:
case T_SYMBOL:
/* Not immediates, but does not have references and singleton class.
*
* RSYMBOL(obj)->fstr intentionally not marked. See log for 96815f1e
* ("symbol.c: remove rb_gc_mark_symbols()") */
return;

case T_NIL:
Expand Down Expand Up @@ -3228,6 +3225,10 @@ rb_gc_mark_children(void *objspace, VALUE obj)
mark_hash(obj);
break;

case T_SYMBOL:
gc_mark_internal(RSYMBOL(obj)->fstr);
break;

case T_STRING:
if (STR_SHARED_P(obj)) {
if (STR_EMBED_P(RSTRING(obj)->as.heap.aux.shared)) {
Expand Down Expand Up @@ -3864,9 +3865,6 @@ update_iclass_classext(rb_classext_t *ext, bool is_prime, VALUE namespace, void
update_classext_values(objspace, ext, true);
}

extern rb_symbols_t ruby_global_symbols;
#define global_symbols ruby_global_symbols

struct global_vm_table_foreach_data {
vm_table_foreach_callback_func callback;
vm_table_update_callback_func update_callback;
Expand Down Expand Up @@ -3924,34 +3922,20 @@ vm_weak_table_cc_refinement_foreach_update_update(st_data_t *key, st_data_t data


static int
vm_weak_table_str_sym_foreach(st_data_t key, st_data_t value, st_data_t data, int error)
vm_weak_table_sym_set_foreach(VALUE *sym_ptr, void *data)
{
VALUE sym = *sym_ptr;
struct global_vm_table_foreach_data *iter_data = (struct global_vm_table_foreach_data *)data;

if (!iter_data->weak_only) {
int ret = iter_data->callback((VALUE)key, iter_data->data);
if (ret != ST_CONTINUE) return ret;
}

if (STATIC_SYM_P(value)) {
return ST_CONTINUE;
}
else {
return iter_data->callback((VALUE)value, iter_data->data);
}
}
if (RB_SPECIAL_CONST_P(sym)) return ST_CONTINUE;

static int
vm_weak_table_foreach_update_weak_value(st_data_t *key, st_data_t *value, st_data_t data, int existing)
{
struct global_vm_table_foreach_data *iter_data = (struct global_vm_table_foreach_data *)data;
int ret = iter_data->callback(sym, iter_data->data);

if (!iter_data->weak_only) {
int ret = iter_data->update_callback((VALUE *)key, iter_data->data);
if (ret != ST_CONTINUE) return ret;
if (ret == ST_REPLACE) {
ret = iter_data->update_callback(sym_ptr, iter_data->data);
}

return iter_data->update_callback((VALUE *)value, iter_data->data);
return ret;
}

struct st_table *rb_generic_fields_tbl_get(void);
Expand Down Expand Up @@ -4098,14 +4082,10 @@ rb_gc_vm_weak_table_foreach(vm_table_foreach_callback_func callback,
break;
}
case RB_GC_VM_GLOBAL_SYMBOLS_TABLE: {
if (global_symbols.str_sym) {
st_foreach_with_replace(
global_symbols.str_sym,
vm_weak_table_str_sym_foreach,
vm_weak_table_foreach_update_weak_value,
(st_data_t)&foreach_data
);
}
rb_sym_global_symbol_table_foreach_weak_reference(
vm_weak_table_sym_set_foreach,
&foreach_data
);
break;
}
case RB_GC_VM_ID2REF_TABLE: {
Expand Down
2 changes: 1 addition & 1 deletion hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ any_hash(VALUE a, st_index_t (*other_func)(VALUE))
hnum = rb_hash_start(hnum);
}
else {
hnum = RSYMBOL(a)->hashval;
hnum = RSHIFT(RSYMBOL(a)->hashval, 1);
}
break;
case T_FIXNUM:
Expand Down
14 changes: 7 additions & 7 deletions internal/concurrent_set.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#ifndef RUBY_RACTOR_SAFE_TABLE_H
#define RUBY_RACTOR_SAFE_TABLE_H

#include "ruby/atomic.h"
#include "ruby/ruby.h"

typedef VALUE (*rb_concurrent_set_hash_func)(VALUE key);
typedef bool (*rb_concurrent_set_cmp_func)(VALUE a, VALUE b);
typedef VALUE (*rb_concurrent_set_create_func)(VALUE key, void *data);

struct rb_concurrent_set_funcs {
rb_concurrent_set_hash_func hash;
rb_concurrent_set_cmp_func cmp;
rb_concurrent_set_create_func create;
VALUE (*hash)(VALUE key);
bool (*cmp)(VALUE a, VALUE b);
VALUE (*create)(VALUE key, void *data);
void (*free)(VALUE key);
};

VALUE rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity);
rb_atomic_t rb_concurrent_set_size(VALUE set_obj);
VALUE rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key);
VALUE rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data);
VALUE rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key);
void rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data);
Expand Down
Loading