diff --git a/lib/async/reactor.rb b/lib/async/reactor.rb index 48016c54..609ff53f 100644 --- a/lib/async/reactor.rb +++ b/lib/async/reactor.rb @@ -23,6 +23,7 @@ require_relative 'logger' require_relative 'task' require_relative 'wrapper' +require_relative 'scheduler' require 'nio' require 'timers' @@ -82,8 +83,56 @@ def initialize(parent = nil, selector: self.class.selector, logger: nil) @ready = [] @running = [] + if Scheduler.supported? + @scheduler = Scheduler.new(self) + else + @scheduler = nil + end + @interrupted = false @guard = Mutex.new + @blocked = 0 + @unblocked = [] + end + + attr :scheduler + + # @reentrant Not thread safe. + def block(blocker, timeout) + fiber = Fiber.current + + if timeout + timer = self.after(timeout) do + if fiber.alive? + fiber.resume(false) + end + end + end + + begin + @blocked += 1 + Fiber.yield + ensure + @blocked -= 1 + end + ensure + timer&.cancel + end + + # @reentrant Thread safe. + def unblock(blocker, fiber) + @guard.synchronize do + @unblocked << fiber + @selector.wakeup + end + end + + def fiber(&block) + if @scheduler + Fiber.new(blocking: false, &block) + else + Fiber.new(&block) + end end def logger @@ -154,7 +203,7 @@ def yield(fiber = Fiber.current) def finished? # TODO I'm not sure if checking `@running.empty?` is really required. - super && @ready.empty? && @running.empty? + super && @ready.empty? && @running.empty? && @blocked.zero? end # Run one iteration of the event loop. @@ -174,6 +223,18 @@ def run_once(timeout = nil) @running.clear end + unless @blocked.zero? + unblocked = Array.new + + @guard.synchronize do + unblocked, @unblocked = @unblocked, unblocked + end + + while fiber = unblocked.pop + fiber.resume if fiber.alive? + end + end + if @ready.empty? interval = @timers.wait_interval else @@ -197,7 +258,7 @@ def run_once(timeout = nil) interval = timeout end - # logger.debug(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."} + # logger.info(self) {"Selecting with #{@children&.size} children with interval = #{interval ? interval.round(2) : 'infinite'}..."} if monitors = @selector.select(interval) monitors.each do |monitor| monitor.value.resume @@ -223,6 +284,8 @@ def run_once(timeout = nil) def run(*arguments, **options, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? + @scheduler&.set! + initial_task = self.async(*arguments, **options, &block) if block_given? while self.run_once @@ -231,6 +294,7 @@ def run(*arguments, **options, &block) return initial_task ensure + @scheduler&.clear! logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb new file mode 100644 index 00000000..94252fff --- /dev/null +++ b/lib/async/scheduler.rb @@ -0,0 +1,102 @@ +# Copyright, 2020, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require_relative 'clock' + +module Async + class Scheduler + if Fiber.respond_to?(:set_scheduler) + def self.supported? + true + end + else + def self.supported? + false + end + end + + def initialize(reactor) + @reactor = reactor + @wrappers = nil + end + + def set! + @wrappers = {} + Fiber.set_scheduler(self) + end + + def clear! + # Because these instances are created with `autoclose: false`, this does not close the underlying file descriptor: + # @ios&.each_value(&:close) + + @wrappers = nil + Fiber.set_scheduler(nil) + end + + private def from_io(io) + @wrappers[io] ||= Wrapper.new(io, @reactor) + end + + def io_wait(io, events, timeout = nil) + wrapper = from_io(io) + + if events == IO::READABLE + if wrapper.wait_readable(timeout) + return IO::READABLE + end + elsif events == IO::WRITABLE + if wrapper.wait_writable(timeout) + return IO::WRITABLE + end + else + if wrapper.wait_any(timeout) + return events + end + end + + return false + ensure + wrapper.reactor = nil + end + + def kernel_sleep(duration) + @reactor.sleep(duration) + end + + def block(blocker, timeout) + @reactor.block(blocker, timeout) + end + + def unblock(blocker, fiber) + @reactor.unblock(blocker, fiber) + end + + def close + end + + def fiber(&block) + task = Task.new(&block) + + task.resume + + return task.fiber + end + end +end diff --git a/spec/async/scheduler_spec.rb b/spec/async/scheduler_spec.rb new file mode 100644 index 00000000..4d715210 --- /dev/null +++ b/spec/async/scheduler_spec.rb @@ -0,0 +1,110 @@ +# Copyright, 2020, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require 'async/scheduler' +require 'async/barrier' +require 'net/http' + +RSpec.describe Async::Scheduler, if: Async::Scheduler.supported? do + include_context Async::RSpec::Reactor + + it "can intercept sleep" do + expect(reactor).to receive(:sleep).with(0.001) + + sleep(0.001) + end + + describe 'IO.pipe' do + let(:message) {"Helloooooo World!"} + + it "can send message via pipe" do + input, output = IO.pipe + + reactor.async do + sleep(0.001) + + message.each_char do |character| + output.write(character) + end + + output.close + end + + expect(input.read).to be == message + + input.close + end + + it "can fetch website using Net::HTTP" do + barrier = Async::Barrier.new + events = [] + + 3.times do |i| + barrier.async do + events << i + response = Net::HTTP.get(URI "https://www.codeotaku.com/index") + expect(response).to_not be_nil + events << i + end + end + + barrier.wait + + # The requests all get started concurrently: + expect(events.first(3)).to be == [0, 1, 2] + end + end + + context "with thread" do + let(:queue) {Thread::Queue.new} + subject {Thread.new{queue.pop}} + + it "can join thread" do + waiting = 0 + + 3.times do + Async do + waiting += 1 + subject.join + waiting -= 1 + end + end + + expect(waiting).to be == 3 + queue.close + end + end + + context "with queue" do + subject {Thread::Queue.new} + let(:item) {"Hello World"} + + it "can pass items between thread and fiber" do + Async do + expect(subject.pop).to be == item + end + + Thread.new do + expect(Fiber).to be_blocking + subject.push(item) + end.join + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d718307e..677bd84a 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true -require "covered/rspec" +require 'async/rspec' +require 'covered/rspec' if RUBY_PLATFORM =~ /darwin/ Q = 20