Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread Scheduler for light weight concurrency. #1870

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 92 additions & 6 deletions cont.c
Expand Up @@ -241,12 +241,17 @@ struct rb_fiber_struct {
*/
unsigned int transferred : 1;

/* Whether the fiber is allowed to implicitly yield. */
unsigned int blocking : 1;

struct coroutine_context context;
struct fiber_pool_stack stack;
};

static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0};

static ID fiber_initialize_keywords[2] = {0};

/*
* FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
* if MAP_STACK is passed.
Expand Down Expand Up @@ -1731,7 +1736,7 @@ fiber_alloc(VALUE klass)
}

static rb_fiber_t*
fiber_t_alloc(VALUE fiber_value)
fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
{
rb_fiber_t *fiber;
rb_thread_t *th = GET_THREAD();
Expand All @@ -1744,6 +1749,7 @@ fiber_t_alloc(VALUE fiber_value)
fiber = ZALLOC(rb_fiber_t);
fiber->cont.self = fiber_value;
fiber->cont.type = FIBER_CONTEXT;
fiber->blocking = blocking;
cont_init(&fiber->cont, th);

fiber->cont.saved_ec.fiber_ptr = fiber;
Expand All @@ -1761,9 +1767,9 @@ fiber_t_alloc(VALUE fiber_value)
}

static VALUE
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool)
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking)
{
rb_fiber_t *fiber = fiber_t_alloc(self);
rb_fiber_t *fiber = fiber_t_alloc(self, blocking);

fiber->first_proc = proc;
fiber->stack.base = NULL;
Expand Down Expand Up @@ -1791,17 +1797,68 @@ fiber_prepare_stack(rb_fiber_t *fiber)
sec->local_storage_recursive_hash_for_trace = Qnil;
}

static struct fiber_pool *
rb_fiber_pool_default(VALUE pool)
{
return &shared_fiber_pool;
}

/* :nodoc: */
static VALUE
rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
{
VALUE pool = Qnil;
VALUE blocking = Qtrue;

if (kw_splat != RB_NO_KEYWORDS) {
VALUE options = Qnil;
VALUE arguments[2] = {Qundef};

argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);

blocking = arguments[0];
pool = arguments[1];
}

return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
}

/* :nodoc: */
static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool);
return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}

VALUE
rb_fiber_new(rb_block_call_func_t func, VALUE obj)
{
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool);
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1);
}

static VALUE
rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
{
rb_thread_t * th = GET_THREAD();
VALUE scheduler = th->scheduler;
VALUE fiber = Qnil;

if (scheduler != Qnil) {
fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat);
} else {
fiber = fiber_initialize(fiber_alloc(rb_cFiber), rb_block_proc(), rb_fiber_pool_default(Qnil), 1);
}

rb_funcall(fiber, rb_intern("resume"), 0);

return fiber;
}

static VALUE
rb_f_fiber(int argc, VALUE *argv, VALUE obj)
{
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
}

static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt);
Expand All @@ -1818,6 +1875,10 @@ rb_fiber_start(void)
VM_ASSERT(th->ec == ruby_current_execution_context_ptr);
VM_ASSERT(FIBER_RESUMED_P(fiber));

if (fiber->blocking) {
th->blocking += 1;
}

EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont;
Expand Down Expand Up @@ -1851,6 +1912,10 @@ rb_fiber_start(void)
need_interrupt = TRUE;
}

// if (fiber->blocking) {
// th->blocking -= 1;
// }

rb_fiber_terminate(fiber, need_interrupt);
VM_UNREACHABLE(rb_fiber_start);
}
Expand Down Expand Up @@ -1890,6 +1955,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
fiber->cont.type = FIBER_CONTEXT;
fiber->cont.saved_ec.fiber_ptr = fiber;
fiber->cont.saved_ec.thread_ptr = th;
fiber->blocking = 1;
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
th->ec = &fiber->cont.saved_ec;
}
Expand Down Expand Up @@ -2042,11 +2108,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
}
}

VM_ASSERT(FIBER_RUNNABLE_P(fiber));

if (is_resume) {
fiber->prev = fiber_current();
}

VM_ASSERT(FIBER_RUNNABLE_P(fiber));
if (fiber_current()->blocking) {
th->blocking -= 1;
}

cont->argc = argc;
cont->kw_splat = kw_splat;
Expand All @@ -2058,6 +2128,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
fiber_stack_release(fiber);
}

if (fiber_current()->blocking) {
th->blocking += 1;
}

RUBY_VM_CHECK_INTS(th->ec);

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil);
Expand All @@ -2071,6 +2145,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS);
}

VALUE
rb_fiber_blocking_p(VALUE fiber)
{
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
}

void
rb_fiber_close(rb_fiber_t *fiber)
{
Expand Down Expand Up @@ -2440,6 +2520,9 @@ Init_Cont(void)

fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size);

