Skip to content

Commit

Permalink
Avoid race conditions in Scheduler#unblock.
Browse files Browse the repository at this point in the history
See <ruby/ruby#7460> for more details.
  • Loading branch information
ioquatix committed Mar 7, 2023
1 parent d2d3cf0 commit 2923d5c
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 4 deletions.
41 changes: 41 additions & 0 deletions examples/bugs/write_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env ruby

# This bug only generally shows up on Linux, when using io_uring, as it has more fine-grained locking. The issue is that `puts` can acquire and release a write lock, and if one thread releases that lock while the reactor on the waitq thread is closing, it can call `unblock` with `@selector = nil` which fails or causes odd behaviour.

require_relative '../../lib/async'

def wait_for_interrupt(thread_index, repeat)
sequence = []

events = Thread::Queue.new
reactor = Async::Reactor.new

thread = Thread.new do
if events.pop
puts "#{thread_index}+#{repeat} Sending Interrupt!"
reactor.interrupt
end
end

reactor.async do
events << true
puts "#{thread_index}+#{repeat} Reactor ready!"

# Wait to be interrupted:
sleep(1)

puts "#{thread_index}+#{repeat} Missing interrupt!"
end

reactor.run

thread.join
end

100.times.map do |thread_index|
Thread.new do
1000.times do |repeat|
wait_for_interrupt(thread_index, repeat)
end
end
end.each(&:join)
14 changes: 10 additions & 4 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def close
# We depend on GVL for consistency:
# @guard.synchronize do

@selector&.close
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
selector = @selector
@selector = nil

selector&.close

# end

consume
Expand All @@ -69,9 +72,10 @@ def to_s
end

# Interrupt the event loop and cause it to exit.
# @asynchronous May be called from any thread.
def interrupt
@interrupted = true
@selector.wakeup
@selector&.wakeup
end

# Transfer from the calling fiber to the event loop.
Expand Down Expand Up @@ -127,8 +131,10 @@ def unblock(blocker, fiber)
# $stderr.puts "unblock(#{blocker}, #{fiber})"

# This operation is protected by the GVL:
@selector.push(fiber)
@selector.wakeup
if selector = @selector
selector.push(fiber)
selector.wakeup
end
end

# @asynchronous May be non-blocking..
Expand Down
39 changes: 39 additions & 0 deletions test-segfault.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env ruby

require_relative 'lib/async'

def wait_for_interrupt(thread_index, repeat)
sequence = []

events = Thread::Queue.new
reactor = Async::Reactor.new

thread = Thread.new do
if events.pop
puts "#{thread_index}+#{repeat} Sending Interrupt!"
reactor.interrupt
end
end

reactor.async do
events << true
puts "#{thread_index}+#{repeat} Reactor ready!"

# Wait to be interrupted:
sleep(1)

puts "#{thread_index}+#{repeat} Missing interrupt!"
end

reactor.run

thread.join
end

32.times.map do |thread_index|
Thread.new do
100.times do |repeat|
wait_for_interrupt(thread_index, repeat)
end
end
end.each(&:join)
24 changes: 24 additions & 0 deletions test/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,28 @@
expect(duration).to be <= 0.1
end
end

with '#interrupt' do
it "can interrupt a closed scheduler" do
scheduler = Async::Scheduler.new
scheduler.close
scheduler.interrupt
end
end

with '#block' do
it "can block and unblock the scheduler after closing" do
scheduler = Async::Scheduler.new

fiber = Fiber.new do
scheduler.block(:test, nil)
end

fiber.transfer

expect do
scheduler.close
end.to raise_exception(RuntimeError, message: be =~ /Closing scheduler with blocked operations/)
end
end
end

0 comments on commit 2923d5c

Please sign in to comment.