Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add -j <max_jobs> and -m (all tasks multitasks) -- solution included #113

Closed
wants to merge 10 commits into from
14 changes: 13 additions & 1 deletion doc/command_line_usage.rdoc
Expand Up @@ -31,15 +31,27 @@ Options are:
[<tt>--execute-print</tt> _code_ (-p)]
Execute some Ruby code, print the result, and exit.

[<tt>--execute-continue</tt> _code_ (-p)]
[<tt>--execute-continue</tt> _code_ (-E)]
Execute some Ruby code, then continue with normal task processing.

[<tt>--help</tt> (-H)]
Display some help text and exit.

[<tt>--jobs</tt> _number_ (-j)]
Specifies the maximum number of concurrent tasks. The suggested
value is equal to the number of CPUs.

Sample values:
no -j : unlimited concurrent tasks (standard rake behavior)
only -j : 2 concurrent tasks
-j 16 : 16 concurrent tasks

[<tt>--libdir</tt> _directory_ (-I)]
Add _directory_ to the list of directories searched for require.

[<tt>--multitask</tt> (-m)]
Treat all tasks as multitasks. ('make/drake' semantics)

[<tt>--nosearch</tt> (-N)]
Do not search for a Rakefile in parent directories.

Expand Down
7 changes: 7 additions & 0 deletions lib/rake/application.rb
Expand Up @@ -325,9 +325,16 @@ def standard_rake_options
"Execute some Ruby code, then continue with normal task processing.",
lambda { |value| eval(value) }
],
['--jobs', '-j [NUMBER]',
"Specifies the maximum number of tasks to execute in parallel. (default:2)",
lambda { |value| options.thread_pool_size = [(value || 2).to_i,2].max }
],
['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.",
lambda { |value| $:.push(value) }
],
['--multitask', '-m', "Treat all tasks as multitasks.",
lambda { |value| options.always_multitask = true }
],
['--no-search', '--nosearch', '-N', "Do not search parent directories for the Rakefile.",
lambda { |value| options.nosearch = true }
],
Expand Down
9 changes: 3 additions & 6 deletions lib/rake/multi_task.rb
Expand Up @@ -5,12 +5,9 @@ module Rake
#
class MultiTask < Task
private
def invoke_prerequisites(args, invocation_chain)
threads = @prerequisites.collect { |p|
Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
}
threads.each { |t| t.join }
def invoke_prerequisites(task_args, invocation_chain) # :nodoc:
invoke_prerequisites_concurrently(task_args, invocation_chain)
end
end

end
end
24 changes: 19 additions & 5 deletions lib/rake/task.rb
@@ -1,4 +1,5 @@
require 'rake/invocation_exception_mixin'
require 'rake/worker_pool'

module Rake

Expand Down Expand Up @@ -171,10 +172,23 @@ def add_chain_to(exception, new_chain)

