Skip to content

Commit

Permalink
Row-oriented parsing to column-oriented
Browse files Browse the repository at this point in the history
  • Loading branch information
j3soon committed May 10, 2022
1 parent 5d69fa1 commit 4bd8740
Showing 1 changed file with 194 additions and 141 deletions.
335 changes: 194 additions & 141 deletions tbparse/summary_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
summaries in a directory contains multiple event files, or a single event file.
"""

# pylint: disable=C0302
import copy
import os
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorboard.backend.event_processing.event_accumulator import (
AUDIO, COMPRESSED_HISTOGRAMS, HISTOGRAMS, IMAGES,
SCALARS, STORE_EVERYTHING_SIZE_GUIDANCE, TENSORS, AudioEvent,
EventAccumulator, HistogramEvent, ImageEvent,
ScalarEvent, TensorEvent)
AUDIO, COMPRESSED_HISTOGRAMS, HISTOGRAMS, IMAGES, SCALARS,
STORE_EVERYTHING_SIZE_GUIDANCE, TENSORS, AudioEvent, EventAccumulator,
HistogramEvent, ImageEvent, ScalarEvent, TensorEvent)
from tensorboard.plugins.hparams.plugin_data_pb2 import HParamsPluginData

# pylint: disable=W0105
Expand Down Expand Up @@ -629,121 +630,160 @@ def histogram_to_bins(counts: np.ndarray, limits: np.ndarray,
bins.append(bin_y)
return centers, bins

def _get_scalar_row(self, tag: str, e: ScalarEvent) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the ScalarEvent `e`"""
d = {'step': e.step}
if self._pivot:
d[tag] = e.value
else:
d['tag'] = tag
d['value'] = e.value
return self._add_extra_columns(d, e.wall_time)

def _get_tensor_row(self, tag: str, e: TensorEvent) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the TensorEvent `e`"""
value = tf.make_ndarray(e.tensor_proto)
if value.shape == ():
# Tensorflow histogram may have more than one items
value = value.item()
d = {'step': e.step}
if self._pivot:
d[tag] = value
else:
d['tag'] = tag
d['value'] = value
return self._add_extra_columns(d, e.wall_time)

def _get_histogram_row(self, tag: str, e: HistogramEvent) -> \
Dict[str, Any]:
"""Add entries in dictionary `d` based on the HistogramEvent `e`"""
hv = e.histogram_value
limits = np.array(hv.bucket_limit, dtype=np.float64)
counts = np.array(hv.bucket, dtype=np.float64)
columns = {
'counts': counts,
'limits': limits,
'max': hv.max,
'min': hv.min,
'num': hv.num,
'sum': hv.sum,
'sum_squares': hv.sum_squares,
}
# assert list(columns.keys()) == list(sorted(columns.keys()))
d = {'step': e.step}
if not self._pivot:
d['tag'] = tag
lst = list(self._extra_columns) + ['limits', 'counts']
for k, v in columns.items():
if k in lst:
key = k if not self._pivot else tag + '/' + k
d[key] = v
return self._add_extra_columns(d, e.wall_time)

def _get_image_row(self, tag: str, e: ImageEvent) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the ImageEvent `e`"""
value = tf.image.decode_image(e.encoded_image_string).numpy()
columns = {
'height': e.height,
'width': e.width,
}
# assert list(columns.keys()) == list(sorted(columns.keys()))
d = {'step': e.step}
if self._pivot:
d[tag] = value
else:
d['tag'] = tag
d['value'] = value
for k, v in columns.items():
if k in self._extra_columns:
key = k if not self._pivot else tag + '/' + k
d[key] = v
return self._add_extra_columns(d, e.wall_time)

def _get_audio_row(self, tag: str, e: AudioEvent) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the AudioEvent `e`"""
audio, _ = tf.audio.decode_wav(e.encoded_audio_string)
value = audio.numpy()
columns = {
'content_type': e.content_type,
'length_frames': e.length_frames,
'sample_rate': e.sample_rate,
}
# assert list(columns.keys()) == list(sorted(columns.keys()))
d = {'step': e.step}
if self._pivot:
d[tag] = value
else:
d['tag'] = tag
d['value'] = value
for k, v in columns.items():
if k in self._extra_columns:
key = k if not self._pivot else tag + '/' + k
d[key] = v
return self._add_extra_columns(d, e.wall_time)

def _get_hparam_row(self, tag: str, value: Any) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the HParamsPluginData \
`plugin_data`"""
d = {}
if self._pivot:
d[tag] = value
else:
d['tag'] = tag
d['value'] = value
return self._add_extra_columns(d, None)

