Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions examples/bugs/write_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env ruby

# This bug only generally shows up on Linux, when using io_uring, as it has more fine-grained locking. The issue is that `puts` can acquire and release a write lock, and if one thread releases that lock while the reactor on the waitq thread is closing, it can call `unblock` with `@selector = nil` which fails or causes odd behaviour.

require_relative '../../lib/async'

def wait_for_interrupt(thread_index, repeat)
sequence = []

events = Thread::Queue.new
reactor = Async::Reactor.new

thread = Thread.new do
if events.pop
puts "#{thread_index}+#{repeat} Sending Interrupt!"
reactor.interrupt
end
end

reactor.async do
events << true
puts "#{thread_index}+#{repeat} Reactor ready!"

# Wait to be interrupted:
sleep(1)

puts "#{thread_index}+#{repeat} Missing interrupt!"
end

reactor.run

thread.join
end

100.times.map do |thread_index|
Thread.new do
1000.times do |repeat|
wait_for_interrupt(thread_index, repeat)
end
end
end.each(&:join)
14 changes: 10 additions & 4 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def close
# We depend on GVL for consistency:
# @guard.synchronize do

@selector&.close
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
selector = @selector
@selector = nil

selector&.close

# end

consume
Expand All @@ -69,9 +72,10 @@ def to_s
end

# Interrupt the event loop and cause it to exit.
# @asynchronous May be called from any thread.
def interrupt
@interrupted = true
@selector.wakeup
@selector&.wakeup
end

# Transfer from the calling fiber to the event loop.
Expand Down Expand Up @@ -127,8 +131,10 @@ def unblock(blocker, fiber)
# $stderr.puts "unblock(#{blocker}, #{fiber})"

# This operation is protected by the GVL:
@selector.push(fiber)
@selector.wakeup
if selector = @selector
selector.push(fiber)
selector.wakeup
end
end

# @asynchronous May be non-blocking..
Expand Down
39 changes: 39 additions & 0 deletions test-segfault.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env ruby

require_relative 'lib/async'

def wait_for_interrupt(thread_index, repeat)
sequence = []

events = Thread::Queue.new
reactor = Async::Reactor.new

thread = Thread.new do
if events.pop
puts "#{thread_index}+#{repeat} Sending Interrupt!"
reactor.interrupt
end
end

reactor.async do
events << true
puts "#{thread_index}+#{repeat} Reactor ready!"

# Wait to be interrupted:
sleep(1)

puts "#{thread_index}+#{repeat} Missing interrupt!"
end

reactor.run

thread.join
end

32.times.map do |thread_index|
Thread.new do
100.times do |repeat|
wait_for_interrupt(thread_index, repeat)
end
end
end.each(&:join)
24 changes: 24 additions & 0 deletions test/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,28 @@
expect(duration).to be <= 0.1
end
end

with '#interrupt' do
it "can interrupt a closed scheduler" do
scheduler = Async::Scheduler.new
scheduler.close
scheduler.interrupt
end
end

with '#block' do
it "can block and unblock the scheduler after closing" do
scheduler = Async::Scheduler.new

fiber = Fiber.new do
scheduler.block(:test, nil)
end

fiber.transfer

expect do
scheduler.close
end.to raise_exception(RuntimeError, message: be =~ /Closing scheduler with blocked operations/)
end
end
end