Skip to content

Commit

Permalink
Add support for native scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Dec 24, 2020
1 parent 732278d commit e403cc2
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 3 deletions.
68 changes: 66 additions & 2 deletions lib/async/reactor.rb
Expand Up @@ -23,6 +23,7 @@
require_relative 'logger'
require_relative 'task'
require_relative 'wrapper'
require_relative 'scheduler'

require 'nio'
require 'timers'
Expand Down Expand Up @@ -82,8 +83,56 @@ def initialize(parent = nil, selector: self.class.selector, logger: nil)
@ready = []
@running = []

if Scheduler.supported?
@scheduler = Scheduler.new(self)
else
@scheduler = nil
end

@interrupted = false
@guard = Mutex.new
@blocked = 0
@unblocked = []
end

attr :scheduler

# @reentrant Not thread safe.
def block(blocker, timeout)
fiber = Fiber.current

if timeout
timer = self.after(timeout) do
if fiber.alive?
fiber.resume(false)
end
end
end

begin
@blocked += 1
Fiber.yield
ensure
@blocked -= 1
end
ensure
timer&.cancel
end

# @reentrant Thread safe.
def unblock(blocker, fiber)
@guard.synchronize do
@unblocked << fiber
@selector.wakeup
end
end

def fiber(&block)
if @scheduler
Fiber.new(blocking: false, &block)
else
Fiber.new(&block)
end
end

def logger
Expand Down Expand Up @@ -154,7 +203,7 @@ def yield(fiber = Fiber.current)

def finished?
# TODO I'm not sure if checking `@running.empty?` is really required.
super && @ready.empty? && @running.empty?
super && @ready.empty? && @running.empty? && @blocked.zero?
end

# Run one iteration of the event loop.
Expand All @@ -174,6 +223,18 @@ def run_once(timeout = nil)
@running.clear
end

unless @blocked.zero?
unblocked = Array.new

@guard.synchronize do
unblocked, @unblocked = @unblocked, unblocked
end

while fiber = unblocked.pop
fiber.resume if fiber.alive?
end
end

if @ready.empty?
interval = @timers.wait_interval
else
Expand All @@ -197,7 +258,7 @@ def run_once(timeout = nil)
interval = timeout
end

# logger.debug(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."}
# logger.info(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."}
if monitors = @selector.select(interval)
monitors.each do |monitor|
monitor.value.resume
Expand All @@ -223,6 +284,8 @@ def run_once(timeout = nil)
def run(*arguments, **options, &block)
raise RuntimeError, 'Reactor has been closed' if @selector.nil?

@scheduler&.set!

initial_task = self.async(*arguments, **options, &block) if block_given?

while self.run_once
Expand All @@ -231,6 +294,7 @@ def run(*arguments, **options, &block)

return initial_task
ensure
@scheduler&.clear!
logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end

Expand Down
102 changes: 102 additions & 0 deletions lib/async/scheduler.rb
@@ -0,0 +1,102 @@
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'clock'

module Async
class Scheduler
if Fiber.respond_to?(:set_scheduler)
def self.supported?
true
end
else
def self.supported?
false
end
end

def initialize(reactor)
@reactor = reactor
@wrappers = nil
end

def set!
@wrappers = {}
Fiber.set_scheduler(self)
end

def clear!
# Because these instances are created with `autoclose: false`, this does not close the underlying file descriptor:
# @ios&.each_value(&:close)

@wrappers = nil
Fiber.set_scheduler(nil)
end

private def from_io(io)
@wrappers[io] ||= Wrapper.new(io, @reactor)
end

def io_wait(io, events, timeout = nil)
wrapper = from_io(io)

if events == IO::READABLE
if wrapper.wait_readable(timeout)
return IO::READABLE
end
elsif events == IO::WRITABLE
if wrapper.wait_writable(timeout)
return IO::WRITABLE
end
else
if wrapper.wait_any(timeout)
return events
end
end

return false
ensure
wrapper.reactor = nil
end

def kernel_sleep(duration)
@reactor.sleep(duration)
end

def block(blocker, timeout)
@reactor.block(blocker, timeout)
end

def unblock(blocker, fiber)
@reactor.unblock(blocker, fiber)
end

def close
end

def fiber(&block)
task = Task.new(&block)

task.resume

return task.fiber
end
end
end
110 changes: 110 additions & 0 deletions spec/async/scheduler_spec.rb
@@ -0,0 +1,110 @@
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'async/scheduler'
require 'async/barrier'
require 'net/http'

RSpec.describe Async::Scheduler, if: Async::Scheduler.supported? do
include_context Async::RSpec::Reactor

it "can intercept sleep" do
expect(reactor).to receive(:sleep).with(0.001)

sleep(0.001)
end

describe 'IO.pipe' do
let(:message) {"Helloooooo World!"}

it "can send message via pipe" do
input, output = IO.pipe

reactor.async do
sleep(0.001)

message.each_char do |character|
output.write(character)
end

output.close
end

expect(input.read).to be == message

input.close
end

it "can fetch website using Net::HTTP" do
barrier = Async::Barrier.new
events = []

3.times do |i|
barrier.async do
events << i
response = Net::HTTP.get(URI "https://www.codeotaku.com/index")
expect(response).to_not be_nil
events << i
end
end

barrier.wait

# The requests all get started concurrently:
expect(events.first(3)).to be == [0, 1, 2]
end
end

context "with thread" do
let(:queue) {Thread::Queue.new}
subject {Thread.new{queue.pop}}

it "can join thread" do
waiting = 0

3.times do
Async do
waiting += 1
subject.join
waiting -= 1
end
end

expect(waiting).to be == 3
queue.close
end
end

context "with queue" do
subject {Thread::Queue.new}
let(:item) {"Hello World"}

it "can pass items between thread and fiber" do
Async do
expect(subject.pop).to be == item
end

Thread.new do
expect(Fiber).to be_blocking
subject.push(item)
end.join
end
end
end
3 changes: 2 additions & 1 deletion spec/spec_helper.rb
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "covered/rspec"
require 'async/rspec'
require 'covered/rspec'

if RUBY_PLATFORM =~ /darwin/
Q = 20
Expand Down

0 comments on commit e403cc2

Please sign in to comment.