def _get_text_row(self, tag: str, e: TensorEvent) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the TensorEvent `e`"""
value = tf.make_ndarray(e.tensor_proto).item()
assert isinstance(value, bytes)
value = value.decode('utf-8')
d = {'step': e.step}
if self._pivot:
d[tag] = value
else:
d['tag'] = tag
d['value'] = value
return self._add_extra_columns(d, e.wall_time)
def _get_scalar_cols(self, tag_to_events: Dict[str, ScalarEvent]) -> \
Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and ScalarEvents."""
cols = self._get_default_cols(tag_to_events)
idx = 0
for tag, events in tag_to_events.items():
for e in events:
cols['step'][idx] = e.step
if self._pivot:
cols[tag][idx] = e.value
else:
cols['tag'][idx] = tag
cols['value'][idx] = e.value
idx += 1
return cols

def _get_tensor_cols(self, tag_to_events: Dict[str, TensorEvent]) -> \
Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and TensorEvents."""
cols = self._get_default_cols(tag_to_events)
idx = 0
for tag, events in tag_to_events.items():
for e in events:
value = tf.make_ndarray(e.tensor_proto)
if value.shape == ():
# Tensorflow histogram may have more than one items
value = value.item()
cols['step'][idx] = e.step
if self._pivot:
cols[tag][idx] = value
else:
cols['tag'][idx] = tag
cols['value'][idx] = value
idx += 1
return cols

def _get_histogram_cols(self, tag_to_events: Dict[str, HistogramEvent]) \
-> Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and HistogramEvent."""
cols = self._get_default_cols(tag_to_events)
idx = 0
for tag, events in tag_to_events.items():
for e in events:
hv = e.histogram_value
limits = np.array(hv.bucket_limit, dtype=np.float64)
counts = np.array(hv.bucket, dtype=np.float64)
columns = {
'counts': counts,
'limits': limits,
'max': hv.max,
'min': hv.min,
'num': hv.num,
'sum': hv.sum,
'sum_squares': hv.sum_squares,
}
# assert list(columns.keys()) == list(sorted(columns.keys()))
cols['step'][idx] = e.step
if not self._pivot:
cols['tag'][idx] = tag
lst = list(self._extra_columns) + ['limits', 'counts']
for k, v in columns.items():
if k in lst:
key = k if not self._pivot else tag + '/' + k
cols[key][idx] = v
idx += 1
return cols

def _get_image_cols(self, tag_to_events: Dict[str, ImageEvent]) -> \
Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and ImageEvent."""

cols = self._get_default_cols(tag_to_events)
idx = 0
for tag, events in tag_to_events.items():
for e in events:
value = tf.image.decode_image(e.encoded_image_string).numpy()
columns = {
'height': e.height,
'width': e.width,
}
# assert list(columns.keys()) == list(sorted(columns.keys()))
cols['step'][idx] = e.step
if self._pivot:
cols[tag][idx] = value
else:
cols['tag'][idx] = tag
cols['value'][idx] = value
for k, v in columns.items():
if k in self._extra_columns:
key = k if not self._pivot else tag + '/' + k
cols[key][idx] = v
idx += 1
return cols

def _get_audio_cols(self, tag_to_events: Dict[str, AudioEvent]) -> \
Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and AudioEvent."""
cols = self._get_default_cols(tag_to_events)
idx = 0
for tag, events in tag_to_events.items():
for e in events:
audio, _ = tf.audio.decode_wav(e.encoded_audio_string)
value = audio.numpy()
columns = {
'content_type': e.content_type,
'length_frames': e.length_frames,
'sample_rate': e.sample_rate,
}
# assert list(columns.keys()) == list(sorted(columns.keys()))
cols['step'][idx] = e.step
if self._pivot:
cols[tag][idx] = value
else:
cols['tag'][idx] = tag
cols['value'][idx] = value
for k, v in columns.items():
if k in self._extra_columns:
key = k if not self._pivot else tag + '/' + k
cols[key][idx] = v
idx += 1
return cols

