Skip to content
This repository
Browse code

Added -j support to rake.

Rake now has a thread_pool implementation which returns futures when passed args
and a block. MultiTask has been changed to ask the thread pool for a list of
futures in which inside each a prerequisite is completed. MultiTask then waits
on each future until it is complete.

The number of threads in the pool is controlled with the new -j option at the
command-line.

The thread pool is now a member of Rake.application and rakefile authors can request
futures for their own operations, participating in the pool.

The thread pool is special in that it will spawn a new thread when a thread in the pool
is sleeping because it is waiting for a future being completed by another thread. When
the new thread is finished, the pool size will shrink to where it was previously.

With this change, the pool always has a number of threads actively doing work (that
number being equal to the -j parameter).

This commit also includes documentation for the new -j parameter and a test for the
ThreadPool implementation.
  • Loading branch information...
commit 173745949fa299d20a558c0511c02991f056933d 1 parent 50218b0
Michael Bishop michaeljbishop authored
9 doc/command_line_usage.rdoc
Source Rendered
@@ -37,6 +37,15 @@ Options are:
37 37 [<tt>--help</tt> (-H)]
38 38 Display some help text and exit.
39 39
  40 +[<tt>--jobs</tt> _number_ (-j)]
  41 + Specifies the maximum number of concurrent tasks. The suggested
  42 + value is equal to the number of CPUs.
  43 +
  44 + Sample values:
  45 + (no -j) : unlimited concurrent tasks (standard rake behavior)
  46 + -j : 2 concurrent tasks (exact number may change)
  47 + -j 16 : 16 concurrent tasks
  48 +
40 49 [<tt>--libdir</tt> _directory_ (-I)]
41 50 Add _directory_ to the list of directories searched for require.
42 51
10 lib/rake/application.rb
@@ -2,6 +2,7 @@
2 2 require 'optparse'
3 3
4 4 require 'rake/task_manager'
  5 +require 'rake/thread_pool'
5 6 require 'rake/win32'
6 7
7 8 module Rake
@@ -64,6 +65,7 @@ def run
64 65 init
65 66 load_rakefile
66 67 top_level
  68 + thread_pool.join
67 69 end
68 70 end
69 71
@@ -106,6 +108,10 @@ def options
106 108 @options ||= OpenStruct.new
107 109 end
108 110
  111 + def thread_pool
  112 + @thread_pool ||= ThreadPool.new options.thread_pool_size
  113 + end
  114 +
109 115 # private ----------------------------------------------------------------
110 116
111 117 def invoke_task(task_string)
@@ -325,6 +331,10 @@ def standard_rake_options
325 331 "Execute some Ruby code, then continue with normal task processing.",
326 332 lambda { |value| eval(value) }
327 333 ],
  334 + ['--jobs', '-j [NUMBER]',
  335 + "Specifies the maximum number of tasks to execute in parallel. (default:2)",
  336 + lambda { |value| options.thread_pool_size = [(value || 2).to_i,2].max }
  337 + ],
328 338 ['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.",
329 339 lambda { |value| $:.push(value) }
330 340 ],
10 lib/rake/multi_task.rb
@@ -6,10 +6,12 @@ module Rake
6 6 class MultiTask < Task
7 7 private
8 8 def invoke_prerequisites(args, invocation_chain)
9   - threads = @prerequisites.collect { |p|
10   - Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
11   - }
12   - threads.each { |t| t.join }
  9 + futures = @prerequisites.collect do |p|
  10 + application.thread_pool.future(p) do |r|
  11 + application[r, @scope].invoke_with_call_chain(args, invocation_chain)
  12 + end
  13 + end
  14 + futures.each { |f| f.call }
