Skip to content

Commit 1737459

Browse files
Added -j support to rake.
Rake now has a thread_pool implementation which returns futures when passed args and a block. MultiTask has been changed to ask the thread pool for a list of futures in which inside each a prerequisite is completed. MultiTask then waits on each future until it is complete. The number of threads in the pool is controlled with the new -j option at the command-line. The thread pool is now a member of Rake.application and rakefile authors can request futures for their own operations, participating in the pool. The thread pool is special in that it will spawn a new thread when a thread in the pool is sleeping because it is waiting for a future being completed by another thread. When the new thread is finished, the pool size will shrink to where it was previously. With this change, the pool always has a number of threads actively doing work (that number being equal to the -j parameter). This commit also includes documentation for the new -j parameter and a test for the ThreadPool implementation.
1 parent 50218b0 commit 1737459

File tree

6 files changed

+295
-4
lines changed

6 files changed

+295
-4
lines changed

doc/command_line_usage.rdoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ Options are:
3737
[<tt>--help</tt> (-H)]
3838
Display some help text and exit.
3939

40+
[<tt>--jobs</tt> _number_ (-j)]
41+
Specifies the maximum number of concurrent tasks. The suggested
42+
value is equal to the number of CPUs.
43+
44+
Sample values:
45+
(no -j) : unlimited concurrent tasks (standard rake behavior)
46+
-j : 2 concurrent tasks (exact number may change)
47+
-j 16 : 16 concurrent tasks
48+
4049
[<tt>--libdir</tt> _directory_ (-I)]
4150
Add _directory_ to the list of directories searched for require.
4251

lib/rake/application.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require 'optparse'
33

44
require 'rake/task_manager'
5+
require 'rake/thread_pool'
56
require 'rake/win32'
67

78
module Rake
@@ -64,6 +65,7 @@ def run
6465
init
6566
load_rakefile
6667
top_level
68+
thread_pool.join
6769
end
6870
end
6971

@@ -106,6 +108,10 @@ def options
106108
@options ||= OpenStruct.new
107109
end
108110

111+
def thread_pool
112+
@thread_pool ||= ThreadPool.new options.thread_pool_size
113+
end
114+
109115
# private ----------------------------------------------------------------
110116

111117
def invoke_task(task_string)
@@ -325,6 +331,10 @@ def standard_rake_options
325331
"Execute some Ruby code, then continue with normal task processing.",
326332
lambda { |value| eval(value) }
327333
],
334+
['--jobs', '-j [NUMBER]',
335+
"Specifies the maximum number of tasks to execute in parallel. (default:2)",
336+
lambda { |value| options.thread_pool_size = [(value || 2).to_i,2].max }
337+
],
328338
['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.",
329339
lambda { |value| $:.push(value) }
330340
],

lib/rake/multi_task.rb

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ module Rake
66
class MultiTask < Task
77
private
88
def invoke_prerequisites(args, invocation_chain)
9-
threads = @prerequisites.collect { |p|
10-
Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
11-
}
12-
threads.each { |t| t.join }
9+
futures = @prerequisites.collect do |p|
10+
application.thread_pool.future(p) do |r|
11+
application[r, @scope].invoke_with_call_chain(args, invocation_chain)
12+
end
13+
end
14+
futures.each { |f| f.call }
1315
end
1416
end
1517

