Skip to content

Loading…

Cleaned up the ThreadPool interface #135

Merged
merged 3 commits into from

3 participants

@michaeljbishop

Since the ThreadPool as currently checked into Rake 0.9.3.beta.2 is a public api, this change ensures that api is minimal, documented, and exposes the fewest dependencies necessary to function, allowing for future and gradual, expansion.

Most notably, the interface is completely documented and certain private things are hidden from documentation.

The futures returned by ThreadPool#future are of an undocumented type, but are guaranteed to have a #value method which will return the value of the future.

michaeljbishop added some commits
@michaeljbishop michaeljbishop Updated documentation for ThreadPool.
Made sure non-public methods were hidden from documentation.
Updated description of methods to be properly formatted.
4885946
@michaeljbishop michaeljbishop Removed some constants from rdoc generation
No functional change.
b28b7eb
@michaeljbishop michaeljbishop Rake::ThreadPool has a more conservative interface.
Thread.new has no default parameter. Returned futures have a #value
method.

- Rake::ThreadPool#initialize no longer documents or accepts a default
value for thread_count. The default has now been placed in
Rake::Application (and the default is FIXNUM_MAX)

- Futures returned by Rake::ThreadPool#future are still Proc instances,
but they now have an added method: #value which calls #call. #value is
the only documented method of the returned future. #value is chosen
because it matches Thread#value in behavior.
70f2b26
@jimweirich

Yes, I like the name "value" rather than "call" for this.

@jimweirich

Shouldn't this be updated to say that an opaque future object is returned, and that you need to call #value, not #call.

Yes, I should have combined a few commits into one before I pushed to GitHub. The updated documentation is in a subsequent commit.

70f2b26

@jimweirich
Owner

I see that now. Thanks.

@jimweirich jimweirich merged commit 70f2b26 into jimweirich:master
@jimweirich
Owner

