Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

ThreadPool no longer creates new threads to compensate for threads sleeping on futures #139

Merged
merged 20 commits into from

2 participants

@michaeljbishop

This is an interesting, but still experimental version of the ThreadPool in
which the thread pool no longer needs to create new threads to
compensate for the imminent sleep of a thread on a future.

Previously, the ThreadPool created futures and added them to the queue.
If, when they processed a future and they had to wait for it to
finish executing, they would spawn another thread so they could
wait for the future.

Now, a non-blocking attempt to process the future's block is what is
put on the queue and a blocking attempt is passed back from #future.
In this way, the thread pool is free to process the core of a future
by dequeuing it and calling it. The the thread can't get the lock, it
is assumed that another thread is handling it and so it moves to
the next.

michaeljbishop and others added some commits
@michaeljbishop michaeljbishop Added some instrumentation to retieve stats from the threadpool
Two new methods to ThreadPool:

1 - #__begin_stats__
This turns on stat-keeping. You can only call it once.

2 - #__stats__
Retrieves all the current statistics. Locks the stats array while it
makes a duplicate. Best to call after #join.
a89a68d
@michaeljbishop michaeljbishop Merge branch 'master' of https://github.com/jimweirich/rake into no-s…
…leep-thread-pool

Conflicts:
	lib/rake/thread_pool.rb
2c04017
@michaeljbishop michaeljbishop Merge branch 'master' into no-sleep-thread-pool 93d3144
@michaeljbishop michaeljbishop ThreadPool threads no longer sleep on futures.
Previously, the ThreadPool created futures and added them to the queue.
If, when they processed a future and they had to wait for it to
finish executing, they would spawn another thread so they could
wait for the future.

Now, a non-blocking attempt to process the future's block is what is
put on the queue and a blocking attempt is passed back from #future.
In this way, the thread pool is free to process the core of a future
by dequeuing it and calling it. The the thread can't get the lock, it
is assumed that another thread is handling it and so it moves to
the next.
07272c4
@jimweirich add private_reader support 2053d6b
@jimweirich Merge branch 'master' of https://github.com/michaeljbishop/rake into …
…threadstats

* 'master' of https://github.com/michaeljbishop/rake:
  Sped up #stat sampling.
  Fixed the ThreadPool statistics code to maintain compatibility with 1.8.7.
bc546be
@jimweirich move dot to end of previous line
to maintain compatibility with Ruby 1.8
a467228
@jimweirich made the test less order dependent 841200e
@jimweirich Merge branch 'no-sleep-thread-pool' of https://github.com/michaeljbis…
…hop/rake into newpool

* 'no-sleep-thread-pool' of https://github.com/michaeljbishop/rake:
  ThreadPool threads no longer sleep on futures.
  Added some instrumentation to retieve stats from the threadpool
be8e4ad
@jimweirich improve order independence in test e768ae7
@jimweirich add promise_complete? method 8bf696d
@jimweirich whitespace a3ed6a4
@jimweirich more with promise_complete? 7d90f8b
@jimweirich should use value for futures (not call) 5afbc00
@jimweirich add comment to assert 8dcd41a
@jimweirich add ThreadPool::Promise d658fc8
@jimweirich de-lambda-fied the promise object. 57df3f4
@jimweirich
Owner

I've locally merged the new pool changes. In going over the code, I really felt that the promise should be its own object instead of a bunch of lambdas. I created a Promise object, originally just to contain the lambdas, but as I kept refactoring the lambdas eventually melted away.

I'm fairly happy with the result and feel I have a better grasp on how the threading / future / promise model is working.

The result is in the newpool branch. Take a look. If everything seems ok, I will merge this into master.

@michaeljbishop
@michaeljbishop
michaeljbishop added some commits
@michaeljbishop michaeljbishop Fixed small bug in ThreadHistoryDisplay where it made a default :data
value that should be a Hash.
e2a4aef
@michaeljbishop michaeljbishop Changed statistics to read more like a sentence.
Example:

  220823  F attempting_lock_on   item_id:20
  225342  F did_execute          item_id:20

Added more statistics to capture more detail.
9ca0481
@michaeljbishop michaeljbishop Removed inaccurate statistic about a thread awake. 02663c2
@jimweirich
Owner