lib/rake/thread_pool.rb

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
require 'thread'
2+
require 'set'
3+
4+
module Rake
5+
6+
class ThreadPool
7+
8+
# Creates a ThreadPool object.
9+
# The parameter is the size of the pool. By default, the pool uses unlimited threads.
10+
def initialize(thread_count=nil)
11+
@max_thread_count = [(thread_count||FIXNUM_MAX), 0].max
12+
@threads = Set.new
13+
@threads_mon = Monitor.new
14+
@queue = Queue.new
15+
@join_cond = @threads_mon.new_cond
16+
end
17+
18+
# Creates a future to be executed in the ThreadPool.
19+
# The args are passed to the block when executing (similarly to Thread#new)
20+
# The return value is a Proc which may or may not be already executing in
21+
# another thread. Calling Proc#call will sleep the current thread until
22+
# the future is finished and will return the result (or raise an Exception
23+
# thrown from the future)
24+
def future(*args,&block)
25+
# capture the local args for the block (like Thread#start)
26+
local_args = args.collect { |a| begin; a.dup; rescue; a; end }
27+
28+
promise_mutex = Mutex.new
29+
promise_result = promise_error = NOT_SET
30+
31+
# (promise code builds on Ben Lavender's public-domain 'promise' gem)
32+
promise = lambda do
33+
# return immediately if the future has been executed
34+
unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
35+
return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
36+
end
37+
38+
# try to get the lock and execute the promise, otherwise, sleep.
39+
if promise_mutex.try_lock
40+
if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
41+
#execute the promise
42+
begin
43+
promise_result = block.call(*local_args)
44+
rescue Exception => e
45+
promise_error = e
46+
end
47+
block = local_args = nil # GC can now clean these up
48+
end
49+
promise_mutex.unlock
50+
else
51+
# Even if we didn't get the lock, we need to sleep until the promise has
52+
# finished executing. If, however, the current thread is part of the thread
53+
# pool, we need to free up a new thread in the pool so there will
54+
# always be a thread doing work.
55+
56+
wait_for_promise = lambda { promise_mutex.synchronize{} }
57+
58+
unless @threads_mon.synchronize { @threads.include? Thread.current }
59+
wait_for_promise.call
60+
else
61+
@threads_mon.synchronize { @max_thread_count += 1 }
62+
start_thread
63+
wait_for_promise.call
64+
@threads_mon.synchronize { @max_thread_count -= 1 }
65+
end
66+
end
67+
promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
68+
end
69+
70+
@queue.enq promise
71+
start_thread
72+
promise
73+
end
74+
75+
# Waits until the queue of futures is empty and all threads have exited.
76+
def join
77+
@threads_mon.synchronize do
78+
begin
79+
@join_cond.wait unless @threads.empty?
80+
rescue Exception => e
81+
STDERR.puts e
82+
STDERR.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
83+
STDERR.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
84+
STDERR.puts e.backtrace.join("\n")
85+
@threads.each do |t|
86+
STDERR.print "Thread #{t} status = #{t.status}\n"
87+
STDERR.puts t.backtrace.join("\n") if t.respond_to? :backtrace
88+
end
89+
raise e
90+
end
91+
end
92+
end
93+
94+
private
95+
def start_thread
96+
@threads_mon.synchronize do
97+
next unless @threads.count < @max_thread_count
98+
99+
@threads << Thread.new do
100+
begin
101+
while @threads.count <= @max_thread_count && !@queue.empty? do
102+
# Even though we just asked if the queue was empty,
103+
# it still could have had an item which by this statement is now gone.
104+
# For this reason we pass true to Queue#deq because we will sleep
105+
# indefinitely if it is empty.
106+
@queue.deq(true).call
107+
end
108+
rescue ThreadError # this means the queue is empty
109+
ensure
110+
@threads_mon.synchronize do
111+
@threads.delete Thread.current
112+
@join_cond.broadcast if @threads.empty?
113+
end
114+
end
115+
end
116+
end
117+
end
118+
119+
# for testing only
120+
121+
def __queue__
122+
@queue
123+
end
124+
125+
def __threads__
126+
@threads.dup
127+
end
128+
129+
NOT_SET = Object.new.freeze
130+
FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
131+
end
132+
133+
end

test/test_rake_application_options.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def test_default_options
4040
assert_nil opts.show_tasks
4141
assert_nil opts.silent
4242
assert_nil opts.trace
43+
assert_nil opts.thread_pool_size
4344
assert_equal ['rakelib'], opts.rakelib
4445
assert ! Rake::FileUtilsExt.verbose_flag
4546
assert ! Rake::FileUtilsExt.nowrite_flag
@@ -110,6 +111,18 @@ def test_help
110111
assert_equal :exit, @exit
111112
end
112113

114+
def test_jobs
115+
flags(['--jobs', '4'], ['-j', '4']) do |opts|
116+
assert_equal 4, opts.thread_pool_size
117+
end
118+
flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts|
119+
assert_equal 2, opts.thread_pool_size
120+
end
121+
flags('--jobs', '-j') do |opts|
122+
assert_equal 2, opts.thread_pool_size
123+
end
124+
end
125+
113126
def test_libdir
114127
flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts|
115128
$:.include?('xx')