# Invoke all the prerequisites of a task.
def invoke_prerequisites(task_args, invocation_chain) # :nodoc:
prerequisite_tasks.each { |prereq|
prereq_args = task_args.new_scope(prereq.arg_names)
prereq.invoke_with_call_chain(prereq_args, invocation_chain)
if application.options.always_multitask
invoke_prerequisites_concurrently(task_args, invocation_chain)
else
prerequisite_tasks.each { |prereq|
prereq_args = task_args.new_scope(prereq.arg_names)
prereq.invoke_with_call_chain(prereq_args, invocation_chain)
}
end
end

def invoke_prerequisites_concurrently(args, invocation_chain)
@@wp ||= WorkerPool.new(application.options.thread_pool_size)

blocks = @prerequisites.collect { |r|
lambda { application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
}
@@wp.execute_blocks blocks
end

# Format the trace flags for display.
Expand All @@ -200,9 +214,9 @@ def execute(args=nil)
@actions.each do |act|
case act.arity
when 1
act.call(self)
act.call(self)
else
act.call(self, args)
act.call(self, args)
end
end
end
Expand Down
111 changes: 111 additions & 0 deletions lib/rake/worker_pool.rb
@@ -0,0 +1,111 @@
require 'thread'
require 'set'

module Rake
class WorkerPool
attr_accessor :maximum_size # this is the maximum size of the pool

def initialize(max = nil)
@threads = Set.new # this holds the set of threads in the pool
@waiting_threads = Set.new # set of threads waiting in #execute_blocks
@threads_mutex = Mutex.new # use this whenever r/w @threads
@queue = Queue.new # this holds blocks to be executed
@join_cv = ConditionVariable.new # alerts threads sleeping from calling #join
if (max && max > 0)
@maximum_size = max
else
@maximum_size = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
end
end

def execute_blocks(blocks)
mutex = Mutex.new
cv = ConditionVariable.new
exception = nil
unprocessed_block_count = blocks.count
mutex.synchronize {
blocks.each { |block|
@queue.enq lambda {
begin
block.call
rescue Exception => e
exception = e
ensure
# we *have* to have this 'ensure' because we *have* to
# call cv.signal to wake up WorkerPool#execute_blocks
# which is asleep because it called cv.wait(mutex)
mutex.synchronize { unprocessed_block_count -= 1; cv.signal }
end
}
}
was_in_set = @threads_mutex.synchronize {
@waiting_threads.add(Thread.current)
@threads.delete? Thread.current
}
ensure_thread_count(blocks.count)
cv.wait(mutex) until unprocessed_block_count == 0
@threads_mutex.synchronize {
@waiting_threads.delete(Thread.current)
@threads.add(Thread.current) if was_in_set
# shutdown the thread pool if we were the last thread
# waiting on the thread pool to process blocks
join if @waiting_threads.count == 0
}
}
# raise any exceptions that arose in the block (the last
# exception won)
raise exception if exception
end

def ensure_thread_count(count)
# here, we need to somehow make sure to add as many threads as
# are needed and no more. So (blocks.size - ready threads)
@threads_mutex.synchronize {
threads_needed = [[@maximum_size,count].min - @threads.size, 0].max
threads_needed.times do
t = Thread.new do
begin
while @threads.size <= @maximum_size
@queue.deq.call
end
ensure
@threads_mutex.synchronize {
@threads.delete(Thread.current)
@join_cv.signal
}
end
end
@threads.add t
end
}
end
private :ensure_thread_count

def join
# *** MUST BE CALLED inside @threads_mutex.synchronize{}
# because we don't want any threads added while we wait, only
# removed we set the maximum size to 0 and then add enough blocks
# to the queue so any sleeping threads will wake up and notice
# there are more threads than the limit and exit
saved_maximum_size, @maximum_size = @maximum_size, 0
@threads.each { @queue.enq lambda { ; } } # wake them all up

# here, we sleep and wait for a signal off @join_cv
# we will get it once for each sleeping thread so we watch the
# thread count
#
# avoid the temptation to change this to
# "<code> until <condition>". The condition needs to checked
# first or you will deadlock.
while (@threads.size > 0)
@join_cv.wait(@threads_mutex)
end

# now everything has been executed and we are ready to
# start accepting more work so we raise the limit back
@maximum_size = saved_maximum_size
end
private :join

end
end
20 changes: 20 additions & 0 deletions test/test_rake_application_options.rb
Expand Up @@ -33,13 +33,15 @@ def test_default_options
assert_nil opts.dryrun
assert_nil opts.ignore_system
assert_nil opts.load_system
assert_nil opts.always_multitask
assert_nil opts.nosearch
assert_equal ['rakelib'], opts.rakelib
assert_nil opts.show_prereqs
assert_nil opts.show_task_pattern
assert_nil opts.show_tasks
assert_nil opts.silent
assert_nil opts.trace
assert_nil opts.thread_pool_size
assert_equal ['rakelib'], opts.rakelib
assert ! Rake::FileUtilsExt.verbose_flag
assert ! Rake::FileUtilsExt.nowrite_flag
Expand Down Expand Up @@ -110,6 +112,18 @@ def test_help
assert_equal :exit, @exit
end

def test_jobs
flags(['--jobs', '4'], ['-j', '4']) do |opts|
assert_equal 4, opts.thread_pool_size
end
flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts|
assert_equal 2, opts.thread_pool_size
end
flags('--jobs', '-j') do |opts|
assert_equal 2, opts.thread_pool_size
end
end

def test_libdir
flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts|
$:.include?('xx')
Expand All @@ -118,6 +132,12 @@ def test_libdir
$:.delete('xx')
end

def test_multitask
flags('--multitask', '-m') do |opts|
assert_equal opts.always_multitask, true
end
end

def test_rakefile
flags(['--rakefile', 'RF'], ['--rakefile=RF'], ['-f', 'RF'], ['-fRF']) do |opts|
assert_equal ['RF'], @app.instance_eval { @rakefiles }
Expand Down
19 changes: 19 additions & 0 deletions test/test_rake_task.rb
Expand Up @@ -223,6 +223,25 @@ def c.timestamp() Time.now + 5 end
assert_in_delta now + 10, a.timestamp, 0.1, 'computer too slow?'
end

def test_all_multitask
mx = Mutex.new
result = ""
root = task :root
('aa'..'zz').each do |c|
task(c.to_sym) { mx.synchronize{ result << c } }
task(:root => c.to_sym)
end
root.invoke

root.prerequisite_tasks.each { |p| p.reenable };
root.reenable
task_result = result.dup; result.clear

Rake.application.options.always_multitask = true
root.invoke
refute_equal task_result, result
end

def test_investigation_output
t1 = task(:t1 => [:t2, :t3]) { |t| runlist << t.name; 3321 }
task(:t2)
Expand Down
90 changes: 90 additions & 0 deletions test/test_rake_test_worker_pool.rb
@@ -0,0 +1,90 @@
require File.expand_path('../helper', __FILE__)
require 'rake/worker_pool'
require 'test/unit/assertions'

class TestRakeTestWorkerPool < Rake::TestCase
include Rake

def test_block_order
mx = Mutex.new
block = lambda {|executor,count=20,result=""|
return if (count < 1)
mx.synchronize{ result << count.to_s }
sleep(rand * 0.01)
executor.call( [lambda {block.call(executor,count-1,result)}] )
result
}

old = lambda {|b|
threads = b.collect {|c| Thread.new(c) {|d| d.call } }
threads.each {|t| t.join }
}

wp = WorkerPool.new(4)
new = lambda {|b| wp.execute_blocks b }

assert_equal(block.call(old), block.call(new))
end

# test that there are no deadlocks within the worker pool itself
def test_deadlocks
wp = WorkerPool.new(10)
blocks = []
10.times {
inner_block = lambda {|count=5|
return if (count < 1)
sleep(rand * 0.000001)
inner_blocks = []
3.times { inner_blocks << lambda {inner_block.call(count-1)} }
wp.execute_blocks inner_blocks
}
blocks << inner_block
}
wp.execute_blocks blocks
end

# test that throwing an exception way down in the blocks propagates
# to the top
def test_exceptions
wp = WorkerPool.new(4)
deep_exception_block = lambda {|count=3|
raise Exception.new if ( count < 1 )
deep_exception_block.call(count-1)
}
assert_raises(Exception) do
wp.execute_blocks [deep_exception_block]
end
end

def test_thread_count
mutex = Mutex.new
expected_thread_count = 2
wp = WorkerPool.new(expected_thread_count)

# the lambda code will determine how many threads are running in
# the pool
blocks = []
thread_count = 0
should_sleep = true
(expected_thread_count*2).times do
blocks << lambda {
mutex.synchronize do; stack_prefix = "#{__FILE__}:#{__LINE__}" # this synchronize will be on the stack
sleep 1 if should_sleep # this lets all the threads wait on the mutex
threads = Thread.list
backtraces = threads.collect {|t| t.backtrace}
# sometimes a thread doesn't return a thread count
if ( threads.count == backtraces.count )
should_sleep = false
# finds all the backtraces that contain our mutex.synchronize call
our_bt = backtraces.find_all{|bt| bt && bt.index{|tr| tr.start_with? stack_prefix}!=nil }
thread_count = [thread_count, our_bt.count].max
end
end
}
end

wp.execute_blocks blocks
assert_equal(expected_thread_count, thread_count)
end
end