-
Notifications
You must be signed in to change notification settings - Fork 4
/
write.py
195 lines (159 loc) · 7.54 KB
/
write.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
from __future__ import annotations
import os
import sys
from typing import Any
import pandas as pd
from tqdm import tqdm
_known_extensions = (".csv", ".json", ".xlsx")
def _rm_rf_or_raise(path: str, overwrite: bool) -> None:
"""Remove the directory tree below dir if overwrite is True.
Args:
path (str): The directory whose children will be removed if overwrite=True.
overwrite (bool): Whether to overwrite existing.
Raises:
FileExistsError: If path exists and overwrite=False.
"""
if os.path.exists(path): # True if dir is either file or directory
# for safety, check dir is either TensorBoard run or CSV file
# to make it harder to delete files not created by this program
is_tb_dir = os.path.isdir(path) and all(
x.startswith("events.out") for x in os.listdir(path)
)
# use `ext in path` instead of endswith() to handle compressed files
# (.csv.gz, .json.bz2, etc.)
is_data_file = any(ext in path.lower() for ext in _known_extensions)
if overwrite and (is_data_file or is_tb_dir):
os.system(f"rm -rf {path}") # noqa: S605
elif overwrite:
ValueError(
f"Received the overwrite flag but the content of '{path}' does not "
"look like it was written by this program. Please make sure you really "
f"want to delete '{path}' and then do so manually."
)
else:
raise FileExistsError(
f"'{path}' already exists, pass overwrite=True"
" (-f/--overwrite in CLI) to proceed anyway"
)
def write_tb_events(
data_to_write: dict[str, dict[str, pd.DataFrame]],
out_dir: str,
overwrite: bool = False,
verbose: bool = False,
) -> list[str]:
"""Write a dictionary with tags as keys and reduced TensorBoard scalar data
as values to disk as a new TensorBoard event file in a newly created or
overwritten `out_dir` directory (depending on `overwrite`).
Inspired by https://stackoverflow.com/a/48774926.
Args:
data_to_write (dict[str, dict[str, pd.DataFrame]]): Data to write to disk.
Assumes 1st-level keys are reduce ops (mean, std, ...) and 2nd-level are
TensorBoard tags.
out_dir (str): Name of the directory to save the new reduced run data. Will
have the reduce op name (e.g. '-mean'/'-std') appended.
overwrite (bool): Whether to overwrite existing reduction directories.
Defaults to False.
verbose (bool): Whether to print the paths to new TensorBoard event file.
Defaults to False.
Returns:
list[str]: List of paths to the new TensorBoard event files.
"""
try:
from torch.utils.tensorboard import SummaryWriter
except ImportError:
try:
from tensorflow.summary import SummaryWriter
except ImportError:
raise ImportError(
"Cannot import SummaryWriter from torch nor tensorflow. "
"Install either to create new TensorBoard event files."
) from None
out_dirs: list[str] = []
data_to_write = data_to_write.copy() # make copy since we modify std data in place
out_dir_op_connector = "" if out_dir.endswith(("/", "\\")) else "-"
# handle std reduction separately as we use writer.add_scalars to write mean +/- std
if {"mean", "std"}.issubset(data_to_write):
mean_dict = data_to_write["mean"]
# remove std from data_to_write so we don't write it twice
std_dict = data_to_write.pop("std")
for sign, symbol in ((1, "+"), (-1, "-")):
std_out_dir = f"{out_dir}{out_dir_op_connector}mean{symbol}std"
if verbose:
print(f"Writing mean{symbol}std reduction to disk...", file=sys.stderr)
_rm_rf_or_raise(std_out_dir, overwrite)
out_dirs.append(std_out_dir)
writer = SummaryWriter(std_out_dir)
for (tag, means), stds in zip(mean_dict.items(), std_dict.values()):
# we can safely zip(means, stds): they have the same length and same
# step values because the same data went into both reductions
for (step, mean), std in zip(means.items(), stds.to_numpy()):
writer.add_scalar(tag, mean + sign * std, step)
writer.close()
# loop over each reduce operation (e.g. mean, min, max, median)
for op, events_dict in (pbar := tqdm(data_to_write.items(), disable=not verbose)):
pbar.set_description(f"Writing {op} reduction to disk")
op_out_dir = f"{out_dir}{out_dir_op_connector}{op}"
out_dirs.append(op_out_dir)
_rm_rf_or_raise(op_out_dir, overwrite)
writer = SummaryWriter(op_out_dir)
for tag, series in events_dict.items():
for step, value in series.items():
writer.add_scalar(tag, value, step)
# Important for allowing write_events() to overwrite. Without it,
# try_rmtree will raise OSError: [Errno 16] Device or resource busy
# trying to delete the existing out_dir.
writer.close()
if verbose:
out_str = "\n- ".join(out_dirs)
print(f"Created new TensorBoard event files in\n- {out_str}")
return out_dirs
def write_df(*args: Any) -> None:
"""Inform users of breaking change if they try to use the old API."""
raise NotImplementedError(
"write_df() was renamed to write_data_file() in tensorboard-reducer v0.2.8"
)
def write_data_file(
data_to_write: dict[str, dict[str, pd.DataFrame]],
out_path: str,
overwrite: bool = False,
verbose: bool = False,
) -> str:
"""Writes reduced TensorBoard data passed as dict of dicts to a CSV file.
Use `pandas.read_csv("path/to/file.csv", header=[0, 1], index_col=0)` to read CSV
data back into a multi-index dataframe.
Args:
data_to_write (dict[str, dict[str, pd.DataFrame]]): Data to write to disk.
Assumes 1st-level keys are reduce ops (mean, std, ...) and 2nd-level are
TensorBoard tags.
out_path (str): CSV, JSON or Excel file path where the reduced data will be
written. Supports all compression formats that Pandas supports. Simply
change the file extension. For example .csv.gz, .csv.gzip, .json.bz2, etc.
overwrite (bool): Whether to overwrite existing reduction directories.
Defaults to False.
verbose (bool): Whether to print the path to new data file. Defaults to False.
Returns:
str: Path to the new data file.
"""
_rm_rf_or_raise(out_path, overwrite)
# create multi-index dataframe from event data with reduce op names as 1st-level col
# names and tag names as 2nd level
dict_of_dfs = {op: pd.DataFrame(dic) for op, dic in data_to_write.items()}
df_out = pd.concat(dict_of_dfs, axis=1)
df_out.columns = df_out.columns.swaplevel(0, 1)
df_out.index.name = "step"
# let pandas handle compression inference from extensions (.csv.gz, .json.bz2, etc.)
basename = os.path.basename(out_path)
if ".csv" in basename.lower():
df_out.to_csv(out_path)
elif ".json" in basename.lower():
df_out.to_json(out_path)
elif ".xlsx" in out_path.lower():
df_out.to_excel(out_path)
else:
raise ValueError(
f"{out_path=} has unknown extension, should be one of {_known_extensions} "
" or compressed versions thereof like '.csv.gz', '.json.bz2', etc."
)
if verbose:
print(f"Created new data file at {out_path!r}")
return out_path