diff --git a/tensorboard/BUILD b/tensorboard/BUILD index 6af3f23c2c..fabecc2fe6 100644 --- a/tensorboard/BUILD +++ b/tensorboard/BUILD @@ -43,6 +43,7 @@ py_library( ":notebook", ":program", "//tensorboard/summary", + "//tensorboard/summary/writer", ], ) diff --git a/tensorboard/summary/writer/BUILD b/tensorboard/summary/writer/BUILD new file mode 100644 index 0000000000..42e408901b --- /dev/null +++ b/tensorboard/summary/writer/BUILD @@ -0,0 +1,48 @@ +# Description: +# Writer interfaces for TensorBoard if tensorflow is not present +package(default_visibility = ["//tensorboard:internal"]) + +licenses(["notice"]) # Apache 2.0 + +exports_files(["LICENSE"]) + +py_library( + name = "writer", + srcs = [ + "__init__.py", + "event_file_writer.py", + "record_writer.py", + ], + srcs_version = "PY2AND3", + visibility = ["//visibility:public"], + deps = [ + "//tensorboard/compat/proto:protos_all_py_pb2", + "//tensorboard/compat/tensorflow_stub", + "@org_pythonhosted_six", + ], +) + + +py_test( + name = "event_file_writer_test", + size = "small", + srcs = ["event_file_writer_test.py"], + main = "event_file_writer_test.py", + srcs_version = "PY2AND3", + deps = [ + ":writer", + "//tensorboard:test", + ], +) + +py_test( + name = "record_writer_test", + size = "small", + srcs = ["record_writer_test.py"], + main = "record_writer_test.py", + srcs_version = "PY2AND3", + deps = [ + ":writer", + "//tensorboard:test", + ], +) \ No newline at end of file diff --git a/tensorboard/summary/writer/__init__.py b/tensorboard/summary/writer/__init__.py new file mode 100644 index 0000000000..61b9129e83 --- /dev/null +++ b/tensorboard/summary/writer/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tensorboard/summary/writer/event_file_writer.py b/tensorboard/summary/writer/event_file_writer.py new file mode 100644 index 0000000000..dd87a23ca0 --- /dev/null +++ b/tensorboard/summary/writer/event_file_writer.py @@ -0,0 +1,227 @@ +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Writes events to disk in a logdir.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import socket +import threading +import time + +import six + +from tensorboard.compat.proto import event_pb2 +from tensorboard.summary.writer.record_writer import RecordWriter + + +class AtomicCounter(object): + def __init__(self, initial_value): + self._value = initial_value + self._lock = threading.Lock() + + def get(self): + with self._lock: + try: + return self._value + finally: + self._value += 1 + + +_global_uid = AtomicCounter(0) + + +class EventFileWriter(object): + """Writes `Event` protocol buffers to an event file. + + The `EventFileWriter` class creates an event file in the specified directory, + and asynchronously writes Event protocol buffers to the file. The Event file + is encoded using the tfrecord format, which is similar to RecordIO. + """ + + def __init__(self, logdir, max_queue_size=10, flush_secs=120, filename_suffix=''): + """Creates a `EventFileWriter` and an event file to write to. + + On construction the summary writer creates a new event file in `logdir`. + This event file will contain `Event` protocol buffers, which are written to + disk via the add_event method. + The other arguments to the constructor control the asynchronous writes to + the event file: + + Args: + logdir: A string. Directory where event file will be written. + max_queue_size: Integer. Size of the queue for pending events and summaries. + flush_secs: Number. How often, in seconds, to flush the + pending events and summaries to disk. + """ + self._logdir = logdir + if not os.path.exists(logdir): + os.makedirs(logdir) + self._file_name = os.path.join(logdir, "events.out.tfevents.%010d.%s.%s.%s" % + (time.time(), socket.gethostname(), os.getpid(), _global_uid.get())) + filename_suffix # noqa E128 + self._general_file_writer = open(self._file_name, 'wb') + self._async_writer = _AsyncWriter(RecordWriter(self._general_file_writer), max_queue_size, flush_secs) + + # Initialize an event instance. + _event = event_pb2.Event(wall_time=time.time(), file_version='brain.Event:2') + self.add_event(_event) + self.flush() + + def get_logdir(self): + """Returns the directory where event file will be written.""" + return self._logdir + + def add_event(self, event): + """Adds an event to the event file. + + Args: + event: An `Event` protocol buffer. + """ + if not isinstance(event, event_pb2.Event): + raise TypeError("Expected an event_pb2.Event proto, " + " but got %s" % type(event)) + self._async_writer.write(event.SerializeToString()) + + def flush(self): + """Flushes the event file to disk. + + Call this method to make sure that all pending events have been written to + disk. + """ + self._async_writer.flush() + + def close(self): + """Performs a final flush of the event file to disk, stops the + write/flush worker and closes the file. Call this method when you do not + need the summary writer anymore. + """ + self._async_writer.close() + + +class _AsyncWriter(object): + '''Writes bytes to a file.''' + + def __init__(self, record_writer, max_queue_size=20, flush_secs=120): + """Writes bytes to a file asynchronously. + An instance of this class holds a queue to keep the incoming data temporarily. + Data passed to the `write` function will be put to the queue and the function + returns immediately. This class also maintains a thread to write data in the + queue to disk. The first initialization parameter is an instance of + `tensorboard.summary.record_writer` which computes the CRC checksum and then write + the combined result to the disk. So we use an async approach to improve performance. + + Args: + record_writer: A RecordWriter instance + max_queue_size: Integer. Size of the queue for pending bytestrings. + flush_secs: Number. How often, in seconds, to flush the + pending bytestrings to disk. + """ + self._writer = record_writer + self._closed = False + self._byte_queue = six.moves.queue.Queue(max_queue_size) + self._worker = _AsyncWriterThread(self._byte_queue, self._writer, flush_secs) + self._lock = threading.Lock() + self._worker.start() + + def write(self, bytestring): + '''Enqueue the given bytes to be written asychronously''' + with self._lock: + if self._closed: + raise IOError('Writer is closed') + self._byte_queue.put(bytestring) + + def flush(self): + '''Write all the enqueued bytestring before this flush call to disk. + Block until all the above bytestring are written. + ''' + with self._lock: + if self._closed: + raise IOError('Writer is closed') + self._byte_queue.join() + self._writer.flush() + + def close(self): + '''Closes the underlying writer, flushing any pending writes first.''' + if not self._closed: + with self._lock: + if not self._closed: + self._closed = True + self._worker.stop() + self._writer.flush() + self._writer.close() + + +class _AsyncWriterThread(threading.Thread): + """Thread that processes asynchronous writes for _AsyncWriter.""" + + def __init__(self, queue, record_writer, flush_secs): + """Creates an _AsyncWriterThread. + Args: + queue: A Queue from which to dequeue data. + record_writer: An instance of record_writer writer. + flush_secs: How often, in seconds, to flush the + pending file to disk. + """ + threading.Thread.__init__(self) + self.daemon = True + self._queue = queue + self._record_writer = record_writer + self._flush_secs = flush_secs + # The first data will be flushed immediately. + self._next_flush_time = 0 + self._has_pending_data = False + self._shutdown_signal = object() + + def stop(self): + self._queue.put(self._shutdown_signal) + self.join() + + def run(self): + # Here wait on the queue until an data appears, or till the next + # time to flush the writer, whichever is earlier. If we have an + # data, write it. If not, an empty queue exception will be raised + # and we can proceed to flush the writer. + while True: + now = time.time() + queue_wait_duration = self._next_flush_time - now + data = None + try: + if queue_wait_duration > 0: + data = self._queue.get(True, queue_wait_duration) + else: + data = self._queue.get(False) + + if data is self._shutdown_signal: + return + self._record_writer.write(data) + self._has_pending_data = True + except six.moves.queue.Empty: + pass + finally: + if data: + self._queue.task_done() + + now = time.time() + if now > self._next_flush_time: + if self._has_pending_data: + # Small optimization - if there are no pending data, + # there's no need to flush, since each flush can be + # expensive (e.g. uploading a new file to a server). + self._record_writer.flush() + self._has_pending_data = False + # Do it again in flush_secs. + self._next_flush_time = now + self._flush_secs diff --git a/tensorboard/summary/writer/event_file_writer_test.py b/tensorboard/summary/writer/event_file_writer_test.py new file mode 100644 index 0000000000..acbef412b8 --- /dev/null +++ b/tensorboard/summary/writer/event_file_writer_test.py @@ -0,0 +1,127 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +# """Tests for EventFileWriter and _AsyncWriter""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +import glob +import os +from tensorboard.summary.writer.event_file_writer import EventFileWriter +from tensorboard.summary.writer.event_file_writer import _AsyncWriter +from tensorboard.compat.proto import event_pb2 +from tensorboard.compat.proto.summary_pb2 import Summary +from tensorboard.compat.tensorflow_stub.pywrap_tensorflow import PyRecordReader_New +from tensorboard import test as tb_test + + +class EventFileWriterTest(tb_test.TestCase): + + def test_event_file_writer_roundtrip(self): + _TAGNAME = 'dummy' + _DUMMY_VALUE = 42 + logdir = self.get_temp_dir() + w = EventFileWriter(logdir) + summary = Summary(value=[Summary.Value(tag=_TAGNAME, simple_value=_DUMMY_VALUE)]) + fakeevent = event_pb2.Event(summary=summary) + w.add_event(fakeevent) + w.close() + event_files = sorted(glob.glob(os.path.join(logdir, '*'))) + self.assertEqual(len(event_files), 1) + r = PyRecordReader_New(event_files[0]) + r.GetNext() # meta data, so skip + r.GetNext() + self.assertEqual(fakeevent.SerializeToString(), r.record()) + + def test_setting_filename_suffix_works(self): + logdir = self.get_temp_dir() + + w = EventFileWriter(logdir, filename_suffix='.event_horizon') + w.close() + event_files = sorted(glob.glob(os.path.join(logdir, '*'))) + self.assertEqual(event_files[0].split('.')[-1], 'event_horizon') + + def test_async_writer_without_write(self): + logdir = self.get_temp_dir() + w = EventFileWriter(logdir) + w.close() + event_files = sorted(glob.glob(os.path.join(logdir, '*'))) + r = PyRecordReader_New(event_files[0]) + r.GetNext() + s = event_pb2.Event.FromString(r.record()) + self.assertEqual(s.file_version, "brain.Event:2") + + +class AsyncWriterTest(tb_test.TestCase): + + def test_async_writer_write_once(self): + filename = os.path.join(self.get_temp_dir(), "async_writer_write_once") + w = _AsyncWriter(open(filename, 'wb')) + bytes_to_write = b"hello world" + w.write(bytes_to_write) + w.close() + with open(filename, 'rb') as f: + self.assertEqual(f.read(), bytes_to_write) + + def test_async_writer_write_queue_full(self): + filename = os.path.join(self.get_temp_dir(), "async_writer_write_queue_full") + w = _AsyncWriter(open(filename, 'wb')) + bytes_to_write = b"hello world" + repeat = 100 + for i in range(repeat): + w.write(bytes_to_write) + w.close() + with open(filename, 'rb') as f: + self.assertEqual(f.read(), bytes_to_write * repeat) + + def test_async_writer_write_one_slot_queue(self): + filename = os.path.join(self.get_temp_dir(), "async_writer_write_one_slot_queue") + w = _AsyncWriter(open(filename, 'wb'), max_queue_size=1) + bytes_to_write = b"hello world" + repeat = 10 # faster + for i in range(repeat): + w.write(bytes_to_write) + w.close() + with open(filename, 'rb') as f: + self.assertEqual(f.read(), bytes_to_write * repeat) + + def test_async_writer_close_triggers_flush(self): + filename = os.path.join(self.get_temp_dir(), "async_writer_close_triggers_flush") + w = _AsyncWriter(open(filename, 'wb')) + bytes_to_write = b"x" * 64 + w.write(bytes_to_write) + w.close() + with open(filename, 'rb') as f: + self.assertEqual(f.read(), bytes_to_write) + + def test_write_after_async_writer_closed(self): + filename = os.path.join(self.get_temp_dir(), "write_after_async_writer_closed") + w = _AsyncWriter(open(filename, 'wb')) + bytes_to_write = b"x" * 64 + w.write(bytes_to_write) + w.close() + + with self.assertRaises(IOError): + w.write(bytes_to_write) + # nothing is written to the file after close + with open(filename, 'rb') as f: + self.assertEqual(f.read(), bytes_to_write) + + +if __name__ == '__main__': + tb_test.main() diff --git a/tensorboard/summary/writer/record_writer.py b/tensorboard/summary/writer/record_writer.py new file mode 100644 index 0000000000..a49a6b14d1 --- /dev/null +++ b/tensorboard/summary/writer/record_writer.py @@ -0,0 +1,49 @@ +# Copyright 2019 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import struct +from tensorboard.compat.tensorflow_stub.pywrap_tensorflow import masked_crc32c + + +class RecordWriter(object): + """Write encoded protobuf to a file with packing defined in tensorflow""" + def __init__(self, writer): + """Open a file to keep the tensorboard records. + + Args: + writer: A file-like object that implements `write`, `flush` and `close`. + """ + self._writer = writer + + # Format of a single record: (little-endian) + # uint64 length + # uint32 masked crc of length + # byte data[length] + # uint32 masked crc of data + def write(self, data): + header = struct.pack('