Permalink
Please sign in to comment.
Browse files
+ Use a Queue for scheduling parallel tests. (tenderlove)
[git-p4: depot-paths = "//src/minitest/dev/": change = 9032]
- Loading branch information...
Showing
with
112 additions
and 144 deletions.
- +1 −1 Manifest.txt
- +35 −6 lib/minitest.rb
- +12 −0 lib/minitest/assertions.rb
- +40 −0 lib/minitest/parallel.rb
- +0 −120 lib/minitest/parallel_each.rb
- +6 −10 lib/minitest/test.rb
- +4 −2 test/minitest/metametameta.rb
- +14 −5 test/minitest/test_minitest_unit.rb
| @@ -0,0 +1,40 @@ | ||
| module Minitest | ||
| module Parallel | ||
| class Executor | ||
| attr_reader :size | ||
| def initialize size | ||
| @size = size | ||
| @queue = Queue.new | ||
| @pool = size.times.map { | ||
| Thread.new(@queue) do |queue| | ||
| Thread.current.abort_on_exception = true | ||
| while job = queue.pop | ||
| klass, method, reporter = job | ||
| result = Minitest.run_test klass, method, reporter | ||
| reporter.synchronize { reporter.record result } | ||
| end | ||
| end | ||
| } | ||
| end | ||
| def << work; @queue << work; end | ||
| def shutdown | ||
| size.times { @queue << nil } | ||
| @pool.each(&:join) | ||
| end | ||
| end | ||
| module Test | ||
| def _synchronize; Test.io_lock.synchronize { yield }; end | ||
| module ClassMethods | ||
| def run_test klass, method_name, reporter | ||
| MiniTest.parallel_executor << [klass, method_name, reporter] | ||
| end | ||
| def test_order; :parallel; end | ||
| end | ||
| end | ||
| end | ||
| end |
| @@ -1,120 +0,0 @@ | ||
| ## | ||
| # Provides a parallel #each that lets you enumerate using N threads. | ||
| # Use environment variable N to customize. Defaults to 2. Enumerable, | ||
| # so all the goodies come along (tho not all are wrapped yet to | ||
| # return another ParallelEach instance). | ||
| class Minitest::ParallelEach | ||
| require 'thread' | ||
| include Enumerable | ||
| ## | ||
| # How many Threads to use for this parallel #each. | ||
| N = (ENV['N'] || 2).to_i | ||
| ## | ||
| # Create a new ParallelEach instance over +list+. | ||
| def initialize list | ||
| @queue = Queue.new # *sigh*... the Queue api sucks sooo much... | ||
| list.each { |i| @queue << i } | ||
| N.times { @queue << nil } | ||
| end | ||
| def select(&block) # :nodoc: | ||
| self.class.new super | ||
| end | ||
| alias find_all select # :nodoc: | ||
| ## | ||
| # Starts N threads that yield each element to your block. Joins the | ||
| # threads at the end. | ||
| def each | ||
| threads = N.times.map { | ||
| Thread.new do | ||
| Thread.current.abort_on_exception = true | ||
| while job = @queue.pop | ||
| yield job | ||
| end | ||
| end | ||
| } | ||
| threads.map(&:join) | ||
| end | ||
| def count # :nodoc: | ||
| [@queue.size - N, 0].max | ||
| end | ||
| alias_method :size, :count # :nodoc: | ||
| end | ||
| module Minitest | ||
| class << self | ||
| remove_method :__run | ||
| end | ||
| class Test | ||
| @mutex = Mutex.new | ||
| def self.synchronize # :nodoc: | ||
| if @mutex then # see parallel_each.rb | ||
| @mutex.synchronize { yield } | ||
| else | ||
| yield | ||
| end | ||
| end | ||
| alias :simple_capture_io :capture_io | ||
| def capture_io(&b) | ||
| Test.synchronize do | ||
| simple_capture_io(&b) | ||
| end | ||
| end | ||
| alias :simple_capture_subprocess_io :capture_subprocess_io | ||
| def capture_subprocess_io(&b) | ||
| Test.synchronize do | ||
| simple_capture_subprocess_io(&b) | ||
| end | ||
| end | ||
| end | ||
| class Reporter | ||
| @mutex = Mutex.new | ||
| def self.synchronize # :nodoc: | ||
| if @mutex then # see parallel_each.rb | ||
| @mutex.synchronize { yield } | ||
| else | ||
| yield | ||
| end | ||
| end | ||
| alias :simple_record :record | ||
| def record result | ||
| Reporter.synchronize do | ||
| simple_record result | ||
| end | ||
| end | ||
| end | ||
| ## | ||
| # Runs all the +suites+ for a given +type+. Runs suites declaring | ||
| # a test_order of +:parallel+ in parallel, and everything else | ||
| # serial. | ||
| def self.__run reporter, options | ||
| suites = Runnable.runnables | ||
| parallel, serial = suites.partition { |s| s.test_order == :parallel } | ||
| ParallelEach.new(parallel).map { |suite| suite.run reporter, options } + | ||
| serial.map { |suite| suite.run reporter, options } | ||
| end | ||
| end |
Oops, something went wrong.
0 comments on commit
34760e3