test/test_rake_test_thread_pool.rb

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
require File.expand_path('../helper', __FILE__)
2+
require 'rake/thread_pool'
3+
require 'test/unit/assertions'
4+
5+
class TestRakeTestThreadPool < Rake::TestCase
6+
include Rake
7+
8+
def test_pool_executes_in_current_thread_for_zero_threads
9+
pool = ThreadPool.new(0)
10+
f = pool.future{Thread.current}
11+
pool.join
12+
assert_equal Thread.current, f.call
13+
end
14+
15+
def test_pool_executes_in_other_thread_for_pool_of_size_one
16+
pool = ThreadPool.new(1)
17+
f = pool.future{Thread.current}
18+
pool.join
19+
refute_equal Thread.current, f.call
20+
end
21+
22+
def test_pool_executes_in_two_other_threads_for_pool_of_size_two
23+
pool = ThreadPool.new(2)
24+
threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call}
25+
26+
refute_equal threads[0], threads[1]
27+
refute_equal Thread.current, threads[0]
28+
refute_equal Thread.current, threads[1]
29+
end
30+
31+
def test_pool_creates_the_correct_number_of_threads
32+
pool = ThreadPool.new(2)
33+
threads = Set.new
34+
t_mutex = Mutex.new
35+
10.times.each do
36+
pool.future do
37+
sleep 0.02
38+
t_mutex.synchronize{ threads << Thread.current }
39+
end
40+
end
41+
pool.join
42+
assert_equal 2, threads.count
43+
end
44+
45+
def test_pool_future_captures_arguments
46+
pool = ThreadPool.new(2)
47+
a = 'a'
48+
b = 'b'
49+
c = 5 # 5 throws an execption with 5.dup. It should be ignored
50+
pool.future(a,c){ |a_var,ignore| a_var.capitalize!; b.capitalize! }
51+
pool.join
52+
assert_equal 'a', a
53+
assert_equal 'b'.capitalize, b
54+
end
55+
56+
def test_pool_join_empties_queue
57+
pool = ThreadPool.new(2)
58+
repeat = 25
59+
repeat.times { pool.future do
60+
repeat.times { pool.future do
61+
repeat.times { pool.future do
62+
;
63+
end }
64+
end }
65+
end }
66+
67+
pool.join
68+
assert_equal true, pool.__send__(:__queue__).empty?
69+
end
70+
71+
# test that throwing an exception way down in the blocks propagates
72+
# to the top
73+
def test_exceptions
74+
pool = ThreadPool.new(10)
75+
76+
deep_exception_block = lambda do |count|
77+
next raise Exception.new if ( count < 1 )
78+
pool.future(count-1, &deep_exception_block).call
79+
end
80+
81+
assert_raises(Exception) do
82+
pool.future(2, &deep_exception_block).call
83+
end
84+
85+
end
86+
87+
def test_pool_always_has_max_threads_doing_work
88+
# here we need to test that even if some threads are halted, there
89+
# are always at least max_threads that are not sleeping.
90+
pool = ThreadPool.new(2)
91+
initial_sleep_time = 0.2
92+
future1 = pool.future { sleep initial_sleep_time }
93+
dependent_futures = 5.times.collect { pool.future{ future1.call } }
94+
future2 = pool.future { sleep initial_sleep_time }
95+
future3 = pool.future { sleep 0.01 }
96+
97+
sleep initial_sleep_time / 2.0 # wait for everything to queue up
98+
99+
# at this point, we should have 5 threads sleeping depending on future1, and
100+
# two threads doing work on future1 and future 2.
101+
assert_equal pool.__send__(:__threads__).count, 7
102+
103+
# future 3 is in the queue because there aren't enough active threads to work on it.
104+
assert_equal pool.__send__(:__queue__).size, 1
105+
106+
[future1, dependent_futures, future2, future3].flatten.each { |f| f.call }
107+
pool.join
108+
end
109+
110+
def test_pool_prevents_deadlock
111+
pool = ThreadPool.new(5)
112+
113+
common_dependency_a = pool.future { sleep 0.2 }
114+
futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } }
115+
116+
common_dependency_b = pool.future { futures_a.each { |f| f.call } }
117+
futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } }
118+
119+
(futures_b).each{|f|f.call}
120+
pool.join
121+
end
122+
123+
end
124+

0 commit comments

Comments
 (0)