-
Notifications
You must be signed in to change notification settings - Fork 21.4k
/
parallelization.rb
134 lines (106 loc) · 3.24 KB
/
parallelization.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# frozen_string_literal: true
require "drb"
require "drb/unix" unless Gem.win_platform?
require "active_support/core_ext/module/attribute_accessors"
module ActiveSupport
module Testing
class Parallelization # :nodoc:
class Server
include DRb::DRbUndumped
def initialize
@queue = Queue.new
end
def record(reporter, result)
raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)
reporter.synchronize do
reporter.record(result)
end
end
def <<(o)
o[2] = DRbObject.new(o[2]) if o
@queue << o
end
def length
@queue.length
end
def pop; @queue.pop; end
end
@@after_fork_hooks = []
def self.after_fork_hook(&blk)
@@after_fork_hooks << blk
end
cattr_reader :after_fork_hooks
@@run_cleanup_hooks = []
def self.run_cleanup_hook(&blk)
@@run_cleanup_hooks << blk
end
cattr_reader :run_cleanup_hooks
def initialize(queue_size)
@queue_size = queue_size
@queue = Server.new
@pool = []
@url = DRb.start_service("drbunix:", @queue).uri
end
def after_fork(worker)
self.class.after_fork_hooks.each do |cb|
cb.call(worker)
end
end
def run_cleanup(worker)
self.class.run_cleanup_hooks.each do |cb|
cb.call(worker)
end
end
def start
@pool = @queue_size.times.map do |worker|
fork do
DRb.stop_service
begin
after_fork(worker)
rescue => setup_exception; end
queue = DRbObject.new_with_uri(@url)
while job = queue.pop
klass = job[0]
method = job[1]
reporter = job[2]
result = klass.with_info_handler reporter do
Minitest.run_one_method(klass, method)
end
add_setup_exception(result, setup_exception) if setup_exception
begin
queue.record(reporter, result)
rescue DRb::DRbConnError
result.failures.map! do |failure|
if failure.respond_to?(:error)
# minitest >5.14.0
error = DRb::DRbRemoteError.new(failure.error)
else
error = DRb::DRbRemoteError.new(failure.exception)
end
Minitest::UnexpectedError.new(error)
end
queue.record(reporter, result)
end
end
ensure
run_cleanup(worker)
end
end
end
def <<(work)
@queue << work
end
def shutdown
@queue_size.times { @queue << nil }
@pool.each { |pid| Process.waitpid pid }
if @queue.length > 0
raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
end
end
private
def add_setup_exception(result, setup_exception)
result.failures.prepend Minitest::UnexpectedError.new(setup_exception)
end
end
end
end