-
Notifications
You must be signed in to change notification settings - Fork 583
/
file_release_manager.py
318 lines (268 loc) · 12.5 KB
/
file_release_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# Copyright 2021, The TensorFlow Federated Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for releasing values from a federated program to a file system.
Note: This library uses `tf.io.gfile` to perform file system operations, this
means that this library:
* supports all the file systems supported by `tf.io.gfile`
* encodes files in the same way as `tf.io.gfile`
"""
import collections
import csv
import enum
import os
import os.path
import random
from typing import Any, Dict, Iterable, List, Mapping, Tuple, Sequence, Union
import numpy as np
import tensorflow as tf
import tree
from tensorflow_federated.python.common_libs import py_typecheck
from tensorflow_federated.python.program import file_utils
from tensorflow_federated.python.program import release_manager
from tensorflow_federated.python.program import structure_utils
from tensorflow_federated.python.program import value_reference
class FileReleaseManagerIncompatibleFileError(Exception):
pass
class FileReleaseManagerPermissionDeniedError(Exception):
pass
@enum.unique
class CSVSaveMode(enum.Enum):
APPEND = 'append'
WRITE = 'write'
class CSVFileReleaseManager(release_manager.ReleaseManager):
"""A `tff.program.ReleaseManager` that releases values to a CSV file.
A `tff.program.CSVFileReleaseManager` is a utility for releasing values
from a federated program to a CSV file and is used to release values from
platform storage to customer storage in a federated program.
Values are released to the file system as a CSV file and are quoted as
strings. When the value is released, if the value is a value reference or a
structure containing value references, each value reference is materialized.
The value is then flattened, converted to a `numpy.ndarray`, and then
converted to a nested list of Python scalars, and released as a CSV file.
For example, `1` will be written as `'1'` and `tf.ones([2, 2])` will be
written as `'[[1.0, 1.0], [1.0, 1.0]'`.
This manager can be configured to release values using a `save_mode` of either
`CSVSaveMode.APPEND` or `CSVSaveMode.WRITE`.
* In append mode, when a value is released, this manager will try and append
the value to the CSV file instead of overwriting the existing file. While
potentially more efficient, append mode is incompatible with compressed
files (e.g. `.bz2` formats) and encoded directories. This mode is equivalent
to write mode when releasing a value with a different structure than the
currently released values, so it may not be useful when values with
different structures are being released frequently.
* In write mode (or in append mode when releasing new structures), when a
value is realeased, this manager reads the entire CSV file and overwrites
the existing file with the additional values. This can be slower than append
mode, but is compatible with compressed files (e.g. `.bz2` formats) and
encoded directories.
"""
def __init__(self,
file_path: Union[str, os.PathLike],
save_mode: CSVSaveMode = CSVSaveMode.APPEND,
key_fieldname: str = 'key'):
"""Returns an initialized `tff.program.CSVFileReleaseManager`.
Args:
file_path: A path on the file system to save releases values. If this file
does not exist it will be created.
save_mode: A `tff.program.CSVSaveMode` specifying how to save released
values.
key_fieldname: A `str` specifying the fieldname used for the key when
saving released value.
Raises:
ValueError: If `file_path` or `key_fieldname` is an empty string.
FileReleaseManagerIncompatibleFileError: If the file exists but does not
contain a fieldname of `key_fieldname`.
"""
py_typecheck.check_type(file_path, (str, os.PathLike))
py_typecheck.check_type(save_mode, CSVSaveMode)
py_typecheck.check_type(key_fieldname, str)
if not file_path:
raise ValueError('Expected `file_path` to not be an empty string.')
if not key_fieldname:
raise ValueError('Expected `key_fieldname` to not be an empty string.')
file_dir = os.path.dirname(file_path)
if not tf.io.gfile.exists(file_dir):
tf.io.gfile.makedirs(file_dir)
self._file_path = file_path
self._save_mode = save_mode
self._key_fieldname = key_fieldname
if tf.io.gfile.exists(self._file_path):
fieldnames, values = self._read_values()
if self._key_fieldname not in fieldnames:
raise FileReleaseManagerIncompatibleFileError(
f'The file \'{self._file_path}\' exists but does not contain a '
f'fieldname of \'{self._key_fieldname}\'. It is possible that this '
'file was not created by a `tff.program.CSVFileReleaseManager` or '
'the `tff.program.CSVFileReleaseManager` was constructed with a '
'different `key_fieldname`.')
if values:
self._latest_key = int(values[-1][self._key_fieldname])
else:
self._latest_key = None
else:
self._write_values([self._key_fieldname], [])
self._latest_key = None
def _read_values(self) -> Tuple[List[str], List[Dict[str, Any]]]:
"""Returns a tuple of fieldnames and values from the managed CSV."""
with tf.io.gfile.GFile(self._file_path, 'r') as file:
reader = csv.DictReader(file)
if reader.fieldnames is not None:
fieldnames = list(reader.fieldnames)
else:
fieldnames = []
values = list(reader)
return fieldnames, values
def _write_values(self, fieldnames: Sequence[str],
values: Iterable[Mapping[str, Any]]):
"""Writes `fieldnames` and `values` to the managed CSV."""
path = os.fspath(self._file_path)
# Create a temporary file.
temp_path = f'{path}_temp{random.randint(1000, 9999)}'
if tf.io.gfile.exists(temp_path):
tf.io.gfile.remove(temp_path)
# Write to the temporary file.
with tf.io.gfile.GFile(temp_path, 'w') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(values)
# Rename the temporary file to the final location atomically.
tf.io.gfile.rename(temp_path, self._file_path, overwrite=True)
def _write_value(self, value: Mapping[str, Any]):
"""Writes `value` to the managed CSV."""
fieldnames, values = self._read_values()
fieldnames.extend([x for x in value.keys() if x not in fieldnames])
values.append(value)
self._write_values(fieldnames, values)
def _append_value(self, value: Mapping[str, Any]):
"""Appends `value` to the managed CSV."""
with tf.io.gfile.GFile(self._file_path, 'a+') as file:
reader = csv.DictReader(file)
if all(key in reader.fieldnames for key in value.keys()):
writer = csv.DictWriter(file, fieldnames=reader.fieldnames)
try:
writer.writerow(value)
except (tf.errors.PermissionDeniedError, csv.Error) as e:
raise FileReleaseManagerPermissionDeniedError(
f'Could not append a value to the file \'{self._file_path}\'. It '
'is possible that this file is compressed or encoded. Olease use '
'write mode instead of append mode to release values to this '
'file using a `tff.program.CSVFileReleaseManager`.') from e
else:
self._write_value(value)
def _remove_all_values(self):
"""Removes all values from the managed CSV."""
self._write_values([self._key_fieldname], [])
self._latest_key = None
def _remove_values_after(self, key: int):
"""Removes all values after `key` from the managed CSV."""
py_typecheck.check_type(key, int)
filtered_fieldnames = [self._key_fieldname]
filtered_values = []
_, values = self._read_values()
for value in values:
current_key = int(value[self._key_fieldname])
if current_key <= key:
fieldnames = [x for x in value.keys() if x not in filtered_fieldnames]
filtered_fieldnames.extend(fieldnames)
filtered_values.append(value)
self._write_values(filtered_fieldnames, filtered_values)
self._latest_key = key
def release(self, value: Any, key: int):
"""Releases `value` from a federated program.
This method will atomically update the managed CSV file by removing all
values previously released with a key greater than or equal to `key` before
writing `value`.
Args:
value: A materialized value, a value reference, or a structure of
materialized values and value references representing the value to
release.
key: An integer used to reference the released `value`, `key` represents a
step in a federated program.
"""
py_typecheck.check_type(key, int)
if self._latest_key is not None and key <= self._latest_key:
if key == 0:
self._remove_all_values()
else:
self._remove_values_after(key - 1)
materialized_value = value_reference.materialize_value(value)
flattened_value = structure_utils.flatten_with_name(materialized_value)
normalized_value = collections.OrderedDict()
for x, y in flattened_value.items():
normalized_value[x] = np.array(y).tolist()
normalized_value[self._key_fieldname] = key
if self._save_mode == CSVSaveMode.APPEND:
self._append_value(normalized_value)
elif self._save_mode == CSVSaveMode.WRITE:
self._write_value(normalized_value)
self._latest_key = key
class SavedModelFileReleaseManager(release_manager.ReleaseManager):
"""A `tff.program.ReleaseManager` that releases values to a file system.
A `tff.program.SavedModelFileReleaseManager` is a utility for releasing values
from a federated program to a file system and is used to release values from
platform storage to customer storage in a federated program.
Values are released to the file system using the SavedModel (see
`tf.saved_model`) format. When the value is released, if the value is a value
reference or a structure containing value references, each value reference is
materialized. The value is then flattened and released using the SavedModel
format. The structure of the value is discarded.
Note: The SavedModel format can only contain values that can be converted to a
`tf.Tensor` (see `tf.convert_to_tensor`), releasing any other values will
result in an error.
See https://www.tensorflow.org/guide/saved_model for more infromation about
the SavedModel format.
"""
def __init__(self,
root_dir: Union[str, os.PathLike],
prefix: str = 'release_'):
"""Returns an initialized `tff.program.SavedModelFileReleaseManager`.
Args:
root_dir: A path on the file system to save program state. If this path
does not exist it will be created.
prefix: A string to use as the prefix for filenames.
Raises:
ValueError: If `root_dir` is an empty string.
"""
py_typecheck.check_type(root_dir, (str, os.PathLike))
py_typecheck.check_type(prefix, str)
if not root_dir:
raise ValueError('Expected `root_dir` to not be an empty string.')
if not tf.io.gfile.exists(root_dir):
tf.io.gfile.makedirs(root_dir)
self._root_dir = root_dir
self._prefix = prefix
def _get_path_for_key(self, key: int) -> str:
"""Returns the path for the given `key`.
This method does not assert that the given `key` or the returned path
represent released values.
Args:
key: The key used to construct the path.
"""
py_typecheck.check_type(key, int)
basename = f'{self._prefix}{key}'
return os.path.join(self._root_dir, basename)
def release(self, value: Any, key: int):
"""Releases `value` from a federated program.
Args:
value: A materialized value, a value reference, or a structure of
materialized values and value references representing the value to
release.
key: An integer used to reference the released `value`.
"""
py_typecheck.check_type(key, int)
path = self._get_path_for_key(key)
materialized_value = value_reference.materialize_value(value)
flattened_value = tree.flatten(materialized_value)
module = file_utils.ValueModule(flattened_value)
file_utils.write_saved_model(module, path, overwrite=True)