forked from tensorflow/tensorboard
-
Notifications
You must be signed in to change notification settings - Fork 0
/
exporter.py
491 lines (432 loc) · 19 KB
/
exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Downloads experiment data from TensorBoard.dev."""
import base64
import contextlib
import errno
import grpc
import json
import os
import string
import time
import numpy as np
from tensorboard.uploader.proto import blob_pb2
from tensorboard.uploader.proto import experiment_pb2
from tensorboard.uploader.proto import export_service_pb2
from tensorboard.uploader import util
from tensorboard.util import grpc_util
from tensorboard.util import tb_logging
from tensorboard.util import tensor_util
# Characters that are assumed to be safe in filenames. Note that the
# server's experiment IDs are base64 encodings of 16-byte blobs, so they
# can theoretically collide on case-insensitive filesystems. Each
# character has a ~3% chance of colliding, and so two random IDs have
# about a ~10^-33 chance of colliding. As a precaution, we'll still
# detect collision and fail fast rather than overwriting data.
_FILENAME_SAFE_CHARS = frozenset(string.ascii_letters + string.digits + "-_")
# Maximum value of a signed 64-bit integer.
_MAX_INT64 = 2 ** 63 - 1
# Output filename for experiment metadata (creation time, description,
# etc.) within an experiment directory.
_FILENAME_METADATA = "metadata.json"
# Output filename for scalar data within an experiment directory.
_FILENAME_SCALARS = "scalars.json"
# Output filename for tensors dat within an experiment directory.
# This file does not contain the actual values of the tensors. Instead,
# it holds `tensors_file_path`s, which point to numpy .npz files that
# store the actual tensor values. The json file additional additionally
# contains other metadata such as run and tag names and `wall_time`s.
_FILENAME_TENSORS = "tensors.json"
# Output filename for blob sequences data within an experiment directory.
# This file does not contain the actual contents of the blobs. Instead,
# it holds `blob_file_path`s that point to the binary files that contain
# the blobs' contents, in addition to other metadata such as run and tag names.
_FILENAME_BLOB_SEQUENCES = "blob_sequences.json"
_DIRNAME_TENSORS = "tensors"
_DIRNAME_BLOBS = "blobs"
_FILENAME_BLOBS_PREFIX = "blob_"
_FILENAME_BLOBS_SUFFIX = ".bin"
logger = tb_logging.get_logger()
class TensorBoardExporter(object):
"""Exports all of the user's experiment data from TensorBoard.dev.
Data is exported into a directory, with one file per experiment. Each
experiment file is a sequence of time series, represented as a stream
of JSON objects, one per line. Each JSON object includes a run name,
tag name, `tensorboard.compat.proto.summary_pb2.SummaryMetadata` proto
(base64-encoded, standard RFC 4648 alphabet), and set of points.
Points are stored in three equal-length lists of steps, wall times (as
seconds since epoch), and scalar values, for storage efficiency.
Such streams of JSON objects may be conveniently processed with tools
like jq(1).
For example one line of an experiment file might read (when
pretty-printed):
{
"points": {
"steps": [0, 5],
"values": [4.8935227394104, 2.5438034534454346],
"wall_times": [1563406522.669238, 1563406523.0268838]
},
"run": "lr_1E-04,conv=1,fc=2",
"summary_metadata": "CgkKB3NjYWxhcnMSC3hlbnQveGVudF8x",
"tag": "xent/xent_1"
}
This is a time series with two points, both logged on 2019-07-17, one
about 0.36 seconds after the other.
"""
def __init__(self, reader_service_client, output_directory):
"""Constructs a TensorBoardExporter.
Args:
reader_service_client: A TensorBoardExporterService stub instance.
output_directory: Path to a directory into which to write data. The
directory must not exist, to avoid stomping existing or concurrent
output. Its ancestors will be created if needed.
"""
self._api = reader_service_client
self._outdir = output_directory
parent_dir = os.path.dirname(self._outdir)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
try:
os.mkdir(self._outdir)
except OSError as e:
if e.errno == errno.EEXIST:
# Bail to avoid stomping existing output.
raise OutputDirectoryExistsError()
def export(self, read_time=None):
"""Executes the export flow.
Args:
read_time: A fixed timestamp from which to export data, as float seconds
since epoch (like `time.time()`). Optional; defaults to the current
time.
Yields:
After each experiment is successfully downloaded, the ID of that
experiment, as a string.
"""
if read_time is None:
read_time = time.time()
experiment_metadata_mask = experiment_pb2.ExperimentMask(
create_time=True,
update_time=True,
name=True,
description=True,
)
experiments = list_experiments(
self._api, fieldmask=experiment_metadata_mask, read_time=read_time
)
for experiment in experiments:
experiment_id = experiment.experiment_id
experiment_metadata = {
"name": experiment.name,
"description": experiment.description,
"create_time": util.format_time_absolute(
experiment.create_time
),
"update_time": util.format_time_absolute(
experiment.update_time
),
}
experiment_dir = _experiment_directory(self._outdir, experiment_id)
os.mkdir(experiment_dir)
metadata_filepath = os.path.join(experiment_dir, _FILENAME_METADATA)
with open(metadata_filepath, "x") as outfile:
json.dump(experiment_metadata, outfile, sort_keys=True)
outfile.write("\n")
try:
data = self._request_json_data(experiment_id, read_time)
with contextlib.ExitStack() as stack:
file_handles = {
filename: stack.enter_context(
open(os.path.join(experiment_dir, filename), "x")
)
for filename in (
_FILENAME_SCALARS,
_FILENAME_TENSORS,
_FILENAME_BLOB_SEQUENCES,
)
}
os.mkdir(os.path.join(experiment_dir, _DIRNAME_TENSORS))
os.mkdir(os.path.join(experiment_dir, _DIRNAME_BLOBS))
for block, filename in data:
outfile = file_handles[filename]
json.dump(block, outfile, sort_keys=True)
outfile.write("\n")
outfile.flush()
yield experiment_id
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
raise GrpcTimeoutException(experiment_id)
else:
raise
def _request_json_data(self, experiment_id, read_time):
"""Given experiment id, generates JSON data and destination file name.
The JSON data describes the run, tag, metadata, in addition to
- Actual data in the case of scalars
- Pointer to binary files in the case of blob sequences.
For the case of blob sequences, this method has the side effect of
downloading the contents of the blobs and writing them to files in
a subdirectory of the experiment directory.
Args:
experiment_id: The id of the experiment to request data for.
read_time: A fixed timestamp from which to export data, as float
seconds since epoch (like `time.time()`). Optional; defaults to the
current time.
Yields:
(JSON-serializable data, destination file name) tuples.
"""
request = export_service_pb2.StreamExperimentDataRequest()
request.experiment_id = experiment_id
util.set_timestamp(request.read_timestamp, read_time)
# No special error handling as we don't expect any errors from these
# calls: all experiments should exist (read consistency timestamp)
# and be owned by the calling user (only queried for own experiment
# IDs). Any non-transient errors would be internal, and we have no
# way to efficiently resume from transient errors because the server
# does not support pagination.
stream = self._api.StreamExperimentData(
request, metadata=grpc_util.version_metadata()
)
for response in stream:
metadata = base64.b64encode(
response.tag_metadata.SerializeToString()
).decode("ascii")
json_data = {
"run": response.run_name,
"tag": response.tag_name,
"summary_metadata": metadata,
}
filename = None
if response.HasField("points"):
json_data["points"] = self._process_scalar_points(
response.points
)
filename = _FILENAME_SCALARS
elif response.HasField("tensors"):
json_data["points"] = self._process_tensor_points(
response.tensors, experiment_id
)
filename = _FILENAME_TENSORS
elif response.HasField("blob_sequences"):
json_data["points"] = self._process_blob_sequence_points(
response.blob_sequences, experiment_id
)
filename = _FILENAME_BLOB_SEQUENCES
if filename:
yield json_data, filename
def _process_scalar_points(self, points):
"""Process scalar data points.
Args:
points: `export_service_pb2.StreamExperimentDataResponse.ScalarPoints`
proto.
Returns:
A JSON-serializable `dict` for the steps, wall_times and values of the
scalar data points.
"""
wall_times = [t.ToNanoseconds() / 1e9 for t in points.wall_times]
return {
"steps": list(points.steps),
"wall_times": wall_times,
"values": list(points.values),
}
def _process_tensor_points(self, points, experiment_id):
"""Process tensor data points.
Args:
points: `export_service_pb2.StreamExperimentDataResponse.TensorPoints`
proto.
experiment_id: ID of the experiment that the `TensorPoints` is a part
of.
Returns:
A JSON-serializable `dict` for the steps, wall_times and the path to
the .npz files that contain the saved tensor values.
"""
wall_times = [t.ToNanoseconds() / 1e9 for t in points.wall_times]
json_object = {
"steps": list(points.steps),
"wall_times": wall_times,
"tensors_file_path": None,
}
if not json_object["steps"]:
return json_object
experiment_dir = _experiment_directory(self._outdir, experiment_id)
tensors_file_path = self._get_tensor_file_path(
experiment_dir, json_object["wall_times"][0]
)
ndarrays = [
tensor_util.make_ndarray(tensor_proto)
for tensor_proto in points.values
]
ndarrays = [self._fix_string_types(x) for x in ndarrays]
np.savez(os.path.join(experiment_dir, tensors_file_path), *ndarrays)
json_object["tensors_file_path"] = tensors_file_path
return json_object
def _fix_string_types(self, ndarray):
"""Change the dtype of text arrays to String rather than Object.
np.savez ends up pickling np.object arrays, while it doesn't pickle
strings. The downside is that it needs to pad the length of each string
in the array to the maximal length string. We only want to do this
type override in this final step of the export path.
Args:
ndarray: a tensor converted to an ndarray
Returns:
The original ndarray if not np.object, dtype converted to String
if np.object.
"""
if ndarray.dtype != np.object:
return ndarray
else:
return ndarray.astype("|S")
def _get_tensor_file_path(self, experiment_dir, wall_time):
"""Get a nonexistent path for a tensor value.
Args:
experiment_dir: Experiment directory.
wall_time: Timestamp of the tensor (seconds since the epoch in double).
Returns:
A nonexistent path for the tensor, relative to the experiemnt_dir.
"""
index = 0
while True:
tensor_file_path = os.path.join(
_DIRNAME_TENSORS,
"%.6f" % wall_time + ("_%d" % index if index else "") + ".npz",
)
if not os.path.exists(
os.path.join(experiment_dir, tensor_file_path)
):
return tensor_file_path
index += 1
def _process_blob_sequence_points(self, blob_sequences, experiment_id):
"""Process blob sequence points.
As a side effect, also downloads the binary contents of the blobs
to respective files. The paths to the files relative to the
experiment directory is encapsulated in the returned JSON object.
Args:
blob_sequences:
`export_service_pb2.StreamDataResponse.BlobSequencePoints` proto.
Returns:
A JSON-serializable `dict` for the steps and wall_times, as well as
the blob_file_paths, which are the relative paths to the downloaded
blob contents.
"""
wall_times = [
t.ToNanoseconds() / 1e9 for t in blob_sequences.wall_times
]
json_object = {
"steps": list(blob_sequences.steps),
"wall_times": wall_times,
"blob_file_paths": [],
}
blob_file_paths = json_object["blob_file_paths"]
for blobseq in blob_sequences.values:
seq_blob_file_paths = []
for entry in blobseq.entries:
if entry.blob.state == blob_pb2.BlobState.BLOB_STATE_CURRENT:
blob_path = self._download_blob(
entry.blob.blob_id, experiment_id
)
seq_blob_file_paths.append(blob_path)
else:
seq_blob_file_paths.append(None)
blob_file_paths.append(seq_blob_file_paths)
return json_object
def _download_blob(self, blob_id, experiment_id):
"""Download the blob via rpc.
Args:
blob_id: Id of the blob.
experiment_id: Id of the experiment that the blob belongs to.
Returns:
If the blob is downloaded successfully:
The path of the downloaded blob file relative to the experiment
directory.
Else:
`None`.
"""
# TODO(cais): Deduplicate with internal method perhaps.
experiment_dir = _experiment_directory(self._outdir, experiment_id)
request = export_service_pb2.StreamBlobDataRequest(blob_id=blob_id)
blob_abspath = os.path.join(
experiment_dir,
_DIRNAME_BLOBS,
_FILENAME_BLOBS_PREFIX + blob_id + _FILENAME_BLOBS_SUFFIX,
)
with open(blob_abspath, "xb") as f:
try:
for response in self._api.StreamBlobData(
request, metadata=grpc_util.version_metadata()
):
# TODO(cais, soergel): validate the various response fields
f.write(response.data)
except grpc.RpcError as rpc_error:
logger.error(
"Omitting blob (id: %s) due to download failure: %s",
blob_id,
rpc_error,
)
return None
return os.path.relpath(blob_abspath, experiment_dir)
def list_experiments(api_client, fieldmask=None, read_time=None):
"""Yields all of the calling user's experiments.
Args:
api_client: A TensorBoardExporterService stub instance.
fieldmask: An optional `experiment_pb2.ExperimentMask` value.
read_time: A fixed timestamp from which to export data, as float seconds
since epoch (like `time.time()`). Optional; defaults to the current
time.
Yields:
For each experiment owned by the user, an `experiment_pb2.Experiment`
value.
Raises:
RuntimeError: If the server returns experiment IDs but no experiments,
as in an old, unsupported version of the protocol.
"""
if read_time is None:
read_time = time.time()
request = export_service_pb2.StreamExperimentsRequest(limit=_MAX_INT64)
util.set_timestamp(request.read_timestamp, read_time)
if fieldmask:
request.experiments_mask.CopyFrom(fieldmask)
stream = api_client.StreamExperiments(
request, metadata=grpc_util.version_metadata()
)
for response in stream:
if response.experiments:
for experiment in response.experiments:
yield experiment
elif response.experiment_ids:
raise RuntimeError(
"Server sent experiment_ids without experiments: <%r>"
% (list(response.experiment_ids),)
)
else:
# No data: not technically a problem, but not expected.
logger.warning(
"StreamExperiments RPC returned response with no experiments: <%r>",
response,
)
class OutputDirectoryExistsError(ValueError):
pass
class GrpcTimeoutException(Exception):
def __init__(self, experiment_id):
super(GrpcTimeoutException, self).__init__(experiment_id)
self.experiment_id = experiment_id
def _experiment_directory(base_dir, experiment_id):
# Experiment IDs from the server should be filename-safe; verify
# this before creating any files.
bad_chars = frozenset(experiment_id) - _FILENAME_SAFE_CHARS
if bad_chars:
raise RuntimeError(
"Unexpected characters ({bad_chars!r}) in experiment ID {eid!r}".format(
bad_chars=sorted(bad_chars), eid=experiment_id
)
)
return os.path.join(base_dir, "experiment_%s" % experiment_id)