Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 156 additions & 27 deletions lib/async/list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,27 @@ def initialize

# Print a short summary of the list.
def to_s
"#<#{self.class.name} size=#{@size}>"
sprintf("#<%s:0x%x size=%d>", self.class.name, object_id, @size)
end

alias inspect to_s

# Fast, safe, unbounded accumulation of children.
def to_a
items = []
current = self

while current.tail != self
unless current.tail.is_a?(Iterator)
items << current.tail
end

current = current.tail
end

return items
end

# Points at the end of the list.
attr_accessor :head

Expand Down Expand Up @@ -118,31 +134,47 @@ def remove(node)

# @returns [Boolean] Returns true if the list is empty.
def empty?
@tail.equal?(self)
@size == 0
end

# Iterate over each node in the linked list. It is generally safe to remove the current node, any previous node or any future node during iteration.
#
# @yields {|node| ...} Yields each node in the list.
# @returns [List] Returns self.
def each
return to_enum unless block_given?

current = self
def validate!(node = nil)
previous = self
current = @tail
found = node.equal?(self)

while true
node = current.tail
# binding.irb if node.nil? && !node.equal?(self)
break if node.equal?(self)
break if current.equal?(self)

if current.head != previous
raise "Invalid previous linked list node!"
end

yield node
if current.is_a?(List) and !current.equal?(self)
raise "Invalid list in list node!"
end

# If the node has deleted itself or any subsequent node, it will no longer be the next node, so don't use it for continued traversal:
if current.tail.equal?(node)
current = node
if node
found ||= current.equal?(node)
end

previous = current
current = current.tail
end

if node and !found
raise "Node not found in list!"
end
end

# Iterate over each node in the linked list. It is generally safe to remove the current node, any previous node or any future node during iteration.
#
# @yields {|node| ...} Yields each node in the list.
# @returns [List] Returns self.
def each(&block)
return to_enum unless block_given?

Iterator.each(self, &block)

return self
end

Expand All @@ -160,22 +192,119 @@ def include?(needle)

# @returns [Node] Returns the first node in the list, if it is not empty.
def first
unless @tail.equal?(self)
@tail
# validate!

current = @tail

while !current.equal?(self)
if current.is_a?(Iterator)
current = current.tail
else
return current
end
end

return nil
end

# @returns [Node] Returns the last node in the list, if it is not empty.
def last
unless @head.equal?(self)
@head
# validate!

current = @head

while !current.equal?(self)
if current.is_a?(Iterator)
current = current.head
else
return current
end
end

return nil
end
end

# A linked list Node.
class List::Node
attr_accessor :head
attr_accessor :tail

def shift
if node = first
remove!(node)
end
end

# A linked list Node.
class Node
attr_accessor :head
attr_accessor :tail

alias inspect to_s
end

class Iterator < Node
def initialize(list)
@list = list

# Insert the iterator as the first item in the list:
@tail = list.tail
@tail.head = self
list.tail = self
@head = list
end

def remove!
@head.tail = @tail
@tail.head = @head
@head = nil
@tail = nil
@list = nil
end

def move_next
# Move to the next item (which could be an iterator or the end):
@tail.head = @head
@head.tail = @tail
@head = @tail
@tail = @tail.tail
@head.tail = self
@tail.head = self
end

def move_current
while true
# Are we at the end of the list?
if @tail.equal?(@list)
return nil
end

if @tail.is_a?(Iterator)
move_next
else
return @tail
end
end
end

def each
while current = move_current
yield current

if current.equal?(@tail)
move_next
end
end
end

def self.each(list, &block)
list.validate!

return if list.empty?

iterator = Iterator.new(list)

iterator.each(&block)
ensure
iterator&.remove!
end
end

private_constant :Iterator
end
end
7 changes: 3 additions & 4 deletions lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,12 @@ def consume
if parent = @parent and finished?
parent.remove_child(self)

# If we have children, then we need to move them to our the parent if they are not finished:
if @children
@children.each do |child|
while child = @children.shift
if child.finished?
remove_child(child)
child.set_parent(nil)
else
# In theory we don't need to do this... because we are throwing away the list. However, if you don't correctly update the list when moving the child to the parent, it foobars the enumeration, and subsequent nodes will be skipped, or in the worst case you might start enumerating the parents nodes.
remove_child(child)
parent.add_child(child)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def schedule(&block)
self.root.resume(@fiber)
end

# Finish the current task, and all bound bound IO objects.
# Finish the current task, moving any children to the parent.
def finish!
# Allow the fiber to be recycled.
@fiber = nil
Expand Down
22 changes: 22 additions & 0 deletions test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,28 @@

expect(items).to be == [1, 2]
end

it "can stop a child task with transient children" do
parent = child = transient = nil

reactor.run do |task|
parent = task.async do |task|
transient = task.async(transient: true) do
sleep(1)
end

child = task.async do
sleep(1)
end
end

parent.wait
expect(parent).to be(:complete?)
parent.stop
expect(parent).to be(:stopped?)
expect(transient).to be(:running?)
end.wait
end
end

with '#sleep' do
Expand Down