/
cluster_resolver.py
624 lines (497 loc) · 23.3 KB
/
cluster_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Cluster Resolvers are used for dynamic cluster IP/hostname resolution."""
import abc
import collections
import six
from tensorflow.python.client import session
from tensorflow.python.eager import context
from tensorflow.python.framework import config
from tensorflow.python.framework import ops
from tensorflow.python.training.server_lib import ClusterSpec
from tensorflow.python.util.tf_export import tf_export
def format_master_url(master, rpc_layer=None):
if rpc_layer:
return '%s://%s' % (rpc_layer, master)
else:
return master
def get_accelerator_devices(master, config_proto):
"""Returns accelerator devices given a master and a configuration."""
if context.executing_eagerly():
logical_devices = config.list_logical_devices()
devices = []
for d in logical_devices:
if d.device_type == 'CPU' or d.device_type == 'XLA_CPU': # Filter CPUs
continue
devices.append(session._DeviceAttributes(d.name, d.device_type, 0, 0)) # pylint: disable=protected-access
return devices
else:
with ops.Graph().as_default():
with session.Session(master, config=config_proto) as s:
devices = s.list_devices()
return devices
@tf_export('distribute.cluster_resolver.ClusterResolver')
@six.add_metaclass(abc.ABCMeta)
class ClusterResolver(object):
"""Abstract class for all implementations of ClusterResolvers.
This defines the skeleton for all implementations of ClusterResolvers.
ClusterResolvers are a way for TensorFlow to communicate with various cluster
management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary
information to set up distributed training.
By letting TensorFlow communicate with these systems, we will be able to
automatically discover and resolve IP addresses for various TensorFlow
workers. This will eventually allow us to automatically recover from
underlying machine failures and scale TensorFlow worker clusters up and down.
Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver`
subclass: In addition to these abstract methods, when task_type, task_id, and
rpc_layer attributes are applicable, you should also implement them either as
properties with getters or setters, or directly set the attributes
`self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class'
getters and setters are used. See
`tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an
example.
In general, multi-client tf.distribute strategies such as
`tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and
task_id properties to be available in the `ClusterResolver` they are using. On
the other hand, these concepts are not applicable in single-client strategies,
such as `tf.distribute.experimental.TPUStrategy`, because the program is only
expected to be run on one task, so there should not be a need to have code
branches according to task type and task id.
- task_type is the name of the server's current named job (e.g. 'worker',
'ps' in a distributed parameterized training job).
- task_id is the ordinal index of the server within the task type.
- rpc_layer is the protocol used by TensorFlow to communicate with other
TensorFlow servers in a distributed environment.
"""
@abc.abstractmethod
def cluster_spec(self):
"""Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`.
Returns:
A `tf.train.ClusterSpec` representing the state of the cluster at the
moment this function is called.
Implementors of this function must take care in ensuring that the
ClusterSpec returned is up-to-date at the time of calling this function.
This usually means retrieving the information from the underlying cluster
management system every time this function is invoked and reconstructing
a cluster_spec, rather than attempting to cache anything.
"""
raise NotImplementedError()
@abc.abstractmethod
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Retrieves the name or URL of the session master.
Note: this is only useful for TensorFlow 1.x.
Args:
task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master.
rpc_layer: (Optional) The RPC protocol for the given cluster.
Returns:
The name or URL of the session master.
Implementors of this function must take care in ensuring that the master
returned is up-to-date at the time to calling this function. This usually
means retrieving the master every time this function is invoked.
"""
raise NotImplementedError()
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
"""Returns the number of accelerator cores per worker.
This returns the number of accelerator cores (such as GPUs and TPUs)
available per worker.
Optionally, we allow callers to specify the task_type, and task_id, for
if they want to target a specific TensorFlow task to query
the number of accelerators. This is to support heterogenous environments,
where the number of accelerators cores per host is different.
Args:
task_type: (Optional) The type of the TensorFlow task of the machine we
want to query.
task_id: (Optional) The index of the TensorFlow task of the machine we
want to query.
config_proto: (Optional) Configuration for starting a new session to
query how many accelerator cores it has.
Returns:
A map of accelerator types to number of cores.
"""
master = self.master(task_type, task_id)
# TODO(b/126786766): in eager mode, we should check whether
# `tf.config.experimental_connect_to_cluster` is called or not.
devices = get_accelerator_devices(master, config_proto)
mapping = collections.defaultdict(int)
for device in devices:
if task_type is not None and task_id is not None:
job_path = '/job:%s' % task_type
task_path = '/task:%s' % task_id
if job_path not in device.name or task_path not in device.name:
continue
mapping[device.device_type] += 1
return mapping
@property
def environment(self):
"""Returns the current environment which TensorFlow is running in.
There are two possible return values, "google" (when TensorFlow is running
in a Google-internal environment) or an empty string (when TensorFlow is
running elsewhere).
If you are implementing a ClusterResolver that works in both the Google
environment and the open-source world (for instance, a TPU ClusterResolver
or similar), you will have to return the appropriate string depending on the
environment, which you will have to detect.
Otherwise, if you are implementing a ClusterResolver that will only work
in open-source TensorFlow, you do not need to implement this property.
"""
return ''
@property
def task_type(self):
"""Returns the task type this `ClusterResolver` indicates.
In TensorFlow distributed environment, each job may have an applicable
task type. Valid task types in TensorFlow include
'chief': a worker that is designated with more responsibility,
'worker': a regular worker for training/evaluation,
'ps': a parameter server, or
'evaluator': an evaluator that evaluates the checkpoints for metrics.
See [Multi-worker configuration](
https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration)
for more information about 'chief' and 'worker' task type, which are most
commonly used.
Having access to such information is useful when user needs to run specific
code according to task types. For example,
```python
cluster_spec = tf.train.ClusterSpec({
"ps": ["localhost:2222", "localhost:2223"],
"worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
})
# SimpleClusterResolver is used here for illustration; other cluster
# resolvers may be used for other source of task type/id.
simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
task_id=1)
...
if cluster_resolver.task_type == 'worker':
# Perform something that's only applicable on workers. This block
# will run on this particular instance since we've specified this task to
# be a worker in above cluster resolver.
elif cluster_resolver.task_type == 'ps':
# Perform something that's only applicable on parameter servers. This
# block will not run on this particular instance.
```
Returns `None` if such information is not available or is not applicable
in the current distributed environment, such as training with
`tf.distribute.experimental.TPUStrategy`.
For more information, please see
`tf.distribute.cluster_resolver.ClusterResolver`'s class doc.
"""
return getattr(self, '_task_type', None)
@property
def task_id(self):
"""Returns the task id this `ClusterResolver` indicates.
In TensorFlow distributed environment, each job may have an applicable
task id, which is the index of the instance within its task type. This is
useful when user needs to run specific code according to task index. For
example,
```python
cluster_spec = tf.train.ClusterSpec({
"ps": ["localhost:2222", "localhost:2223"],
"worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
})
# SimpleClusterResolver is used here for illustration; other cluster
# resolvers may be used for other source of task type/id.
simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
task_id=0)
...
if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0:
# Perform something that's only applicable on 'worker' type, id 0. This
# block will run on this particular instance since we've specified this
# task to be a 'worker', id 0 in above cluster resolver.
else:
# Perform something that's only applicable on other ids. This block will
# not run on this particular instance.
```
Returns `None` if such information is not available or is not applicable
in the current distributed environment, such as training with
`tf.distribute.cluster_resolver.TPUClusterResolver`.
For more information, please see
`tf.distribute.cluster_resolver.ClusterResolver`'s class docstring.
"""
return getattr(self, '_task_id', None)
@task_type.setter
def task_type(self, task_type):
"""Setter of `task_type` property. See `task_type` property doc."""
self._task_type = task_type
@task_id.setter
def task_id(self, task_id):
"""Setter of `task_id` property. See `task_type` property doc."""
self._task_id = task_id
@tf_export('distribute.cluster_resolver.SimpleClusterResolver')
class SimpleClusterResolver(ClusterResolver):
"""Simple implementation of ClusterResolver that accepts all attributes.
Please see the base class for documentation of arguments of its constructor.
It is useful if you want to specify some or all attributes.
Usage example with `tf.distribute.Strategy`:
```Python
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
"worker1.example.com:2222"]})
# On worker 0
cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
task_id=0,
num_accelerators={"GPU": 8},
rpc_layer="grpc")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
# On worker 1
cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
task_id=1,
num_accelerators={"GPU": 8},
rpc_layer="grpc")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
```
"""
def __init__(self, cluster_spec, master='', task_type=None, task_id=None,
environment='', num_accelerators=None,
rpc_layer=None):
"""Creates a SimpleClusterResolver from a ClusterSpec."""
super(SimpleClusterResolver, self).__init__()
self._task_type = task_type
self._task_id = task_id
self._environment = environment
self._num_accelerators = num_accelerators
self._rpc_layer = rpc_layer
if not isinstance(cluster_spec, ClusterSpec):
raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.')
self._cluster_spec = cluster_spec
if not isinstance(master, str):
raise TypeError('master must be a string.')
self._master = master
def cluster_spec(self):
"""Returns the ClusterSpec passed into the constructor."""
return self._cluster_spec
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master address to use when creating a session.
Note: this is only useful for TensorFlow 1.x.
Args:
task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master.
rpc_layer: (Optional) The RPC used by distributed TensorFlow.
Returns:
The name or URL of the session master.
If a task_type and task_id is given, this will override the `master`
string passed into the initialization function.
"""
if task_type is not None and task_id is not None:
master = self.cluster_spec().task_address(task_type, task_id)
else:
master = self._master
return format_master_url(master, rpc_layer=rpc_layer or self._rpc_layer)
@property
def task_type(self):
return self._task_type
@property
def task_id(self):
return self._task_id
@task_type.setter
def task_type(self, task_type):
self._task_type = task_type
@task_id.setter
def task_id(self, task_id):
self._task_id = task_id
@property
def environment(self):
return self._environment
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
"""Returns the number of accelerator cores per worker.
The SimpleClusterResolver does not do automatic detection of accelerators,
and thus all arguments are unused and we simply return the value provided
in the constructor.
Args:
task_type: Unused.
task_id: Unused.
config_proto: Unused.
"""
# Unused
del task_type, task_id, config_proto
if self._num_accelerators is None:
return {}
return self._num_accelerators
@property
def rpc_layer(self):
return self._rpc_layer
@rpc_layer.setter
def rpc_layer(self, rpc_layer):
self._rpc_layer = rpc_layer
@tf_export('distribute.cluster_resolver.UnionResolver')
class UnionClusterResolver(ClusterResolver):
"""Performs a union on underlying ClusterResolvers.
This class performs a union given two or more existing ClusterResolvers. It
merges the underlying ClusterResolvers, and returns one unified ClusterSpec
when cluster_spec is called. The details of the merge function is
documented in the cluster_spec function.
For additional ClusterResolver properties such as task type, task index,
rpc layer, environment, etc..., we will return the value from the first
ClusterResolver in the union.
An example to combine two cluster resolvers:
```Python
cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
"worker1.example.com:2222"]})
cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker",
task_id=0,
rpc_layer="grpc")
cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222",
"ps1.example.com:2222"]})
cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps",
task_id=0,
rpc_layer="grpc")
# Its task type would be "worker".
cluster_resolver = UnionClusterResolver(cluster_resolver_0,
cluster_resolver_1)
```
An example to override the number of GPUs in a TFConfigClusterResolver
instance:
```Python
tf_config = TFConfigClusterResolver()
gpu_override = SimpleClusterResolver(tf_config.cluster_spec(),
num_accelerators={"GPU": 1})
cluster_resolver = UnionResolver(gpu_override, tf_config)
```
"""
def __init__(self, *args, **kwargs):
"""Initializes a UnionClusterResolver with other ClusterResolvers.
Args:
*args: `ClusterResolver` objects to be unionized.
**kwargs:
rpc_layer - (Optional) Override value for the RPC layer used by
TensorFlow.
task_type - (Optional) Override value for the current task type.
task_id - (Optional) Override value for the current task index.
Raises:
TypeError: If any argument is not a subclass of `ClusterResolvers`.
ValueError: If there are no arguments passed.
"""
super(UnionClusterResolver, self).__init__()
self._rpc_layer = kwargs.pop('rpc_layer', None)
self._task_type = kwargs.pop('task_type', None)
self._task_id = kwargs.pop('task_id', None)
if kwargs:
raise ValueError('Unexpected kwargs provided {!r}'.format(kwargs))
if not args:
raise ValueError('At least one ClusterResolver is required.')
for cluster_resolver in args:
if not isinstance(cluster_resolver, ClusterResolver):
raise TypeError('All arguments must be a sub-class of '
'`ClusterResolver.`')
self._cluster_resolvers = args
def cluster_spec(self):
"""Returns a union of all the ClusterSpecs from the ClusterResolvers.
Returns:
A ClusterSpec containing host information merged from all the underlying
ClusterResolvers.
Raises:
KeyError: If there are conflicting keys detected when merging two or
more dictionaries, this exception is raised.
Note: If there are multiple ClusterResolvers exposing ClusterSpecs with the
same job name, we will merge the list/dict of workers.
If *all* underlying ClusterSpecs expose the set of workers as lists, we will
concatenate the lists of workers, starting with the list of workers from
the first ClusterResolver passed into the constructor.
If *any* of the ClusterSpecs expose the set of workers as a dict, we will
treat all the sets of workers as dicts (even if they are returned as lists)
and will only merge them into a dict if there is no conflicting keys. If
there is a conflicting key, we will raise a `KeyError`.
"""
merged_cluster = {}
# We figure out whether it is all lists for a particular job, or whether
# there are dicts inside.
for cluster_resolver in self._cluster_resolvers:
cluster_spec = cluster_resolver.cluster_spec()
cluster_dict = cluster_spec.as_dict()
for job_name, tasks in cluster_dict.items():
if job_name in merged_cluster:
# If we see a dict, then we write a dict out regardless.
if isinstance(tasks, dict):
merged_cluster[job_name] = {}
else:
# We take whichever type is present.
if isinstance(tasks, list):
merged_cluster[job_name] = []
else:
merged_cluster[job_name] = {}
# We then do the merge as appropriate in merged_cluster[job].
for cluster_resolver in self._cluster_resolvers:
cluster_spec = cluster_resolver.cluster_spec()
cluster_dict = cluster_spec.as_dict()
for job_name, tasks in cluster_dict.items():
if isinstance(merged_cluster[job_name], list):
# We all have lists, we can just concatenate and be done.
merged_cluster[job_name].extend(tasks)
else:
if isinstance(tasks, list):
# We convert to a dictionary if the type is a list.
task_dict = dict(zip(range(0, len(tasks)), tasks))
else:
# We can simply make a copy (for update) and be done.
task_dict = tasks.copy()
# We detect if there are duplicates, and raise an error if so.
task_keys = set(task_dict)
merged_keys = set(merged_cluster[job_name].keys())
intersected_keys = task_keys.intersection(merged_keys)
if intersected_keys:
raise KeyError('Duplicate keys detected when merging two '
'ClusterSpecs: %s' % repr(intersected_keys))
# We do the merge after all the processing.
merged_cluster[job_name].update(task_dict)
return ClusterSpec(merged_cluster)
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master address to use when creating a session.
This usually returns the master from the first ClusterResolver passed in,
but you can override this by specifying the task_type and task_id.
Note: this is only useful for TensorFlow 1.x.
Args:
task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master.
rpc_layer: (Optional) The RPC protocol for the given cluster.
Returns:
The name or URL of the session master.
"""
if task_type is not None and task_id is not None:
master = self.cluster_spec().task_address(task_type, task_id)
return format_master_url(master, rpc_layer or self._rpc_layer)
return self._cluster_resolvers[0].master(rpc_layer=rpc_layer)
@property
def task_type(self):
return self._task_type or self._cluster_resolvers[0].task_type
@property
def task_id(self):
return self._task_id or self._cluster_resolvers[0].task_id
@task_type.setter
def task_type(self, task_type):
self._task_type = task_type
@task_id.setter
def task_id(self, task_id):
self._task_id = task_id
@property
def environment(self):
return self._cluster_resolvers[0].environment
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
return self._cluster_resolvers[0].num_accelerators(
task_type, task_id, config_proto)
@property
def rpc_layer(self):
return self._rpc_layer or self._cluster_resolvers[0].rpc_layer
@rpc_layer.setter
def rpc_layer(self, rpc_layer):
self._rpc_layer = rpc_layer