Re: chore ... Yes, I originally called it core for the lambda, but at some point the name slipped out.

I'm away from my laptop at the moment, I'll review the changes later tonight.

Quick question about thread life time ... Looks like the threads voluntarily die if the queue is empty. This works because adding a new item to the queue tries to start a new thread. First, is my understanding correct? If so. What do you think about keeping the thread around until the entire job is done, rather than trying to shrink the pool? Currently the code seems to have the potential to shrink/grow/shrink/grow. Just thinking out loud, and maybe that's not a big concern. Thoughts welcome.

@michaeljbishop
@jimweirich jimweirich merged commit 02663c2 into jimweirich:master
@jimweirich
Owner

Hmm, just a note. I was testing with Rubinius and discovered that TestRakeTestThreadPool#test_pool_join_empties_queue will occasionally hang when running under Rubinius. I haven't investigated any further than that. Care to take a look?

@michaeljbishop
@jimweirich
Owner

The deadlock problem may be an issue with Rubinius. I've updated to the current head and haven't had a deadlock since then. There was a deadlock issue that was addressed in rubinius/rubinius@03a475f.

So, if you're not seeing anything, that might be why.

@jimweirich
Owner

I just ran 50 itereations of the tests with the latest master of Rubinius (with commit 52eba5f). I got no crashes and no deadlocks.

@michaeljbishop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 25, 2012
  1. @michaeljbishop

    Added some instrumentation to retieve stats from the threadpool

    michaeljbishop authored
    Two new methods to ThreadPool:
    
    1 - #__begin_stats__
    This turns on stat-keeping. You can only call it once.
    
    2 - #__stats__
    Retrieves all the current statistics. Locks the stats array while it
    makes a duplicate. Best to call after #join.
Commits on Oct 27, 2012
  1. @michaeljbishop

    Merge branch 'master' of https://github.com/jimweirich/rake into no-s…

    michaeljbishop authored
    …leep-thread-pool
    
    Conflicts:
    	lib/rake/thread_pool.rb
Commits on Oct 28, 2012
  1. @michaeljbishop
  2. @michaeljbishop

    ThreadPool threads no longer sleep on futures.

    michaeljbishop authored
    Previously, the ThreadPool created futures and added them to the queue.
    If, when they processed a future and they had to wait for it to
    finish executing, they would spawn another thread so they could
    wait for the future.
    
    Now, a non-blocking attempt to process the future's block is what is
    put on the queue and a blocking attempt is passed back from #future.
    In this way, the thread pool is free to process the core of a future
    by dequeuing it and calling it. The the thread can't get the lock, it
    is assumed that another thread is handling it and so it moves to
    the next.
Commits on Oct 29, 2012
Commits on Nov 1, 2012
  1. Merge branch 'master' of https://github.com/michaeljbishop/rake into …

    authored
    …threadstats
    
    * 'master' of https://github.com/michaeljbishop/rake:
      Sped up #stat sampling.
      Fixed the ThreadPool statistics code to maintain compatibility with 1.8.7.
  2. move dot to end of previous line

    authored
    to maintain compatibility with Ruby 1.8
  3. Merge branch 'no-sleep-thread-pool' of https://github.com/michaeljbis…

    authored
    …hop/rake into newpool
    
    * 'no-sleep-thread-pool' of https://github.com/michaeljbishop/rake:
      ThreadPool threads no longer sleep on futures.
      Added some instrumentation to retieve stats from the threadpool
  4. whitespace

    authored
Commits on Nov 3, 2012
  1. add comment to assert

    authored
  2. add ThreadPool::Promise

    authored
  3. @michaeljbishop
  4. @michaeljbishop

    Changed statistics to read more like a sentence.

    michaeljbishop authored
    Example:
    
      220823  F attempting_lock_on   item_id:20
      225342  F did_execute          item_id:20
    
    Added more statistics to capture more detail.
  5. @michaeljbishop
