Skip to content

Commit

Permalink
Redocument non-blocking Fibers and scheduler
Browse files Browse the repository at this point in the history
* Document Fiber's method related to scheduling;
* Extend Fiber's class docs with concepts of non-blocking
  fibers;
* Introduce "imaginary" (documentation-only) class
  Fiber::SchedulerInterface to properly document how
  scheduler's methods should look.
  • Loading branch information
zverok authored and marcandre committed Dec 24, 2020
1 parent 1729fd8 commit 1415653
Showing 1 changed file with 339 additions and 1 deletion.
340 changes: 339 additions & 1 deletion cont.c
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,28 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval)
* 1000000
* FiberError: dead fiber called
*
* == Non-blocking Fibers
*
* Since Ruby 3.0, the concept of <em>non-blocking fiber</em> was introduced.
* Non-blocking fiber, when reaching any potentially blocking operation (like
* sleep, wait for another process, wait for I/O data to be ready), instead
* of just freezing itself and all execution in the thread, yields control
* to other fibers, and allows the <em>scheduler</em> to handle waiting and waking
* (resuming) the fiber when it can proceed.
*
* For Fiber to behave as non-blocking, it should be created in Fiber.new with
* <tt>blocking: false</tt> (which is the default now), and Fiber.scheduler
* should be set with Fiber.set_scheduler. If Fiber.scheduler is not set in
* the current thread, blocking and non-blocking fiber's behavior is identical.
*
* Ruby doesn't provide a scheduler class: it is expected to be implemented by
* the user and correspond to Fiber::SchedulerInterface.
*
* There is also Fiber.schedule method, which is expected to immediately perform
* passed block in a non-blocking manner (but its actual implementation is up to
* the scheduler).
*
*
*/

static const rb_data_type_t fiber_data_type = {
Expand Down Expand Up @@ -1842,7 +1864,29 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
}

/* :nodoc: */
/*
* call-seq:
* Fiber.new(blocking: false) { |*args| ... } -> fiber
*
* Creates new Fiber. Initially, fiber is not running, but can be resumed with
* #resume. Arguments to the first #resume call would be passed to the block:
*
* f = Fiber.new do |initial|
* current = initial
* loop do
* puts "current: #{current.inspect}"
* current = Fiber.yield
* end
* end
* f.resume(100) # prints: current: 100
* f.resume(1, 2, 3) # prints: current: [1, 2, 3]
* f.resume # prints: current: nil
* # ... and so on ...
*
* if <tt>blocking: false</tt> is passed to the <tt>Fiber.new</tt>, _and_ current thread
* has Fiber.scheduler defined, the Fiber becames non-blocking (see "Non-blocking
* fibers" section in class docs).
*/
static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
Expand Down Expand Up @@ -1871,18 +1915,84 @@ rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
return fiber;
}

/*
* call-seq:
* Fiber.schedule { |*args| ... } -> fiber
*
* The method is <em>expected</em> to immediately run the provided block of code in a
* separate non-blocking fiber.
*
* puts "Go to sleep!"
*
* Fiber.set_scheduler(MyScheduler.new)
*
* Fiber.schedule do
* puts "Going to sleep"
* sleep(1)
* puts "I slept well"
* end
*
* puts "Wakey-wakey, sleepyhead"
*
* Assuming MyScheduler is properly implemented, this program will produce:
*
* Go to sleep!
* Going to sleep
* Wakey-wakey, sleepyhead
* ...1 sec pause here...
* I slept well
*
* ...e.g. on the first blocking operation inside the Fiber (<tt>sleep(1)</tt>),
* the control is yielded at the outside code (main fiber), and <em>at the end
* of the execution</em>, the scheduler takes care of properly resuming all the
* blocked fibers.
*
* Note that the behavior described above is how the method is <em>expected</em>
* to behave, actual behavior is up to the current scheduler's implementation of
* Fiber::SchedulerInterface#fiber method. Ruby doesn't enforce this method to
* behave in any particular way.
*
* If the scheduler is not set, the method raises
* <tt>RuntimeError (No scheduler is available!)</tt>.
*
*/
static VALUE
rb_f_fiber(int argc, VALUE *argv, VALUE obj)
{
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
}

