Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial version.

  • Loading branch information...
commit 8d4459a4df6dc6c5d809f8fbf77f693e56d054e7 0 parents
@wisq authored
4 .gitignore
@@ -0,0 +1,4 @@
+*.gem
+.bundle
+Gemfile.lock
+pkg/*
4 Gemfile
@@ -0,0 +1,4 @@
+source "http://rubygems.org"
+
+# Specify your gem's dependencies in rails_parallel.gemspec
+gemspec
33 README.markdown
@@ -0,0 +1,33 @@
+rails_parallel
+==============
+
+rails_parallel makes your Rails tests scale with the number of CPU cores available.
+
+It also speeds up the testing process in general, by making heavy use of forking to only have to load the Rails environment once.
+
+Installation
+------------
+
+To load rails_parallel, require "rails_parallel/rake" early in your Rakefile. One possibility is to load it conditionally based on an environment variable:
+
+ require 'rails_parallel/rake' if ENV['PARALLEL']
+
+You'll want to add a lib/tasks/rails_parallel.rake with at least the following:
+
+ # RailsParallel handles the DB schema.
+ Rake::Task['test:prepare'].clear_prerequisites if Object.const_get(:RailsParallel)
+
+ namespace :parallel do
+ # Run this task if you have non-test tasks to run first and you want the
+ # RailsParallel worker to start loading your environment earlier.
+ task :launch do
+ RailsParallel::Rake.launch
+ end
+
+ # RailsParallel runs this if it needs to reload the DB.
+ namespace :db do
+ task :setup => ['db:drop', 'db:create', 'db:schema:load']
+ end
+ end
+
+This gem was designed as an internal project and currently makes certain assumptions about your project setup, such as the use of MySQL and a separate versioned schema (rather than db/schema.rb). These will become more generic in future versions.
2  Rakefile
@@ -0,0 +1,2 @@
+require 'bundler'
+Bundler::GemHelper.install_tasks
23 bin/rails_parallel_worker
@@ -0,0 +1,23 @@
+#!/usr/bin/env ruby
+
+ENV['RAILS_ENV'] = 'test'
+
+begin
+ puts 'RP: Loading RailsParallel.'
+ $LOAD_PATH << 'lib'
+ require 'rails_parallel/runner'
+ require 'rails_parallel/object_socket'
+
+ socket = ObjectSocket.new(IO.for_fd(ARGV.first.to_i))
+ socket << :started
+
+ puts 'RP: Loading Rails.'
+ require "#{ENV['RAILS_PARALLEL_ROOT']}/config/environment"
+
+ puts 'RP: Ready for testing.'
+ RailsParallel::Runner.launch(socket)
+ puts 'RP: Shutting down.'
+ Kernel.exit!(0)
+rescue Interrupt, SignalException
+ Kernel.exit!(1)
+end
3  lib/rails_parallel.rb
@@ -0,0 +1,3 @@
+module RailsParallel
+ # Nothing here. Require 'rails_parallel/rake' in your Rakefile if you want RP.
+end
32 lib/rails_parallel/collector.rb
@@ -0,0 +1,32 @@
+require 'test/unit/collector'
+
+module RailsParallel
+ class Collector
+ include Test::Unit::Collector
+
+ NAME = 'collected from the ObjectSpace'
+
+ def prepare(timings, test_name)
+ @suites = {}
+ ::ObjectSpace.each_object(Class) do |klass|
+ @suites[klass.name] = klass.suite if Test::Unit::TestCase > klass
+ end
+
+ @pending = @suites.keys.sort_by do |name|
+ [
+ 0 - timings.fetch(test_name, name), # runtime, descending
+ 0 - @suites[name].size, # no. of tests, descending
+ name
+ ]
+ end
+ end
+
+ def next_suite
+ @pending.shift
+ end
+
+ def suite_for(name)
+ @suites[name]
+ end
+ end
+end
39 lib/rails_parallel/forks.rb
@@ -0,0 +1,39 @@
+module RailsParallel
+ module Forks
+ def fork_and_run
+ ActiveRecord::Base.connection.disconnect! if ActiveRecord::Base.connected?
+
+ fork do
+ begin
+ yield
+ Kernel.exit!(0)
+ rescue Interrupt, SignalException
+ Kernel.exit!(1)
+ rescue Exception => e
+ puts "Error: #{e}"
+ puts(*e.backtrace.map {|t| "\t#{t}"})
+ before_exit
+ Kernel.exit!(1)
+ end
+ end
+ end
+
+ def wait_for(pid, nonblock = false)
+ pid = Process.waitpid(pid, nonblock ? Process::WNOHANG : 0)
+ check_status($?) if pid
+ pid
+ end
+
+ def wait_any(nonblock = false)
+ wait_for(-1, nonblock)
+ end
+
+ def check_status(stat)
+ raise "error: #{stat.inspect}" unless stat.success?
+ end
+
+ def before_exit
+ # cleanup here (in children)
+ end
+ end
+end
76 lib/rails_parallel/object_socket.rb
@@ -0,0 +1,76 @@
+require 'rubygems'
+require 'socket'
+
+class ObjectSocket
+ BLOCK_SIZE = 4096
+
+ attr_reader :socket
+
+ def self.pair
+ Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM, 0).map { |s| new(s) }
+ end
+
+ def initialize(socket)
+ @socket = socket
+ @buffer = ''
+ end
+
+ def nonblock=(val)
+ @nonblock = val
+ end
+
+ def close
+ @socket.close
+ end
+
+ def nonblocking(&block)
+ with_nonblock(true, &block)
+ end
+ def blocking(&block)
+ with_nonblock(false, &block)
+ end
+
+ def each_object(&block)
+ first = true
+ loop do
+ process_buffer(&block) if first
+ first = false
+
+ @buffer += @nonblock ? @socket.read_nonblock(BLOCK_SIZE) : @socket.readpartial(BLOCK_SIZE)
+ process_buffer(&block)
+ end
+ rescue Errno::EAGAIN
+ # end of nonblocking data
+ end
+
+ def next_object
+ each_object { |o| return o }
+ nil # no pending data in nonblock mode
+ end
+
+ def <<(obj)
+ data = Marshal.dump(obj)
+ @socket.syswrite [data.size, data].pack('Na*')
+ self # chainable
+ end
+
+ private
+
+ def process_buffer
+ while @buffer.size >= 4
+ size = 4 + @buffer.unpack('N').first
+ break unless @buffer.size >= size
+
+ packet = @buffer.slice!(0, size)
+ yield Marshal.load(packet[4..-1])
+ end
+ end
+
+ def with_nonblock(value)
+ old_value = @nonblock
+ @nonblock = value
+ return yield
+ ensure
+ @nonblock = old_value
+ end
+end
182 lib/rails_parallel/rake.rb
@@ -0,0 +1,182 @@
+require 'rake/testtask'
+require 'fcntl'
+
+require 'rails_parallel/object_socket'
+
+module RailsParallel
+ class Rake
+ include Singleton
+
+ SCHEMA_DIR = 'tmp/rails_parallel/schema'
+
+ def self.launch
+ instance.launch
+ end
+
+ def self.run(name, ruby_opts, files)
+ instance.launch
+ instance.run(name, ruby_opts, files)
+ end
+
+ def launch
+ return if @pid
+ at_exit { shutdown }
+
+ create_test_db
+
+ my_socket, c_socket = ObjectSocket.pair
+ sock = c_socket.socket
+ sock.fcntl(Fcntl::F_SETFD, sock.fcntl(Fcntl::F_GETFD, 0) & ~Fcntl::FD_CLOEXEC)
+
+ @pid = fork do
+ my_socket.close
+ ENV['RAILS_PARALLEL_ROOT'] = Rails.root
+ exec('rails_parallel_worker', sock.fileno.to_s)
+ raise 'exec failed'
+ end
+
+ c_socket.close
+ @socket = my_socket
+
+ expect(:started)
+ end
+
+ def run(name, ruby_opts, files)
+ options = parse_options(ruby_opts)
+ schema = schema_file
+
+ expect(:ready)
+ @socket << {
+ :name => name,
+ :schema => schema,
+ :options => options,
+ :files => files.to_a
+ }
+ expect(:done)
+ end
+
+ def shutdown
+ if @pid
+ @socket << :shutdown
+ Process.waitpid(@pid)
+ @pid = nil
+ end
+ end
+
+ private
+
+ def expect(want)
+ got = @socket.next_object
+ raise "Expected #{want}, got #{got}" unless want == got
+ end
+
+ def parse_options(ruby_opts)
+ ruby_opts.collect do |opt|
+ case opt
+ when /^-r/
+ [:require, $']
+ else
+ raise "Unhandled Ruby option: #{opt.inspect}"
+ end
+ end
+ end
+
+ def create_test_db
+ dbconfig = YAML.load_file('config/database.yml')['test']
+ ActiveRecord::Base.establish_connection(dbconfig.merge('database' => nil))
+ ActiveRecord::Base.connection.execute("CREATE DATABASE IF NOT EXISTS #{dbconfig['database']}")
+ end
+
+ def schema_digest
+ files = FileList['db/schema.versioned.rb', 'db/migrate/*.rb'].sort
+ digest = Digest::MD5.new
+ files.each { |f| digest.update("#{f}|#{File.read(f)}|") }
+ digest.hexdigest
+ end
+
+ def schema_file
+ digest = schema_digest
+ basename = "#{digest}.sql"
+ schema = "#{SCHEMA_DIR}/#{basename}"
+
+ if File.exist? schema
+ puts "RP: Using cached schema: #{basename}"
+ else
+ puts 'RP: Building new schema ... '
+
+ silently { generate_schema(digest, schema) }
+
+ puts "RP: Generated new schema: #{basename}"
+ end
+
+ schema
+ end
+
+ def silently
+ File.open('/dev/null', 'w') do |fh|
+ $stdout = $stderr = fh
+ yield
+ end
+ ensure
+ $stdout = STDOUT
+ $stderr = STDERR
+ end
+
+ def generate_schema(digest, schema)
+ FileUtils.mkdir_p(SCHEMA_DIR)
+ Tempfile.open(["#{digest}.", ".sql"], SCHEMA_DIR) do |file|
+ ::Rake::Task['parallel:db:setup'].invoke
+ sh "mysqldump --no-data -u root shopify_dev > #{file.path}"
+
+ raise 'No schema dumped' unless file.size > 0
+ File.rename(file.path, schema)
+ $schema_dump_file = nil
+ end
+ end
+ end
+end
+
+module Rake
+ class TestTask
+ @@patched = false
+
+ def initialize(name=:test)
+ if name.kind_of? Hash
+ @name = name.keys.first
+ @depends = name.values.first
+ else
+ @name = name
+ @depends = []
+ end
+ @full_name = [Rake.application.current_scope, @name].join(':')
+
+ @libs = ["lib"]
+ @pattern = nil
+ @options = nil
+ @test_files = nil
+ @verbose = false
+ @warning = false
+ @loader = :rake
+ @ruby_opts = []
+ yield self if block_given?
+ @pattern = 'test/test*.rb' if @pattern.nil? && @test_files.nil?
+
+ if !@@patched && self.class.name == 'TestTaskWithoutDescription'
+ TestTaskWithoutDescription.class_eval { def define; super(false); end }
+ @@patched = true
+ end
+
+ define
+ end
+
+ def define(describe = true)
+ lib_path = @libs.join(File::PATH_SEPARATOR)
+ desc "Run tests" + (@full_name == :test ? "" : " for #{@name}") if describe
+ task @name => @depends do
+ files = file_list.map {|f| f =~ /[\*\?\[\]]/ ? FileList[f] : f }.flatten(1)
+ RailsParallel::Rake.run(@full_name, ruby_opts, files)
+ end
+ self
+ end
+ end
+end
38 lib/rails_parallel/runner.rb
@@ -0,0 +1,38 @@
+require 'rails_parallel/runner/parent'
+require 'rails_parallel/object_socket'
+
+module RailsParallel
+ class Runner
+ def self.launch(socket)
+ Runner.new(socket).run
+ end
+
+ def initialize(socket)
+ @socket = socket
+ end
+
+ def run
+ prepare
+
+ @socket << :ready
+ @socket.each_object do |obj|
+ break if obj == :shutdown
+ run_suite(obj)
+ @socket << :done << :ready
+ end
+ rescue EOFError
+ # shutdown
+ end
+
+ private
+
+ def prepare
+ $LOAD_PATH << 'test'
+ require 'test_helper'
+ end
+
+ def run_suite(params)
+ Parent.new(params).run
+ end
+ end
+end
84 lib/rails_parallel/runner/child.rb
@@ -0,0 +1,84 @@
+require 'rails_parallel/object_socket'
+require 'rails_parallel/runner/test_runner'
+
+module RailsParallel
+ class Runner
+ class Child
+ include Forks
+
+ attr_reader :socket, :pid, :number, :last_suite, :last_time
+
+ def initialize(number, schema, collector)
+ @number = number
+ @schema = schema
+ @collector = collector
+ @buffer = ''
+ @state = :waiting
+ end
+
+ def launch
+ parent_socket, child_socket = ObjectSocket.pair
+
+ @pid = fork_and_run do
+ parent_socket.close
+ @socket = child_socket
+ main_loop
+ end
+
+ child_socket.close
+ @socket = parent_socket
+ @socket.nonblock = true
+ end
+
+ def run_suite(name)
+ @last_suite = name
+ @last_time = Time.now
+ @socket << name
+ end
+
+ def finish
+ @socket << :finish
+ end
+
+ def close
+ @socket.close
+ end
+
+ def kill
+ Process.kill('KILL', @pid)
+ close rescue nil
+ end
+
+ def socket
+ @socket.socket
+ end
+
+ def poll
+ output = []
+ @socket.each_object { |obj| output << obj }
+ output
+ end
+
+ private
+
+ def main_loop
+ @schema.load_db(@number)
+
+ @socket << :started << :ready
+
+ @socket.each_object do |obj|
+ break if obj == :finish
+
+ ($rp_suites ||= []) << obj
+ suite = @collector.suite_for(obj)
+ runner = TestRunner.new(suite)
+ runner.start
+
+ @socket << [obj, runner.result, runner.faults] << :ready
+ end
+
+ @socket << :finished
+ end
+ end
+ end
+end
179 lib/rails_parallel/runner/parent.rb
@@ -0,0 +1,179 @@
+require 'rails_parallel/forks'
+require 'rails_parallel/collector'
+require 'rails_parallel/timings'
+require 'rails_parallel/runner/child'
+require 'rails_parallel/runner/schema'
+require 'rails_parallel/runner/test_runner'
+
+class Test::Unit::TestResult
+ attr_reader :failures, :errors
+
+ def append(other)
+ @run_count += other.run_count
+ @assertion_count += other.assertion_count
+ @failures += other.failures
+ @errors += other.errors
+ end
+end
+
+module RailsParallel
+ class Runner
+ class Parent
+ include Forks
+
+ def initialize(params)
+ @name = params[:name]
+ @schema = Schema.new(params[:schema])
+ @options = params[:options]
+ @files = params[:files]
+ @max_children = number_of_workers
+
+ @timings = Timings.new
+
+ @children = []
+ @launched = 0
+ @by_pid = {}
+ @by_socket = {}
+
+ @result = Test::Unit::TestResult.new
+ @faults = {}
+ end
+
+ def run
+ @schema.load_main_db
+
+ pid = fork_and_run do
+ status "RP: Preparing #{@name} ... "
+ handle_options
+ prepare
+ puts "ready."
+
+ puts "RP: Running #{@name}."
+ start = Time.now
+ begin
+ launch_next_child
+ monitor
+ ensure
+ @children.each(&:kill)
+ output_result(Time.now - start)
+ end
+ end
+ wait_for(pid)
+ end
+
+ private
+
+ def status(msg)
+ $stdout.print(msg)
+ $stdout.flush
+ end
+
+ def handle_options
+ @options.each do |opt, value|
+ case opt
+ when :require
+ value = 'rubygems' if value == 'ubygems'
+ status "#{value}, "
+ require value
+ else
+ raise "Unknown option type: #{opt}"
+ end
+ end
+ end
+
+ def prepare
+ status "#{@files.count} test files ... "
+ @files.each { |f| load f }
+ @collector = Collector.new
+ @collector.prepare(@timings, @name)
+ end
+
+ def launch_next_child
+ return if @launched >= @max_children
+ return if @complete
+
+ child = Child.new(@launched += 1, @schema, @collector)
+ child.launch
+
+ @children << child
+ @by_pid[child.pid] = child
+ @by_socket[child.socket] = child
+ end
+
+ def monitor
+ until @children.empty?
+ watching = @children.map(&:socket)
+ IO.select(watching).first.each do |socket|
+ child = @by_socket[socket]
+
+ begin
+ child.poll.each do |packet|
+ case packet
+ when :started
+ launch_next_child
+ when :ready
+ @timings.record(@name, child.last_suite, Time.now - child.last_time) if child.last_suite
+
+ suite = @collector.next_suite
+ if suite
+ child.run_suite(suite)
+ else
+ @complete = true
+ child.finish
+ end
+ when :finished
+ close_child(child)
+ else
+ suite, result, faults = packet
+ @result.append(result)
+ @faults[suite] = faults
+ end
+ end
+ rescue EOFError
+ close_child(child)
+ end
+ end
+
+ while pid = wait_any(true)
+ child = @by_pid[pid]
+ close_child(child) if child
+ end
+ end
+ end
+
+ def close_child(child)
+ child.close rescue nil
+ @children.delete(child)
+ @by_socket.delete(child.socket)
+ @by_pid.delete(child.pid)
+ end
+
+ def output_result(elapsed)
+ runner = TestRunner.new(nil, Test::Unit::UI::NORMAL)
+ runner.result = @result
+ runner.faults = @faults.sort.map(&:last).flatten(1)
+
+ runner.output_report(elapsed)
+ end
+
+ def number_of_workers
+ workers = number_of_cores
+ workers -= 1 if workers > 4 # reserve one core for DB
+ workers
+ end
+
+ def number_of_cores
+ if RUBY_PLATFORM =~ /linux/
+ cores = File.read('/proc/cpuinfo').split("\n\n").map do |data|
+ values = data.split("\n").map { |line| line.split(/\s*:/, 2) }
+ attrs = Hash[*values.flatten]
+ ['physical id', 'core id'].map { |key| attrs[key] }.join("/")
+ end
+ cores.uniq.count
+ elsif RUBY_PLATFORM =~ /darwin/
+ `/usr/bin/hwprefs cpu_count`.to_i
+ end
+ end
+ end
+ end
+end
93 lib/rails_parallel/runner/schema.rb
@@ -0,0 +1,93 @@
+require 'rails_parallel/object_socket'
+require 'rails_parallel/runner/test_runner'
+
+module RailsParallel
+ class Runner
+ class Schema
+ include Forks
+
+ def initialize(file)
+ @file = file
+ end
+
+ def load_main_db
+ if load_db(1)
+ failed = 0
+ ObjectSpace.each_object(Class) do |klass|
+ next unless klass < ActiveRecord::Base
+
+ klass.reset_column_information
+ begin
+ klass.columns
+ rescue StandardError => e
+ failed += 1
+ raise e if failed > 3
+ end
+ end
+ end
+ end
+
+ def load_db(number)
+ update_db_config(number)
+ if schema_loaded?
+ reconnect
+ false
+ else
+ schema_load
+ true
+ end
+ end
+
+ private
+
+ def reconnect(override = {})
+ ActiveRecord::Base.establish_connection(@dbconfig.merge(override))
+ ActiveRecord::Base.connection
+ end
+
+ def update_db_config(number)
+ config = ActiveRecord::Base.configurations[Rails.env]
+ config['database'] += "_#{number}" unless number == 1
+ @dbconfig = config.with_indifferent_access
+ end
+
+ def schema_load
+ dbname = @dbconfig[:database]
+ mysql_args = ['-u', 'root']
+
+ connection = reconnect(:database => nil)
+ connection.execute("DROP DATABASE IF EXISTS #{dbname}")
+ connection.execute("CREATE DATABASE #{dbname}")
+
+ File.open(@file) do |fh|
+ pid = fork do
+ STDIN.reopen(fh)
+ exec(*['mysql', mysql_args, dbname].flatten)
+ end
+ wait_for(pid)
+ end
+
+ reconnect
+ sm_table = ActiveRecord::Migrator.schema_migrations_table_name
+ ActiveRecord::Base.connection.execute("INSERT INTO #{sm_table} (version) VALUES ('#{@file}')")
+ end
+
+ def schema_loaded?
+ begin
+ ActiveRecord::Base.establish_connection(@dbconfig)
+ ActiveRecord::Base.connection
+ rescue StandardError
+ return false
+ end
+
+ begin
+ sm_table = ActiveRecord::Migrator.schema_migrations_table_name
+ migrated = ActiveRecord::Base.connection.select_values("SELECT version FROM #{sm_table}")
+ migrated.include?(@file)
+ rescue ActiveRecord::StatementInvalid
+ false
+ end
+ end
+ end
+ end
+end
17 lib/rails_parallel/runner/test_runner.rb
@@ -0,0 +1,17 @@
+require 'test/unit/ui/console/testrunner'
+
+module RailsParallel
+ class Runner
+ class TestRunner < Test::Unit::UI::Console::TestRunner
+ attr_accessor :result, :faults
+
+ def initialize(suite, output_level = Test::Unit::UI::PROGRESS_ONLY)
+ super(suite, output_level)
+ end
+
+ def output_report(elapsed)
+ finished(elapsed)
+ end
+ end
+ end
+end
32 lib/rails_parallel/timings.rb
@@ -0,0 +1,32 @@
+require 'rubygems'
+require 'redis'
+require 'active_support/core_ext/enumerable'
+
+module RailsParallel
+ class Timings
+ TIMING_COUNT = 10
+
+ def initialize
+ @cache = Redis.new
+ end
+
+ def record(test_name, class_name, time)
+ key = key_for(test_name, class_name)
+ @cache.lpush(key, time)
+ @cache.ltrim(key, 0, TIMING_COUNT - 1)
+ end
+
+ def fetch(test_name, class_name)
+ key = key_for(test_name, class_name)
+ times = @cache.lrange(key, 0, TIMING_COUNT - 1).map(&:to_f)
+ return 0 if times.empty?
+ times.sum / times.count
+ end
+
+ private
+
+ def key_for(test_name, class_name)
+ "timings-#{test_name}-#{class_name}"
+ end
+ end
+end
3  lib/rails_parallel/version.rb
@@ -0,0 +1,3 @@
+module RailsParallel
+ VERSION = "0.1.0"
+end
25 rails_parallel.gemspec
@@ -0,0 +1,25 @@
+# -*- encoding: utf-8 -*-
+$:.push File.expand_path("../lib", __FILE__)
+require "rails_parallel/version"
+
+Gem::Specification.new do |s|
+ s.name = "rails_parallel"
+ s.version = RailsParallel::VERSION
+ s.platform = Gem::Platform::RUBY
+ s.authors = ["Adrian Irving-Beer"]
+ s.email = ["adrian@shopify.com"]
+ s.homepage = ""
+ s.summary = %q{Runs multiple Rails tests concurrently}
+ s.description = %q{rails_parallel runs your Rails tests by forking off a worker and running multiple tests concurrently. It makes heavy use of forking to reduce memory footprint (assuming copy-on-write), only loads your Rails environment once, and automatically scales to the number of cores available. Designed to work with MySQL only. For best results, run MySQL on a tmpfs or a RAM disk.}
+
+ s.rubyforge_project = "rails_parallel"
+
+ s.files = `git ls-files`.split("\n")
+ s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
+ s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
+ s.require_paths = ["lib"]
+
+ s.add_dependency 'redis', '~> 2.2.0'
+ s.add_dependency 'rails', '~> 3.0'
+ s.add_dependency 'rake', '~> 0.9.2'
+end
Please sign in to comment.
Something went wrong with that request. Please try again.