/
pcows.rb
303 lines (277 loc) · 9.06 KB
/
pcows.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# Parallel copy-on-write streaming (PCOWS)
require 'tempfile'
class PCOWS
RUNNINGEXT = 'part' # file extension
def initialize(num_threads,chunk_size,name=File.basename(__FILE__),timeout=180,quiet=false,debug=false)
num_threads = cpu_count() if not num_threads # FIXME: set to cpu_num by default
# $stderr.print "Using ",num_threads,"threads \n"
@num_threads = num_threads
@chunk_size = chunk_size
@pid_list = []
@name = name
@timeout = timeout
@quiet = quiet
@debug = debug
if @debug
$stderr.print "PCOWS running in DEBUG MODE\n"
end
if multi_threaded
@tmpdir = Dir::mktmpdir(@name+'_')
end
@last_output = 0 # counter
@output_locked = false
end
# Feed the worker 'func and state' to COWS. Note that func is a
# lambda closure so it can pick up surrounding scope at invocation
# in addition to the data captured in 'state'.
def submit_worker(func,state)
pid = nil
if multi_threaded
count = @pid_list.size+1
fn = mktmpfilename(count)
pid = fork do
# ---- This is running a new copy-on-write process
tempfn = fn+'.'+RUNNINGEXT
STDOUT.reopen(File.open(tempfn, 'w+'))
func.call(state).each { | line | print line }
STDOUT.flush
STDOUT.close
# sleep 0.1
# f.flush
# f.close
# sleep 0.2 # interval to make sure we are done writing,
# otherwise there may be misses at the end of a
# block (maybe the f.close fixed it)
FileUtils::mv(tempfn,fn)
exit(0)
end
Process.detach(pid)
else
# ---- Single threaded: call in main process and output immediately
func.call(state).each { | line | print line }
end
@pid_list << [ pid,count,fn ]
return true
end
def submit_final_worker(func,state)
@final_worker = true
submit_worker(func,state)
end
# Make sure no more than num_threads are running at the same time -
# this is achieved by checking the PID table and the running files
# in the tmpdir
def wait_for_worker_slot()
return if single_threaded
Timeout.timeout(@timeout) do
printed_timeout_message = false
while true
# ---- count running pids
running = @pid_list.reduce(0) do | sum, info |
(pid,count,fn) = info
if pid_or_file_running?(pid,fn)
sum+1
else
sum
end
end
return if running < @num_threads
if not printed_timeout_message
$stderr.print "Waiting for slot (timeout=#{@timeout})\n" if not @quiet
printed_timeout_message = true
end
sleep 0.1
end
end
end
# ---- In this section the output gets collected and passed on to a
# printer thread. This function makes sure the printing is
# ordered and that no printers are running at the same
# time. The printer thread should be doing as little processing
# as possible.
#
# In this implementation type==:by_line will call func for
# each line. Otherwise it is called once with the filename.
def process_output(func=nil,type=:by_line, blocking=false)
return if single_threaded
output = lambda { |fn|
if type == :by_line
File.new(fn).each_line { |buf|
print buf
}
else
func.call(fn)
end
}
if @output_locked
# ---- is the other thread still running? We wait until it
# is finished to start the next one
(pid,count,fn) = @output_locked
$stderr.print "Checking for output_lock on existing #{fn}\n" if not @quiet
return if File.exist?(fn) # continue because thread still processing
# Now we should remove the .keep file
cleanup_keep_file(fn)
@last_output += 1 # get next one in line
@output_locked = false
end
# ---- process the next output chunk. After completion it
# gets renamed to chunk.keep. This to avoid missing
# output (if we unlink the file prematurely)
if info = @pid_list[@last_output]
(pid,count,fn) = info
$stderr.print "Testing (#{@last_output}) for output file ",[info],"\n" if @debug
if File.exist?(fn)
# Yes! We have the next output, create outputter
@output_locked = info
$stderr.print "Set lock on ",[info],"\n" if not @quiet
if not blocking
$stderr.print "Processing output file #{fn} (non-blocking)\n" if not @quiet
pid = fork do
output.call(fn)
# after finishing output move it to .keep
FileUtils::mv(fn,fn+'.keep')
exit(0)
end
Process.detach(pid)
else
$stderr.print "Processing output file #{fn} (blocking)\n" if not @quiet
output.call(fn)
FileUtils::mv(fn,fn+'.keep')
end
else
sleep 0.2
end
end
end
# Wait for a worker slot to appear. When working the pid is writing
# a file with extension .part(ial). After completion the file is
# renamed without .part and a slot is free.
def wait_for_worker(info)
(pid,count,fn) = info
if pid_or_file_running?(pid,fn)
$stderr.print "Waiting up to #{@timeout} seconds for pid=#{pid} to complete #{fn}\n" if not @quiet
begin
Timeout.timeout(@timeout) do
while not File.exist?(fn) # wait for the result to appear
sleep 0.2
return if not pid_or_file_running?(pid,fn) # worker is gone
end
end
# Partial file should have been renamed:
raise "FATAL: child process #{pid} appears to have crashed #{fn}" if not File.exist?(fn)
$stderr.print "OK pid=#{pid}, processing starts of #{fn}\n" if not @quiet
rescue Timeout::Error
# Kill it to speed up exit
Process.kill 9, pid
Process.wait pid
$stderr.print "FATAL: child process killed because it stopped responding, pid = #{pid}, fn = #{fn}, count = #{count}\n"
$stderr.print "Bailing out"
raise
end
end
end
# This is the final cleanup after the reader thread is done. All workers
# need to complete.
def wait_for_workers()
return if single_threaded
@pid_list.each do |info|
wait_for_worker(info)
end
end
def process_remaining_output()
return if single_threaded
$stderr.print "Processing remaining output...\n" if not @quiet
while @output_locked
sleep 0.2
process_output() # keep trying
end
@pid_list.each do |info|
(pid,count,fn) = info
while pid_or_file_running?(pid,fn) or File.exist?(fn)
$stderr.print "Trying: ",[info],"\n" if not @quiet
process_output(nil,:by_line,true)
sleep 0.2
end
end
while @output_locked
sleep 0.1
process_output(nil,:by_line,true)
end
cleanup_tmpdir()
end
def cleanup()
@pid_list.each do |info|
(pid,count,fn) = info
if pid_running?(pid)
$stderr.print "Killing child ",[info],"\n"
begin
Process.kill 9, pid
Process.wait pid
rescue Errno::ENOENT
$stdout.puts "INFO: #{pidfile} did not exist: Errno::ENOENT" if not @quiet
rescue Errno::ESRCH
$stdout.puts "INFO: The process #{opid} did not exist: Errno::ESRCH" if not @quiet
end
end
File.unlink(fn) if File.exist?(fn)
cleanup_keep_file(fn,wait: false)
tempfn = fn+'.'+RUNNINGEXT
File.unlink(tempfn) if File.exist?(tempfn)
end
cleanup_tmpdir()
end
private
def mktmpfilename(num,ext=nil)
@tmpdir+sprintf("/%0.6d-",num)+@name+(ext ? '.'+ext : '')
end
def pid_or_file_running?(pid,fn)
(pid && pid_running?(pid)) or File.exist?(fn+'.'+RUNNINGEXT)
end
def pid_running?(pid)
begin
fpid,status=Process.waitpid2(pid,Process::WNOHANG)
rescue Errno::ECHILD, Errno::ESRCH
return false
end
return true if nil == fpid && nil == status
return ! (status.exited? || status.signaled?)
end
def single_threaded
@num_threads == 1
end
def multi_threaded
@num_threads > 1
end
def cpu_count
begin
return File.read('/proc/cpuinfo').scan(/^processor\s*:/).size if File.exist? '/proc/cpuinfo'
# Actually, the JVM does not allow fork...
return Java::Java.lang.Runtime.getRuntime.availableProcessors if defined? Java::Java
rescue LoadError
# Count on MAC
return Integer `sysctl -n hw.ncpu 2>/dev/null`
end
$stderr.print "Could not determine number of CPUs" if not @quiet
1
end
def cleanup_keep_file(fn, opts = { wait: true })
if not @debug
keep = fn+'.keep'
return if not opts[:wait] and !File.exist?(keep)
$stderr.print "Trying to remove #{keep}\n" if not @quiet
while true
if File.exist?(keep)
$stderr.print "Removing #{keep}\n" if not @quiet
File.unlink(keep)
break # forever loop
end
sleep 0.1
end #forever
end
end
def cleanup_tmpdir
if not @debug
$stderr.print "Removing dir #{@tmpdir}\n" if not @quiet
Dir.unlink(@tmpdir) if @tmpdir
end
end
end