/*
* call-seq:
* Fiber.scheduler -> obj or nil
*
* Fiber scheduler, set in the current thread with Fiber.set_scheduler. If the scheduler
* is +nil+ (which is the default), non-blocking fibers behavior is the same as blocking.
* (see "Non-blocking fibers" section in class docs for details about the scheduler concept).
*
*/
static VALUE
rb_fiber_scheduler(VALUE klass)
{
return rb_scheduler_get();
}

/*
* call-seq:
* Fiber.set_scheduler(scheduler) -> scheduler
*
* Sets Fiber scheduler for the current thread. If the scheduler is set, non-blocking
* fibers (created by Fiber.new with <tt>blocking: false</tt>, or by Fiber.schedule)
* call that scheduler's hook methods on potentially blocking operations, and the current
* thread will call scheduler's +close+ method on finalization (allowing the scheduler to
* properly manage all non-finished fibers).
*
* +scheduler+ can be an object of any class corresponding to Fiber::SchedulerInterface. Its
* implementation is up to the user.
*
* See also the "Non-blocking fibers" section in class docs.
*
*/
static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
Expand Down Expand Up @@ -2196,12 +2306,44 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, Qfalse, false);
}

/*
* call-seq:
* fiber.blocking? -> true or false
*
* Returns +true+ if +fiber+ is blocking and +false+ otherwise.
* Fiber is non-blocking if it was created via passing <tt>blocking: false</tt>
* to Fiber.new, or via Fiber.schedule.
*
* Note, that even if the method returns +false+, Fiber behaves differently
* only if Fiber.scheduler is set in the current thread.
*
* See the "Non-blocking fibers" section in class docs for details.
*
*/
VALUE
rb_fiber_blocking_p(VALUE fiber)
{
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
}

/*
* call-seq:
* Fiber.blocking? -> false or number
*
* Returns +false+ if the current fiber is non-blocking.
* Fiber is non-blocking if it was created via passing <tt>blocking: false</tt>
* to Fiber.new, or via Fiber.schedule.
*
* If the current Fiber is blocking, the method, unlike usual
* predicate methods, returns a *number* of blocking fibers currently
* running (TBD: always 1?).
*
* Note, that even if the method returns +false+, Fiber behaves differently
* only if Fiber.scheduler is set in the current thread.
*
* See the "Non-blocking fibers" section in class docs for details.
*
*/
static VALUE
rb_f_fiber_blocking_p(VALUE klass)
{
Expand Down Expand Up @@ -2707,6 +2849,191 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self)
* fiber.resume #=> FiberError: dead fiber called
*/

/*
* Document-class: Fiber::SchedulerInterface
*
* This is not an existing class, but documentation of the interface that Scheduler
* object should comply in order to be used as Fiber.scheduler and handle non-blocking
* fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
* of some concepts.
*
* Scheduler's behavior and usage are expected to be as follows:
*
* * When the execution in the non-blocking Fiber reaches some blocking operation (like
* sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
* hook methods, listed below.
* * Scheduler somehow registers what the current fiber is waited for, and yields control
* to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
* wait to end, and other fibers in the same thread can perform)
* * At the end of the current thread execution, the scheduler's method #close is called
* * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
* registered on hook calls) and resuming them when the awaited resource is ready (I/O
* ready, sleep time passed).
*
* A typical implementation would probably rely for this closing loop on a gem like
* EventMachine[https://github.com/eventmachine/eventmachine] or
* Async[https://github.com/socketry/async].
*
* This way concurrent execution will be achieved in a way that is transparent for every
* individual Fiber's code.
*
* Hook methods are:
*
* * #io_wait
* * #process_wait
* * #kernel_sleep
* * #block and #unblock
* * (the list is expanded as Ruby developers make more methods having non-blocking calls)
*
* When not specified otherwise, the hook implementations are mandatory: if they are not
* implemented, the methods trying to call hook will fail. To provide backward compatibility,
* in the future hooks will be optional (if they are not implemented, due to the scheduler
* being created for the older Ruby version, the code which needs this hook will not fail,
* and will just behave in a blocking fashion).
*
* It is also strongly suggested that the scheduler implement the #fiber method, which is
* delegated to by Fiber.schedule.
*
* Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
* <tt>test/fiber/scheduler.rb</tt>
*
*/

