Skip to content

Commit

Permalink
Merge pull request #23398 from matthewd/interlock
Browse files Browse the repository at this point in the history
Address remaining known issues in Interlock
  • Loading branch information
matthewd committed Feb 1, 2016
2 parents f3e6e80 + f836630 commit f8167ac
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 23 deletions.
56 changes: 37 additions & 19 deletions activesupport/lib/active_support/concurrency/share_lock.rb
Expand Up @@ -48,17 +48,11 @@ def initialize
def start_exclusive(purpose: nil, compatible: [], no_wait: false)
synchronize do
unless @exclusive_thread == Thread.current
if busy?(purpose)
if busy_for_exclusive?(purpose)
return false if no_wait

loose_shares = @sharing.delete(Thread.current)
@waiting[Thread.current] = compatible if loose_shares

begin
@cv.wait_while { busy?(purpose) }
ensure
@waiting.delete Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
yield_shares(purpose, compatible) do
@cv.wait_while { busy_for_exclusive?(purpose) }
end
end
@exclusive_thread = Thread.current
Expand All @@ -71,22 +65,26 @@ def start_exclusive(purpose: nil, compatible: [], no_wait: false)

# Relinquish the exclusive lock. Must only be called by the thread
# that called start_exclusive (and currently holds the lock).
def stop_exclusive
def stop_exclusive(compatible: [])
synchronize do
raise "invalid unlock" if @exclusive_thread != Thread.current

@exclusive_depth -= 1
if @exclusive_depth == 0
@exclusive_thread = nil
@cv.broadcast

yield_shares(nil, compatible) do
@cv.broadcast
@cv.wait_while { @exclusive_thread || eligible_waiters?(compatible) }
end
end
end
end

def start_sharing
def start_sharing(purpose: :share)
synchronize do
if @exclusive_thread && @exclusive_thread != Thread.current
@cv.wait_while { @exclusive_thread }
if @sharing[Thread.current] == 0 && @exclusive_thread != Thread.current && busy_for_sharing?(purpose)
@cv.wait_while { busy_for_sharing?(purpose) }
end
@sharing[Thread.current] += 1
end
Expand All @@ -109,12 +107,12 @@ def stop_sharing
# the block.
#
# See +start_exclusive+ for other options.
def exclusive(purpose: nil, compatible: [], no_wait: false)
def exclusive(purpose: nil, compatible: [], after_compatible: [], no_wait: false)
if start_exclusive(purpose: purpose, compatible: compatible, no_wait: no_wait)
begin
yield
ensure
stop_exclusive
stop_exclusive(compatible: after_compatible)
end
end
end
Expand All @@ -132,11 +130,31 @@ def sharing
private

# Must be called within synchronize
def busy?(purpose)
(@exclusive_thread && @exclusive_thread != Thread.current) ||
@waiting.any? { |k, v| k != Thread.current && !v.include?(purpose) } ||
def busy_for_exclusive?(purpose)
busy_for_sharing?(purpose) ||
@sharing.size > (@sharing[Thread.current] > 0 ? 1 : 0)
end

def busy_for_sharing?(purpose)
(@exclusive_thread && @exclusive_thread != Thread.current) ||
@waiting.any? { |t, (_, c)| t != Thread.current && !c.include?(purpose) }
end

def eligible_waiters?(compatible)
@waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } }
end

def yield_shares(purpose, compatible)
loose_shares = @sharing.delete(Thread.current)
@waiting[Thread.current] = [purpose, compatible] if loose_shares

begin
yield
ensure
@waiting.delete Thread.current
@sharing[Thread.current] = loose_shares if loose_shares
end
end
end
end
end
6 changes: 3 additions & 3 deletions activesupport/lib/active_support/dependencies/interlock.rb
Expand Up @@ -8,13 +8,13 @@ def initialize # :nodoc:
end

def loading
@lock.exclusive(purpose: :load, compatible: [:load]) do
@lock.exclusive(purpose: :load, compatible: [:load], after_compatible: [:load]) do
yield
end
end

def unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload]) do
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload]) do
yield
end
end
Expand All @@ -24,7 +24,7 @@ def unloading
# concurrent activity, return immediately (without executing the
# block) instead of waiting.
def attempt_unloading
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], no_wait: true) do
@lock.exclusive(purpose: :unload, compatible: [:load, :unload], after_compatible: [:load, :unload], no_wait: true) do
yield
end
end
Expand Down
78 changes: 78 additions & 0 deletions activesupport/test/share_lock_test.rb
Expand Up @@ -114,14 +114,17 @@ def test_exclusive_conflicting_purpose
[true, false].each do |use_upgrading|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
begin
together = Concurrent::CyclicBarrier.new(2)
conflicting_exclusive_threads = [
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
together.wait
@lock.exclusive(purpose: :red, compatible: [:green, :purple]) {}
end
end,
Thread.new do
@lock.send(use_upgrading ? :sharing : :tap) do
together.wait
@lock.exclusive(purpose: :blue, compatible: [:green]) {}
end
end
Expand Down Expand Up @@ -183,11 +186,14 @@ def test_exclusive_ordering
load_params = [:load, [:load]]
unload_params = [:unload, [:unload, :load]]

all_sharing = Concurrent::CyclicBarrier.new(4)

[load_params, load_params, unload_params, unload_params].permutation do |thread_params|
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
threads = thread_params.map do |purpose, compatible|
Thread.new do
@lock.sharing do
all_sharing.wait
@lock.exclusive(purpose: purpose, compatible: compatible) do
scratch_pad_mutex.synchronize { scratch_pad << purpose }
end
Expand All @@ -209,6 +215,78 @@ def test_exclusive_ordering
end
end

def test_new_share_attempts_block_on_waiting_exclusive
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
release_exclusive = Concurrent::CountDownLatch.new

waiting_exclusive = Thread.new do
@lock.sharing do
@lock.exclusive do
release_exclusive.wait
end
end
end
assert_threads_stuck waiting_exclusive

late_share_attempt = Thread.new do
@lock.sharing {}
end
assert_threads_stuck late_share_attempt

sharing_thread_release_latch.count_down
assert_threads_stuck late_share_attempt

release_exclusive.count_down
assert_threads_not_stuck late_share_attempt
end
end

def test_share_remains_reentrant_ignoring_a_waiting_exclusive
with_thread_waiting_in_lock_section(:sharing) do |sharing_thread_release_latch|
ready = Concurrent::CyclicBarrier.new(2)
attempt_reentrancy = Concurrent::CountDownLatch.new

sharer = Thread.new do
@lock.sharing do
ready.wait
attempt_reentrancy.wait
@lock.sharing {}
end
end

exclusive = Thread.new do
@lock.sharing do
ready.wait
@lock.exclusive {}
end
end

assert_threads_stuck exclusive

attempt_reentrancy.count_down

assert_threads_not_stuck sharer
assert_threads_stuck exclusive
end
end

def test_compatible_exclusives_cooperate_to_both_proceed
ready = Concurrent::CyclicBarrier.new(2)
done = Concurrent::CyclicBarrier.new(2)

threads = 2.times.map do
Thread.new do
@lock.sharing do
ready.wait
@lock.exclusive(purpose: :x, compatible: [:x], after_compatible: [:x]) {}
done.wait
end
end
end

assert_threads_not_stuck threads
end

def test_in_shared_section_incompatible_non_upgrading_threads_cannot_preempt_upgrading_threads
scratch_pad = []
scratch_pad_mutex = Mutex.new
Expand Down
2 changes: 1 addition & 1 deletion railties/lib/rails/application/finisher.rb
Expand Up @@ -86,7 +86,7 @@ module Finisher
# added in the hook are taken into account.
initializer :set_clear_dependencies_hook, group: :all do
callback = lambda do
ActiveSupport::Dependencies.interlock.attempt_unloading do
ActiveSupport::Dependencies.interlock.unloading do
ActiveSupport::DescendantsTracker.clear
ActiveSupport::Dependencies.clear
end
Expand Down

0 comments on commit f8167ac

Please sign in to comment.