def _get_hparam_cols(self, tag_to_events: Dict[str, Any]) -> \
Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and HParamsPluginData."""
cols = self._get_default_cols(tag_to_events, wall_time=False)
idx = 0
for tag, value in tag_to_events.items():
if self._pivot:
cols[tag][idx] = value[0]
else:
cols['tag'][idx] = tag
cols['value'][idx] = value[0]
idx += 1
return cols

def _get_text_cols(self, tag_to_events: Dict[str, TensorEvent]) -> \
Dict[str, List[Any]]:
"""Return a dict of lists based on the tags and TensorEvent."""
cols = self._get_default_cols(tag_to_events)
idx = 0
for tag, events in tag_to_events.items():
for e in events:
value = tf.make_ndarray(e.tensor_proto).item()
assert isinstance(value, bytes)
value = value.decode('utf-8')
cols['step'][idx] = e.step
if self._pivot:
cols[tag][idx] = value
else:
cols['tag'][idx] = tag
cols['value'][idx] = value
idx += 1
return cols

def _parse_hparams(self, event_acc: EventAccumulator) -> \
Tuple[List[str], Dict[str, Any]]:
Expand All @@ -764,16 +804,24 @@ def _parse_hparams(self, event_acc: EventAccumulator) -> \
values[tag] = [fields[0][1]]
return tags, values

def _add_extra_columns(self, d: Dict[str, Any],
wall_time: Optional[float]) -> Dict[str, Any]:
"""Add entries in dictionary `d` based on the extra columns."""
if 'wall_time' in self._extra_columns and wall_time is not None:
d['wall_time'] = wall_time
def _get_default_cols(self, tag_to_events: Dict[str, ScalarEvent],
wall_time=True) -> Dict[str, List[Any]]:
"""Get default entries based on the extra columns."""
length = 0
for events in tag_to_events.values():
length += len(events)
cols: Dict[str, Any] = defaultdict(lambda: [np.NaN] * length)
if 'dir_name' in self._extra_columns:
d['dir_name'] = ''
cols['dir_name'] = [''] * length
if 'file_name' in self._extra_columns:
d['file_name'] = os.path.basename(self.log_path)
return d
cols['file_name'] = [os.path.basename(self.log_path)] * length
if 'wall_time' not in self._extra_columns or not wall_time:
return cols
cols['wall_time'] = []
for events in tag_to_events.values():
cols['wall_time'].extend([e.wall_time for e in events])
# assert len(cols['wall_time']) == length
return cols

def _parse_events(self, event_type: str, event_acc: EventAccumulator):
"""Parse and store `event_type` events inside a event file.
Expand All @@ -784,7 +832,6 @@ def _parse_events(self, event_type: str, event_acc: EventAccumulator):
"""
if os.path.isdir(self.log_path):
raise ValueError(f"Not an event file: {self.log_path}")
rows = []
assert self._tags is not None
if event_type == HPARAMS:
self._tags[event_type], all_events = self._parse_hparams(event_acc)
Expand All @@ -806,26 +853,32 @@ def _parse_events(self, event_type: str, event_acc: EventAccumulator):
self._tags[event_type] = \
list(filter(lambda x: x not in filtered_tags,
self._tags[event_type]))
# Add columns that depend on event types
get_row = {
SCALARS: self._get_scalar_row,
TENSORS: self._get_tensor_row,
HISTOGRAMS: self._get_histogram_row,
IMAGES: self._get_image_row,
AUDIO: self._get_audio_row,
HPARAMS: self._get_hparam_row,
TEXT: self._get_text_row,
# Add columns according to the event type
get_cols = {
SCALARS: self._get_scalar_cols,
TENSORS: self._get_tensor_cols,
HISTOGRAMS: self._get_histogram_cols,
IMAGES: self._get_image_cols,
AUDIO: self._get_audio_cols,
HPARAMS: self._get_hparam_cols,
TEXT: self._get_text_cols,
}[event_type]
tag_to_events = {}
for tag in self._tags[event_type]:
events = all_events[tag]
# Rename tags
if event_type == TEXT and tag.endswith(PLUGIN_RAW_TAGS[TEXT]):
# Remove tag suffix for torch & tensorboardX
tag = tag[:-len(PLUGIN_RAW_TAGS[TEXT])]
# Append row
for e in events:
rows.append(get_row(tag, e))
self._events[event_type] = pd.DataFrame(rows)
tag_to_events[tag] = events
cols = get_cols(tag_to_events)
# Reorder columns
for tag in ['wall_time', 'dir_name', 'file_name']:
if tag in cols:
tmp = cols[tag]
cols.pop(tag)
cols[tag] = tmp
self._events[event_type] = pd.DataFrame.from_dict(cols)

@property
def children(self) -> Dict[str, 'SummaryReader']:
Expand Down

0 comments on commit 4bd8740

Please sign in to comment.