This page is out of date. Refresh to see the latest.
View
20 lib/rake/private_reader.rb
@@ -0,0 +1,20 @@
+module Rake
+
+ # Include PrivateReader to use +private_reader+.
+ module PrivateReader # :nodoc: all
+
+ def self.included(base)
+ base.extend(ClassMethods)
+ end
+
+ module ClassMethods
+
+ # Declare a list of private accessors
+ def private_reader(*names)
+ attr_reader(*names)
+ private(*names)
+ end
+ end
+
+ end
+end
View
99 lib/rake/promise.rb
@@ -0,0 +1,99 @@
+module Rake
+
+ # A Promise object represents a promise to do work (a chore) in the
+ # future. The promise is created with a block and a list of
+ # arguments for the block. Calling value will return the value of
+ # the promised chore.
+ #
+ # Used by ThreadPool.
+ #
+ class Promise # :nodoc: all
+ NOT_SET = Object.new.freeze # :nodoc:
+
+ attr_accessor :recorder
+
+ # Create a promise to do the chore specified by the block.
+ def initialize(args, &block)
+ @mutex = Mutex.new
+ @result = NOT_SET
+ @error = NOT_SET
+ @args = args.collect { |a| begin; a.dup; rescue; a; end }
+ @block = block
+ end
+
+ # Return the value of this promise.
+ #
+ # If the promised chore is not yet complete, then do the work
+ # synchronously. We will wait.
+ def value
+ unless complete?
+ stat :sleeping_on, :item_id => object_id
+ @mutex.synchronize do
+ stat :has_lock_on, :item_id => object_id
+ chore
+ stat :releasing_lock_on, :item_id => object_id
+ end
+ end
+ error? ? raise(@error) : @result
+ end
+
+ # If no one else is working this promise, go ahead and do the chore.
+ def work
+ stat :attempting_lock_on, :item_id => object_id
+ if @mutex.try_lock
+ stat :has_lock_on, :item_id => object_id
+ chore
+ stat :releasing_lock_on, :item_id => object_id
+ @mutex.unlock
+ else
+ stat :bailed_on, :item_id => object_id
+ end
+ end
+
+ private
+
+ # Perform the chore promised
+ def chore
+ if complete?
+ stat :found_completed, :item_id => object_id
+ return
+ end
+ stat :will_execute, :item_id => object_id
+ begin
+ @result = @block.call(*@args)
+ rescue Exception => e
+ @error = e
+ end
+ stat :did_execute, :item_id => object_id
+ discard
+ end
+
+ # Do we have a result for the promise
+ def result?
+ ! @result.equal?(NOT_SET)
+ end
+
+ # Did the promise throw an error
+ def error?
+ ! @error.equal?(NOT_SET)
+ end
+
+ # Are we done with the promise
+ def complete?
+ result? || error?
+ end
+
+ # free up these items for the GC
+ def discard
+ @args = nil
+ @block = nil
+ end
+
+ # Record execution statistics if there is a recorder
+ def stat(*args)
+ @recorder.call(*args) if @recorder
+ end
+
+ end
+
+end
View
2  lib/rake/task.rb
@@ -196,7 +196,7 @@ def invoke_prerequisites_concurrently(args, invocation_chain) # :nodoc:
application[r, @scope].invoke_with_call_chain(args, invocation_chain)
end
end
- futures.each { |f| f.call }
+ futures.each { |f| f.value }
end
# Format the trace flags for display.
View
9 lib/rake/thread_history_display.rb
@@ -1,8 +1,11 @@
+require 'rake/private_reader'
+
module Rake
class ThreadHistoryDisplay # :nodoc: all
- attr_reader :stats, :items, :threads
- private :stats, :items, :threads
+ include Rake::PrivateReader
+
+ private_reader :stats, :items, :threads
def initialize(stats)
@stats = stats
@@ -13,7 +16,7 @@ def initialize(stats)
def show
puts "Job History:"
stats.each do |stat|
- stat[:data] ||= []
+ stat[:data] ||= {}
rename(stat, :thread, threads)
rename(stat[:data], :item_id, items)
rename(stat[:data], :new_thread, threads)
View
110 lib/rake/thread_pool.rb
@@ -1,6 +1,8 @@
require 'thread'
require 'set'
+require 'rake/promise'
+
module Rake
class ThreadPool # :nodoc: all
@@ -28,63 +30,12 @@ def initialize(thread_count)
# pool. Sending <tt>#value</tt> to the object will sleep the
# current thread until the future is finished and will return the
# result (or raise an exception thrown from the future)
- def future(*args,&block)
- # capture the local args for the block (like Thread#start)
- local_args = args.collect { |a| begin; a.dup; rescue; a; end }
-
- promise_mutex = Mutex.new
- promise_result = promise_error = NOT_SET
-
- # (promise code builds on Ben Lavender's public-domain 'promise' gem)
- promise = lambda do
- # return immediately if the future has been executed
- unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
- return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
- end
-
- # try to get the lock and execute the promise, otherwise, sleep.
- if promise_mutex.try_lock
- if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
- #execute the promise
- begin
- promise_result = block.call(*local_args)
- rescue Exception => e
- promise_error = e
- end
- block = local_args = nil # GC can now clean these up
- end
- promise_mutex.unlock
- else
- # Even if we didn't get the lock, we need to sleep until the
- # promise has finished executing. If, however, the current
- # thread is part of the thread pool, we need to free up a
- # new thread in the pool so there will always be a thread
- # doing work.
-
- wait_for_promise = lambda {
- stat :waiting, :item_id => promise.object_id
- promise_mutex.synchronize {}
- stat :continue, :item_id => promise.object_id
- }
-
- unless @threads_mon.synchronize { @threads.include? Thread.current }
- wait_for_promise.call
- else
- @threads_mon.synchronize { @max_active_threads += 1 }
- start_thread
- wait_for_promise.call
- @threads_mon.synchronize { @max_active_threads -= 1 }
- end
- end
- promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
- end
-
- def promise.value
- call
- end
+ def future(*args, &block)
+ promise = Promise.new(args, &block)
+ promise.recorder = lambda { |*stats| stat(*stats) }
@queue.enq promise
- stat :item_queued, :item_id => promise.object_id
+ stat :queued, :item_id => promise.object_id
start_thread
promise
end
@@ -93,14 +44,18 @@ def promise.value
def join
@threads_mon.synchronize do
begin
- @join_cond.wait unless @threads.empty?
+ stat :joining
+ @join_cond.wait unless @threads.empty?
+ stat :joined
rescue Exception => e
+ stat :joined
$stderr.puts e
$stderr.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
$stderr.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
$stderr.puts e.backtrace.join("\n")
@threads.each do |t|
$stderr.print "Thread #{t} status = #{t.status}\n"
+ # 1.8 doesn't support Thread#backtrace
$stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
end
raise e
@@ -119,47 +74,58 @@ def gather_history #:nodoc:
# (see #gather_history). Best to call this when the job is
# complete (i.e. after ThreadPool#join is called).
def history # :nodoc:
- @history_mon.synchronize { @history.dup }
- .sort_by { |i| i[:time] }
- .each { |i| i[:time] -= @history_start_time }
+ @history_mon.synchronize { @history.dup }.
+ sort_by { |i| i[:time] }.
+ each { |i| i[:time] -= @history_start_time }
end
# Return a hash of always collected statistics for the thread pool.
def statistics # :nodoc:
{
:total_threads_in_play => @total_threads_in_play,
- :max_active_threads => @max_active_threads,
+ :max_active_threads => @max_active_threads,
}
end
private
+ # processes one item on the queue. Returns true if there was an
+ # item to process, false if there was no item
+ def process_queue_item #:nodoc:
+ return false if @queue.empty?
+
+ # Even though we just asked if the queue was empty, it
+ # still could have had an item which by this statement
+ # is now gone. For this reason we pass true to Queue#deq
+ # because we will sleep indefinitely if it is empty.
+ promise = @queue.deq(true)
+ stat :dequeued, :item_id => promise.object_id
+ promise.work
+ return true
+
+ rescue ThreadError # this means the queue is empty
+ false
+ end
+
def start_thread # :nodoc:
@threads_mon.synchronize do
next unless @threads.count < @max_active_threads
t = Thread.new do
begin
- while @threads.count <= @max_active_threads && !@queue.empty? do
- # Even though we just asked if the queue was empty, it
- # still could have had an item which by this statement
- # is now gone. For this reason we pass true to Queue#deq
- # because we will sleep indefinitely if it is empty.
- block = @queue.deq(true)
- stat :item_dequeued, :item_id => block.object_id
- block.call
+ while @threads.count <= @max_active_threads
+ break unless process_queue_item
end
- rescue ThreadError # this means the queue is empty
ensure
@threads_mon.synchronize do
@threads.delete Thread.current
- stat :thread_deleted, :deleted_thread => Thread.current.object_id, :thread_count => @threads.count
+ stat :ended, :thread_count => @threads.count
@join_cond.broadcast if @threads.empty?
end
end
end
@threads << t
- stat :thread_created, :new_thread => t.object_id, :thread_count => @threads.count
+ stat :spawned, :new_thread => t.object_id, :thread_count => @threads.count
@total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play
end
end
@@ -184,8 +150,6 @@ def __queue__ # :nodoc:
def __threads__ # :nodoc:
@threads.dup
end
-
- NOT_SET = Object.new.freeze # :nodoc:
end
end
View
42 test/test_private_reader.rb
@@ -0,0 +1,42 @@
+require File.expand_path('../helper', __FILE__)
+require 'rake/private_reader'
+
+class TestPrivateAttrs < Rake::TestCase
+
+ class Sample
+ include Rake::PrivateReader
+
+ private_reader :reader, :a
+
+ def initialize
+ @reader = :RVALUE
+ end
+
+ def get_reader
+ reader
+ end
+
+ end
+
+ def setup
+ super
+ @sample = Sample.new
+ end
+
+ def test_private_reader_is_private
+ assert_private do @sample.reader end
+ assert_private do @sample.a end
+ end
+
+ def test_private_reader_returns_data
+ assert_equal :RVALUE, @sample.get_reader
+ end
+
+ private
+
+ def assert_private
+ ex = assert_raises(NoMethodError) do yield end
+ assert_match(/private/, ex.message)
+ end
+
+end
View
25 test/test_rake_thread_pool.rb
@@ -69,7 +69,7 @@ def test_pool_join_empties_queue
}
pool.join
- assert_equal true, pool.__send__(:__queue__).empty?
+ assert_equal true, pool.__send__(:__queue__).empty?, "queue should be empty"
end
# test that throwing an exception way down in the blocks propagates
@@ -88,29 +88,6 @@ def test_exceptions
end
- def test_pool_always_has_max_threads_doing_work
- # here we need to test that even if some threads are halted, there
- # are always at least max_threads that are not sleeping.
- pool = ThreadPool.new(2)
- initial_sleep_time = 0.2
- future1 = pool.future { sleep initial_sleep_time }
- dependent_futures = 5.times.collect { pool.future{ future1.value } }
- future2 = pool.future { sleep initial_sleep_time }
- future3 = pool.future { sleep 0.01 }
-
- sleep initial_sleep_time / 2.0 # wait for everything to queue up
-
- # at this point, we should have 5 threads sleeping depending on future1, and
- # two threads doing work on future1 and future 2.
- assert_equal pool.__send__(:__threads__).count, 7
-
- # future 3 is in the queue because there aren't enough active threads to work on it.
- assert_equal pool.__send__(:__queue__).size, 1
-
- [future1, dependent_futures, future2, future3].flatten.each { |f| f.value }
- pool.join
- end
-
def test_pool_prevents_deadlock
pool = ThreadPool.new(5)
View
4 test/test_thread_history_display.rb
@@ -64,7 +64,7 @@ def test_thread_deleted
out, _ = capture_io do
@display.show
end
- assert_match(/^ *1000000 +A +thread_deleted +deleted_thread:B +thread_count:12$/, out)
+ assert_match(/^ *1000000 +A +thread_deleted( +deleted_thread:B| +thread_count:12){2}$/, out)
end
def test_thread_created
@@ -72,7 +72,7 @@ def test_thread_created
out, _ = capture_io do
@display.show
end
- assert_match(/^ *1000000 +A +thread_created +new_thread:B +thread_count:13$/, out)
+ assert_match(/^ *1000000 +A +thread_created( +new_thread:B| +thread_count:13){2}$/, out)
end
private
Something went wrong with that request. Please try again.