diff --git a/tbparse/summary_reader.py b/tbparse/summary_reader.py index 7ea14e7..229163d 100644 --- a/tbparse/summary_reader.py +++ b/tbparse/summary_reader.py @@ -3,18 +3,19 @@ summaries in a directory contains multiple event files, or a single event file. """ +# pylint: disable=C0302 import copy import os +from collections import defaultdict from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast import numpy as np import pandas as pd import tensorflow as tf from tensorboard.backend.event_processing.event_accumulator import ( - AUDIO, COMPRESSED_HISTOGRAMS, HISTOGRAMS, IMAGES, - SCALARS, STORE_EVERYTHING_SIZE_GUIDANCE, TENSORS, AudioEvent, - EventAccumulator, HistogramEvent, ImageEvent, - ScalarEvent, TensorEvent) + AUDIO, COMPRESSED_HISTOGRAMS, HISTOGRAMS, IMAGES, SCALARS, + STORE_EVERYTHING_SIZE_GUIDANCE, TENSORS, AudioEvent, EventAccumulator, + HistogramEvent, ImageEvent, ScalarEvent, TensorEvent) from tensorboard.plugins.hparams.plugin_data_pb2 import HParamsPluginData # pylint: disable=W0105 @@ -629,121 +630,160 @@ def histogram_to_bins(counts: np.ndarray, limits: np.ndarray, bins.append(bin_y) return centers, bins - def _get_scalar_row(self, tag: str, e: ScalarEvent) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the ScalarEvent `e`""" - d = {'step': e.step} - if self._pivot: - d[tag] = e.value - else: - d['tag'] = tag - d['value'] = e.value - return self._add_extra_columns(d, e.wall_time) - - def _get_tensor_row(self, tag: str, e: TensorEvent) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the TensorEvent `e`""" - value = tf.make_ndarray(e.tensor_proto) - if value.shape == (): - # Tensorflow histogram may have more than one items - value = value.item() - d = {'step': e.step} - if self._pivot: - d[tag] = value - else: - d['tag'] = tag - d['value'] = value - return self._add_extra_columns(d, e.wall_time) - - def _get_histogram_row(self, tag: str, e: HistogramEvent) -> \ - Dict[str, Any]: - """Add entries in dictionary `d` based on the HistogramEvent `e`""" - hv = e.histogram_value - limits = np.array(hv.bucket_limit, dtype=np.float64) - counts = np.array(hv.bucket, dtype=np.float64) - columns = { - 'counts': counts, - 'limits': limits, - 'max': hv.max, - 'min': hv.min, - 'num': hv.num, - 'sum': hv.sum, - 'sum_squares': hv.sum_squares, - } - # assert list(columns.keys()) == list(sorted(columns.keys())) - d = {'step': e.step} - if not self._pivot: - d['tag'] = tag - lst = list(self._extra_columns) + ['limits', 'counts'] - for k, v in columns.items(): - if k in lst: - key = k if not self._pivot else tag + '/' + k - d[key] = v - return self._add_extra_columns(d, e.wall_time) - - def _get_image_row(self, tag: str, e: ImageEvent) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the ImageEvent `e`""" - value = tf.image.decode_image(e.encoded_image_string).numpy() - columns = { - 'height': e.height, - 'width': e.width, - } - # assert list(columns.keys()) == list(sorted(columns.keys())) - d = {'step': e.step} - if self._pivot: - d[tag] = value - else: - d['tag'] = tag - d['value'] = value - for k, v in columns.items(): - if k in self._extra_columns: - key = k if not self._pivot else tag + '/' + k - d[key] = v - return self._add_extra_columns(d, e.wall_time) - - def _get_audio_row(self, tag: str, e: AudioEvent) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the AudioEvent `e`""" - audio, _ = tf.audio.decode_wav(e.encoded_audio_string) - value = audio.numpy() - columns = { - 'content_type': e.content_type, - 'length_frames': e.length_frames, - 'sample_rate': e.sample_rate, - } - # assert list(columns.keys()) == list(sorted(columns.keys())) - d = {'step': e.step} - if self._pivot: - d[tag] = value - else: - d['tag'] = tag - d['value'] = value - for k, v in columns.items(): - if k in self._extra_columns: - key = k if not self._pivot else tag + '/' + k - d[key] = v - return self._add_extra_columns(d, e.wall_time) - - def _get_hparam_row(self, tag: str, value: Any) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the HParamsPluginData \ - `plugin_data`""" - d = {} - if self._pivot: - d[tag] = value - else: - d['tag'] = tag - d['value'] = value - return self._add_extra_columns(d, None) - - def _get_text_row(self, tag: str, e: TensorEvent) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the TensorEvent `e`""" - value = tf.make_ndarray(e.tensor_proto).item() - assert isinstance(value, bytes) - value = value.decode('utf-8') - d = {'step': e.step} - if self._pivot: - d[tag] = value - else: - d['tag'] = tag - d['value'] = value - return self._add_extra_columns(d, e.wall_time) + def _get_scalar_cols(self, tag_to_events: Dict[str, ScalarEvent]) -> \ + Dict[str, List[Any]]: + """Return a dict of lists based on the tags and ScalarEvents.""" + cols = self._get_default_cols(tag_to_events) + idx = 0 + for tag, events in tag_to_events.items(): + for e in events: + cols['step'][idx] = e.step + if self._pivot: + cols[tag][idx] = e.value + else: + cols['tag'][idx] = tag + cols['value'][idx] = e.value + idx += 1 + return cols + + def _get_tensor_cols(self, tag_to_events: Dict[str, TensorEvent]) -> \ + Dict[str, List[Any]]: + """Return a dict of lists based on the tags and TensorEvents.""" + cols = self._get_default_cols(tag_to_events) + idx = 0 + for tag, events in tag_to_events.items(): + for e in events: + value = tf.make_ndarray(e.tensor_proto) + if value.shape == (): + # Tensorflow histogram may have more than one items + value = value.item() + cols['step'][idx] = e.step + if self._pivot: + cols[tag][idx] = value + else: + cols['tag'][idx] = tag + cols['value'][idx] = value + idx += 1 + return cols + + def _get_histogram_cols(self, tag_to_events: Dict[str, HistogramEvent]) \ + -> Dict[str, List[Any]]: + """Return a dict of lists based on the tags and HistogramEvent.""" + cols = self._get_default_cols(tag_to_events) + idx = 0 + for tag, events in tag_to_events.items(): + for e in events: + hv = e.histogram_value + limits = np.array(hv.bucket_limit, dtype=np.float64) + counts = np.array(hv.bucket, dtype=np.float64) + columns = { + 'counts': counts, + 'limits': limits, + 'max': hv.max, + 'min': hv.min, + 'num': hv.num, + 'sum': hv.sum, + 'sum_squares': hv.sum_squares, + } + # assert list(columns.keys()) == list(sorted(columns.keys())) + cols['step'][idx] = e.step + if not self._pivot: + cols['tag'][idx] = tag + lst = list(self._extra_columns) + ['limits', 'counts'] + for k, v in columns.items(): + if k in lst: + key = k if not self._pivot else tag + '/' + k + cols[key][idx] = v + idx += 1 + return cols + + def _get_image_cols(self, tag_to_events: Dict[str, ImageEvent]) -> \ + Dict[str, List[Any]]: + """Return a dict of lists based on the tags and ImageEvent.""" + + cols = self._get_default_cols(tag_to_events) + idx = 0 + for tag, events in tag_to_events.items(): + for e in events: + value = tf.image.decode_image(e.encoded_image_string).numpy() + columns = { + 'height': e.height, + 'width': e.width, + } + # assert list(columns.keys()) == list(sorted(columns.keys())) + cols['step'][idx] = e.step + if self._pivot: + cols[tag][idx] = value + else: + cols['tag'][idx] = tag + cols['value'][idx] = value + for k, v in columns.items(): + if k in self._extra_columns: + key = k if not self._pivot else tag + '/' + k + cols[key][idx] = v + idx += 1 + return cols + + def _get_audio_cols(self, tag_to_events: Dict[str, AudioEvent]) -> \ + Dict[str, List[Any]]: + """Return a dict of lists based on the tags and AudioEvent.""" + cols = self._get_default_cols(tag_to_events) + idx = 0 + for tag, events in tag_to_events.items(): + for e in events: + audio, _ = tf.audio.decode_wav(e.encoded_audio_string) + value = audio.numpy() + columns = { + 'content_type': e.content_type, + 'length_frames': e.length_frames, + 'sample_rate': e.sample_rate, + } + # assert list(columns.keys()) == list(sorted(columns.keys())) + cols['step'][idx] = e.step + if self._pivot: + cols[tag][idx] = value + else: + cols['tag'][idx] = tag + cols['value'][idx] = value + for k, v in columns.items(): + if k in self._extra_columns: + key = k if not self._pivot else tag + '/' + k + cols[key][idx] = v + idx += 1 + return cols + + def _get_hparam_cols(self, tag_to_events: Dict[str, Any]) -> \ + Dict[str, List[Any]]: + """Return a dict of lists based on the tags and HParamsPluginData.""" + cols = self._get_default_cols(tag_to_events, wall_time=False) + idx = 0 + for tag, value in tag_to_events.items(): + if self._pivot: + cols[tag][idx] = value[0] + else: + cols['tag'][idx] = tag + cols['value'][idx] = value[0] + idx += 1 + return cols + + def _get_text_cols(self, tag_to_events: Dict[str, TensorEvent]) -> \ + Dict[str, List[Any]]: + """Return a dict of lists based on the tags and TensorEvent.""" + cols = self._get_default_cols(tag_to_events) + idx = 0 + for tag, events in tag_to_events.items(): + for e in events: + value = tf.make_ndarray(e.tensor_proto).item() + assert isinstance(value, bytes) + value = value.decode('utf-8') + cols['step'][idx] = e.step + if self._pivot: + cols[tag][idx] = value + else: + cols['tag'][idx] = tag + cols['value'][idx] = value + idx += 1 + return cols def _parse_hparams(self, event_acc: EventAccumulator) -> \ Tuple[List[str], Dict[str, Any]]: @@ -764,16 +804,24 @@ def _parse_hparams(self, event_acc: EventAccumulator) -> \ values[tag] = [fields[0][1]] return tags, values - def _add_extra_columns(self, d: Dict[str, Any], - wall_time: Optional[float]) -> Dict[str, Any]: - """Add entries in dictionary `d` based on the extra columns.""" - if 'wall_time' in self._extra_columns and wall_time is not None: - d['wall_time'] = wall_time + def _get_default_cols(self, tag_to_events: Dict[str, ScalarEvent], + wall_time=True) -> Dict[str, List[Any]]: + """Get default entries based on the extra columns.""" + length = 0 + for events in tag_to_events.values(): + length += len(events) + cols: Dict[str, Any] = defaultdict(lambda: [np.NaN] * length) if 'dir_name' in self._extra_columns: - d['dir_name'] = '' + cols['dir_name'] = [''] * length if 'file_name' in self._extra_columns: - d['file_name'] = os.path.basename(self.log_path) - return d + cols['file_name'] = [os.path.basename(self.log_path)] * length + if 'wall_time' not in self._extra_columns or not wall_time: + return cols + cols['wall_time'] = [] + for events in tag_to_events.values(): + cols['wall_time'].extend([e.wall_time for e in events]) + # assert len(cols['wall_time']) == length + return cols def _parse_events(self, event_type: str, event_acc: EventAccumulator): """Parse and store `event_type` events inside a event file. @@ -784,7 +832,6 @@ def _parse_events(self, event_type: str, event_acc: EventAccumulator): """ if os.path.isdir(self.log_path): raise ValueError(f"Not an event file: {self.log_path}") - rows = [] assert self._tags is not None if event_type == HPARAMS: self._tags[event_type], all_events = self._parse_hparams(event_acc) @@ -806,26 +853,32 @@ def _parse_events(self, event_type: str, event_acc: EventAccumulator): self._tags[event_type] = \ list(filter(lambda x: x not in filtered_tags, self._tags[event_type])) - # Add columns that depend on event types - get_row = { - SCALARS: self._get_scalar_row, - TENSORS: self._get_tensor_row, - HISTOGRAMS: self._get_histogram_row, - IMAGES: self._get_image_row, - AUDIO: self._get_audio_row, - HPARAMS: self._get_hparam_row, - TEXT: self._get_text_row, + # Add columns according to the event type + get_cols = { + SCALARS: self._get_scalar_cols, + TENSORS: self._get_tensor_cols, + HISTOGRAMS: self._get_histogram_cols, + IMAGES: self._get_image_cols, + AUDIO: self._get_audio_cols, + HPARAMS: self._get_hparam_cols, + TEXT: self._get_text_cols, }[event_type] + tag_to_events = {} for tag in self._tags[event_type]: events = all_events[tag] # Rename tags if event_type == TEXT and tag.endswith(PLUGIN_RAW_TAGS[TEXT]): # Remove tag suffix for torch & tensorboardX tag = tag[:-len(PLUGIN_RAW_TAGS[TEXT])] - # Append row - for e in events: - rows.append(get_row(tag, e)) - self._events[event_type] = pd.DataFrame(rows) + tag_to_events[tag] = events + cols = get_cols(tag_to_events) + # Reorder columns + for tag in ['wall_time', 'dir_name', 'file_name']: + if tag in cols: + tmp = cols[tag] + cols.pop(tag) + cols[tag] = tmp + self._events[event_type] = pd.DataFrame.from_dict(cols) @property def children(self) -> Dict[str, 'SummaryReader']: