-
Notifications
You must be signed in to change notification settings - Fork 6
/
batch_job_collect.m
101 lines (94 loc) · 3.3 KB
/
batch_job_collect.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
%BATCH_JOB_COLLECT Output data computed by a batch job
%
% output = batch_job_collect(h)
%
% This function outputs the data computed by batch jobs which have been
% started using:
%
% h = batch_job_submit(job_dir, func, input, timeout, global_data);
%
% and run on workers running the function:
%
% batch_job_worker(job_dir);
%
%IN:
% h - batch job structure created by batch_job_submit.
%
%OUT:
% output - collected output from all the workers, as if the work had been
% done in a single instance of MATLAB.
%
% See also BATCH_JOB_SUBMIT, BATCH_JOB_WORKER, PARFOR
function output = batch_job_collect(s, varargin)
% Check if we need to finish up the computation
if exist(s.params_file, 'file')
% Do the job initializations
[s, mi, func] = setup_job(s.params_file);
% Check which chunks are finished
chunks_unfinished = true(ceil(s.N / s.chunk_size), 1);
if s.timeout == 0
% Go over all possible chunks in order, starting at the input index
for a = 1:numel(chunks_unfinished)
% Do the chunk
chunks_unfinished(a) = ~do_chunk(func, mi, s, a);
end
end
% Wait for all the chunks to finish
while any(chunks_unfinished)
% Wait a bit - just to let all the locks be freed
pause(0.05);
% Check for the kill signal (user deleted!)
if kill_signal(s)
break;
end
for a = find(chunks_unfinished)'
% Get the chunk filename
fname = chunk_name(s.work_dir, a);
% Check that the file exists and the lock doesn't
switch (exist(fname, 'file') ~= 0) * 2 + (exist([fname '.lock'], 'file') ~= 0)
case 0
if s.timeout ~= 0
continue;
end
% Neither exist (something went wrong!)
do_chunk(func, mi, s, a); % Do the chunk now if we can
chunks_unfinished(a) = false; % Mark as done regardless
% Check for the kill signal
if kill_signal(s)
break;
end
case 1
% Lock file exists - see if we can grab it
lock = get_file_lock(fname, true);
% Now delete it
clear lock;
case 2
% The mat file exists and the lock doesn't - great
chunks_unfinished(a) = false; % Mark as read
otherwise
% Computation in progress. Go on to the next chunk
continue;
end
end
end
clear mi lock func
end
% Tidy up files
d = dir([s.work_dir '*.lock']);
quiet_delete(s.params_file, s.cmd_file, s.input_mmap.name, d(:).name);
% Read in all the outputs
output = cell(s.N, 1);
for a = 1:ceil(s.N / s.chunk_size)
% Get the chunk filename
fname = chunk_name(s.work_dir, a);
% Check that the file exists
if exist(fname, 'file')
% Set the chunk indices
ind = get_chunk_indices(a, s);
% Read in the data
output(ind) = getfield(load(fname), 'output');
end
end
% Reshape the output
output = matrify(output);
end