#if 0 /* for RDoc */
/*
*
* Document-method: Fiber::SchedulerInterface#close
*
* Called when the current thread exits. The scheduler is expected to implement this
* method in order to allow all waiting fibers to finalize their execution.
*
* The suggested pattern is to implement the main event loop in the #close method.
*
*/
static VALUE
rb_fiber_scheduler_interface_close(VALUE self)
{
}

/*
* Document-method: SchedulerInterface#process_wait
* call-seq: process_wait(pid, flags)
*
* Invoked by Process::Status.wait in order to wait for a specified process.
* See that method description for arguments description.
*
* Suggested minimal implementation:
*
* Thread.new do
* Process::Status.wait(pid, flags)
* end.value
*
* This hook is optional: if it is not present in the current scheduler,
* Process::Status.wait will behave as a blocking method.
*
* Expected to returns a Process::Status instance.
*/
static VALUE
rb_fiber_scheduler_interface_process_wait(VALUE self)
{
}

/*
* Document-method: SchedulerInterface#io_wait
* call-seq: io_wait(io, events, timeout)
*
* Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
* specified descriptor is ready for specified events within
* the specified +timeout+.
*
* +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
* <tt>IO::PRIORITY</tt>.
*
* Suggested implementation should register which Fiber is waiting for which
* resources and immediately calling Fiber.yield to pass control to other
* fibers. Then, in the #close method, the scheduler might dispatch all the
* I/O resources to fibers waiting for it.
*
* Expected to return the subset of events that are ready immediately.
*
*/
static VALUE
rb_fiber_scheduler_interface_io_wait(VALUE self)
{
}

/*
* Document-method: SchedulerInterface#kernel_sleep
* call-seq: kernel_sleep(duration = nil)
*
* Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
* an implementation of sleeping in a non-blocking way. Implementation might
* register the current fiber in some list of "what fiber waits till what
* moment", call Fiber.yield to pass control, and then in #close resume
* the fibers whose wait period have ended.
*
*/
static VALUE
rb_fiber_scheduler_interface_kernel_sleep(VALUE self)
{
}

/*
* Document-method: SchedulerInterface#block
* call-seq: block(blocker, timeout = nil)
*
* Invoked by methods like Thread.join, and by Mutex, to signify that current
* Fiber is blocked till further notice (e.g. #unblock) or till +timeout+ will
* pass.
*
* +blocker+ is what we are waiting on, informational only (for debugging and
* logging). There are no guarantees about its value.
*
* Expected to return boolean, specifying whether the blocking operation was
* successful or not.
*/
static VALUE
rb_fiber_scheduler_interface_block(VALUE self)
{
}

/*
* Document-method: SchedulerInterface#unblock
* call-seq: unblock(blocker, fiber)
*
* Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
* calls #block and Mutex#unlock calls #unblock). The scheduler should use
* the +fiber+ parameter to understand which fiber is unblocked.
*
* +blocker+ is what was awaited for, but it is informational only (for debugging
* and logging), and it is not guaranteed to be the same value as the +blocker+ for
* #block.
*
*/
static VALUE
rb_fiber_scheduler_interface_unblock(VALUE self)
{
}

/*
* Document-method: SchedulerInterface#fiber
* call-seq: fiber(&block)
*
* Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
* run passed block of code in a separate non-blocking fiber, and to return that Fiber.
*
* Minimal suggested implementation is:
*
* def fiber(&block)
* Fiber.new(blocking: false, &block).tap(&:resume)
* end
*/
static VALUE
rb_fiber_scheduler_interface_fiber(VALUE self)
{
}
#endif

void
Init_Cont(void)
{
Expand Down Expand Up @@ -2754,6 +3081,17 @@ Init_Cont(void)
rb_define_singleton_method(rb_cFiber, "schedule", rb_f_fiber, -1);
//rb_define_global_function("Fiber", rb_f_fiber, -1);

#if 0 /* for RDoc */
rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject);
rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0);
rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0);
rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0);
rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0);
rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0);
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0);
rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0);
#endif

#ifdef RB_EXPERIMENTAL_FIBER_POOL
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);
Expand Down

0 comments on commit 1415653

Please sign in to comment.