/
topology.py
233 lines (192 loc) · 9.43 KB
/
topology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ======================================
"""Defines the `Topology` class, that describes a TPU fabric topology."""
import numpy as np
from tensorflow.core.protobuf.tpu import topology_pb2
from tensorflow.python.util.tf_export import tf_export
def _tpu_device_name(job, task, device):
"""Returns the device name for the TPU `device` on `task` of `job`."""
if job is None:
return "/task:%d/device:TPU:%d" % (task, device)
else:
return "/job:%s/task:%d/device:TPU:%d" % (job, task, device)
def _tpu_host_device_name(job, task):
"""Returns the device name for the CPU device on `task` of `job`."""
if job is None:
return "/task:%d/device:CPU:0" % task
else:
return "/job:%s/task:%d/device:CPU:0" % (job, task)
@tf_export("tpu.experimental.Topology")
class Topology(object):
"""Describes a set of TPU devices.
Represents both the shape of the physical mesh, and the mapping between
TensorFlow TPU devices to physical mesh coordinates.
"""
def __init__(self, serialized=None, mesh_shape=None, device_coordinates=None):
"""Builds a Topology object.
If `serialized` is not `None`, the topology is parsed from `serialized` and
the other arguments are ignored. Otherwise, the topology is computed from
`mesh_shape` and `device_coordinates`.
Args:
serialized: A serialized `TopologyProto`, or `None`. If not `None`, the
serialized proto is parsed to discover the topology.
mesh_shape: A sequence of 4 positive integers, or `None`. If not `None`,
the shape of the TPU topology, in number of cores. Ignored if
`serialized` is not `None`.
device_coordinates: A rank 4 numpy array that describes the mapping from
TensorFlow TPU devices to TPU fabric coordinates, or `None`. Ignored
if `serialized is not `None`.
Raises:
ValueError: If `serialized` does not describe a well-formed topology.
ValueError: If `serialized` is `None` and `mesh_shape` is not a sequence
of 4 positive integers.
ValueError: If `serialized` is `None` and `device_coordinates` is not a
rank 4 numpy int32 array that describes a valid coordinate mapping.
"""
self._serialized = serialized
if serialized:
self._parse_topology(serialized)
else:
self._mesh_shape = np.asarray(mesh_shape, dtype=np.int32)
self._device_coordinates = np.asarray(device_coordinates, np.int32)
if len(self._mesh_shape) != 4 or any(self._mesh_shape < 1):
raise ValueError("`mesh_shape` must be a sequence of 4 positive "
f"entries; got `mesh_shape={self._mesh_shape}`")
if (len(self._device_coordinates.shape) != 3 or
self._device_coordinates.shape[2] != len(self._mesh_shape)):
raise ValueError(
"`device_coordinates` must be a rank 3 int32 array "
"with minor dimension equal to the `mesh_shape` rank"
"got device_coordinates={} len(device_coordinates)={} device_coordinates.shape[2]={} mesh_shape={}, len(mesh_shape)={}"
.format(self._device_coordinates.shape,
len(self._device_coordinates.shape),
self._device_coordinates.shape[2], self._mesh_shape,
len(self._mesh_shape)))
self._topology_tasks, self._topology_devices = self._invert_topology()
# Coordinates of devices that are missing
self._missing_devices = np.argwhere(self._topology_tasks < 0)
def _parse_topology(self, serialized):
"""Parses a serialized `TopologyProto` into `self`."""
proto = topology_pb2.TopologyProto()
proto.ParseFromString(serialized)
self._mesh_shape = np.array(proto.mesh_shape, dtype=np.int32)
if len(self._mesh_shape) != 4 or any(self._mesh_shape < 1):
raise ValueError("`mesh_shape` must be a vector of size 4 with positive "
"entries; got {}".format(self._mesh_shape))
if proto.num_tasks < 0:
raise ValueError("`num_tasks` must be >= 0; got {}".format(
proto.num_tasks))
if proto.num_tpu_devices_per_task < 0:
raise ValueError("`num_tpu_devices_per_task` must be >= 0; got {}".format(
proto.num_tpu_devices_per_task))
expected_coordinates_size = (
proto.num_tasks * proto.num_tpu_devices_per_task * len(
proto.mesh_shape))
if len(proto.device_coordinates) != expected_coordinates_size:
raise ValueError("`device_coordinates` must have shape num_tasks ({}) * "
"num_tpu_devices_per_task ({}) * len(mesh_shape) ({}); "
"got shape {}".format(proto.num_tasks,
proto.num_tpu_devices_per_task,
proto.mesh_shape,
len(proto.device_coordinates)))
coords = np.array(proto.device_coordinates, dtype=np.int32)
if any(coords < 0):
raise ValueError(
"All values in `device_coordinates` must be >= 0, got {}"
.format(coords))
coords = coords.reshape((proto.num_tasks, proto.num_tpu_devices_per_task,
len(proto.mesh_shape)))
self._device_coordinates = coords
def _invert_topology(self):
"""Inverts a [task,device,axis] topology to [x,y,z] -> task/device maps."""
tasks = np.full(list(self.mesh_shape), -1, dtype=np.int32)
devices = np.full(list(self.mesh_shape), -1, dtype=np.int32)
for task in range(self.device_coordinates.shape[0]):
for device in range(self.device_coordinates.shape[1]):
x, y, z, core = self.device_coordinates[task, device, :]
tasks[x, y, z, core] = task
devices[x, y, z, core] = device
return tasks, devices
@property
def mesh_shape(self):
"""A rank 1 int32 array describing the shape of the TPU topology."""
return self._mesh_shape
@property
def mesh_rank(self):
"""Returns the number of dimensions in the mesh."""
return len(self._mesh_shape)
@property
def device_coordinates(self):
"""Describes the mapping from TPU devices to topology coordinates.
Returns:
A rank 3 int32 array with shape `[tasks, devices, axis]`.
`tasks` is the number of tasks in the TPU cluster, `devices` is the number
of TPU devices per task, and `axis` is the number of axes in the TPU
cluster topology. Each entry gives the `axis`-th coordinate in the
topology of a task/device pair. TPU topologies are 4-dimensional, with
dimensions `(x, y, z, core number)`.
"""
return self._device_coordinates
@property
def missing_devices(self):
"""Array of indices of missing devices."""
return self._missing_devices
def task_ordinal_at_coordinates(self, device_coordinates):
"""Returns the TensorFlow task number attached to `device_coordinates`.
Args:
device_coordinates: An integer sequence describing a device's physical
coordinates in the TPU fabric.
Returns:
Returns the TensorFlow task number that contains the TPU device with those
physical coordinates.
"""
return self._topology_tasks[tuple(device_coordinates)]
def tpu_device_ordinal_at_coordinates(self, device_coordinates):
"""Returns the TensorFlow device number at `device_coordinates`.
Args:
device_coordinates: An integer sequence describing a device's physical
coordinates in the TPU fabric.
Returns:
Returns the TensorFlow device number within the task corresponding to
attached to the device with those physical coordinates.
"""
return self._topology_devices[tuple(device_coordinates)]
def cpu_device_name_at_coordinates(self, device_coordinates, job=None):
"""Returns the CPU device attached to a logical core."""
return _tpu_host_device_name(
job, self._topology_tasks[tuple(device_coordinates)])
def tpu_device_name_at_coordinates(self, device_coordinates, job=None):
"""Returns the name of the TPU device assigned to a logical core."""
return _tpu_device_name(job,
self._topology_tasks[tuple(device_coordinates)],
self._topology_devices[tuple(device_coordinates)])
@property
def num_tasks(self):
"""Returns the number of TensorFlow tasks in the TPU slice."""
return self._device_coordinates.shape[0]
@property
def num_tpus_per_task(self):
"""Returns the number of TPU devices per task in the TPU slice."""
return self._device_coordinates.shape[1]
def serialized(self):
"""Returns the serialized form of the topology."""
if self._serialized is None:
proto = topology_pb2.TopologyProto()
proto.mesh_shape[:] = list(self._mesh_shape)
proto.num_tasks = self._device_coordinates.shape[0]
proto.num_tpu_devices_per_task = self._device_coordinates.shape[1]
proto.device_coordinates.extend(list(self._device_coordinates.flatten()))
self._serialized = proto.SerializeToString()
return self._serialized