diff --git a/.travis.yml b/.travis.yml index 44a8c5df..66109806 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,6 +29,7 @@ matrix: env: JRUBY_OPTS="--debug -X+O" - rvm: truffleruby - rvm: ruby-head + - rvm: ruby-head-scheduler --url https://github.com/ioquatix/ruby.git --branch thread-selector allow_failures: - rvm: truffleruby - rvm: ruby-head diff --git a/lib/async/reactor.rb b/lib/async/reactor.rb index 70bad015..2acaf8e6 100644 --- a/lib/async/reactor.rb +++ b/lib/async/reactor.rb @@ -21,6 +21,7 @@ require_relative 'logger' require_relative 'task' require_relative 'wrapper' +require_relative 'scheduler' require 'nio' require 'timers' @@ -80,10 +81,18 @@ def initialize(parent = nil, selector: self.class.selector, logger: nil) @ready = [] @running = [] + if Scheduler.supported? + @scheduler = Scheduler.new(self) + else + @scheduler = nil + end + @interrupted = false @guard = Mutex.new end + attr :scheduler + def logger @logger ||= Console.logger end @@ -227,6 +236,7 @@ def run_once(timeout = nil) def run(*arguments, &block) raise RuntimeError, 'Reactor has been closed' if @selector.nil? + @scheduler&.set! initial_task = self.async(*arguments, &block) if block_given? while self.run_once @@ -235,6 +245,7 @@ def run(*arguments, &block) return initial_task ensure + @scheduler&.clear! logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb new file mode 100644 index 00000000..391abff0 --- /dev/null +++ b/lib/async/scheduler.rb @@ -0,0 +1,96 @@ +# Copyright, 2017, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require_relative 'clock' + +module Async + class Scheduler + if Thread.instance_methods.include?(:scheduler) + def self.supported? + true + end + else + def self.supported? + false + end + end + + def initialize(reactor) + @reactor = reactor + @blocking_started_at = nil + end + + def set! + if thread = Thread.current + thread.scheduler = self + end + end + + def clear! + if thread = Thread.current + thread.scheduler = nil + end + end + + private def from_descriptor(fd) + io = IO.for_fd(fd, autoclose: false) + return Wrapper.new(io, @reactor) + end + + def wait_readable(fd, timeout = nil) + wrapper = from_descriptor(fd) + wrapper.wait_readable(timeout) + ensure + wrapper.reactor = nil + end + + def wait_writable(fd) + wrapper = from_descriptor(fd) + wrapper.wait_writable(timeout) + ensure + wrapper.reactor = nil + end + + def wait_for_single_fd(fd, events, duration) + wrapper = from_descriptor(fd) + wrapper.wait_any(duration) + ensure + wrapper.reactor = nil + end + + def wait_sleep(duration) + @reactor.sleep(duration) + end + + def enter_blocking_region + @blocking_started_at = Clock.now + end + + def exit_blocking_region + duration = Clock.now - @blocking_started_at + + if duration > 0.1 + what = caller.first + + warn "Blocking for #{duration.round(4)}s in #{what}." if $VERBOSE + end + end + end +end diff --git a/spec/async/scheduler_spec.rb b/spec/async/scheduler_spec.rb new file mode 100644 index 00000000..bff6fba1 --- /dev/null +++ b/spec/async/scheduler_spec.rb @@ -0,0 +1,57 @@ +# Copyright, 2020, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require 'async/scheduler' +require 'io/nonblock' + +RSpec.describe Async::Scheduler, if: Async::Scheduler.supported? do + include_context Async::RSpec::Reactor + + it "can intercept sleep" do + expect(reactor).to receive(:sleep).with(0.001) + + sleep(0.001) + end + + describe 'IO.pipe' do + let(:message) {"Helloooooo World!"} + + it "can send message via pipe" do + input, output = IO.pipe + input.nonblock = true + output.nonblock = true + + reactor.async do + # This causes fd leakage for some reason: + # sleep(0.001) + + message.each_char do |character| + output.write(character) + end + + output.close + end + + expect(input.read).to be == message + + input.close + end + end +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 817454f1..ce2cc969 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,5 +1,6 @@ -require "covered/rspec" +require 'async/rspec' +require 'covered/rspec' RSpec.configure do |config| # Enable flags like --only-failures and --next-failure