fiber_initialize_keywords[0] = rb_intern_const("blocking");
fiber_initialize_keywords[1] = rb_intern_const("pool");

char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
if (fiber_shared_fiber_pool_free_stacks) {
shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks);
Expand All @@ -2450,11 +2533,14 @@ Init_Cont(void)
rb_eFiberError = rb_define_class("FiberError", rb_eStandardError);
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1);
rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);
rb_define_alias(rb_cFiber, "inspect", "to_s");

rb_define_global_function("Fiber", rb_f_fiber, -1);

#ifdef RB_EXPERIMENTAL_FIBER_POOL
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);
Expand Down
167 changes: 167 additions & 0 deletions doc/fiber.rdoc
@@ -0,0 +1,167 @@
= Fiber

Fiber is a flow-control primitive which enable cooperative scheduling. This is
in contrast to threads which can be preemptively scheduled at any time. While
having a similar memory profiles, the cost of context switching fibers can be
significantly less than threads as it does not involve a system call.

== Design

=== Thread Scheduler

The per-thread fiber scheduler interface is used to intercept blocking
operations. A typical implementation would be a wrapper for a gem like
EventMachine or Async. This design provides separation of concerns between the
event loop implementation and application code. It also allows for layered
schedulers which can perform instrumentation.

class Scheduler
# Wait for the given file descriptor to become readable.
def wait_readable(fd)
end

# Wait for the given file descriptor to become writable.
def wait_writable(fd)
end

# Wait for the given file descriptor to match the specified events within
# the specified timeout.
# @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
# `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
# @param timeout [#to_f] the amount of time to wait for the event.
def wait_for_single_fd(fd, events, timeout)
end

# Sleep the current task for the specified duration, or forever if not
# specified.
# @param duration [#to_f] the amount of time to sleep.
def wait_sleep(duration = nil)
end

# The Ruby virtual machine is going to enter a system level blocking
# operation.
def enter_blocking_region
end

# The Ruby virtual machine has completed the system level blocking
# operation.
def exit_blocking_region
end

# Intercept the creation of a non-blocking fiber.
def fiber(&block)
Fiber.new(blocking: false, &block)
end

# Invoked when the thread exits.
def run
# Implement event loop here.
end
end

=== Non-blocking Fibers

We introduce the concept of blocking and non-blocking fibers. By default, fibers
are blocking and there is no change in behaviour. In contrast, non-blocking
fibers may invoke specific scheduler hooks when a blocking operation occurs, and
these hooks may introduce context switching points.

Fiber.new(blocking: false) do
puts Fiber.current.blocking? # false

# May invoke `Thread.scheduler&.wait_readable`.
io.read(...)

# May invoke `Thread.scheduler&.wait_writable`.
io.write(...)

# Will invoke `Thread.scheduler&.wait_sleep`.
sleep(n)
end.resume

We also introduce a new method which simplifes the creation of these
non-blocking fibers:

Fiber do
puts Fiber.current.blocking? # false
end

The purpose of this method is to allow the scheduler to internally decide the
policy for when to start the fiber, and whether to use symmetric or asymmetric
fibers.

=== Non-blocking Threads

We introduce the concept of blocking and non-blocking threads. By default,
threads are blocking. When switching to a non-blocking fiber, we track this
state, and the thread may become non-blocking. When a non-blocking thread
invokes a blocking operation, it may defer the operation to the thread
scheduler.

puts Thread.current.blocking? # 1 (true)

Fiber.new(blocking: false) do
puts Thread.current.blocking? # false
end.resume

In addition, locking a mutex causes the thread to become blocking:

mutex = Mutex.new

puts Thread.current.blocking? # 1 (true)

Fiber.new(blocking: false) do
puts Thread.current.blocking? # false
mutex.synchronize do
puts Thread.current.blocking? # (1) true
end

puts Thread.current.blocking? # false
end.resume

=== Non-blocking Mutex

As a future feature, we may consider introducing non-blocking mutex.

mutex = Mutex.new(blocking: false)

Fiber.new(blocking: false) do
puts Thread.current.blocking? # false

mutex.synchronize do
puts Thread.current.blocking? # false
end
end.resume

Such a design will require extensions to the scheduler interface to support fair
scheduling of tasks waiting on a non-blocking mutex.

=== Non-blocking I/O

By default, I/O should be non-blocking. This ensures non-blocking fibers
provide maximum concurrency.

static int
rsock_socket0(int domain, int type, int proto)
{
#ifdef SOCK_CLOEXEC
type |= SOCK_CLOEXEC;
#endif

#ifdef SOCK_NONBLOCK
type |= SOCK_NONBLOCK;
#endif

int result = socket(domain, type, proto);

if (result == -1)
return -1;

rb_fd_fix_cloexec(result);

#ifndef SOCK_NONBLOCK
rsock_make_fd_nonblock(result);
#endif

return result;
}