Skip to content

Commit

Permalink
MultiTask no longer spawns a thread for each prerequisite
Browse files Browse the repository at this point in the history
WorkerPool:
- now has the ability to execute an array of blocks and wait for them
all to execute.
- only adds a new thread when there is no thread waiting for action.
This slows the ramp up and threads are better reused
- has a minimum and maximum size. By default, minimum is 1 and maximum
is the maximum fixnum
- removed unused #wait call

MultiTask:
- Now uses WorkerPool#execte_blocks to execute its prerequisites
  • Loading branch information
michaeljbishop committed Apr 24, 2012
1 parent faa1ff1 commit 7fa886d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 45 deletions.
10 changes: 7 additions & 3 deletions lib/rake/multi_task.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'rake/worker_pool'

module Rake

# Same as a regular task, but the immediate prerequisites are done in
Expand All @@ -6,10 +8,12 @@ module Rake
class MultiTask < Task
private
def invoke_prerequisites(args, invocation_chain)
threads = @prerequisites.collect { |p|
Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
@@wp ||= WorkerPool.new

blocks = @prerequisites.collect { |r|
lambda { application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
}
threads.each { |t| t.join }
@@wp.execute_blocks blocks
end
end

Expand Down
78 changes: 36 additions & 42 deletions lib/rake/worker_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,41 @@ class WorkerPool

def initialize(max = nil)
@threads = Set.new # this holds the set of threads in the pool
@threads_mutex = Mutex.new # use this whenever r/w @threads
@ready_threads = Set.new # this holds the set of threads awaiting work
@threads_mutex = Mutex.new # use this whenever r/w @threads, @ready_threads
@queue = Queue.new # this holds blocks to be executed
@wait_cv = ConditionVariable.new # alerts threads sleeping from calling #wait
if (max && max > 0)
self.maximum_size= max
@maximum_size= max
else
self.maximum_size= (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
@maximum_size= (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
end
end

def execute_block(&block)
def execute_blocks(blocks)
mutex = Mutex.new
cv = ConditionVariable.new
exception = nil

remaining = blocks.size
mutex.synchronize {
@queue.enq lambda {
begin
block.call
rescue Exception => e
exception = e
ensure
# we *have* to have this 'ensure' because we *have* to
# call cv.signal to wake up WorkerPool#execute_block
# which is asleep because it called cv.wait(mutex)
mutex.synchronize{ cv.signal }
end
blocks.each { |block|
@queue.enq lambda {
begin
block.call
rescue Exception => e
exception = e
ensure
# we *have* to have this 'ensure' because we *have* to
# call cv.signal to wake up WorkerPool#execute_block
# which is asleep because it called cv.wait(mutex)
mutex.synchronize { remaining -= 1; cv.signal }
end
}
add_thread
}
add_thread
cv.wait(mutex)
while remaining > 0
cv.wait(mutex)
end
}
if exception
# IMPORTANT: In order to trace execution through threads,
Expand All @@ -53,26 +58,37 @@ def execute_block(&block)
# |
# caller (in our context)
#
# TODO: remove all portions of the backtrace that involve this
# file, but retain a list of full backtraces (in order)
# for the exception in each context. add (or use) an
# inspec method that shows the list of full backtraces
exception.set_backtrace exception.backtrace.concat(caller)
raise exception
end
end

def execute_block(&block)
execute_blocks [block]
end

def add_thread
@threads_mutex.synchronize {
if @threads.size >= self.maximum_size
if @threads.size >= @maximum_size || @ready_threads.size > 0
next
end
t = Thread.new do
begin
while @threads.size <= self.maximum_size
while @threads.size <= @maximum_size
@threads_mutex.synchronize{ @ready_threads.add(Thread.current) }
@queue.deq.call
@threads_mutex.synchronize{ @ready_threads.delete(Thread.current) }
end
ensure
# we *have* to have this 'ensure' because we *have* to
# call @wait_cv.signal. This wakes up the Thread
# that is sleeping because it called WorkerPool#wait
@threads_mutex.synchronize{
@ready_threads.delete(Thread.current)
@threads.delete(Thread.current)
@wait_cv.signal
}
Expand All @@ -83,27 +99,5 @@ def add_thread
end
private :add_thread

def wait
# we synchronize on @threads_mutex because we don't want
# any threads added while we wait, only removed
# we set the maximum size to 0 and then add enough blocks to the
# queue so any sleeping threads will wake up and notice there are
# more threads than the limit and exit
@threads_mutex.synchronize {
saved_maximum_size, @maximum_size = @maximum_size, 0
@threads.each { @queue.enq lambda { ; } } # wake them all up
# here, we sleep and wait for a signal off @wait_cv
# we will get it once for each sleeping thread so we watch the
# thread count
while (@threads.size > 0)
@wait_cv.wait(@threads_mutex)
end

# now everything has been executed and we are ready to
# start accepting more work so we raise the limit back
@maximum_size = saved_maximum_size
}
end

end
end

0 comments on commit 7fa886d

Please sign in to comment.