/
slurm_cluster_resolver.py
397 lines (324 loc) · 14.1 KB
/
slurm_cluster_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# Copyright 2018-2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Implementation of Cluster Resolvers for Slurm workload manager."""
import os
import re
import subprocess
from tensorflow.python.distribute.cluster_resolver.cluster_resolver import ClusterResolver
from tensorflow.python.distribute.cluster_resolver.cluster_resolver import format_master_url
from tensorflow.python.training.server_lib import ClusterSpec
from tensorflow.python.util.tf_export import tf_export
def expand_hostlist(hostlist):
"""Create a list of hosts out of a SLURM hostlist.
The order of nodes is preserved and no deduplication is done
Input: 'n[1-2],m5,o[3-4,6,7-9]')
Output: ['n1', 'n2', 'm5', 'o3', 'o4', 'o6', 'o7', 'o8', 'o9']
"""
def split_hostlist(hostlist):
"""Split hostlist at commas outside of range expressions ('[3-5]')."""
in_brackets = False
cur_host = ''
for c in hostlist:
if in_brackets:
assert c != '['
if c == ']':
in_brackets = False
elif c == '[':
in_brackets = True
elif c == ',':
assert cur_host != ''
yield cur_host
cur_host = ''
continue
cur_host += c
if cur_host:
yield cur_host
def expand_range_expression(range_exp):
"""Expand a range expression like '3-5' to values 3,4,5."""
for part in range_exp.split(','):
sub_range = part.split('-')
if len(sub_range) == 1:
sub_range = sub_range * 2
else:
assert len(sub_range) == 2
num_digits = len(sub_range[0])
for i in range(int(sub_range[0]), int(sub_range[1]) + 1):
yield str(i).zfill(num_digits)
hosts = []
try:
for part in split_hostlist(hostlist):
# Match prefix (anything but a range expression) and range expression
# Both are optional
m = re.match(r'([^,[\]]*)(\[([^\]]+)\])?$', part)
if m is None:
raise ValueError('Invalid part: %s' % part)
prefix = m.group(1) or ''
if m.group(3) is None:
hosts.append(prefix)
else:
hosts.extend(prefix + i for i in expand_range_expression(m.group(3)))
except Exception as e:
raise ValueError('Invalid hostlist format "%s": %s' % (hostlist, e))
return hosts
def expand_tasks_per_node(tasks_per_node):
"""Expands the tasks per node expression from SLURM.
The order is preserved so it can be matched to the hostlist
Input: '3(x2),2,1'
Output: [3, 3, 2, 1]
"""
result = []
try:
for part in tasks_per_node.split(','):
m = re.match(r'(\d+)(\(x(\d+)\))?$', part)
assert m is not None
num_tasks = int(m.group(1))
num_repetitions = int(m.group(3) or 1)
result.extend([num_tasks] * num_repetitions)
except Exception as e:
raise ValueError('Invalid tasks-per-node list format "%s": %s' %
(tasks_per_node, e))
return result
def _get_slurm_var(name):
"""Gets the SLURM variable from the environment.
Args:
name: Name of the step variable
Returns:
SLURM_<name> from os.environ
Raises:
RuntimeError if variable is not found
"""
name = 'SLURM_' + name
try:
return os.environ[name]
except KeyError:
raise RuntimeError('%s not found in environment. '
'Not running inside a SLURM step?' % name)
def _get_num_slurm_tasks():
"""Returns the number of SLURM tasks of the current job step.
Returns:
The number of tasks as an int
"""
return int(_get_slurm_var('STEP_NUM_TASKS'))
def _get_num_nvidia_gpus():
"""Gets the number of NVIDIA GPUs by using CUDA_VISIBLE_DEVICES and nvidia-smi.
Returns:
Number of GPUs available on the node
Raises:
RuntimeError if executing nvidia-smi failed
"""
try:
return len(os.environ['CUDA_VISIBLE_DEVICES'].split(','))
except KeyError:
pass # Ignore and fallback to using nvidia-smi
try:
output = subprocess.check_output(['nvidia-smi', '--list-gpus'],
encoding='utf-8')
return sum(l.startswith('GPU ') for l in output.strip().split('\n'))
except subprocess.CalledProcessError as e:
raise RuntimeError('Could not get number of GPUs from nvidia-smi. '
'Maybe it is missing?\nOutput: %s' % e.output)
def get_num_gpus():
"""Returns the number of GPUs visible on the current node.
Currently only implemented for NVIDIA GPUs.
"""
return _get_num_nvidia_gpus()
@tf_export('distribute.cluster_resolver.SlurmClusterResolver')
class SlurmClusterResolver(ClusterResolver):
"""ClusterResolver for system with Slurm workload manager.
This is an implementation of ClusterResolver for Slurm clusters. This allows
the specification of jobs and task counts, number of tasks per node, number
of GPUs on each node and number of GPUs for each task. It retrieves system
attributes by Slurm environment variables, resolves allocated computing node
names, constructs a cluster and returns a ClusterResolver object which can be
used for distributed TensorFlow.
"""
def __init__(self,
jobs=None,
port_base=8888,
gpus_per_node=None,
gpus_per_task=None,
tasks_per_node=None,
auto_set_gpu=True,
rpc_layer='grpc'):
"""Creates a new SlurmClusterResolver object.
For any parameter not set it will query the environment for the value.
It uses those parameters to check which nodes have processes reside on and
resolves their hostnames.
With the number tasks per node it offsets the port number for each process.
With the number of GPUs per node and per task it allocates GPUs to tasks by
setting environment variables.
Using the resolver works best (and is easier) with homogeneous tasks but
heterogeneous tasks (number of tasks varying per node) are also possible as
long as the number of GPUs per task stays constant.
Used environment variables:
- SLURM_PROCID
- (opt) SLURM_STEP_NUM_TASKS
- (opt) SLURM_STEP_NODELIST
- (opt) SLURM_STEP_TASKS_PER_NODE
Args:
jobs: Dictionary with job names as key and number of tasks in the job as
value. Defaults to as many 'worker's as there are (Slurm) tasks.
port_base: The first port number to start with for processes on a node.
gpus_per_node: Number of GPUs available on each node. Defaults to the
number of GPUs reported by nvidia-smi
gpus_per_task: Number of GPUs to be used for each task. Default is to
evenly distribute the gpus_per_node to tasks_per_node.
tasks_per_node: Number of tasks running on each node. Can be an integer if
the number of tasks per node is constant or a dictionary mapping
hostnames to number of tasks on that node. If not set the Slurm
environment is queried for the correct mapping.
auto_set_gpu: Set the visible CUDA devices automatically while resolving
the cluster by setting CUDA_VISIBLE_DEVICES environment variable.
Defaults to True.
rpc_layer: The protocol TensorFlow used to communicate between nodes.
Defaults to 'grpc'.
Returns:
A ClusterResolver object which can be used with distributed TensorFlow.
Raises:
RuntimeError: If requested more GPUs per node than available or
requested more tasks than assigned tasks or
resolving missing values from the environment failed.
"""
self._rank = self._resolve_own_rank()
if jobs is None:
jobs = {'worker': self._resolve_num_tasks()}
self._jobs = jobs
self._port_base = port_base
if tasks_per_node is None:
self._task_configuration = self._resolve_task_configuration()
elif isinstance(tasks_per_node, dict):
# User can pass in an explicit configuration as a dict
self._task_configuration = tasks_per_node
else:
# User can pass a fixed number of tasks per node
hostlist = self._resolve_hostlist()
self._task_configuration = {
host: int(tasks_per_node) for host in hostlist
}
max_tasks_per_node = max(self._task_configuration.values())
num_tasks = sum(self._task_configuration.values())
if gpus_per_node is None:
gpus_per_node = get_num_gpus()
if gpus_per_task is None:
gpus_per_task = gpus_per_node // max_tasks_per_node
self._gpus_per_node = gpus_per_node
self._gpus_per_task = gpus_per_task
self._auto_set_gpu = auto_set_gpu
self.task_type = None
self.task_id = None
self.rpc_layer = rpc_layer
self._gpu_allocation = []
self._cluster_allocation = {}
if max_tasks_per_node * self._gpus_per_task > self._gpus_per_node:
raise RuntimeError('Requested more GPUs per node than available.')
if sum(self._jobs.values()) != num_tasks:
raise RuntimeError('Requested {} tasks but only {} were assigned.'.format(
sum(self._jobs.values()), num_tasks))
def _resolve_own_rank(self):
"""Returns the rank of the current task in range [0, num_tasks)."""
return int(_get_slurm_var('PROCID'))
def _resolve_num_tasks(self):
"""Returns the number of tasks for the current job step."""
return _get_num_slurm_tasks()
def _resolve_hostlist(self):
"""Returns a list of hostnames for nodes running the current job step."""
return expand_hostlist(_get_slurm_var('STEP_NODELIST'))
def _resolve_task_configuration(self):
"""Creates a mapping of hostnames to the number of tasks allocated on it.
Reads the SLURM environment to determine the nodes involved in the current
job step and number of tasks running on each node.
Returns a dictionary mapping each hostname to the number of tasks.
"""
hostlist = self._resolve_hostlist()
tasks_per_node = expand_tasks_per_node(
_get_slurm_var('STEP_TASKS_PER_NODE'))
return {
host: num_tasks for (host, num_tasks) in zip(hostlist, tasks_per_node)
}
def cluster_spec(self):
"""Returns a ClusterSpec object based on the latest instance group info.
This returns a ClusterSpec object for use based on information from the
specified initialization parameters and Slurm environment variables. The
cluster specification is resolved each time this function is called. The
resolver extract hostnames of nodes by scontrol and pack tasks in that
order until a node a has number of tasks that is equal to specification.
GPUs on nodes are allocated to tasks by specification through setting
CUDA_VISIBLE_DEVICES environment variable.
Returns:
A ClusterSpec containing host information retrieved from Slurm's
environment variables.
"""
task_list = []
self._gpu_allocation = []
self._cluster_allocation = {}
# Sort to make sure the order is the same for each run
for host, num_tasks in sorted(self._task_configuration.items()):
for port_offset, gpu_offset in zip(
range(num_tasks), range(0, self._gpus_per_node, self._gpus_per_task)):
host_addr = '%s:%d' % (host, self._port_base + port_offset)
task_list.append(host_addr)
gpu_id_list = []
for gpu_id in range(gpu_offset, gpu_offset + self._gpus_per_task):
gpu_id_list.append(str(gpu_id))
self._gpu_allocation.append(','.join(gpu_id_list))
cluster_rank_offset_start = 0
cluster_rank_offset_end = 0
# Sort to make sure the order is the same for each run
for task_type, num_tasks in sorted(self._jobs.items()):
cluster_rank_offset_end = cluster_rank_offset_start + num_tasks
self._cluster_allocation[task_type] = (
task_list[cluster_rank_offset_start:cluster_rank_offset_end])
if cluster_rank_offset_start <= self._rank < cluster_rank_offset_end:
self.task_type = task_type
self.task_id = self._rank - cluster_rank_offset_start
cluster_rank_offset_start = cluster_rank_offset_end
if self._auto_set_gpu:
os.environ['CUDA_VISIBLE_DEVICES'] = self._gpu_allocation[self._rank]
return ClusterSpec(self._cluster_allocation)
def get_task_info(self):
"""Returns job name and task_id for the process which calls this.
This returns the job name and task index for the process which calls this
function according to its rank and cluster specification. The job name and
task index are set after a cluster is constructed by cluster_spec otherwise
defaults to None.
Returns:
A string specifying job name the process belongs to and an integer
specifying the task index the process belongs to in that job.
"""
return self.task_type, self.task_id
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master string for connecting to a TensorFlow master.
Args:
task_type: (Optional) Overrides the default auto-selected task type.
task_id: (Optional) Overrides the default auto-selected task index.
rpc_layer: (Optional) Overrides the default RPC protocol TensorFlow uses
to communicate across nodes.
Returns:
A connection string for connecting to a TensorFlow master.
"""
task_type = task_type if task_type is not None else self.task_type
task_id = task_id if task_id is not None else self.task_id
if task_type is not None and task_id is not None:
return format_master_url(
self.cluster_spec().task_address(task_type, task_id),
rpc_layer or self.rpc_layer)
return ''
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
# Unused, since this is set in __init__ manually.
del task_type, task_id, config_proto
return {'GPU': self._gpus_per_task}