Skip to content

Commit

Permalink
Merge c7153df into 1c19348
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Aug 21, 2019
2 parents 1c19348 + c7153df commit 1404469
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 10 deletions.
43 changes: 33 additions & 10 deletions lib/async/task.rb
Expand Up @@ -27,6 +27,19 @@
module Async
# Raised when a task is explicitly stopped.
class Stop < Exception
class Later
def initialize(task)
@task = task
end

def alive?
true
end

def resume
@task.stop
end
end
end

# A task represents the state associated with the execution of an asynchronous
Expand Down Expand Up @@ -137,20 +150,28 @@ def wait
# Stop the task and all of its children.
# @return [void]
def stop
if self.stopping?
# If we are already stopping this task... don't try to stop it again.
return true
elsif self.running?
if self.stopping? || self.stopped?
# If we already stopped this task... don't try to stop it again:
return
end

if self.running?
previous_status = @status
@status = :stopping

if self.current?
raise Stop, "Stopping current fiber!"
elsif @fiber&.alive?
@fiber.resume(Stop.new)
begin
@fiber.resume(Stop.new)
rescue FiberError
@status = previous_status
@reactor << Stop::Later.new(self)
end
end
else
# We are not running, but children might be, so transition directly into stopped state:
stop!
end
ensure
@children&.each(&:stop)
end

# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
Expand Down Expand Up @@ -217,7 +238,9 @@ def fail!(exception = nil, propagate = true)
end

def stop!
# logger.debug(self) {"Task was stopped with #{@children.size} children!"}
@status = :stopped
@children&.each(&:stop)
end

def make_fiber(&block)
Expand All @@ -227,15 +250,15 @@ def make_fiber(&block)
begin
@result = yield(self, *args)
@status = :complete
# logger.debug("Task #{self} completed normally.")
# logger.debug(self) {"Task was completed with #{@children.size} children!"}
rescue Stop
stop!
rescue StandardError => error
fail!(error, false)
rescue Exception => exception
fail!(exception, true)
ensure
# logger.debug("Task #{self} closing: #{$!}")
# logger.debug(self) {"Task ensure $!=#{$!} with #{@children.size} children!"}
finish!
end
end
Expand Down
25 changes: 25 additions & 0 deletions spec/async/condition_spec.rb
Expand Up @@ -41,5 +41,30 @@
task.stop
end

it 'can stop nested task' do
producer = nil

consumer = reactor.async do |task|
condition = Async::Condition.new

producer = task.async do |subtask|
subtask.yield
condition.signal
subtask.sleep(10)
end

condition.wait
expect do
producer.stop
end.to_not raise_error
end

consumer.wait
producer.wait

expect(producer).to be_stopped
expect(consumer).to be_complete
end

it_behaves_like Async::Condition
end

0 comments on commit 1404469

Please sign in to comment.