Skip to content

Commit

Permalink
All of reactor spec passing using Fiber#transfer only.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jul 5, 2018
1 parent fd378c6 commit f881286
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 17 deletions.
64 changes: 64 additions & 0 deletions examples/reactor.rb
@@ -0,0 +1,64 @@
#!/usr/bin/env ruby

require 'fiber'

module Async
class Reactor
def initialize
@fiber = nil
@waiting = []
end

def yield
@fiber.transfer
end

def resume(fiber)
previous = @fiber

@fiber = Fiber.current
fiber.transfer

@fiber = previous
end

def async(&block)
fiber = Fiber.new do
block.call
self.yield
end

resume(fiber)
end

# Wait for some event...
def wait
@waiting << Fiber.current
self.yield
end

def run
while @waiting.any?
fiber = @waiting.pop
resume(fiber)
end
end
end
end

reactor = Async::Reactor.new

reactor.async do
puts "Hello World"
reactor.async do
puts "Goodbye World"
reactor.wait
puts "I'm back!"
end
puts "Foo Bar"
end

puts "Running"
reactor.run
puts "Finished"

4 changes: 3 additions & 1 deletion lib/async/condition.rb
Expand Up @@ -48,11 +48,13 @@ def empty?
# @see Task.yield which is responsible for handling value.
# @return [void]
def signal(value = nil)
reactor = Task.current.reactor

waiting = @waiting
@waiting = []

waiting.each do |fiber|
fiber.resume(value) if fiber.alive?
reactor.resume(fiber, value) if fiber.alive?
end

return nil
Expand Down
38 changes: 26 additions & 12 deletions lib/async/reactor.rb
Expand Up @@ -66,9 +66,28 @@ def initialize(parent = nil, selector: NIO::Selector.new)

@ready = []

# This is where we come back to when calling `#yield`.
@fiber = nil

@stopped = true
end

# Yield the current fiber and resume it on the next iteration of the event loop.
def yield(fiber = Fiber.current)
@ready << fiber if fiber

@fiber.transfer
end

def resume(fiber, *args)
previous = @fiber

@fiber = Fiber.current
fiber.transfer(*args)

@fiber = previous
end

def to_s
"<#{self.description} stopped=#{@stopped}>"
end
Expand Down Expand Up @@ -127,13 +146,6 @@ def << fiber
@ready << fiber
end

# Yield the current fiber and resume it on the next iteration of the event loop.
def yield(fiber = Fiber.current)
@ready << fiber

Fiber.yield
end

def finished?
super && @ready.empty?
end
Expand All @@ -151,7 +163,7 @@ def run(*args, &block)
@timers.wait do |interval|
if @ready.any?
@ready.each do |fiber|
fiber.resume if fiber.alive?
resume(fiber) if fiber.alive?
end

@ready.clear
Expand Down Expand Up @@ -179,7 +191,7 @@ def run(*args, &block)
# Async.logger.debug(self) {"Selecting with #{@children.count} fibers interval = #{interval.inspect}..."}
if monitors = @selector.select(interval)
monitors.each do |monitor|
monitor.value.resume
monitor.value.transfer
end
end
end until @stopped
Expand Down Expand Up @@ -215,11 +227,13 @@ def sleep(duration)

timer = self.after(duration) do
if fiber.alive?
fiber.resume
resume(fiber)
end
end

Task.yield
Task.yield do
self.yield(nil)
end
ensure
timer.cancel if timer
end
Expand All @@ -236,7 +250,7 @@ def timeout(duration)
if fiber.alive?
error = TimeoutError.new("execution expired")
error.set_backtrace backtrace
fiber.resume error
resume(fiber, error)
end
end

Expand Down
10 changes: 6 additions & 4 deletions lib/async/task.rb
Expand Up @@ -34,7 +34,7 @@ class Stop < Exception
class Task < Node
extend Forwardable

# Yield the unerlying `result` for the task. If the result
# Yield the underlying `result` for the task. If the result
# is an Exception, then that result will be raised an its
# exception.
# @return [Object] result of the task
Expand All @@ -44,7 +44,7 @@ def self.yield
if block_given?
result = yield
else
result = Fiber.yield
result = Task.current.reactor.yield
end

if result.is_a? Exception
Expand Down Expand Up @@ -86,6 +86,8 @@ def initialize(reactor, parent = Task.current?)
# Async.logger.debug("Task #{self} closing: #{$!}")
finish!
end

@reactor.yield(nil)
end
end

Expand Down Expand Up @@ -113,7 +115,7 @@ def yield
def run(*args)
if @status == :initialized
@status = :running
@fiber.resume(*args)
@reactor.resume(fiber, *args)
else
raise RuntimeError, "Task already running!"
end
Expand Down Expand Up @@ -149,7 +151,7 @@ def stop
@children.each(&:stop)

if @fiber.alive?
@fiber.resume(Stop.new)
@reactor.resume(@fiber, Stop.new)
end
end

Expand Down

0 comments on commit f881286

Please sign in to comment.