Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ Note: We're only listing outstanding class updates.
associated with the AST node. [[Feature #20624]]
* Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]]

* Fiber::Scheduler

* An optional `Fiber::Scheduler#blocking_operation_wait` hook allows blocking operations to be moved out of the
event loop in order to reduce latency and improve multi-core processor utilization. [[Feature #20876]]

## Stdlib updates

* Tempfile
Expand Down Expand Up @@ -236,3 +241,4 @@ details of the default gems or bundled gems.
[Feature #20497]: https://bugs.ruby-lang.org/issues/20497
[Feature #20624]: https://bugs.ruby-lang.org/issues/20624
[Feature #20775]: https://bugs.ruby-lang.org/issues/20775
[Feature #20876]: https://bugs.ruby-lang.org/issues/20876
1 change: 1 addition & 0 deletions common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -16689,6 +16689,7 @@ scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
scheduler.$(OBJEXT): {$(VPATH)}shape.h
scheduler.$(OBJEXT): {$(VPATH)}st.h
scheduler.$(OBJEXT): {$(VPATH)}subst.h
scheduler.$(OBJEXT): {$(VPATH)}thread.h
scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
scheduler.$(OBJEXT): {$(VPATH)}thread_native.h
scheduler.$(OBJEXT): {$(VPATH)}vm_core.h
Expand Down
20 changes: 20 additions & 0 deletions include/ruby/fiber/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,26 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
*/
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);

struct rb_fiber_scheduler_blocking_operation_state {
void *result;
int saved_errno;
};

/**
* Defer the execution of the passed function to the scheduler.
*
* @param[in] scheduler Target scheduler.
* @param[in] function The function to run.
* @param[in] data The data to pass to the function.
* @param[in] unblock_function The unblock function to use to interrupt the operation.
* @param[in] data2 The data to pass to the unblock function.
* @param[in] flags Flags passed to `rb_nogvl`.
* @param[out] state The result and errno of the operation.
* @retval RUBY_Qundef `scheduler` doesn't have `#blocking_operation_wait`.
* @return otherwise What `scheduler.blocking_operation_wait` returns.
*/
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);

/**
* Create and schedule a non-blocking fiber.
*
Expand Down
13 changes: 13 additions & 0 deletions include/ruby/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@
*/
#define RB_NOGVL_UBF_ASYNC_SAFE (0x2)

/**
* Passing this flag to rb_nogvl() indicates that the passed function
* is safe to offload to a background thread or work pool. In other words, the
* function is safe to run using a fiber scheduler's `blocking_operation_wait`.
* hook.
*
* If your function depends on thread-local storage, or thread-specific data
* operations/data structures, you should not set this flag, as
* these operations may behave differently (or fail) when run in a different
* thread/context (e.g. unlocking a mutex).
*/
#define RB_NOGVL_OFFLOAD_SAFE (0x4)

/** @} */

RBIMPL_SYMBOL_EXPORT_BEGIN()
Expand Down
2 changes: 1 addition & 1 deletion internal/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct rb_io;

#include "ruby/io.h" /* for rb_io_t */

#define IO_WITHOUT_GVL(func, arg) rb_thread_call_without_gvl(func, arg, RUBY_UBF_IO, 0)
#define IO_WITHOUT_GVL(func, arg) rb_nogvl(func, arg, RUBY_UBF_IO, 0, RB_NOGVL_OFFLOAD_SAFE)
#define IO_WITHOUT_GVL_INT(func, arg) (int)(VALUE)IO_WITHOUT_GVL(func, arg)

/** Ruby's IO, metadata and buffers. */
Expand Down
63 changes: 63 additions & 0 deletions scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "ruby/io.h"
#include "ruby/io/buffer.h"

#include "ruby/thread.h"

// For `ruby_thread_has_gvl_p`.
#include "internal/thread.h"

static ID id_close;
Expand All @@ -33,6 +36,8 @@ static ID id_io_close;

static ID id_address_resolve;

static ID id_blocking_operation_wait;

static ID id_fiber_schedule;

/*
Expand Down Expand Up @@ -109,6 +114,8 @@ Init_Fiber_Scheduler(void)

id_address_resolve = rb_intern_const("address_resolve");

id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");

id_fiber_schedule = rb_intern_const("fiber");

#if 0 /* for RDoc */
Expand Down Expand Up @@ -693,6 +700,62 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}

struct rb_blocking_operation_wait_arguments {
void *(*function)(void *);
void *data;
rb_unblock_function_t *unblock_function;
void *data2;
int flags;

struct rb_fiber_scheduler_blocking_operation_state *state;
};

static VALUE
rb_fiber_scheduler_blocking_operation_wait_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
{
struct rb_blocking_operation_wait_arguments *arguments = (struct rb_blocking_operation_wait_arguments*)_arguments;

if (arguments->state == NULL) {
rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
}

arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
arguments->state->saved_errno = rb_errno();

// Make sure it's only invoked once.
arguments->state = NULL;

return Qnil;
}

/*
* Document-method: Fiber::Scheduler#blocking_operation_wait
* call-seq: blocking_operation_wait(work)
*
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
*
* Minimal suggested implementation is:
*
* def blocking_operation_wait(work)
* Thread.new(&work).join
* end
*/
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
{
struct rb_blocking_operation_wait_arguments arguments = {
.function = function,
.data = data,
.unblock_function = unblock_function,
.data2 = data2,
.flags = flags,
.state = state
};

VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments);

return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
}

/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)
Expand Down
10 changes: 10 additions & 0 deletions test/fiber/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ def address_resolve(hostname)
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
end.value
end

def blocking_operation_wait(work)
thread = Thread.new(&work)

thread.join

thread = nil
ensure
thread&.kill
end
end

# This scheduler class implements `io_read` and `io_write` hooks which require
Expand Down
14 changes: 14 additions & 0 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,20 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2,
int flags)
{
if (flags & RB_NOGVL_OFFLOAD_SAFE) {
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
struct rb_fiber_scheduler_blocking_operation_state state;

VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);

if (!UNDEF_P(result)) {
rb_errno_set(state.saved_errno);
return state.result;
}
}
}

void *val = 0;
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
Expand Down