I like that you are thinking about minimizing the API surface area (it's something I wish I paid more attention to in the early days of rake).

I'm actually thinking we shouldn't publish the thread pool as part of the Rake public API, at least not yet.

@michaeljbishop
@pvdb pvdb commented on the diff
lib/rake/application.rb
@@ -604,5 +604,9 @@ def rakefile_location backtrace = caller
backtrace.find { |str| str =~ re } || ''
end
+
+ private
+ FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # :nodoc:
+
@pvdb
pvdb added a note

Hey @michaeljbishop - the private declaration doesn't actually have the intended effect of making FIXNUM_MAX a private constant; see discussion in ruby/rake#80 for more context/investigation, in case you want to chip in, or vote up / vote down my proposed "fix" ... :smiley:

@pvdb
pvdb added a note

Or go straight to ruby/rake#82 for a more future-proof implementation of the constant!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 23, 2012
  1. @michaeljbishop

    Updated documentation for ThreadPool.

    michaeljbishop committed
    Made sure non-public methods were hidden from documentation.
    Updated description of methods to be properly formatted.
  2. @michaeljbishop

    Removed some constants from rdoc generation

    michaeljbishop committed
    No functional change.
  3. @michaeljbishop

    Rake::ThreadPool has a more conservative interface.

    michaeljbishop committed
    Thread.new has no default parameter. Returned futures have a #value
    method.
    
    - Rake::ThreadPool#initialize no longer documents or accepts a default
    value for thread_count. The default has now been placed in
    Rake::Application (and the default is FIXNUM_MAX)
    
    - Futures returned by Rake::ThreadPool#future are still Proc instances,
    but they now have an added method: #value which calls #call. #value is
    the only documented method of the returned future. #value is chosen
    because it matches Thread#value in behavior.
Showing with 36 additions and 29 deletions.
  1. +5 −1 lib/rake/application.rb
  2. +1 −1 lib/rake/multi_task.rb
  3. +17 −14 lib/rake/thread_pool.rb
  4. +13 −13 test/test_rake_thread_pool.rb
View
6 lib/rake/application.rb
@@ -109,7 +109,7 @@ def options
end
def thread_pool
- @thread_pool ||= ThreadPool.new options.thread_pool_size
+ @thread_pool ||= ThreadPool.new(options.thread_pool_size||FIXNUM_MAX)
end
# private ----------------------------------------------------------------
@@ -604,5 +604,9 @@ def rakefile_location backtrace = caller
backtrace.find { |str| str =~ re } || ''
end
+
+ private
+ FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # :nodoc:
+
@pvdb
pvdb added a note

Hey @michaeljbishop - the private declaration doesn't actually have the intended effect of making FIXNUM_MAX a private constant; see discussion in ruby/rake#80 for more context/investigation, in case you want to chip in, or vote up / vote down my proposed "fix" ... :smiley:

@pvdb
pvdb added a note

Or go straight to ruby/rake#82 for a more future-proof implementation of the constant!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
end
end
View
2 lib/rake/multi_task.rb
@@ -11,7 +11,7 @@ def invoke_prerequisites(args, invocation_chain)
application[r, @scope].invoke_with_call_chain(args, invocation_chain)
end
end
- futures.each { |f| f.call }
+ futures.each { |f| f.value }
end
end
View
31 lib/rake/thread_pool.rb
@@ -6,21 +6,21 @@ module Rake
class ThreadPool
# Creates a ThreadPool object.
- # The parameter is the size of the pool. By default, the pool uses unlimited threads.
- def initialize(thread_count=nil)
- @max_thread_count = [(thread_count||FIXNUM_MAX), 0].max
+ # The parameter is the size of the pool.
+ def initialize(thread_count)
+ @max_thread_count = [thread_count, 0].max
@threads = Set.new
@threads_mon = Monitor.new
@queue = Queue.new
@join_cond = @threads_mon.new_cond
end
- # Creates a future to be executed in the ThreadPool.
- # The args are passed to the block when executing (similarly to Thread#new)
- # The return value is a Proc which may or may not be already executing in
- # another thread. Calling Proc#call will sleep the current thread until
- # the future is finished and will return the result (or raise an Exception
- # thrown from the future)
+ # Creates a future executed by the +ThreadPool+.
+ # The args are passed to the block when executing (similarly to <tt>Thread#new</tt>)
+ # The return value is an object representing a future which has been created and
+ # added to the queue in the pool. Sending <tt>#value</tt> to the object will sleep
+ # the current thread until the future is finished and will return the result (or
+ # raise an exception thrown from the future)
def future(*args,&block)
# capture the local args for the block (like Thread#start)
local_args = args.collect { |a| begin; a.dup; rescue; a; end }
@@ -67,6 +67,10 @@ def future(*args,&block)
promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
end
+ def promise.value
+ call
+ end
+
@queue.enq promise
start_thread
promise
@@ -92,7 +96,7 @@ def join
end
private
- def start_thread
+ def start_thread # :nodoc:
@threads_mon.synchronize do
next unless @threads.count < @max_thread_count
@@ -118,16 +122,15 @@ def start_thread
# for testing only
- def __queue__
+ def __queue__ # :nodoc:
@queue
end
- def __threads__
+ def __threads__ # :nodoc:
@threads.dup
end
- NOT_SET = Object.new.freeze
- FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
+ NOT_SET = Object.new.freeze # :nodoc:
end
end
View
26 test/test_rake_thread_pool.rb
@@ -9,19 +9,19 @@ def test_pool_executes_in_current_thread_for_zero_threads
pool = ThreadPool.new(0)
f = pool.future{Thread.current}
pool.join
- assert_equal Thread.current, f.call
+ assert_equal Thread.current, f.value
end
def test_pool_executes_in_other_thread_for_pool_of_size_one
pool = ThreadPool.new(1)
f = pool.future{Thread.current}
pool.join
- refute_equal Thread.current, f.call
+ refute_equal Thread.current, f.value
end
def test_pool_executes_in_two_other_threads_for_pool_of_size_two
pool = ThreadPool.new(2)
- threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call}
+ threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.value}
refute_equal threads[0], threads[1]
refute_equal Thread.current, threads[0]
@@ -75,11 +75,11 @@ def test_exceptions
deep_exception_block = lambda do |count|
next raise Exception.new if ( count < 1 )
- pool.future(count-1, &deep_exception_block).call
+ pool.future(count-1, &deep_exception_block).value
end
assert_raises(Exception) do
- pool.future(2, &deep_exception_block).call
+ pool.future(2, &deep_exception_block).value
end
end
@@ -90,7 +90,7 @@ def test_pool_always_has_max_threads_doing_work
pool = ThreadPool.new(2)
initial_sleep_time = 0.2
future1 = pool.future { sleep initial_sleep_time }
- dependent_futures = 5.times.collect { pool.future{ future1.call } }
+ dependent_futures = 5.times.collect { pool.future{ future1.value } }
future2 = pool.future { sleep initial_sleep_time }
future3 = pool.future { sleep 0.01 }
@@ -103,7 +103,7 @@ def test_pool_always_has_max_threads_doing_work
# future 3 is in the queue because there aren't enough active threads to work on it.
assert_equal pool.__send__(:__queue__).size, 1
- [future1, dependent_futures, future2, future3].flatten.each { |f| f.call }
+ [future1, dependent_futures, future2, future3].flatten.each { |f| f.value }
pool.join
end
@@ -111,12 +111,12 @@ def test_pool_prevents_deadlock
pool = ThreadPool.new(5)
common_dependency_a = pool.future { sleep 0.2 }
- futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } }
+ futures_a = 10.times.collect { pool.future{ common_dependency_a.value; sleep(rand() * 0.01) } }
- common_dependency_b = pool.future { futures_a.each { |f| f.call } }
- futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } }
+ common_dependency_b = pool.future { futures_a.each { |f| f.value } }
+ futures_b = 10.times.collect { pool.future{ common_dependency_b.value; sleep(rand() * 0.01) } }
- (futures_b).each{|f|f.call}
+ (futures_b).each{|f|f.value}
pool.join
end
@@ -131,9 +131,9 @@ def test_pool_reports_correct_results
pool.future do
b.times.collect do
pool.future { sleep rand * 0.001; c }
- end.inject(0) { |m,f| m+f.call }
+ end.inject(0) { |m,f| m+f.value }
end
- end.inject(0) { |m,f| m+f.call }
+ end.inject(0) { |m,f| m+f.value }
assert_equal( (a*b*c), result )
pool.join
Something went wrong with that request. Please try again.