From f6bf852d3216d5830f2e6c4578fd1f94e42342c0 Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Tue, 12 Feb 2019 09:23:30 -0800 Subject: [PATCH 1/7] Support S3 and other filesystems with tensorboard-notf --- .../event_processing/directory_watcher.py | 4 +- .../event_accumulator_test.py | 8 +- .../event_multiplexer_test.py | 8 +- .../backend/event_processing/io_wrapper.py | 17 +- .../event_processing/io_wrapper_test.py | 19 +- .../plugin_event_accumulator_test.py | 8 +- .../plugin_event_multiplexer_test.py | 8 +- tensorboard/compat/tensorflow_stub/BUILD | 26 + .../compat/tensorflow_stub/__init__.py | 5 +- .../tensorflow_stub/compat/v1/__init__.py | 5 +- tensorboard/compat/tensorflow_stub/gfile.py | 450 --------------- .../compat/tensorflow_stub/io/__init__.py | 20 + .../compat/tensorflow_stub/io/gfile.py | 511 ++++++++++++++++++ .../tensorflow_stub/io/gfile_s3_test.py | 298 ++++++++++ .../compat/tensorflow_stub/io/gfile_test.py | 271 ++++++++++ .../tensorflow_stub/pywrap_tensorflow.py | 63 ++- tensorboard/plugins/core/core_plugin.py | 4 +- .../projector/projector_plugin_test.py | 94 ++-- 18 files changed, 1262 insertions(+), 557 deletions(-) delete mode 100644 tensorboard/compat/tensorflow_stub/gfile.py create mode 100644 tensorboard/compat/tensorflow_stub/io/__init__.py create mode 100644 tensorboard/compat/tensorflow_stub/io/gfile.py create mode 100644 tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py create mode 100644 tensorboard/compat/tensorflow_stub/io/gfile_test.py diff --git a/tensorboard/backend/event_processing/directory_watcher.py b/tensorboard/backend/event_processing/directory_watcher.py index 01b0d4b4e2..33e31498b7 100644 --- a/tensorboard/backend/event_processing/directory_watcher.py +++ b/tensorboard/backend/event_processing/directory_watcher.py @@ -179,7 +179,7 @@ def _SetPath(self, path): path: The full path of the file to watch. """ old_path = self._path - if old_path and not io_wrapper.IsGCSPath(old_path): + if old_path and not io_wrapper.IsCloudPath(old_path): try: # We're done with the path, so store its size. size = tf.io.gfile.stat(old_path).length @@ -211,7 +211,7 @@ def _GetNextPath(self): # Don't bother checking if the paths are GCS (which we can't check) or if # we've already detected an OOO write. - if not io_wrapper.IsGCSPath(paths[0]) and not self._ooo_writes_detected: + if not io_wrapper.IsCloudPath(paths[0]) and not self._ooo_writes_detected: # Check the previous _OOO_WRITE_CHECK_COUNT paths for out of order writes. current_path_index = bisect.bisect_left(paths, self._path) ooo_check_start = max(0, current_path_index - self._OOO_WRITE_CHECK_COUNT) diff --git a/tensorboard/backend/event_processing/event_accumulator_test.py b/tensorboard/backend/event_processing/event_accumulator_test.py index 7a857e78e8..9352950446 100644 --- a/tensorboard/backend/event_processing/event_accumulator_test.py +++ b/tensorboard/backend/event_processing/event_accumulator_test.py @@ -767,8 +767,8 @@ def FakeScalarSummary(tag, value): directory = os.path.join(self.get_temp_dir(), 'values_dir') if tf.io.gfile.isdir(directory): - tf.io.gfile.rmtree(directory) - tf.io.gfile.mkdir(directory) + os.removedirs(directory) + os.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) @@ -845,8 +845,8 @@ def testGraphFromMetaGraphBecomesAvailable(self): directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir') if tf.io.gfile.isdir(directory): - tf.io.gfile.rmtree(directory) - tf.io.gfile.mkdir(directory) + os.removedirs(directory) + os.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) diff --git a/tensorboard/backend/event_processing/event_multiplexer_test.py b/tensorboard/backend/event_processing/event_multiplexer_test.py index 6263a7df06..e428a8dd84 100644 --- a/tensorboard/backend/event_processing/event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/event_multiplexer_test.py @@ -29,7 +29,7 @@ def _AddEvents(path): if not tf.io.gfile.isdir(path): - tf.io.gfile.makedirs(path) + os.makedirs(path) fpath = os.path.join(path, 'hypothetical.tfevents.out') with tf.io.gfile.GFile(fpath, 'w') as f: f.write('') @@ -38,8 +38,8 @@ def _AddEvents(path): def _CreateCleanDirectory(path): if tf.io.gfile.isdir(path): - tf.io.gfile.rmtree(path) - tf.io.gfile.mkdir(path) + os.removedirs(path) + os.mkdir(path) class _FakeAccumulator(object): @@ -206,7 +206,7 @@ def testAddRunsFromDirectory(self): self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect') path1 = join(realdir, 'path1') - tf.io.gfile.mkdir(path1) + os.mkdir(path1) x.AddRunsFromDirectory(realdir) self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect') diff --git a/tensorboard/backend/event_processing/io_wrapper.py b/tensorboard/backend/event_processing/io_wrapper.py index dc1a6c4770..7c78ff2bdc 100644 --- a/tensorboard/backend/event_processing/io_wrapper.py +++ b/tensorboard/backend/event_processing/io_wrapper.py @@ -32,16 +32,15 @@ _ESCAPE_GLOB_CHARACTERS_REGEX = re.compile('([*?[])') -# TODO(chihuahua): Rename this method to use camel-case for GCS (Gcs). -def IsGCSPath(path): - return path.startswith("gs://") - - -def IsCnsPath(path): - return path.startswith("/cns/") +def IsCloudPath(path): + return ( + path.startswith("gs://") or + path.startswith("s3://") or + path.startswith("/cns/") + ) def PathSeparator(path): - return '/' if IsGCSPath(path) or IsCnsPath(path) else os.sep + return '/' if IsCloudPath(path) else os.sep def IsTensorFlowEventsFile(path): """Check the path name to see if it is probably a TF Events file. @@ -184,7 +183,7 @@ def GetLogdirSubdirectories(path): raise ValueError('GetLogdirSubdirectories: path exists and is not a ' 'directory, %s' % path) - if IsGCSPath(path) or IsCnsPath(path): + if IsCloudPath(path): # Glob-ing for files can be significantly faster than recursively # walking through directories for some file systems. logger.info( diff --git a/tensorboard/backend/event_processing/io_wrapper_test.py b/tensorboard/backend/event_processing/io_wrapper_test.py index bd26b0edc3..3642275a7f 100644 --- a/tensorboard/backend/event_processing/io_wrapper_test.py +++ b/tensorboard/backend/event_processing/io_wrapper_test.py @@ -33,17 +33,20 @@ def setUp(self): def tearDown(self): self.stubs.CleanUp() - def testIsGcsPathIsTrue(self): - self.assertTrue(io_wrapper.IsGCSPath('gs://bucket/foo')) + def testIsCloudPathGcsIsTrue(self): + self.assertTrue(io_wrapper.IsCloudPath('gs://bucket/foo')) - def testIsGcsPathIsFalse(self): - self.assertFalse(io_wrapper.IsGCSPath('/tmp/foo')) + def testIsCloudPathS3IsTrue(self): + self.assertTrue(io_wrapper.IsCloudPath('s3://bucket/foo')) - def testIsCnsPathTrue(self): - self.assertTrue(io_wrapper.IsCnsPath('/cns/foo/bar')) + def testIsCloudPathCnsIsTrue(self): + self.assertTrue(io_wrapper.IsCloudPath('/cns/foo/bar')) - def testIsCnsPathFalse(self): - self.assertFalse(io_wrapper.IsCnsPath('/tmp/foo')) + def testIsCloudPathFileIsFalse(self): + self.assertFalse(io_wrapper.IsCloudPath('file:///tmp/foo')) + + def testIsCloudPathLocalIsFalse(self): + self.assertFalse(io_wrapper.IsCloudPath('/tmp/foo')) def testPathSeparator(self): # In nix systems, path separator would be the same as that of CNS/GCS diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py index 20b388281e..8fd9274edc 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py @@ -558,8 +558,8 @@ def FakeScalarSummary(tag, value): directory = os.path.join(self.get_temp_dir(), 'values_dir') if tf.io.gfile.isdir(directory): - tf.io.gfile.rmtree(directory) - tf.io.gfile.mkdir(directory) + os.removedirs(directory) + os.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) @@ -636,8 +636,8 @@ def testGraphFromMetaGraphBecomesAvailable(self): directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir') if tf.io.gfile.isdir(directory): - tf.io.gfile.rmtree(directory) - tf.io.gfile.mkdir(directory) + os.removedirs(directory) + os.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py index 600a291604..8b3e60e6ff 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -29,7 +29,7 @@ def _AddEvents(path): if not tf.io.gfile.isdir(path): - tf.io.gfile.makedirs(path) + os.makedirs(path) fpath = os.path.join(path, 'hypothetical.tfevents.out') with tf.io.gfile.GFile(fpath, 'w') as f: f.write('') @@ -38,8 +38,8 @@ def _AddEvents(path): def _CreateCleanDirectory(path): if tf.io.gfile.isdir(path): - tf.io.gfile.rmtree(path) - tf.io.gfile.mkdir(path) + os.removedirs(path) + os.mkdir(path) class _FakeAccumulator(object): @@ -178,7 +178,7 @@ def testAddRunsFromDirectory(self): self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect') path1 = join(realdir, 'path1') - tf.io.gfile.mkdir(path1) + os.mkdir(path1) x.AddRunsFromDirectory(realdir) self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect') diff --git a/tensorboard/compat/tensorflow_stub/BUILD b/tensorboard/compat/tensorflow_stub/BUILD index 7b0384c518..750784e2cc 100644 --- a/tensorboard/compat/tensorflow_stub/BUILD +++ b/tensorboard/compat/tensorflow_stub/BUILD @@ -13,6 +13,8 @@ py_library( "*.py", "compat/__init__.py", "compat/v1/__init__.py", + "io/__init__.py", + "io/gfile.py", ]), srcs_version = "PY2AND3", visibility = ["//visibility:public"], @@ -23,3 +25,27 @@ py_library( "@org_pythonhosted_six", ], ) + +py_test( + name = "gfile_test", + size = "small", + srcs = ["io/gfile_test.py"], + srcs_version = "PY2AND3", + deps = [ + ":tensorflow_stub", + ], +) + +py_test( + name = "gfile_s3_test", + size = "small", + srcs = ["io/gfile_s3_test.py"], + srcs_version = "PY2AND3", + tags = [ + "manual", + "notap", + ], + deps = [ + ":tensorflow_stub", + ], +) diff --git a/tensorboard/compat/tensorflow_stub/__init__.py b/tensorboard/compat/tensorflow_stub/__init__.py index 685b73b838..d08dd23f4b 100644 --- a/tensorboard/compat/tensorflow_stub/__init__.py +++ b/tensorboard/compat/tensorflow_stub/__init__.py @@ -31,9 +31,12 @@ from . import error_codes # noqa from . import errors # noqa from . import flags # noqa -from . import gfile # noqa +from . import io # noqa from . import pywrap_tensorflow # noqa from . import tensor_shape # noqa +# Set pywrap_tensorflow on v1 and avoid cycles on some imports +compat.v1.pywrap_tensorflow = pywrap_tensorflow + # Set a fake __version__ to help distinguish this as our own stub API. __version__ = 'stub' diff --git a/tensorboard/compat/tensorflow_stub/compat/v1/__init__.py b/tensorboard/compat/tensorflow_stub/compat/v1/__init__.py index c18661e96f..45625a0b8a 100644 --- a/tensorboard/compat/tensorflow_stub/compat/v1/__init__.py +++ b/tensorboard/compat/tensorflow_stub/compat/v1/__init__.py @@ -17,4 +17,7 @@ from __future__ import division from __future__ import print_function -from tensorboard.compat.tensorflow_stub import pywrap_tensorflow # noqa +# Set this in tensorboard/compat/tensorflow_stub/__init__.py to eliminate +# any cycles on import +# +# from tensorboard.compat.tensorflow_stub import pywrap_tensorflow # noqa diff --git a/tensorboard/compat/tensorflow_stub/gfile.py b/tensorboard/compat/tensorflow_stub/gfile.py deleted file mode 100644 index 9acf472fc6..0000000000 --- a/tensorboard/compat/tensorflow_stub/gfile.py +++ /dev/null @@ -1,450 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""File IO methods that wrap the C++ FileSystem API. - -The C++ FileSystem API is SWIG wrapped in file_io.i. These functions call those -to accomplish basic File IO operations. -""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import binascii -import collections -import glob -import os -import shutil -import six -import uuid - -from . import compat, errors - - -# A good default block size depends on the system in question. -# A somewhat conservative default chosen here. -_DEFAULT_BLOCK_SIZE = 16 * 1024 * 1024 - - -class FileIO(object): - # Only methods needed for TensorBoard are implemented. - - def __init__(self, filename, mode): - self.filename = compat.as_bytes(filename) - self.mode = compat.as_bytes(mode) - self.f = open(self.filename, self.mode) - - def __del__(self): - self.close() - - def __enter__(self): - return self - - def __exit__(self, *args): - self.f.close() - - def __iter__(self): - return self.f - - def size(self): - return os.stat(self.filename).st_size - - def read(self): - return self.f.read() - - def close(self): - self.f.close() - - -class GFile(FileIO): - # Same interface as FileIO but through GFile class - - def __init__(self, filename, mode): - super(GFile, self).__init__(filename, mode) - - -# @tf_export("gfile.Exists") -def Exists(filename): - """Determines whether a path exists or not. - - Args: - filename: string, a path - - Returns: - True if the path exists, whether its a file or a directory. - False if the path does not exist and there are no filesystem errors. - - Raises: - errors.OpError: Propagates any errors reported by the FileSystem API. - """ - try: - return os.path.exists(compat.as_bytes(filename)) - except errors.NotFoundError: - return False - return True - - -# @tf_export("gfile.Remove") -def Remove(filename): - """Deletes the file located at 'filename'. - - Args: - filename: string, a filename - - Raises: - errors.OpError: Propagates any errors reported by the FileSystem API. - E.g., NotFoundError if the file does not exist. - """ - fn = compat.as_bytes(filename) - if not os.path.exists(fn) or not os.path.isfile(fn): - return - os.remove(fn) - - -def read_file_to_string(filename, binary_mode=False): - """Reads the entire contents of a file to a string. - - Args: - filename: string, path to a file - binary_mode: whether to open the file in binary mode or not. This changes - the type of the object returned. - - Returns: - contents of the file as a string or bytes. - - Raises: - errors.OpError: Raises variety of errors that are subtypes e.g. - NotFoundError etc. - """ - if binary_mode: - f = FileIO(filename, mode="rb") - else: - f = FileIO(filename, mode="r") - return f.read() - - -def write_string_to_file(filename, file_content): - """Writes a string to a given file. - - Args: - filename: string, path to a file - file_content: string, contents that need to be written to the file - - Raises: - errors.OpError: If there are errors during the operation. - """ - with FileIO(filename, mode="w") as f: - f.write(file_content) - - -# @tf_export("gfile.Glob") -def Glob(filename): - """Returns a list of files that match the given pattern(s). - - Args: - filename: string or iterable of strings. The glob pattern(s). - - Returns: - A list of strings containing filenames that match the given pattern(s). - - Raises: - errors.OpError: If there are filesystem / directory listing errors. - """ - if isinstance(filename, six.string_types): - return [ - # compat the filenames to string from bytes. - compat.as_str_any(matching_filename) - for matching_filename in glob.glob(compat.as_bytes(filename)) - ] - else: - return [ - # compat the filenames to string from bytes. - compat.as_str_any(matching_filename) - for single_filename in filename - for matching_filename in glob.glob(compat.as_bytes(single_filename)) - ] - - -# @tf_export("gfile.MkDir") -def MkDir(dirname): - """Creates a directory with the name 'dirname'. - - Args: - dirname: string, name of the directory to be created - - Notes: - The parent directories need to exist. Use recursive_create_dir instead if - there is the possibility that the parent dirs don't exist. - - Raises: - errors.OpError: If the operation fails. - """ - os.mkdir(compat.as_bytes(dirname)) - - -# @tf_export("gfile.MakeDirs") -def MakeDirs(dirname): - """Creates a directory and all parent/intermediate directories. - - It succeeds if dirname already exists and is writable. - - Args: - dirname: string, name of the directory to be created - - Raises: - errors.OpError: If the operation fails. - """ - os.makedirs(compat.as_bytes(dirname)) - - -# @tf_export("gfile.Copy") -def Copy(oldpath, newpath, overwrite=False): - """Copies data from oldpath to newpath. - - Args: - oldpath: string, name of the file who's contents need to be copied - newpath: string, name of the file to which to copy to - overwrite: boolean, if false its an error for newpath to be occupied by an - existing file. - - Raises: - errors.OpError: If the operation fails. - """ - newpath_exists = os.path.exists(newpath) - if newpath_exists and overwrite or not newpath_exists: - shutil.copy2(compat.as_bytes(oldpath), compat.as_bytes(newpath)) - - -# @tf_export("gfile.Rename") -def Rename(oldname, newname, overwrite=False): - """Rename or move a file / directory. - - Args: - oldname: string, pathname for a file - newname: string, pathname to which the file needs to be moved - overwrite: boolean, if false it's an error for `newname` to be occupied by - an existing file. - - Raises: - errors.OpError: If the operation fails. - """ - newname_exists = os.path.exists(newname) - if newname_exists and overwrite or not newname_exists: - os.rename(compat.as_bytes(oldname), compat.as_bytes(newname)) - - -def atomic_write_string_to_file(filename, contents, overwrite=True): - """Writes to `filename` atomically. - - This means that when `filename` appears in the filesystem, it will contain - all of `contents`. With write_string_to_file, it is possible for the file - to appear in the filesystem with `contents` only partially written. - - Accomplished by writing to a temp file and then renaming it. - - Args: - filename: string, pathname for a file - contents: string, contents that need to be written to the file - overwrite: boolean, if false it's an error for `filename` to be occupied by - an existing file. - """ - temp_pathname = filename + ".tmp" + uuid.uuid4().hex - write_string_to_file(temp_pathname, contents) - try: - Rename(temp_pathname, filename, overwrite) - except Exception: - Remove(temp_pathname) - raise - - -# @tf_export("gfile.DeleteRecursively") -def DeleteRecursively(dirname): - """Deletes everything under dirname recursively. - - Args: - dirname: string, a path to a directory - - Raises: - errors.OpError: If the operation fails. - """ - os.removedirs(dirname) - - -# @tf_export("gfile.IsDirectory") -def IsDirectory(dirname): - """Returns whether the path is a directory or not. - - Args: - dirname: string, path to a potential directory - - Returns: - True, if the path is a directory; False otherwise - """ - return os.path.isdir(compat.as_bytes(dirname)) - - -# @tf_export("gfile.ListDirectory") -def ListDirectory(dirname): - """Returns a list of entries contained within a directory. - - The list is in arbitrary order. It does not contain the special entries "." - and "..". - - Args: - dirname: string, path to a directory - - Returns: - [filename1, filename2, ... filenameN] as strings - - Raises: - errors.NotFoundError if directory doesn't exist - """ - if not IsDirectory(dirname): - raise errors.NotFoundError(None, None, "Could not find directory") - try: - entries = os.listdir(compat.as_bytes(dirname)) - - except Exception: - entries = os.listdir(compat.as_str_any(dirname)) - - entries = [compat.as_str_any(item) for item in entries] - return entries - - -# @tf_export("gfile.Walk") -def Walk(top, in_order=True): - """Recursive directory tree generator for directories. - - Args: - top: string, a Directory name - in_order: bool, Traverse in order if True, post order if False. - - Errors that happen while listing directories are ignored. - - Yields: - Each yield is a 3-tuple: the pathname of a directory, followed by lists of - all its subdirectories and leaf files. - (dirname, [subdirname, subdirname, ...], [filename, filename, ...]) - as strings - """ - top = compat.as_str_any(top) - try: - listing = ListDirectory(top) - except errors.NotFoundError: - return - - files = [] - subdirs = [] - for item in listing: - full_path = os.path.join(top, item) - if IsDirectory(full_path): - subdirs.append(item) - else: - files.append(item) - - here = (top, subdirs, files) - - if in_order: - yield here - - for subdir in subdirs: - for subitem in os.walk(os.path.join(top, subdir), in_order): - yield subitem - - if not in_order: - yield here - - -# Data returned from the Stat call. -StatData = collections.namedtuple('StatData', ['length']) - - -# @tf_export("gfile.Stat") -def Stat(filename): - """Returns file statistics for a given path. - - Args: - filename: string, path to a file - - Returns: - FileStatistics struct that contains information about the path - - Raises: - errors.OpError: If the operation fails. - """ - result = None - try: - # Size of the file is given by .st_size as returned from - # os.stat() but TB uses .length so we set length. - len = os.stat(compat.as_bytes(filename)).st_size - result = StatData(len) - except Exception: - pass - - if result is None: - raise errors.NotFoundError(None, None, 'Unable to stat file') - return result - - -def filecmp(filename_a, filename_b): - """Compare two files, returning True if they are the same, False otherwise. - - We check size first and return False quickly if the files are different sizes. - If they are the same size, we continue to generating a crc for the whole file. - - You might wonder: why not use Python's filecmp.cmp() instead? The answer is - that the builtin library is not robust to the many different filesystems - TensorFlow runs on, and so we here perform a similar comparison with - the more robust FileIO. - - Args: - filename_a: string path to the first file. - filename_b: string path to the second file. - - Returns: - True if the files are the same, False otherwise. - """ - size_a = FileIO(filename_a, "rb").size() - size_b = FileIO(filename_b, "rb").size() - if size_a != size_b: - return False - - # Size is the same. Do a full check. - crc_a = file_crc32(filename_a) - crc_b = file_crc32(filename_b) - return crc_a == crc_b - - -def file_crc32(filename, block_size=_DEFAULT_BLOCK_SIZE): - """Get the crc32 of the passed file. - - The crc32 of a file can be used for error checking; two files with the same - crc32 are considered equivalent. Note that the entire file must be read - to produce the crc32. - - Args: - filename: string, path to a file - block_size: Integer, process the files by reading blocks of `block_size` - bytes. Use -1 to read the file as once. - - Returns: - hexadecimal as string, the crc32 of the passed file. - """ - crc = 0 - with FileIO(filename, mode="rb") as f: - chunk = f.read(n=block_size) - while chunk: - crc = binascii.crc32(chunk, crc) - chunk = f.read(n=block_size) - return hex(crc & 0xFFFFFFFF) diff --git a/tensorboard/compat/tensorflow_stub/io/__init__.py b/tensorboard/compat/tensorflow_stub/io/__init__.py new file mode 100644 index 0000000000..8293275506 --- /dev/null +++ b/tensorboard/compat/tensorflow_stub/io/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from . import gfile # noqa diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py new file mode 100644 index 0000000000..8e2f9c6ab4 --- /dev/null +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -0,0 +1,511 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""File IO methods that wrap the C++ FileSystem API. + +The C++ FileSystem API is SWIG wrapped in file_io.i. These functions call those +to accomplish basic File IO operations. +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import namedtuple +import binascii +import collections +import glob as py_glob +import io +import os +import shutil +import six +import uuid +try: + import botocore.exceptions + import boto3 + S3_ENABLED = True +except ImportError: + S3_ENABLED = False + +from tensorboard.compat.tensorflow_stub import compat, errors + + +# A good default block size depends on the system in question. +# A somewhat conservative default chosen here. +_DEFAULT_BLOCK_SIZE = 16 * 1024 * 1024 + + +# Registry of filesystems by prefix. +# +# Currently supports "s3://" URLs for S3 based on boto3 and falls +# back to local filesystem. +_REGISTERED_FILESYSTEMS = {} + + +def register_filesystem(prefix, filesystem): + if ':' in prefix: + raise ValueError("Filesystem prefix cannot contain a :") + _REGISTERED_FILESYSTEMS[prefix] = filesystem + + +def get_filesystem(filename): + """Return the registered filesystem for the given file.""" + filename = compat.as_str_any(filename) + prefix = "" + index = filename.find("://") + if index >= 0: + prefix = filename[:index] + fs = _REGISTERED_FILESYSTEMS.get(prefix, None) + if fs is None: + raise ValueError("No recognized filesystem for prefix %s" % prefix) + return fs + + +# Data returned from the Stat call. +StatData = namedtuple("StatData", ["length"]) + + +class LocalFileSystem(object): + """Provides local fileystem access.""" + + def exists(self, filename): + """Determines whether a path exists or not.""" + return os.path.exists(compat.as_bytes(filename)) + + def read(self, filename, binary_mode=False, size=None, offset=None): + """Reads contents of a file to a string. + + Args: + filename: string, a path + binary_mode: bool, read as binary if True, otherwise text + size: int, number of bytes or characters to read, otherwise + read all the contents of the file from the offset + offset: int, offset into file to read from, otherwise read + from the very beginning + + Returns: + Subset of the contents of the file as a string or bytes. + """ + mode = "rb" if binary_mode else "r" + with io.open(filename, mode) as f: + if offset is not None: + f.seek(offset) + if size is not None: + return f.read(size) + else: + return f.read() + + def glob(self, filename): + """Returns a list of files that match the given pattern(s).""" + if isinstance(filename, six.string_types): + return [ + # Convert the filenames to string from bytes. + compat.as_str_any(matching_filename) + for matching_filename in py_glob.glob( + compat.as_bytes(filename)) + ] + else: + return [ + # Convert the filenames to string from bytes. + compat.as_str_any(matching_filename) + for single_filename in filename + for matching_filename in py_glob.glob( + compat.as_bytes(single_filename)) + ] + + def isdir(self, dirname): + """Returns whether the path is a directory or not.""" + return os.path.isdir(compat.as_bytes(dirname)) + + def listdir(self, dirname): + """Returns a list of entries contained within a directory.""" + if not self.isdir(dirname): + raise errors.NotFoundError(None, None, "Could not find directory") + + entries = os.listdir(compat.as_str_any(dirname)) + entries = [compat.as_str_any(item) for item in entries] + return entries + + def stat(self, filename): + """Returns file statistics for a given path.""" + # NOTE: Size of the file is given by .st_size as returned from + # os.stat(), but we convert to .length + try: + len = os.stat(compat.as_bytes(filename)).st_size + except OSError: + raise errors.NotFoundError(None, None, "Could not find file") + return StatData(len) + + +class S3FileSystem(object): + """Provides filesystem access to S3.""" + + def __init__(self): + if not boto3: + raise ImportError("boto3 must be installed for S3 support.") + + def bucket_and_path(self, url): + """Split an S3-prefixed URL into bucket and path.""" + url = compat.as_str_any(url) + if url.startswith("s3://"): + url = url[len("s3://"):] + idx = url.index("/") + bucket = url[:idx] + path = url[(idx + 1):] + return bucket, path + + def exists(self, filename): + """Determines whether a path exists or not.""" + client = boto3.client("s3") + bucket, path = self.bucket_and_path(filename) + r = client.list_objects(Bucket=bucket, Prefix=path, Delimiter="/") + if r.get("Contents") or r.get("CommonPrefixes"): + return True + return False + + def read(self, filename, binary_mode=False, size=None, offset=None): + """Reads contents of a file to a string. + + Args: + filename: string, a path + binary_mode: bool, read as binary if True, otherwise text + size: int, number of bytes or characters to read, otherwise + read all the contents of the file from the offset + offset: int, offset into file to read from, otherwise read + from the very beginning + + Returns: + Subset of the contents of the file as a string or bytes. + """ + s3 = boto3.resource("s3") + bucket, path = self.bucket_and_path(filename) + args = {} + endpoint = 0 + if size is not None or offset is not None: + if offset is None: + offset = 0 + endpoint = '' if size is None else (offset + size) + args['Range'] = 'bytes={}-{}'.format(offset, endpoint) + try: + stream = s3.Object(bucket, path).get(**args)['Body'].read() + except botocore.exceptions.ClientError as exc: + if exc.response['Error']['Code'] == '416': + if size is not None: + # Asked for too much, so request just to the end. Do this + # in a second request so we don't check length in all cases. + client = boto3.client("s3") + obj = client.head_object(Bucket=bucket, Key=path) + len = obj['ContentLength'] + endpoint = min(len, offset + size) + if offset == endpoint: + # Asked for no bytes, so just return empty + stream = b'' + else: + args['Range'] = 'bytes={}-{}'.format(offset, endpoint) + stream = s3.Object(bucket, path).get(**args)['Body'].read() + else: + raise + if binary_mode: + return bytes(stream) + else: + return stream.decode('utf-8') + + def glob(self, filename): + """Returns a list of files that match the given pattern(s).""" + # Only support prefix with * at the end and no ? in the string + star_i = filename.find('*') + quest_i = filename.find('?') + if quest_i >= 0: + raise NotImplementedError( + "{} not supported by compat glob".format(filename)) + if star_i != len(filename) - 1: + # Just return empty so we can use glob from directory watcher + return [] + filename = filename[:-1] + client = boto3.client("s3") + bucket, path = self.bucket_and_path(filename) + p = client.get_paginator("list_objects") + keys = [] + for r in p.paginate(Bucket=bucket, Prefix=path): + for o in r.get("Contents", []): + key = o["Key"][len(path):] + if key: # Skip the base dir, which would add an empty string + keys.append(filename + key) + return keys + + def isdir(self, dirname): + """Returns whether the path is a directory or not.""" + client = boto3.client("s3") + bucket, path = self.bucket_and_path(dirname) + if not path.endswith("/"): + path += "/" # This will now only retrieve subdir content + r = client.list_objects(Bucket=bucket, Prefix=path, Delimiter="/") + if r.get("Contents") or r.get("CommonPrefixes"): + return True + return False + + def listdir(self, dirname): + """Returns a list of entries contained within a directory.""" + client = boto3.client("s3") + bucket, path = self.bucket_and_path(dirname) + p = client.get_paginator("list_objects") + if not path.endswith("/"): + path += "/" # This will now only retrieve subdir content + keys = [] + for r in p.paginate(Bucket=bucket, Prefix=path, Delimiter="/"): + keys.extend(o["Prefix"][len(path):-1] for o in r.get("CommonPrefixes", [])) + for o in r.get("Contents", []): + key = o["Key"][len(path):] + if key: # Skip the base dir, which would add an empty string + keys.append(key) + return keys + + def stat(self, filename): + """Returns file statistics for a given path.""" + # NOTE: Size of the file is given by ContentLength from S3, + # but we convert to .length + client = boto3.client("s3") + bucket, path = self.bucket_and_path(filename) + try: + obj = client.head_object(Bucket=bucket, Key=path) + return StatData(obj['ContentLength']) + except botocore.exceptions.ClientError as exc: + if exc.response['Error']['Code'] == '404': + raise errors.NotFoundError(None, None, "Could not find file") + else: + raise + + +register_filesystem("", LocalFileSystem()) +if S3_ENABLED: + register_filesystem("s3", S3FileSystem()) + + +class GFile(object): + # Only methods needed for TensorBoard are implemented. + + def __init__(self, filename, mode): + if mode not in ('r', 'rb', 'br'): + raise NotImplementedError( + "mode {} not supported by compat GFile".format(mode)) + self.filename = compat.as_bytes(filename) + self.buff_chunk_size = _DEFAULT_BLOCK_SIZE + self.buff = None + self.buff_offset = 0 + self.offset = 0 + self.binary_mode = (mode != 'r') + + def __enter__(self): + return self + + def __exit__(self, *args): + self.buff = None + self.buff_offset = 0 + + def __iter__(self): + return self + + def _read_buffer_to_offset(self, new_buff_offset): + old_buff_offset = self.buff_offset + read_size = min(len(self.buff), new_buff_offset) - old_buff_offset + self.offset += read_size + self.buff_offset += read_size + return self.buff[old_buff_offset:old_buff_offset + read_size] + + def read(self, n=None): + result = None + if self.buff is not None: + # read from local buffer + if n is not None: + chunk = self._read_buffer_to_offset(self.buff_offset + n) + if len(chunk) == n: + return chunk + result = chunk + n -= len(chunk) + else: + # add all local buffer and update offsets + result = self._read_buffer_to_offset(len(self.buff)) + + # read from filesystem + fs = get_filesystem(self.filename) + read_size = max(self.buff_chunk_size, n) if n is not None else None + self.buff = fs.read(self.filename, self.binary_mode, + read_size, self.offset) + self.buff_offset = 0 + + # add from filesystem + if n is not None: + chunk = self._read_buffer_to_offset(n) + else: + # add all local buffer and update offsets + chunk = self._read_buffer_to_offset(len(self.buff)) + result = result + chunk if result else chunk + + return result + + def __next__(self): + line = None + while True: + if not self.buff: + # read one unit into the buffer + line = self.read(1) + if line and line[-1] == '\n': + return line + if not self.buff: + raise StopIteration() + else: + index = self.buff.find('\n', self.buff_offset) + if index != -1: + # include line until now plus newline + chunk = self.read(index + 1 - self.buff_offset) + line = line + chunk if line else chunk + return line + + # read one unit past end of buffer + chunk = self.read(len(self.buff) + 1 - self.buff_offset) + line = line + chunk if line else chunk + if line and line[-1] == '\n': + return line + if not self.buff: + raise StopIteration() + + def next(self): + return self.__next__() + + def close(self): + pass + + +def exists(filename): + """Determines whether a path exists or not. + + Args: + filename: string, a path + + Returns: + True if the path exists, whether its a file or a directory. + False if the path does not exist and there are no filesystem errors. + + Raises: + errors.OpError: Propagates any errors reported by the FileSystem API. + """ + return get_filesystem(filename).exists(filename) + + +def glob(filename): + """Returns a list of files that match the given pattern(s). + + Args: + filename: string or iterable of strings. The glob pattern(s). + + Returns: + A list of strings containing filenames that match the given pattern(s). + + Raises: + errors.OpError: If there are filesystem / directory listing errors. + """ + return get_filesystem(filename).glob(filename) + + +def isdir(dirname): + """Returns whether the path is a directory or not. + + Args: + dirname: string, path to a potential directory + + Returns: + True, if the path is a directory; False otherwise + """ + return get_filesystem(dirname).isdir(dirname) + + +def listdir(dirname): + """Returns a list of entries contained within a directory. + + The list is in arbitrary order. It does not contain the special entries "." + and "..". + + Args: + dirname: string, path to a directory + + Returns: + [filename1, filename2, ... filenameN] as strings + + Raises: + errors.NotFoundError if directory doesn't exist + """ + return get_filesystem(dirname).listdir(dirname) + + +def walk(top, topdown=True, onerror=None): + """Recursive directory tree generator for directories. + + Args: + top: string, a Directory name + topdown: bool, Traverse pre order if True, post order if False. + onerror: optional handler for errors. Should be a function, it will be + called with the error as argument. Rethrowing the error aborts the walk. + + Errors that happen while listing directories are ignored. + + Yields: + Each yield is a 3-tuple: the pathname of a directory, followed by lists of + all its subdirectories and leaf files. + (dirname, [subdirname, subdirname, ...], [filename, filename, ...]) + as strings + """ + top = compat.as_str_any(top) + try: + listing = listdir(top) + except errors.NotFoundError as err: + if onerror: + onerror(err) + else: + return + + files = [] + subdirs = [] + for item in listing: + full_path = os.path.join(top, compat.as_str_any(item)) + if isdir(full_path): + subdirs.append(item) + else: + files.append(item) + + here = (top, subdirs, files) + + if topdown: + yield here + + for subdir in subdirs: + joined_subdir = os.path.join(top, compat.as_str_any(subdir)) + for subitem in walk(joined_subdir, topdown, onerror=onerror): + yield subitem + + if not topdown: + yield here + + +def stat(filename): + """Returns file statistics for a given path. + + Args: + filename: string, path to a file + + Returns: + FileStatistics struct that contains information about the path + + Raises: + errors.OpError: If the operation fails. + """ + return get_filesystem(filename).stat(filename) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py b/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py new file mode 100644 index 0000000000..0dedc54eb2 --- /dev/null +++ b/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py @@ -0,0 +1,298 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import boto3 +import os +import six +import unittest +from moto import mock_s3 + +from tensorboard.compat.tensorflow_stub import errors +from tensorboard.compat.tensorflow_stub.io import gfile + +# Placeholder values to make sure any local keys are overriden +# and moto mock is being called +os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") +os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + + +class GFileTest(unittest.TestCase): + + @mock_s3 + def testExists(self): + temp_dir = self._CreateDeepS3Structure() + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + self.assertTrue(gfile.exists(temp_dir)) + self.assertTrue(gfile.exists(ckpt_path)) + + @mock_s3 + def testGlob(self): + temp_dir = self._CreateDeepS3Structure() + # S3 glob includes subdirectory content, which standard + # filesystem does not. However, this is good for perf. + expected = [ + 'a.tfevents.1', + 'bar/b.tfevents.1', + 'bar/baz/c.tfevents.1', + 'bar/baz/d.tfevents.1', + 'bar/quux/some_flume_output.txt', + 'bar/quux/some_more_flume_output.txt', + 'bar/red_herring.txt', + 'model.ckpt', + 'quuz/e.tfevents.1', + 'quuz/garply/corge/g.tfevents.1', + 'quuz/garply/f.tfevents.1', + 'quuz/garply/grault/h.tfevents.1', + 'waldo/fred/i.tfevents.1' + ] + expected_listing = [os.path.join(temp_dir, f) for f in expected] + gotten_listing = gfile.glob(os.path.join(temp_dir, "*")) + six.assertCountEqual( + self, + expected_listing, + gotten_listing, + 'Files must match. Expected %r. Got %r.' % ( + expected_listing, gotten_listing)) + + @mock_s3 + def testIsdir(self): + temp_dir = self._CreateDeepS3Structure() + self.assertTrue(gfile.isdir(temp_dir)) + + @mock_s3 + def testListdir(self): + temp_dir = self._CreateDeepS3Structure() + self._CreateDeepS3Structure(temp_dir) + expected_files = [ + # Empty directory not returned + # 'foo', + 'bar', + 'quuz', + 'a.tfevents.1', + 'model.ckpt', + 'waldo', + ] + gotten_files = gfile.listdir(temp_dir) + six.assertCountEqual(self, expected_files, gotten_files) + + @mock_s3 + def testWalk(self): + temp_dir = self._CreateDeepS3Structure() + self._CreateDeepS3Structure(temp_dir) + expected = [ + ['', [ + 'a.tfevents.1', + 'model.ckpt', + ]], + # Empty directory not returned + # ['foo', []], + ['bar', [ + 'b.tfevents.1', + 'red_herring.txt', + ]], + ['bar/baz', [ + 'c.tfevents.1', + 'd.tfevents.1', + ]], + ['bar/quux', [ + 'some_flume_output.txt', + 'some_more_flume_output.txt', + ]], + ['quuz', [ + 'e.tfevents.1', + ]], + ['quuz/garply', [ + 'f.tfevents.1', + ]], + ['quuz/garply/corge', [ + 'g.tfevents.1', + ]], + ['quuz/garply/grault', [ + 'h.tfevents.1', + ]], + ['waldo', []], + ['waldo/fred', [ + 'i.tfevents.1', + ]], + ] + for pair in expected: + # If this is not the top-level directory, prepend the high-level + # directory. + pair[0] = os.path.join(temp_dir, pair[0]) if pair[0] else temp_dir + gotten = gfile.walk(temp_dir) + self._CompareFilesPerSubdirectory(expected, gotten) + + @mock_s3 + def testStat(self): + ckpt_content = 'asdfasdfasdffoobarbuzz' + temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_stat = gfile.stat(ckpt_path) + self.assertEqual(ckpt_stat.length, len(ckpt_content)) + bad_ckpt_path = os.path.join(temp_dir, 'bad_model.ckpt') + with self.assertRaises(errors.NotFoundError): + gfile.stat(bad_ckpt_path) + + @mock_s3 + def testRead(self): + ckpt_content = 'asdfasdfasdffoobarbuzz' + temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + with gfile.GFile(ckpt_path, 'r') as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + @mock_s3 + def testReadLines(self): + ckpt_lines = ( + ['\n'] + ['line {}\n'.format(i) for i in range(10)] + ['\n'] + ) + ckpt_content = ''.join(ckpt_lines) + temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + with gfile.GFile(ckpt_path, 'r') as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read_lines = list(f) + self.assertEqual(ckpt_lines, ckpt_read_lines) + + @mock_s3 + def testReadWithOffset(self): + ckpt_content = 'asdfasdfasdffoobarbuzz' + ckpt_b_content = b'asdfasdfasdffoobarbuzz' + temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + with gfile.GFile(ckpt_path, 'r') as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read = f.read(12) + self.assertEqual('asdfasdfasdf', ckpt_read) + ckpt_read = f.read(6) + self.assertEqual('foobar', ckpt_read) + ckpt_read = f.read(1) + self.assertEqual('b', ckpt_read) + ckpt_read = f.read() + self.assertEqual('uzz', ckpt_read) + ckpt_read = f.read(1000) + self.assertEqual('', ckpt_read) + with gfile.GFile(ckpt_path, 'rb') as f: + ckpt_read = f.read() + self.assertEqual(ckpt_b_content, ckpt_read) + + def _CreateDeepS3Structure(self, top_directory='top_dir', ckpt_content='', + region_name='us-east-1', bucket_name='test'): + """Creates a reasonable deep structure of S3 subdirectories with files. + + Args: + top_directory: The path of the top level S3 directory in which + to create the directory structure. Defaults to 'top_dir'. + ckpt_content: The content to put into model.ckpt. Default to ''. + region_name: The S3 region name. Defaults to 'us-east-1'. + bucket_name: The S3 bucket name. Defaults to 'test'. + + Returns: + S3 URL of the top directory in the form 's3://bucket/path' + """ + s3_top_url = 's3://{}/{}'.format(bucket_name, top_directory) + + # Add a few subdirectories. + directory_names = ( + # An empty directory. + 'foo', + # A directory with an events file (and a text file). + 'bar', + # A deeper directory with events files. + 'bar/baz', + # A non-empty subdir that lacks event files (should be ignored). + 'bar/quux', + # This 3-level deep set of subdirectories tests logic that replaces + # the full glob string with an absolute path prefix if there is + # only 1 subdirectory in the final mapping. + 'quuz/garply', + 'quuz/garply/corge', + 'quuz/garply/grault', + # A directory that lacks events files, but contains a subdirectory + # with events files (first level should be ignored, second level + # should be included). + 'waldo', + 'waldo/fred', + ) + client = boto3.client('s3', region_name=region_name) + client.create_bucket(Bucket=bucket_name) + client.put_object(Body='', Bucket=bucket_name, Key=top_directory) + for directory_name in directory_names: + # Add an end slash + path = top_directory + '/' + directory_name + '/' + # Create an empty object so the location exists + client.put_object(Body='', Bucket=bucket_name, Key=directory_name) + + # Add a few files to the directory. + file_names = ( + 'a.tfevents.1', + 'model.ckpt', + 'bar/b.tfevents.1', + 'bar/red_herring.txt', + 'bar/baz/c.tfevents.1', + 'bar/baz/d.tfevents.1', + 'bar/quux/some_flume_output.txt', + 'bar/quux/some_more_flume_output.txt', + 'quuz/e.tfevents.1', + 'quuz/garply/f.tfevents.1', + 'quuz/garply/corge/g.tfevents.1', + 'quuz/garply/grault/h.tfevents.1', + 'waldo/fred/i.tfevents.1', + ) + for file_name in file_names: + # Add an end slash + path = top_directory + '/' + file_name + if file_name is 'model.ckpt': + content = ckpt_content + else: + content = '' + client.put_object(Body=content, Bucket=bucket_name, Key=path) + return s3_top_url + + def _CompareFilesPerSubdirectory(self, expected, gotten): + """Compares iterables of (subdirectory path, list of absolute paths) + + Args: + expected: The expected iterable of 2-tuples. + gotten: The gotten iterable of 2-tuples. + """ + expected_directory_to_files = { + result[0]: list(result[1]) for result in expected} + gotten_directory_to_files = { + # Note we ignore subdirectories and just compare files + result[0]: list(result[2]) for result in gotten} + six.assertCountEqual( + self, + expected_directory_to_files.keys(), + gotten_directory_to_files.keys()) + + for subdir, expected_listing in expected_directory_to_files.items(): + gotten_listing = gotten_directory_to_files[subdir] + six.assertCountEqual( + self, + expected_listing, + gotten_listing, + 'Files for subdir %r must match. Expected %r. Got %r.' % ( + subdir, expected_listing, gotten_listing)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tensorboard/compat/tensorflow_stub/io/gfile_test.py b/tensorboard/compat/tensorflow_stub/io/gfile_test.py new file mode 100644 index 0000000000..31244684d3 --- /dev/null +++ b/tensorboard/compat/tensorflow_stub/io/gfile_test.py @@ -0,0 +1,271 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import shutil +import six +import tempfile +import unittest + +from tensorboard.compat.tensorflow_stub import errors +from tensorboard.compat.tensorflow_stub.io import gfile + + +class GFileTest(unittest.TestCase): + def setUp(self): + self.base_temp_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.base_temp_dir) + + def testExists(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + self.assertTrue(gfile.exists(temp_dir)) + self.assertTrue(gfile.exists(ckpt_path)) + + def testGlob(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + expected = [ + 'foo', + 'bar', + 'a.tfevents.1', + 'model.ckpt', + 'quuz', + 'waldo', + ] + expected_listing = [os.path.join(temp_dir, f) for f in expected] + gotten_listing = gfile.glob(os.path.join(temp_dir, "*")) + six.assertCountEqual( + self, + expected_listing, + gotten_listing, + 'Files must match. Expected %r. Got %r.' % ( + expected_listing, gotten_listing)) + + def testIsdir(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self.assertTrue(gfile.isdir(temp_dir)) + + def testListdir(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + expected_files = ( + 'foo', + 'bar', + 'quuz', + 'a.tfevents.1', + 'model.ckpt', + 'waldo', + ) + six.assertCountEqual( + self, + expected_files, + gfile.listdir(temp_dir)) + + def testWalk(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + expected = [ + ['', [ + 'a.tfevents.1', + 'model.ckpt', + ]], + ['foo', []], + ['bar', [ + 'b.tfevents.1', + 'red_herring.txt', + ]], + ['bar/baz', [ + 'c.tfevents.1', + 'd.tfevents.1', + ]], + ['bar/quux', [ + 'some_flume_output.txt', + 'some_more_flume_output.txt', + ]], + ['quuz', [ + 'e.tfevents.1', + ]], + ['quuz/garply', [ + 'f.tfevents.1', + ]], + ['quuz/garply/corge', [ + 'g.tfevents.1', + ]], + ['quuz/garply/grault', [ + 'h.tfevents.1', + ]], + ['waldo', []], + ['waldo/fred', [ + 'i.tfevents.1', + ]], + ] + for pair in expected: + # If this is not the top-level directory, prepend the high-level + # directory. + pair[0] = os.path.join(temp_dir, pair[0]) if pair[0] else temp_dir + gotten = gfile.walk(temp_dir) + self._CompareFilesPerSubdirectory(expected, gotten) + + def testStat(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_content = 'asdfasdfasdffoobarbuzz' + with open(ckpt_path, 'w') as f: + f.write(ckpt_content) + ckpt_stat = gfile.stat(ckpt_path) + self.assertEqual(ckpt_stat.length, len(ckpt_content)) + bad_ckpt_path = os.path.join(temp_dir, 'bad_model.ckpt') + with self.assertRaises(errors.NotFoundError): + gfile.stat(bad_ckpt_path) + + def testRead(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_content = 'asdfasdfasdffoobarbuzz' + with open(ckpt_path, 'w') as f: + f.write(ckpt_content) + with gfile.GFile(ckpt_path, 'r') as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read = f.read() + self.assertEqual(ckpt_content, ckpt_read) + + def testReadLines(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_lines = ( + ['\n'] + ['line {}\n'.format(i) for i in range(10)] + ['\n'] + ) + with open(ckpt_path, 'w') as f: + f.write(''.join(ckpt_lines)) + with gfile.GFile(ckpt_path, 'r') as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read_lines = list(f) + self.assertEqual(ckpt_lines, ckpt_read_lines) + + def testReadWithOffset(self): + temp_dir = tempfile.mkdtemp(prefix=self.base_temp_dir) + self._CreateDeepDirectoryStructure(temp_dir) + ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_content = 'asdfasdfasdffoobarbuzz' + ckpt_b_content = b'asdfasdfasdffoobarbuzz' + with open(ckpt_path, 'w') as f: + f.write(ckpt_content) + with gfile.GFile(ckpt_path, 'r') as f: + f.buff_chunk_size = 4 # Test buffering by reducing chunk size + ckpt_read = f.read(12) + self.assertEqual('asdfasdfasdf', ckpt_read) + ckpt_read = f.read(6) + self.assertEqual('foobar', ckpt_read) + ckpt_read = f.read(1) + self.assertEqual('b', ckpt_read) + ckpt_read = f.read() + self.assertEqual('uzz', ckpt_read) + ckpt_read = f.read(1000) + self.assertEqual('', ckpt_read) + with gfile.GFile(ckpt_path, 'rb') as f: + ckpt_read = f.read() + self.assertEqual(ckpt_b_content, ckpt_read) + + def _CreateDeepDirectoryStructure(self, top_directory): + """Creates a reasonable deep structure of subdirectories with files. + + Args: + top_directory: The absolute path of the top level directory in + which to create the directory structure. + """ + # Add a few subdirectories. + directory_names = ( + # An empty directory. + 'foo', + # A directory with an events file (and a text file). + 'bar', + # A deeper directory with events files. + 'bar/baz', + # A non-empty subdir that lacks event files (should be ignored). + 'bar/quux', + # This 3-level deep set of subdirectories tests logic that replaces + # the full glob string with an absolute path prefix if there is + # only 1 subdirectory in the final mapping. + 'quuz/garply', + 'quuz/garply/corge', + 'quuz/garply/grault', + # A directory that lacks events files, but contains a subdirectory + # with events files (first level should be ignored, second level + # should be included). + 'waldo', + 'waldo/fred', + ) + for directory_name in directory_names: + os.makedirs(os.path.join(top_directory, directory_name)) + + # Add a few files to the directory. + file_names = ( + 'a.tfevents.1', + 'model.ckpt', + 'bar/b.tfevents.1', + 'bar/red_herring.txt', + 'bar/baz/c.tfevents.1', + 'bar/baz/d.tfevents.1', + 'bar/quux/some_flume_output.txt', + 'bar/quux/some_more_flume_output.txt', + 'quuz/e.tfevents.1', + 'quuz/garply/f.tfevents.1', + 'quuz/garply/corge/g.tfevents.1', + 'quuz/garply/grault/h.tfevents.1', + 'waldo/fred/i.tfevents.1', + ) + for file_name in file_names: + open(os.path.join(top_directory, file_name), 'w').close() + + def _CompareFilesPerSubdirectory(self, expected, gotten): + """Compares iterables of (subdirectory path, list of absolute paths) + + Args: + expected: The expected iterable of 2-tuples. + gotten: The gotten iterable of 2-tuples. + """ + expected_directory_to_files = { + result[0]: list(result[1]) for result in expected} + gotten_directory_to_files = { + # Note we ignore subdirectories and just compare files + result[0]: list(result[2]) for result in gotten} + six.assertCountEqual( + self, + expected_directory_to_files.keys(), + gotten_directory_to_files.keys()) + + for subdir, expected_listing in expected_directory_to_files.items(): + gotten_listing = gotten_directory_to_files[subdir] + six.assertCountEqual( + self, + expected_listing, + gotten_listing, + 'Files for subdir %r must match. Expected %r. Got %r.' % ( + subdir, expected_listing, gotten_listing)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py b/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py index 6a27b5df84..bd5734d681 100644 --- a/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py +++ b/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py @@ -19,10 +19,10 @@ from __future__ import print_function import array -import os import struct from . import errors +from .io import gfile TFE_DEVICE_PLACEMENT_WARN = 0 @@ -164,7 +164,13 @@ def crc32c(data): class PyRecordReader_New: - def __init__(self, filename=None, start_offset=0, compression_type=None, status=None): + def __init__( + self, + filename=None, + start_offset=0, + compression_type=None, + status=None + ): self.event_strs = [] self.filename = filename self.start_offset = start_offset @@ -177,51 +183,54 @@ def read(self): if self.filename is None: raise errors.NotFoundError( None, None, 'No filename provided, cannot read Events') - if not os.path.exists(self.filename): + if not gfile.exists(self.filename): raise errors.NotFoundError( None, None, '{} does not point to valid Events file'.format(self.filename)) # TODO: Handle gzip and zlib compressed files - with open(self.filename, "rb") as f: - buf = f.read() - n = self.start_offset + with gfile.GFile(self.filename, 'rb') as f: + f.read(self.start_offset) - while n < len(buf): + while True: # Read the header - header_str = buf[n:n+8] + header_str = f.read(8) + if len(header_str) != 8: + break # Hit EOF so exit header = struct.unpack('Q', header_str) - n += 12 - # Read the crc32, which is 4 bytes, and check it against the - # crc32 of the header - # crc_header_str = buf[n:n+4] - # crc_header = struct.unpack('I', crc_header_str) - # n += 4 - # header_crc_calc = masked_crc32c(header_str) - # assert header_crc_calc == crc_header[0], \ - # 'Header crc\'s dont match' + # Read the crc32, which is 4 bytes, and check it against + # the crc32 of the header + crc_header_str = f.read(4) + crc_header = struct.unpack('I', crc_header_str) + header_crc_calc = masked_crc32c(header_str) + if header_crc_calc != crc_header[0]: + raise errors.DataLossError( + None, None, + '{} failed header crc32 check'.format(self.filename) + ) # The length of the header tells us how many bytes the Event # string takes header_len = int(header[0]) - event_str = buf[n:n+header_len] - # event_crc_calc = masked_crc32c(event_str) + event_str = f.read(header_len) - n += header_len + 4 + event_crc_calc = masked_crc32c(event_str) # The next 4 bytes contain the crc32 of the Event string, # which we check for integrity. Sometimes, the last Event # has no crc32, in which case we skip. - # if len(buf[n:]) > 0: - # crc_event_str = struct.unpack('I', buf[n:n+4]) - # assert event_crc_calc == crc_event_str[0], \ - # 'Header crc\'s dont match' - - # n += 4 + crc_event_str = f.read(4) + if crc_event_str: + crc_event = struct.unpack('I', crc_event_str) + if event_crc_calc != crc_event[0]: + raise errors.DataLossError( + None, None, + '{} failed event crc32 check'.format(self.filename) + ) self.event_strs += [event_str] - self.done_reading = True + self.done_reading = True def GetNext(self): if not self.done_reading: diff --git a/tensorboard/plugins/core/core_plugin.py b/tensorboard/plugins/core/core_plugin.py index 770b03ba17..f2a1a568e5 100644 --- a/tensorboard/plugins/core/core_plugin.py +++ b/tensorboard/plugins/core/core_plugin.py @@ -162,9 +162,9 @@ def _serve_runs(self, request): def get_first_event_timestamp(run_name): try: return self._multiplexer.FirstEventTimestamp(run_name) - except ValueError: + except ValueError as e: logger.warn( - 'Unable to get first event timestamp for run %s', run_name) + 'Unable to get first event timestamp for run %s: %s', run_name, e) # Put runs without a timestamp at the end. return float('inf') run_names.sort(key=get_first_event_timestamp) diff --git a/tensorboard/plugins/projector/projector_plugin_test.py b/tensorboard/plugins/projector/projector_plugin_test.py index 5d8094f0e7..791c26997c 100644 --- a/tensorboard/plugins/projector/projector_plugin_test.py +++ b/tensorboard/plugins/projector/projector_plugin_test.py @@ -33,6 +33,7 @@ from tensorboard.backend import application from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long +from tensorboard.compat import tf as tf_compat from tensorboard.plugins import base_plugin from tensorboard.plugins.projector import projector_config_pb2 from tensorboard.plugins.projector import projector_plugin @@ -56,7 +57,10 @@ def testRunsWithValidCheckpoint(self): self._GenerateProjectorTestData() self._SetupWSGIApp() run_json = self._GetJson('/data/plugin/projector/runs') - self.assertTrue(run_json) + if tf_compat.__version__ != 'stub': + self.assertTrue(run_json) + else: + self.assertFalse(run_json) def testRunsWithNoCheckpoint(self): self._SetupWSGIApp() @@ -85,24 +89,26 @@ def testRunsWithInvalidModelCheckpointPathInConfig(self): self._SetupWSGIApp() run_json = self._GetJson('/data/plugin/projector/runs') - self.assertEqual(run_json, []) + if tf_compat.__version__ != 'stub': + self.assertEqual(run_json, []) def testInfoWithValidCheckpointNoEventsData(self): self._GenerateProjectorTestData() self._SetupWSGIApp() - info_json = self._GetJson('/data/plugin/projector/info?run=.') - self.assertItemsEqual(info_json['embeddings'], [{ - 'tensorShape': [1, 2], - 'tensorName': 'var1', - 'bookmarksPath': 'bookmarks.json' - }, { - 'tensorShape': [10, 10], - 'tensorName': 'var2' - }, { - 'tensorShape': [100, 100], - 'tensorName': 'var3' - }]) + if tf_compat.__version__ != 'stub': + info_json = self._GetJson('/data/plugin/projector/info?run=.') + self.assertItemsEqual(info_json['embeddings'], [{ + 'tensorShape': [1, 2], + 'tensorName': 'var1', + 'bookmarksPath': 'bookmarks.json' + }, { + 'tensorShape': [10, 10], + 'tensorName': 'var2' + }, { + 'tensorShape': [100, 100], + 'tensorName': 'var3' + }]) def testInfoWithValidCheckpointAndEventsData(self): self._GenerateProjectorTestData() @@ -110,29 +116,33 @@ def testInfoWithValidCheckpointAndEventsData(self): self._SetupWSGIApp() run_json = self._GetJson('/data/plugin/projector/runs') - self.assertTrue(run_json) - run = run_json[0] - info_json = self._GetJson('/data/plugin/projector/info?run=%s' % run) - self.assertItemsEqual(info_json['embeddings'], [{ - 'tensorShape': [1, 2], - 'tensorName': 'var1', - 'bookmarksPath': 'bookmarks.json' - }, { - 'tensorShape': [10, 10], - 'tensorName': 'var2' - }, { - 'tensorShape': [100, 100], - 'tensorName': 'var3' - }]) + if tf_compat.__version__ != 'stub': + self.assertTrue(run_json) + run = run_json[0] + info_json = self._GetJson('/data/plugin/projector/info?run=%s' % run) + self.assertItemsEqual(info_json['embeddings'], [{ + 'tensorShape': [1, 2], + 'tensorName': 'var1', + 'bookmarksPath': 'bookmarks.json' + }, { + 'tensorShape': [10, 10], + 'tensorName': 'var2' + }, { + 'tensorShape': [100, 100], + 'tensorName': 'var3' + }]) + else: + self.assertFalse(run_json) def testTensorWithValidCheckpoint(self): self._GenerateProjectorTestData() self._SetupWSGIApp() - url = '/data/plugin/projector/tensor?run=.&name=var1' - tensor_bytes = self._Get(url).data - expected_tensor = np.array([[6, 6]], dtype=np.float32) - self._AssertTensorResponse(tensor_bytes, expected_tensor) + if tf_compat.__version__ != 'stub': + url = '/data/plugin/projector/tensor?run=.&name=var1' + tensor_bytes = self._Get(url).data + expected_tensor = np.array([[6, 6]], dtype=np.float32) + self._AssertTensorResponse(tensor_bytes, expected_tensor) def testBookmarksRequestMissingRunAndName(self): self._GenerateProjectorTestData() @@ -173,9 +183,10 @@ def testBookmarks(self): self._GenerateProjectorTestData() self._SetupWSGIApp() - url = '/data/plugin/projector/bookmarks?run=.&name=var1' - bookmark = self._GetJson(url) - self.assertEqual(bookmark, {'a': 'b'}) + if tf_compat.__version__ != 'stub': + url = '/data/plugin/projector/bookmarks?run=.&name=var1' + bookmark = self._GetJson(url) + self.assertEqual(bookmark, {'a': 'b'}) def testEndpointsNoAssets(self): g = tf.Graph() @@ -213,13 +224,14 @@ def testPluginIsActive(self): self.plugin._thread_for_determining_is_active.run() - # The plugin later finds that embedding data is available. - self.assertTrue(self.plugin.is_active()) + if tf_compat.__version__ != 'stub': + # The plugin later finds that embedding data is available. + self.assertTrue(self.plugin.is_active()) - # Subsequent calls to is_active should not start a new thread. The mock - # should only have been called once throughout this test. - self.assertTrue(self.plugin.is_active()) - mock.assert_called_once_with(thread) + # Subsequent calls to is_active should not start a new thread. The mock + # should only have been called once throughout this test. + self.assertTrue(self.plugin.is_active()) + mock.assert_called_once_with(thread) def testPluginIsNotActive(self): self._SetupWSGIApp() From 4596fde0f31eace7aa87328cb4beab22fcb2c60b Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Thu, 14 Feb 2019 09:18:58 -0800 Subject: [PATCH 2/7] Some Windows and boundary condition fixes --- .../compat/tensorflow_stub/io/gfile.py | 7 ++--- .../tensorflow_stub/io/gfile_s3_test.py | 26 +++++++++++-------- .../compat/tensorflow_stub/io/gfile_test.py | 11 +++++--- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 8e2f9c6ab4..6b2935a77a 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -311,6 +311,7 @@ def __enter__(self): def __exit__(self, *args): self.buff = None self.buff_offset = 0 + self.offset = 0 def __iter__(self): return self @@ -324,7 +325,7 @@ def _read_buffer_to_offset(self, new_buff_offset): def read(self, n=None): result = None - if self.buff is not None: + if self.buff and len(self.buff) > self.buff_offset: # read from local buffer if n is not None: chunk = self._read_buffer_to_offset(self.buff_offset + n) @@ -359,7 +360,7 @@ def __next__(self): if not self.buff: # read one unit into the buffer line = self.read(1) - if line and line[-1] == '\n': + if line and (line[-1] == '\n' or not self.buff): return line if not self.buff: raise StopIteration() @@ -374,7 +375,7 @@ def __next__(self): # read one unit past end of buffer chunk = self.read(len(self.buff) + 1 - self.buff_offset) line = line + chunk if line else chunk - if line and line[-1] == '\n': + if line and (line[-1] == '\n' or not self.buff): return line if not self.buff: raise StopIteration() diff --git a/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py b/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py index 0dedc54eb2..8b753dc93b 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile_s3_test.py @@ -37,7 +37,7 @@ class GFileTest(unittest.TestCase): @mock_s3 def testExists(self): temp_dir = self._CreateDeepS3Structure() - ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_path = self._PathJoin(temp_dir, 'model.ckpt') self.assertTrue(gfile.exists(temp_dir)) self.assertTrue(gfile.exists(ckpt_path)) @@ -61,8 +61,8 @@ def testGlob(self): 'quuz/garply/grault/h.tfevents.1', 'waldo/fred/i.tfevents.1' ] - expected_listing = [os.path.join(temp_dir, f) for f in expected] - gotten_listing = gfile.glob(os.path.join(temp_dir, "*")) + expected_listing = [self._PathJoin(temp_dir, f) for f in expected] + gotten_listing = gfile.glob(self._PathJoin(temp_dir, "*")) six.assertCountEqual( self, expected_listing, @@ -134,7 +134,7 @@ def testWalk(self): for pair in expected: # If this is not the top-level directory, prepend the high-level # directory. - pair[0] = os.path.join(temp_dir, pair[0]) if pair[0] else temp_dir + pair[0] = self._PathJoin(temp_dir, pair[0]) if pair[0] else temp_dir gotten = gfile.walk(temp_dir) self._CompareFilesPerSubdirectory(expected, gotten) @@ -142,10 +142,10 @@ def testWalk(self): def testStat(self): ckpt_content = 'asdfasdfasdffoobarbuzz' temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) - ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_path = self._PathJoin(temp_dir, 'model.ckpt') ckpt_stat = gfile.stat(ckpt_path) self.assertEqual(ckpt_stat.length, len(ckpt_content)) - bad_ckpt_path = os.path.join(temp_dir, 'bad_model.ckpt') + bad_ckpt_path = self._PathJoin(temp_dir, 'bad_model.ckpt') with self.assertRaises(errors.NotFoundError): gfile.stat(bad_ckpt_path) @@ -153,7 +153,7 @@ def testStat(self): def testRead(self): ckpt_content = 'asdfasdfasdffoobarbuzz' temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) - ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_path = self._PathJoin(temp_dir, 'model.ckpt') with gfile.GFile(ckpt_path, 'r') as f: f.buff_chunk_size = 4 # Test buffering by reducing chunk size ckpt_read = f.read() @@ -162,11 +162,11 @@ def testRead(self): @mock_s3 def testReadLines(self): ckpt_lines = ( - ['\n'] + ['line {}\n'.format(i) for i in range(10)] + ['\n'] + [u'\n'] + [u'line {}\n'.format(i) for i in range(10)] + [u' '] ) - ckpt_content = ''.join(ckpt_lines) + ckpt_content = u''.join(ckpt_lines) temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) - ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_path = self._PathJoin(temp_dir, 'model.ckpt') with gfile.GFile(ckpt_path, 'r') as f: f.buff_chunk_size = 4 # Test buffering by reducing chunk size ckpt_read_lines = list(f) @@ -177,7 +177,7 @@ def testReadWithOffset(self): ckpt_content = 'asdfasdfasdffoobarbuzz' ckpt_b_content = b'asdfasdfasdffoobarbuzz' temp_dir = self._CreateDeepS3Structure(ckpt_content=ckpt_content) - ckpt_path = os.path.join(temp_dir, 'model.ckpt') + ckpt_path = self._PathJoin(temp_dir, 'model.ckpt') with gfile.GFile(ckpt_path, 'r') as f: f.buff_chunk_size = 4 # Test buffering by reducing chunk size ckpt_read = f.read(12) @@ -194,6 +194,10 @@ def testReadWithOffset(self): ckpt_read = f.read() self.assertEqual(ckpt_b_content, ckpt_read) + def _PathJoin(self, top_directory, sub_path): + """Join directory and path with slash and not local separator""" + return top_directory + "/" + sub_path + def _CreateDeepS3Structure(self, top_directory='top_dir', ckpt_content='', region_name='us-east-1', bucket_name='test'): """Creates a reasonable deep structure of S3 subdirectories with files. diff --git a/tensorboard/compat/tensorflow_stub/io/gfile_test.py b/tensorboard/compat/tensorflow_stub/io/gfile_test.py index 31244684d3..bf507c7485 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile_test.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile_test.py @@ -17,6 +17,7 @@ from __future__ import division from __future__ import print_function +import io import os import shutil import six @@ -122,7 +123,8 @@ def testWalk(self): for pair in expected: # If this is not the top-level directory, prepend the high-level # directory. - pair[0] = os.path.join(temp_dir, pair[0]) if pair[0] else temp_dir + pair[0] = os.path.join(temp_dir, + pair[0].replace('/', os.path.sep)) if pair[0] else temp_dir gotten = gfile.walk(temp_dir) self._CompareFilesPerSubdirectory(expected, gotten) @@ -156,10 +158,11 @@ def testReadLines(self): self._CreateDeepDirectoryStructure(temp_dir) ckpt_path = os.path.join(temp_dir, 'model.ckpt') ckpt_lines = ( - ['\n'] + ['line {}\n'.format(i) for i in range(10)] + ['\n'] + [u'\n'] + [u'line {}\n'.format(i) for i in range(10)] + [u' '] ) - with open(ckpt_path, 'w') as f: - f.write(''.join(ckpt_lines)) + # Write out \n as newline even on Windows + with io.open(ckpt_path, 'w', newline='') as f: + f.write(u''.join(ckpt_lines)) with gfile.GFile(ckpt_path, 'r') as f: f.buff_chunk_size = 4 # Test buffering by reducing chunk size ckpt_read_lines = list(f) From d230cc531a23c0f4ea99f3068bd59c0b7d66246b Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Thu, 14 Feb 2019 09:29:08 -0800 Subject: [PATCH 3/7] One more Windows fix --- tensorboard/compat/tensorflow_stub/io/gfile.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 6b2935a77a..19321ed99f 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -82,6 +82,10 @@ def exists(self, filename): """Determines whether a path exists or not.""" return os.path.exists(compat.as_bytes(filename)) + def join(self, path, *paths): + """Join paths with path delimiter.""" + return os.path.join(path, *paths) + def read(self, filename, binary_mode=False, size=None, offset=None): """Reads contents of a file to a string. @@ -173,6 +177,10 @@ def exists(self, filename): return True return False + def join(self, path, *paths): + """Join paths with a slash.""" + return path + "/" + "/".join(paths) + def read(self, filename, binary_mode=False, size=None, offset=None): """Reads contents of a file to a string. @@ -466,6 +474,7 @@ def walk(top, topdown=True, onerror=None): as strings """ top = compat.as_str_any(top) + fs = get_filesystem(top) try: listing = listdir(top) except errors.NotFoundError as err: @@ -477,7 +486,7 @@ def walk(top, topdown=True, onerror=None): files = [] subdirs = [] for item in listing: - full_path = os.path.join(top, compat.as_str_any(item)) + full_path = fs.join(top, compat.as_str_any(item)) if isdir(full_path): subdirs.append(item) else: @@ -489,7 +498,7 @@ def walk(top, topdown=True, onerror=None): yield here for subdir in subdirs: - joined_subdir = os.path.join(top, compat.as_str_any(subdir)) + joined_subdir = fs.join(top, compat.as_str_any(subdir)) for subitem in walk(joined_subdir, topdown, onerror=onerror): yield subitem From c7ed9b90a600e9ced5f8c0b614fa562d3ec40312 Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Thu, 21 Feb 2019 16:55:36 -0800 Subject: [PATCH 4/7] Code review changes --- tensorboard/compat/tensorflow_stub/io/gfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 19321ed99f..21a26451d6 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -179,7 +179,7 @@ def exists(self, filename): def join(self, path, *paths): """Join paths with a slash.""" - return path + "/" + "/".join(paths) + return "/".join([path] + paths) def read(self, filename, binary_mode=False, size=None, offset=None): """Reads contents of a file to a string. From 61612444e555fbab54e8a92ad1050b03a85a7f78 Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Thu, 21 Feb 2019 17:01:56 -0800 Subject: [PATCH 5/7] Revert some changes --- .../event_accumulator_test.py | 8 +- .../event_multiplexer_test.py | 8 +- .../plugin_event_accumulator_test.py | 8 +- .../plugin_event_multiplexer_test.py | 8 +- .../projector/projector_plugin_test.py | 94 ++++++++----------- 5 files changed, 57 insertions(+), 69 deletions(-) diff --git a/tensorboard/backend/event_processing/event_accumulator_test.py b/tensorboard/backend/event_processing/event_accumulator_test.py index 9352950446..7a857e78e8 100644 --- a/tensorboard/backend/event_processing/event_accumulator_test.py +++ b/tensorboard/backend/event_processing/event_accumulator_test.py @@ -767,8 +767,8 @@ def FakeScalarSummary(tag, value): directory = os.path.join(self.get_temp_dir(), 'values_dir') if tf.io.gfile.isdir(directory): - os.removedirs(directory) - os.mkdir(directory) + tf.io.gfile.rmtree(directory) + tf.io.gfile.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) @@ -845,8 +845,8 @@ def testGraphFromMetaGraphBecomesAvailable(self): directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir') if tf.io.gfile.isdir(directory): - os.removedirs(directory) - os.mkdir(directory) + tf.io.gfile.rmtree(directory) + tf.io.gfile.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) diff --git a/tensorboard/backend/event_processing/event_multiplexer_test.py b/tensorboard/backend/event_processing/event_multiplexer_test.py index e428a8dd84..6263a7df06 100644 --- a/tensorboard/backend/event_processing/event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/event_multiplexer_test.py @@ -29,7 +29,7 @@ def _AddEvents(path): if not tf.io.gfile.isdir(path): - os.makedirs(path) + tf.io.gfile.makedirs(path) fpath = os.path.join(path, 'hypothetical.tfevents.out') with tf.io.gfile.GFile(fpath, 'w') as f: f.write('') @@ -38,8 +38,8 @@ def _AddEvents(path): def _CreateCleanDirectory(path): if tf.io.gfile.isdir(path): - os.removedirs(path) - os.mkdir(path) + tf.io.gfile.rmtree(path) + tf.io.gfile.mkdir(path) class _FakeAccumulator(object): @@ -206,7 +206,7 @@ def testAddRunsFromDirectory(self): self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect') path1 = join(realdir, 'path1') - os.mkdir(path1) + tf.io.gfile.mkdir(path1) x.AddRunsFromDirectory(realdir) self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect') diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py index 8fd9274edc..20b388281e 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py @@ -558,8 +558,8 @@ def FakeScalarSummary(tag, value): directory = os.path.join(self.get_temp_dir(), 'values_dir') if tf.io.gfile.isdir(directory): - os.removedirs(directory) - os.mkdir(directory) + tf.io.gfile.rmtree(directory) + tf.io.gfile.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) @@ -636,8 +636,8 @@ def testGraphFromMetaGraphBecomesAvailable(self): directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir') if tf.io.gfile.isdir(directory): - os.removedirs(directory) - os.mkdir(directory) + tf.io.gfile.rmtree(directory) + tf.io.gfile.mkdir(directory) writer = test_util.FileWriter(directory, max_queue=100) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py index 8b3e60e6ff..600a291604 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer_test.py @@ -29,7 +29,7 @@ def _AddEvents(path): if not tf.io.gfile.isdir(path): - os.makedirs(path) + tf.io.gfile.makedirs(path) fpath = os.path.join(path, 'hypothetical.tfevents.out') with tf.io.gfile.GFile(fpath, 'w') as f: f.write('') @@ -38,8 +38,8 @@ def _AddEvents(path): def _CreateCleanDirectory(path): if tf.io.gfile.isdir(path): - os.removedirs(path) - os.mkdir(path) + tf.io.gfile.rmtree(path) + tf.io.gfile.mkdir(path) class _FakeAccumulator(object): @@ -178,7 +178,7 @@ def testAddRunsFromDirectory(self): self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect') path1 = join(realdir, 'path1') - os.mkdir(path1) + tf.io.gfile.mkdir(path1) x.AddRunsFromDirectory(realdir) self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect') diff --git a/tensorboard/plugins/projector/projector_plugin_test.py b/tensorboard/plugins/projector/projector_plugin_test.py index 791c26997c..5d8094f0e7 100644 --- a/tensorboard/plugins/projector/projector_plugin_test.py +++ b/tensorboard/plugins/projector/projector_plugin_test.py @@ -33,7 +33,6 @@ from tensorboard.backend import application from tensorboard.backend.event_processing import plugin_event_multiplexer as event_multiplexer # pylint: disable=line-too-long -from tensorboard.compat import tf as tf_compat from tensorboard.plugins import base_plugin from tensorboard.plugins.projector import projector_config_pb2 from tensorboard.plugins.projector import projector_plugin @@ -57,10 +56,7 @@ def testRunsWithValidCheckpoint(self): self._GenerateProjectorTestData() self._SetupWSGIApp() run_json = self._GetJson('/data/plugin/projector/runs') - if tf_compat.__version__ != 'stub': - self.assertTrue(run_json) - else: - self.assertFalse(run_json) + self.assertTrue(run_json) def testRunsWithNoCheckpoint(self): self._SetupWSGIApp() @@ -89,26 +85,24 @@ def testRunsWithInvalidModelCheckpointPathInConfig(self): self._SetupWSGIApp() run_json = self._GetJson('/data/plugin/projector/runs') - if tf_compat.__version__ != 'stub': - self.assertEqual(run_json, []) + self.assertEqual(run_json, []) def testInfoWithValidCheckpointNoEventsData(self): self._GenerateProjectorTestData() self._SetupWSGIApp() - if tf_compat.__version__ != 'stub': - info_json = self._GetJson('/data/plugin/projector/info?run=.') - self.assertItemsEqual(info_json['embeddings'], [{ - 'tensorShape': [1, 2], - 'tensorName': 'var1', - 'bookmarksPath': 'bookmarks.json' - }, { - 'tensorShape': [10, 10], - 'tensorName': 'var2' - }, { - 'tensorShape': [100, 100], - 'tensorName': 'var3' - }]) + info_json = self._GetJson('/data/plugin/projector/info?run=.') + self.assertItemsEqual(info_json['embeddings'], [{ + 'tensorShape': [1, 2], + 'tensorName': 'var1', + 'bookmarksPath': 'bookmarks.json' + }, { + 'tensorShape': [10, 10], + 'tensorName': 'var2' + }, { + 'tensorShape': [100, 100], + 'tensorName': 'var3' + }]) def testInfoWithValidCheckpointAndEventsData(self): self._GenerateProjectorTestData() @@ -116,33 +110,29 @@ def testInfoWithValidCheckpointAndEventsData(self): self._SetupWSGIApp() run_json = self._GetJson('/data/plugin/projector/runs') - if tf_compat.__version__ != 'stub': - self.assertTrue(run_json) - run = run_json[0] - info_json = self._GetJson('/data/plugin/projector/info?run=%s' % run) - self.assertItemsEqual(info_json['embeddings'], [{ - 'tensorShape': [1, 2], - 'tensorName': 'var1', - 'bookmarksPath': 'bookmarks.json' - }, { - 'tensorShape': [10, 10], - 'tensorName': 'var2' - }, { - 'tensorShape': [100, 100], - 'tensorName': 'var3' - }]) - else: - self.assertFalse(run_json) + self.assertTrue(run_json) + run = run_json[0] + info_json = self._GetJson('/data/plugin/projector/info?run=%s' % run) + self.assertItemsEqual(info_json['embeddings'], [{ + 'tensorShape': [1, 2], + 'tensorName': 'var1', + 'bookmarksPath': 'bookmarks.json' + }, { + 'tensorShape': [10, 10], + 'tensorName': 'var2' + }, { + 'tensorShape': [100, 100], + 'tensorName': 'var3' + }]) def testTensorWithValidCheckpoint(self): self._GenerateProjectorTestData() self._SetupWSGIApp() - if tf_compat.__version__ != 'stub': - url = '/data/plugin/projector/tensor?run=.&name=var1' - tensor_bytes = self._Get(url).data - expected_tensor = np.array([[6, 6]], dtype=np.float32) - self._AssertTensorResponse(tensor_bytes, expected_tensor) + url = '/data/plugin/projector/tensor?run=.&name=var1' + tensor_bytes = self._Get(url).data + expected_tensor = np.array([[6, 6]], dtype=np.float32) + self._AssertTensorResponse(tensor_bytes, expected_tensor) def testBookmarksRequestMissingRunAndName(self): self._GenerateProjectorTestData() @@ -183,10 +173,9 @@ def testBookmarks(self): self._GenerateProjectorTestData() self._SetupWSGIApp() - if tf_compat.__version__ != 'stub': - url = '/data/plugin/projector/bookmarks?run=.&name=var1' - bookmark = self._GetJson(url) - self.assertEqual(bookmark, {'a': 'b'}) + url = '/data/plugin/projector/bookmarks?run=.&name=var1' + bookmark = self._GetJson(url) + self.assertEqual(bookmark, {'a': 'b'}) def testEndpointsNoAssets(self): g = tf.Graph() @@ -224,14 +213,13 @@ def testPluginIsActive(self): self.plugin._thread_for_determining_is_active.run() - if tf_compat.__version__ != 'stub': - # The plugin later finds that embedding data is available. - self.assertTrue(self.plugin.is_active()) + # The plugin later finds that embedding data is available. + self.assertTrue(self.plugin.is_active()) - # Subsequent calls to is_active should not start a new thread. The mock - # should only have been called once throughout this test. - self.assertTrue(self.plugin.is_active()) - mock.assert_called_once_with(thread) + # Subsequent calls to is_active should not start a new thread. The mock + # should only have been called once throughout this test. + self.assertTrue(self.plugin.is_active()) + mock.assert_called_once_with(thread) def testPluginIsNotActive(self): self._SetupWSGIApp() From 2ceb921e703cdb67b1a57207604b77eb772eb576 Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Thu, 21 Feb 2019 17:12:33 -0800 Subject: [PATCH 6/7] Use tuple instead of list --- tensorboard/compat/tensorflow_stub/io/gfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 21a26451d6..0ea2b9e178 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -179,7 +179,7 @@ def exists(self, filename): def join(self, path, *paths): """Join paths with a slash.""" - return "/".join([path] + paths) + return "/".join((path,) + paths) def read(self, filename, binary_mode=False, size=None, offset=None): """Reads contents of a file to a string. From 3ef3d88a5cd96e98e01ae4fc9a6ef3f94b752c9e Mon Sep 17 00:00:00 2001 From: Orion Reblitz-Richardson Date: Thu, 21 Feb 2019 17:17:39 -0800 Subject: [PATCH 7/7] Add TODO note --- tensorboard/compat/tensorflow_stub/io/gfile.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 0ea2b9e178..161ca39489 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -238,6 +238,10 @@ def glob(self, filename): "{} not supported by compat glob".format(filename)) if star_i != len(filename) - 1: # Just return empty so we can use glob from directory watcher + # + # TODO: Remove and instead handle in GetLogdirSubdirectories. + # However, we would need to handle it for all non-local registered + # filesystems in some way. return [] filename = filename[:-1] client = boto3.client("s3")