13 15 end
14 16 end
15 17
133 lib/rake/thread_pool.rb
... ... @@ -0,0 +1,133 @@
  1 +require 'thread'
  2 +require 'set'
  3 +
  4 +module Rake
  5 +
  6 + class ThreadPool
  7 +
  8 + # Creates a ThreadPool object.
  9 + # The parameter is the size of the pool. By default, the pool uses unlimited threads.
  10 + def initialize(thread_count=nil)
  11 + @max_thread_count = [(thread_count||FIXNUM_MAX), 0].max
  12 + @threads = Set.new
  13 + @threads_mon = Monitor.new
  14 + @queue = Queue.new
  15 + @join_cond = @threads_mon.new_cond
  16 + end
  17 +
  18 + # Creates a future to be executed in the ThreadPool.
  19 + # The args are passed to the block when executing (similarly to Thread#new)
  20 + # The return value is a Proc which may or may not be already executing in
  21 + # another thread. Calling Proc#call will sleep the current thread until
  22 + # the future is finished and will return the result (or raise an Exception
  23 + # thrown from the future)
  24 + def future(*args,&block)
  25 + # capture the local args for the block (like Thread#start)
  26 + local_args = args.collect { |a| begin; a.dup; rescue; a; end }
  27 +
  28 + promise_mutex = Mutex.new
  29 + promise_result = promise_error = NOT_SET
  30 +
  31 + # (promise code builds on Ben Lavender's public-domain 'promise' gem)
  32 + promise = lambda do
  33 + # return immediately if the future has been executed
  34 + unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
  35 + return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
  36 + end
  37 +
  38 + # try to get the lock and execute the promise, otherwise, sleep.
  39 + if promise_mutex.try_lock
  40 + if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
  41 + #execute the promise
  42 + begin
  43 + promise_result = block.call(*local_args)
  44 + rescue Exception => e
  45 + promise_error = e
  46 + end
  47 + block = local_args = nil # GC can now clean these up
  48 + end
  49 + promise_mutex.unlock
  50 + else
  51 + # Even if we didn't get the lock, we need to sleep until the promise has
  52 + # finished executing. If, however, the current thread is part of the thread
  53 + # pool, we need to free up a new thread in the pool so there will
  54 + # always be a thread doing work.
  55 +
  56 + wait_for_promise = lambda { promise_mutex.synchronize{} }
  57 +
  58 + unless @threads_mon.synchronize { @threads.include? Thread.current }
  59 + wait_for_promise.call
  60 + else
  61 + @threads_mon.synchronize { @max_thread_count += 1 }
  62 + start_thread
  63 + wait_for_promise.call
  64 + @threads_mon.synchronize { @max_thread_count -= 1 }
  65 + end
  66 + end
  67 + promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
  68 + end
  69 +
  70 + @queue.enq promise
  71 + start_thread
  72 + promise
  73 + end
  74 +
  75 + # Waits until the queue of futures is empty and all threads have exited.
  76 + def join
  77 + @threads_mon.synchronize do
  78 + begin
  79 + @join_cond.wait unless @threads.empty?
  80 + rescue Exception => e
  81 + STDERR.puts e
  82 + STDERR.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
  83 + STDERR.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
  84 + STDERR.puts e.backtrace.join("\n")
  85 + @threads.each do |t|
  86 + STDERR.print "Thread #{t} status = #{t.status}\n"
  87 + STDERR.puts t.backtrace.join("\n") if t.respond_to? :backtrace
  88 + end
  89 + raise e
  90 + end
  91 + end
  92 + end
  93 +
  94 + private
  95 + def start_thread
  96 + @threads_mon.synchronize do
  97 + next unless @threads.count < @max_thread_count
  98 +
  99 + @threads << Thread.new do
  100 + begin
  101 + while @threads.count <= @max_thread_count && !@queue.empty? do
  102 + # Even though we just asked if the queue was empty,
  103 + # it still could have had an item which by this statement is now gone.
  104 + # For this reason we pass true to Queue#deq because we will sleep
  105 + # indefinitely if it is empty.
  106 + @queue.deq(true).call
  107 + end
  108 + rescue ThreadError # this means the queue is empty
  109 + ensure
  110 + @threads_mon.synchronize do
  111 + @threads.delete Thread.current
  112 + @join_cond.broadcast if @threads.empty?
  113 + end
  114 + end
  115 + end
  116 + end
  117 + end
  118 +
  119 + # for testing only
  120 +
  121 + def __queue__
  122 + @queue
  123 + end
  124 +
  125 + def __threads__
  126 + @threads.dup
  127 + end
  128 +
  129 + NOT_SET = Object.new.freeze
  130 + FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
  131 + end
  132 +
  133 +end
13 test/test_rake_application_options.rb
@@ -40,6 +40,7 @@ def test_default_options
40 40 assert_nil opts.show_tasks
41 41 assert_nil opts.silent
42 42 assert_nil opts.trace
  43 + assert_nil opts.thread_pool_size
43 44 assert_equal ['rakelib'], opts.rakelib
44 45 assert ! Rake::FileUtilsExt.verbose_flag
45 46 assert ! Rake::FileUtilsExt.nowrite_flag
@@ -110,6 +111,18 @@ def test_help
110 111 assert_equal :exit, @exit
111 112 end
112 113
  114 + def test_jobs
  115 + flags(['--jobs', '4'], ['-j', '4']) do |opts|
  116 + assert_equal 4, opts.thread_pool_size
  117 + end
  118 + flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts|
  119 + assert_equal 2, opts.thread_pool_size
  120 + end
  121 + flags('--jobs', '-j') do |opts|
  122 + assert_equal 2, opts.thread_pool_size
  123 + end
  124 + end
  125 +
