Skip to content

Commit

Permalink
Merge 6d4ab47 into 213f840
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jan 11, 2020
2 parents 213f840 + 6d4ab47 commit ff3f4e9
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 1 deletion.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -29,6 +29,7 @@ matrix:
env: JRUBY_OPTS="--debug -X+O"
- rvm: truffleruby
- rvm: ruby-head
- rvm: ruby-head-scheduler --url https://github.com/ioquatix/ruby.git --branch thread-selector
allow_failures:
- rvm: truffleruby
- rvm: ruby-head
Expand Down
11 changes: 11 additions & 0 deletions lib/async/reactor.rb
Expand Up @@ -21,6 +21,7 @@
require_relative 'logger'
require_relative 'task'
require_relative 'wrapper'
require_relative 'scheduler'

require 'nio'
require 'timers'
Expand Down Expand Up @@ -80,10 +81,18 @@ def initialize(parent = nil, selector: self.class.selector, logger: nil)
@ready = []
@running = []

if Scheduler.supported?
@scheduler = Scheduler.new(self)
else
@scheduler = nil
end

@interrupted = false
@guard = Mutex.new
end

attr :scheduler

def logger
@logger ||= Console.logger
end
Expand Down Expand Up @@ -227,6 +236,7 @@ def run_once(timeout = nil)
def run(*arguments, &block)
raise RuntimeError, 'Reactor has been closed' if @selector.nil?

@scheduler&.set!
initial_task = self.async(*arguments, &block) if block_given?

while self.run_once
Expand All @@ -235,6 +245,7 @@ def run(*arguments, &block)

return initial_task
ensure
@scheduler&.clear!
logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end

Expand Down
96 changes: 96 additions & 0 deletions lib/async/scheduler.rb
@@ -0,0 +1,96 @@
# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'clock'

module Async
class Scheduler
if Thread.instance_methods.include?(:scheduler)
def self.supported?
true
end
else
def self.supported?
false
end
end

def initialize(reactor)
@reactor = reactor
@blocking_started_at = nil
end

def set!
if thread = Thread.current
thread.scheduler = self
end
end

def clear!
if thread = Thread.current
thread.scheduler = nil
end
end

private def from_descriptor(fd)
io = IO.for_fd(fd, autoclose: false)
return Wrapper.new(io, @reactor)
end

def wait_readable(fd, timeout = nil)
wrapper = from_descriptor(fd)
wrapper.wait_readable(timeout)
ensure
wrapper.reactor = nil
end

def wait_writable(fd)
wrapper = from_descriptor(fd)
wrapper.wait_writable(timeout)
ensure
wrapper.reactor = nil
end

def wait_for_single_fd(fd, events, duration)
wrapper = from_descriptor(fd)
wrapper.wait_any(duration)
ensure
wrapper.reactor = nil
end

def wait_sleep(duration)
@reactor.sleep(duration)
end

def enter_blocking_region
@blocking_started_at = Clock.now
end

def exit_blocking_region
duration = Clock.now - @blocking_started_at

if duration > 0.1
what = caller.first

warn "Blocking for #{duration.round(4)}s in #{what}." if $VERBOSE
end
end
end
end
57 changes: 57 additions & 0 deletions spec/async/scheduler_spec.rb
@@ -0,0 +1,57 @@
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'async/scheduler'
require 'io/nonblock'

RSpec.describe Async::Scheduler, if: Async::Scheduler.supported? do
include_context Async::RSpec::Reactor

it "can intercept sleep" do
expect(reactor).to receive(:sleep).with(0.001)

sleep(0.001)
end

describe 'IO.pipe' do
let(:message) {"Helloooooo World!"}

it "can send message via pipe" do
input, output = IO.pipe
input.nonblock = true
output.nonblock = true

reactor.async do
# This causes fd leakage for some reason:
# sleep(0.001)

message.each_char do |character|
output.write(character)
end

output.close
end

expect(input.read).to be == message

input.close
end
end
end
3 changes: 2 additions & 1 deletion spec/spec_helper.rb
@@ -1,5 +1,6 @@

require "covered/rspec"
require 'async/rspec'
require 'covered/rspec'

RSpec.configure do |config|
# Enable flags like --only-failures and --next-failure
Expand Down

0 comments on commit ff3f4e9

Please sign in to comment.