Permalink
Browse files

rework ThreadLocal

  • Loading branch information...
1 parent 3486bc2 commit 122cbf47bab5c16373e771066409321b145ed97e @rkh committed Feb 23, 2014
Showing with 110 additions and 23 deletions.
  1. +47 −5 lib/tool/thread_local.rb
  2. +63 −18 spec/thread_local_spec.rb
View
@@ -29,17 +29,59 @@ module Tool
# Thread.new { p local }.join # [:foo]
# p local # [:foo, :bar]
class ThreadLocal < Delegator
+ @mutex ||= Mutex.new
+ @locals ||= []
+
+ # Thread finalizer.
+ # @!visibility private
+ def self.cleanup(id)
+ @locals.keep_if do |local|
+ next false unless local.weakref_alive?
+ local.__cleanup__
+ true
+ end
+ end
+
+ # Generates weak reference to thread and sets up finalizer.
+ # @return [WeakRef]
+ # @!visibility private
+ def self.ref(thread)
+ thread[:weakref] ||= begin
+ ObjectSpace.define_finalizer(thread, method(:cleanup))
+ WeakRef.new(thread)
+ end
+ end
+
+ # @see #initialize
+ # @!visibility private
+ def self.new(*)
+ result = super(default)
+ @mutex.synchronize { @locals << WeakRef.new(result) }
+ result
+ end
+
def initialize(default = {})
- @mutex = Mutex.new
@default = default.dup
- @map = {}
+ @map = {}
end
+ # @see Delegator
# @!visibility private
def __getobj__
- ref = Thread.current[:weakref] ||= WeakRef.new(Thread.current)
- @map.delete_if { |key, value| !key.weakref_alive? }
- @mutex.synchronize { @map[ref] ||= @default.dup }
+ ref = ::Tool::ThreadLocal.ref(Thread.current)
+ @map[ref] ||= @default.dup
+ end
+
+ # @return [Integer] number of threads with specific locals
+ # @!visibility private
+ def __size__
+ @map.size
+ end
+
+ # Remove locals for dead or GC'ed threads
+ # @!visibility private
+ def __cleanup__
+ @map.keep_if { |key, value| key.weakref_alive? and key.alive? }
end
end
end
View
@@ -1,26 +1,71 @@
require 'tool/thread_local'
describe Tool::ThreadLocal do
- specify 'normal access' do
- subject[:foo] = 'bar'
- expect(subject[:foo]).to be == 'bar'
- end
+ describe :__getobj__ do
+ specify 'normal access' do
+ subject[:foo] = 'bar'
+ expect(subject[:foo]).to be == 'bar'
+ end
+
+ specify 'concurrent access' do
+ subject[:foo] = 'bar'
+ value = Thread.new { subject[:foo] = 'baz' }.value
+ expect(value).to be == 'baz'
+ expect(subject[:foo]).to be == 'bar'
+ end
+
+ specify 'with an array as value' do
+ list = Tool::ThreadLocal.new([])
+ foo = Thread.new { 10.times { list << :foo; sleep(0.01) }; list.to_a }
+ bar = Thread.new { 10.times { list << :bar; sleep(0.01) }; list.to_a }
+ expect(list).to be_empty
+ list << :list
+ expect(list) .to be == [ :list ]
+ expect(foo.value) .to be == [ :foo ] * 10
+ expect(bar.value) .to be == [ :bar ] * 10
+ end
+
+ specify 'deals with garbage collected threads' do
+ subject[:a] = 'A'
+
+ Thread.new do
+ subject[:b] = 'B'
+ Thread.new do
+ subject[:c] = 'C'
+ end.value
+ end.value
- specify 'concurrent access' do
- subject[:foo] = 'bar'
- value = Thread.new { subject[:foo] = 'baz' }.value
- expect(value).to be == 'baz'
- expect(subject[:foo]).to be == 'bar'
+ GC.start
+ expect(subject[:a]).to be == 'A'
+ end
end
- specify 'with an array as value' do
- list = Tool::ThreadLocal.new([])
- foo = Thread.new { 10.times { list << :foo; sleep(0.01) }; list.to_a }
- bar = Thread.new { 10.times { list << :bar; sleep(0.01) }; list.to_a }
- expect(list).to be_empty
- list << :list
- expect(list) .to be == [ :list ]
- expect(foo.value) .to be == [ :foo ] * 10
- expect(bar.value) .to be == [ :bar ] * 10
+ describe :__size__ do
+ specify 'with one thread' do
+ subject[:a] = 'A'
+ expect(subject.__size__).to be == 1
+ end
+
+ specify 'with multiple threads' do
+ subject[:a] = 'A'
+ thread = Thread.new { subject[:b] = 'B'; sleep }
+ sleep 0.01
+ expect(subject.__size__).to be == 2
+ thread.kill
+ end
+
+ specify 'with dead threads' do
+ subject[:a] = 'A'
+
+ Thread.new do
+ subject[:b] = 'B'
+ Thread.new do
+ subject[:c] = 'C'
+ end.value
+ end.value
+
+ GC.start
+ expect(subject.__size__).to be == 1
+ end
end
end

0 comments on commit 122cbf4

Please sign in to comment.