113 126 def test_libdir
114 127 flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts|
115 128 $:.include?('xx')
124 test/test_rake_test_thread_pool.rb
... ... @@ -0,0 +1,124 @@
  1 +require File.expand_path('../helper', __FILE__)
  2 +require 'rake/thread_pool'
  3 +require 'test/unit/assertions'
  4 +
  5 +class TestRakeTestThreadPool < Rake::TestCase
  6 + include Rake
  7 +
  8 + def test_pool_executes_in_current_thread_for_zero_threads
  9 + pool = ThreadPool.new(0)
  10 + f = pool.future{Thread.current}
  11 + pool.join
  12 + assert_equal Thread.current, f.call
  13 + end
  14 +
  15 + def test_pool_executes_in_other_thread_for_pool_of_size_one
  16 + pool = ThreadPool.new(1)
  17 + f = pool.future{Thread.current}
  18 + pool.join
  19 + refute_equal Thread.current, f.call
  20 + end
  21 +
  22 + def test_pool_executes_in_two_other_threads_for_pool_of_size_two
  23 + pool = ThreadPool.new(2)
  24 + threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call}
  25 +
  26 + refute_equal threads[0], threads[1]
  27 + refute_equal Thread.current, threads[0]
  28 + refute_equal Thread.current, threads[1]
  29 + end
  30 +
  31 + def test_pool_creates_the_correct_number_of_threads
  32 + pool = ThreadPool.new(2)
  33 + threads = Set.new
  34 + t_mutex = Mutex.new
  35 + 10.times.each do
  36 + pool.future do
  37 + sleep 0.02
  38 + t_mutex.synchronize{ threads << Thread.current }
  39 + end
  40 + end
  41 + pool.join
  42 + assert_equal 2, threads.count
  43 + end
  44 +
  45 + def test_pool_future_captures_arguments
  46 + pool = ThreadPool.new(2)
  47 + a = 'a'
  48 + b = 'b'
  49 + c = 5 # 5 throws an execption with 5.dup. It should be ignored
  50 + pool.future(a,c){ |a_var,ignore| a_var.capitalize!; b.capitalize! }
  51 + pool.join
  52 + assert_equal 'a', a
  53 + assert_equal 'b'.capitalize, b
  54 + end
  55 +
  56 + def test_pool_join_empties_queue
  57 + pool = ThreadPool.new(2)
  58 + repeat = 25
  59 + repeat.times { pool.future do
  60 + repeat.times { pool.future do
  61 + repeat.times { pool.future do
  62 + ;
  63 + end }
  64 + end }
  65 + end }
  66 +
  67 + pool.join
  68 + assert_equal true, pool.__send__(:__queue__).empty?
  69 + end
  70 +
  71 + # test that throwing an exception way down in the blocks propagates
  72 + # to the top
  73 + def test_exceptions
  74 + pool = ThreadPool.new(10)
  75 +
  76 + deep_exception_block = lambda do |count|
  77 + next raise Exception.new if ( count < 1 )
  78 + pool.future(count-1, &deep_exception_block).call
  79 + end
  80 +
  81 + assert_raises(Exception) do
  82 + pool.future(2, &deep_exception_block).call
  83 + end
  84 +
  85 + end
  86 +
  87 + def test_pool_always_has_max_threads_doing_work
  88 + # here we need to test that even if some threads are halted, there
  89 + # are always at least max_threads that are not sleeping.
  90 + pool = ThreadPool.new(2)
  91 + initial_sleep_time = 0.2
  92 + future1 = pool.future { sleep initial_sleep_time }
  93 + dependent_futures = 5.times.collect { pool.future{ future1.call } }
  94 + future2 = pool.future { sleep initial_sleep_time }
  95 + future3 = pool.future { sleep 0.01 }
  96 +
  97 + sleep initial_sleep_time / 2.0 # wait for everything to queue up
  98 +
  99 + # at this point, we should have 5 threads sleeping depending on future1, and
  100 + # two threads doing work on future1 and future 2.
  101 + assert_equal pool.__send__(:__threads__).count, 7
  102 +
  103 + # future 3 is in the queue because there aren't enough active threads to work on it.
  104 + assert_equal pool.__send__(:__queue__).size, 1
  105 +
  106 + [future1, dependent_futures, future2, future3].flatten.each { |f| f.call }
  107 + pool.join
  108 + end
  109 +
  110 + def test_pool_prevents_deadlock
  111 + pool = ThreadPool.new(5)
  112 +
  113 + common_dependency_a = pool.future { sleep 0.2 }
  114 + futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } }
  115 +
  116 + common_dependency_b = pool.future { futures_a.each { |f| f.call } }
  117 + futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } }
  118 +
  119 + (futures_b).each{|f|f.call}
  120 + pool.join
  121 + end
  122 +
  123 +end
  124 +

0 comments on commit 1737459

Please sign in to comment.
Something went wrong with that request. Please try again.