Permalink
Browse files

Added tests. tidied up command-line ops. WorkerPool default size is F…

…IXNUM_MAX.

* The default WorkerPool maximum size was changed to FIXNUM_MAX. This
  is ok  because only enough blocks are added to the thread pool to
  cover the requested number of blocks (but not past the maximum).
* Added a #join call which clears the thread pool of all threads. This
  is called inside #execute_blocks when there are no more threads
  waiting on the thread pool.
* Suppressed "multithreads" output when specifying -m
* Removed unnecessary exception backtrace concatenating
* -j now is kinder when receiving bad input. If it has no parameter
  or the parameter can't be parsed, it defaults to 2.
* Added WorkerPool test
* added tests for -j and -m to the application options test.
  • Loading branch information...
1 parent 2a2bb2f commit f40087d6d508599b6c6ee98abd38ac1a184d4959 @michaeljbishop committed Apr 25, 2012
Showing with 167 additions and 36 deletions.
  1. +4 −9 lib/rake/application.rb
  2. +53 −27 lib/rake/worker_pool.rb
  3. +20 −0 test/test_rake_application_options.rb
  4. +90 −0 test/test_rake_test_worker_pool.rb
@@ -325,22 +325,17 @@ def standard_rake_options
"Execute some Ruby code, then continue with normal task processing.",
lambda { |value| eval(value) }
],
- ['--jobs', '-j NUMBER',
- "Specifies the maximum number of tasks to execute in parallel.",
+ ['--jobs', '-j [NUMBER]',
+ "Specifies the maximum number of tasks to execute in parallel. (default:2)",
lambda { |value|
- value_i = value.to_i
- if ( value_i.to_s == value && value_i > 0 )
- options.thread_pool_size = value_i
- else
- puts "received '-j #{value}'. '#{value}' should be a positive integer"
- end
+ options.thread_pool_size = [(value || 2).to_i,2].max
}
],
['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.",
lambda { |value| $:.push(value) }
],
['--multitask', '-m', "Treat all tasks as multitasks.",
- lambda { |value| options.always_multitask = true; puts "All tasks are multitasks" }
+ lambda { |value| options.always_multitask = true }
],
['--no-search', '--nosearch', '-N', "Do not search parent directories for the Rakefile.",
lambda { |value| options.nosearch = true }
@@ -7,20 +7,22 @@ class WorkerPool
def initialize(max = nil)
@threads = Set.new # this holds the set of threads in the pool
+ @waiting_threads = Set.new # set of threads waiting in #execute_blocks
@threads_mutex = Mutex.new # use this whenever r/w @threads
@queue = Queue.new # this holds blocks to be executed
+ @join_cv = ConditionVariable.new # alerts threads sleeping from calling #join
if (max && max > 0)
- @maximum_size= max
+ @maximum_size = max
else
- @maximum_size= 2 # why bother if it's not at least 2?
+ @maximum_size = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
end
end
def execute_blocks(blocks)
mutex = Mutex.new
cv = ConditionVariable.new
exception = nil
- unprocessed_block_count = blocks.size
+ unprocessed_block_count = blocks.count
mutex.synchronize {
blocks.each { |block|
@queue.enq lambda {
@@ -36,50 +38,74 @@ def execute_blocks(blocks)
end
}
}
- was_in_set = @threads_mutex.synchronize { @threads.delete? Thread.current }
- ensure_enough_threads
+ was_in_set = @threads_mutex.synchronize {
+ @waiting_threads.add(Thread.current)
+ @threads.delete? Thread.current
+ }
+ ensure_thread_count(blocks.count)
cv.wait(mutex) until unprocessed_block_count == 0
- @threads_mutex.synchronize { @threads.add Thread.current } if was_in_set
+ @threads_mutex.synchronize {
+ @waiting_threads.delete(Thread.current)
+ @threads.add(Thread.current) if was_in_set
+ # shutdown the thread pool if we were the last thread
+ # waiting on the thread pool to process blocks
+ join if @waiting_threads.count == 0
+ }
}
- # IMPORTANT: In order to trace execution through threads,
- # we concatenate the backtrace of the exception (thrown in a
- # different context), with the backtrace of the
- # current context. In this way, you can see the backtrace
- # all the way through from when you called #execute_block
- # to where it was raised in the thread that was executing
- # that block.
- #
- # backtrace looks like this:
- # exception.backtrace (in original thread context)
- # |
- # |
- # caller (in our context)
- if exception
- exception.set_backtrace exception.backtrace.concat(caller)
- raise exception
- end
+ # raise any exceptions that arose in the block (the last
+ # exception won)
+ raise exception if exception
end
- def ensure_enough_threads
+ def ensure_thread_count(count)
# here, we need to somehow make sure to add as many threads as
# are needed and no more. So (blocks.size - ready threads)
@threads_mutex.synchronize {
- threads_needed = [@maximum_size - @threads.size, 0].max
+ threads_needed = [[@maximum_size,count].min - @threads.size, 0].max
threads_needed.times do
t = Thread.new do
begin
while @threads.size <= @maximum_size
@queue.deq.call
end
ensure
- @threads_mutex.synchronize { @threads.delete(Thread.current) }
+ @threads_mutex.synchronize {
+ @threads.delete(Thread.current)
+ @join_cv.signal
+ }
end
end
@threads.add t
end
}
end
- private :ensure_enough_threads
+ private :ensure_thread_count
+ def join
+ # *** MUST BE CALLED inside @threads_mutex.synchronize{}
+ # because we don't want any threads added while we wait, only
+ # removed we set the maximum size to 0 and then add enough blocks
+ # to the queue so any sleeping threads will wake up and notice
+ # there are more threads than the limit and exit
+ saved_maximum_size, @maximum_size = @maximum_size, 0
+ @threads.each { @queue.enq lambda { ; } } # wake them all up
+
+ # here, we sleep and wait for a signal off @join_cv
+ # we will get it once for each sleeping thread so we watch the
+ # thread count
+ #
+ # avoid the temptation to change this to
+ # "<code> until <condition>". The condition needs to checked
+ # first or you will deadlock.
+ while (@threads.size > 0)
+ @join_cv.wait(@threads_mutex)
+ end
+
+ # now everything has been executed and we are ready to
+ # start accepting more work so we raise the limit back
+ @maximum_size = saved_maximum_size
+ end
+ private :join
+
end
end
@@ -33,13 +33,15 @@ def test_default_options
assert_nil opts.dryrun
assert_nil opts.ignore_system
assert_nil opts.load_system
+ assert_nil opts.always_multitask
assert_nil opts.nosearch
assert_equal ['rakelib'], opts.rakelib
assert_nil opts.show_prereqs
assert_nil opts.show_task_pattern
assert_nil opts.show_tasks
assert_nil opts.silent
assert_nil opts.trace
+ assert_nil opts.thread_pool_size
assert_equal ['rakelib'], opts.rakelib
assert ! Rake::FileUtilsExt.verbose_flag
assert ! Rake::FileUtilsExt.nowrite_flag
@@ -110,6 +112,18 @@ def test_help
assert_equal :exit, @exit
end
+ def test_jobs
+ flags(['--jobs', '4'], ['-j', '4']) do |opts|
+ assert_equal 4, opts.thread_pool_size
+ end
+ flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts|
+ assert_equal 2, opts.thread_pool_size
+ end
+ flags('--jobs', '-j') do |opts|
+ assert_equal 2, opts.thread_pool_size
+ end
+ end
+
def test_libdir
flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts|
$:.include?('xx')
@@ -118,6 +132,12 @@ def test_libdir
$:.delete('xx')
end
+ def test_multitask
+ flags('--multitask', '-m') do |opts|
+ assert_equal opts.always_multitask, true
+ end
+ end
+
def test_rakefile
flags(['--rakefile', 'RF'], ['--rakefile=RF'], ['-f', 'RF'], ['-fRF']) do |opts|
assert_equal ['RF'], @app.instance_eval { @rakefiles }
@@ -0,0 +1,90 @@
+require File.expand_path('../helper', __FILE__)
+require 'rake/worker_pool'
+require 'test/unit/assertions'
+
+class TestRakeTestWorkerPool < Rake::TestCase
+ include Rake
+
+ def test_block_order
+ mx = Mutex.new
+ block = lambda {|executor,count=20,result=""|
+ return if (count < 1)
+ mx.synchronize{ result << count.to_s }
+ sleep(rand * 0.01)
+ executor.call( [lambda {block.call(executor,count-1,result)}] )
+ result
+ }
+
+ old = lambda {|b|
+ threads = b.collect {|c| Thread.new(c) {|d| d.call } }
+ threads.each {|t| t.join }
+ }
+
+ wp = WorkerPool.new(4)
+ new = lambda {|b| wp.execute_blocks b }
+
+ assert_equal(block.call(old), block.call(new))
+ end
+
+ # test that there are no deadlocks within the worker pool itself
+ def test_deadlocks
+ wp = WorkerPool.new(10)
+ blocks = []
+ 10.times {
+ inner_block = lambda {|count=5|
+ return if (count < 1)
+ sleep(rand * 0.000001)
+ inner_blocks = []
+ 3.times { inner_blocks << lambda {inner_block.call(count-1)} }
+ wp.execute_blocks inner_blocks
+ }
+ blocks << inner_block
+ }
+ wp.execute_blocks blocks
+ end
+
+ # test that throwing an exception way down in the blocks propagates
+ # to the top
+ def test_exceptions
+ wp = WorkerPool.new(4)
+ deep_exception_block = lambda {|count=3|
+ raise Exception.new if ( count < 1 )
+ deep_exception_block.call(count-1)
+ }
+ assert_raises(Exception) do
+ wp.execute_blocks [deep_exception_block]
+ end
+ end
+
+ def test_thread_count
+ mutex = Mutex.new
+ expected_thread_count = 2
+ wp = WorkerPool.new(expected_thread_count)
+
+ # the lambda code will determine how many threads are running in
+ # the pool
+ blocks = []
+ thread_count = 0
+ should_sleep = true
+ (expected_thread_count*2).times do
+ blocks << lambda {
+ mutex.synchronize do; stack_prefix = "#{__FILE__}:#{__LINE__}" # this synchronize will be on the stack
+ sleep 1 if should_sleep # this lets all the threads wait on the mutex
+ threads = Thread.list
+ backtraces = threads.collect {|t| t.backtrace}
+ # sometimes a thread doesn't return a thread count
+ if ( threads.count == backtraces.count )
+ should_sleep = false
+ # finds all the backtraces that contain our mutex.synchronize call
+ our_bt = backtraces.find_all{|bt| bt && bt.index{|tr| tr.start_with? stack_prefix}!=nil }
+ thread_count = [thread_count, our_bt.count].max
+ end
+ end
+ }
+ end
+
+ wp.execute_blocks blocks
+ assert_equal(expected_thread_count, thread_count)
+ end
+end
+

0 comments on commit f40087d

Please sign in to comment.