Skip to content

Commit

Permalink
Update wrapper implementation to support both "free" and "bound" IO.
Browse files Browse the repository at this point in the history
Free IO is not bound to any particular reactor, and can be shared between threads, but has a higher overhead for blocking operations.

Bound IO only works on the reactor it was bound to, but has better performance, and the lifetime of the wrapper is bound to the IO.
  • Loading branch information
ioquatix committed Mar 16, 2018
1 parent d9e6c89 commit 18042b5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ cache: bundler

matrix:
include:
- rvm: 2.0
- rvm: 2.1
- rvm: 2.2
- rvm: 2.3
- rvm: 2.4
Expand Down
6 changes: 3 additions & 3 deletions async.gemspec
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-

require_relative 'lib/async/version'

Gem::Specification.new do |spec|
Expand All @@ -21,9 +21,9 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.has_rdoc = "yard"

spec.required_ruby_version = "~> 2.0"
spec.required_ruby_version = ">= 2.2.7"

spec.add_runtime_dependency "nio4r"
spec.add_runtime_dependency "nio4r", "~> 2.0"
spec.add_runtime_dependency "timers", "~> 4.1"

spec.add_development_dependency "async-rspec", "~> 1.1"
Expand Down
44 changes: 29 additions & 15 deletions lib/async/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def initialize(io, reactor = nil)
@io = io

@reactor = reactor
@monitor = nil
end

# The underlying native `io`.
Expand All @@ -52,26 +53,41 @@ def wait_writable(duration = nil)
# @param interests [:r | :w | :rw] what events to wait for.
# @param duration [Float] timeout after the given duration if not `nil`.
def wait_any(interests = :rw, duration = nil)
monitor(interests, duration)
# There is value in caching this monitor - if you can reuse it, you will get about 2x the throughput, because you avoid calling Reactor#register and Monitor#close for every call. That being said, by caching it, you also introduce lifetime issues. I'm going to accept this overhead into the wrapper design because it's pretty convenient, but if you want faster IO, take a look at the performance spec which compares this method with a more direct alternative.
if @reactor
unless @monitor
@monitor = @reactor.register(@io, interests)
else
@monitor.interests = interests
end

begin
wait_for(@reactor, @monitor, duration)
ensure
@monitor.remove_interest(@monitor.interests)
end
else
reactor = Task.current.reactor
monitor = reactor.register(@io, interests)

begin
wait_for(reactor, monitor, duration)
ensure
monitor.close
end
end
end

# Close the monitor.
# Close the io and monitor.
def close
@io.close if @io
@monitor&.close

@io.close
end

private

def current_reactor
@reactor || Task.current.reactor
end

# Monitor the io for the given events
def monitor(interests, duration = nil)
reactor = current_reactor

monitor = reactor.register(@io, interests)

def wait_for(reactor, monitor, duration)
# If the user requested an explicit timeout for this operation:
if duration
reactor.timeout(duration) do
Expand All @@ -82,8 +98,6 @@ def monitor(interests, duration = nil)
end

return true
ensure
monitor.close if monitor
end
end
end

0 comments on commit 18042b5

Please sign in to comment.