/
variables.py
334 lines (265 loc) · 12.2 KB
/
variables.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""Coders for individual Variable objects."""
import warnings
from functools import partial
from typing import Any, Hashable
import numpy as np
import pandas as pd
from ..core import dtypes, duck_array_ops, indexing
from ..core.pycompat import is_duck_dask_array
from ..core.variable import Variable
class SerializationWarning(RuntimeWarning):
"""Warnings about encoding/decoding issues in serialization."""
class VariableCoder:
"""Base class for encoding and decoding transformations on variables.
We use coders for transforming variables between xarray's data model and
a format suitable for serialization. For example, coders apply CF
conventions for how data should be represented in netCDF files.
Subclasses should implement encode() and decode(), which should satisfy
the identity ``coder.decode(coder.encode(variable)) == variable``. If any
options are necessary, they should be implemented as arguments to the
__init__ method.
The optional name argument to encode() and decode() exists solely for the
sake of better error messages, and should correspond to the name of
variables in the underlying store.
"""
def encode(
self, variable: Variable, name: Hashable = None
) -> Variable: # pragma: no cover
"""Convert an encoded variable to a decoded variable"""
raise NotImplementedError()
def decode(
self, variable: Variable, name: Hashable = None
) -> Variable: # pragma: no cover
"""Convert an decoded variable to a encoded variable"""
raise NotImplementedError()
class _ElementwiseFunctionArray(indexing.ExplicitlyIndexedNDArrayMixin):
"""Lazily computed array holding values of elemwise-function.
Do not construct this object directly: call lazy_elemwise_func instead.
Values are computed upon indexing or coercion to a NumPy array.
"""
def __init__(self, array, func, dtype):
assert not is_duck_dask_array(array)
self.array = indexing.as_indexable(array)
self.func = func
self._dtype = dtype
@property
def dtype(self):
return np.dtype(self._dtype)
def __getitem__(self, key):
return type(self)(self.array[key], self.func, self.dtype)
def __array__(self, dtype=None):
return self.func(self.array)
def __repr__(self):
return "{}({!r}, func={!r}, dtype={!r})".format(
type(self).__name__, self.array, self.func, self.dtype
)
def lazy_elemwise_func(array, func, dtype):
"""Lazily apply an element-wise function to an array.
Parameters
----------
array : any valid value of Variable._data
func : callable
Function to apply to indexed slices of an array. For use with dask,
this should be a pickle-able object.
dtype : coercible to np.dtype
Dtype for the result of this function.
Returns
-------
Either a dask.array.Array or _ElementwiseFunctionArray.
"""
if is_duck_dask_array(array):
import dask.array as da
return da.map_blocks(func, array, dtype=dtype)
else:
return _ElementwiseFunctionArray(array, func, dtype)
def unpack_for_encoding(var):
return var.dims, var.data, var.attrs.copy(), var.encoding.copy()
def unpack_for_decoding(var):
return var.dims, var._data, var.attrs.copy(), var.encoding.copy()
def safe_setitem(dest, key, value, name=None):
if key in dest:
var_str = f" on variable {name!r}" if name else ""
raise ValueError(
"failed to prevent overwriting existing key {} in attrs{}. "
"This is probably an encoding field used by xarray to describe "
"how a variable is serialized. To proceed, remove this key from "
"the variable's attributes manually.".format(key, var_str)
)
dest[key] = value
def pop_to(source, dest, key, name=None):
"""
A convenience function which pops a key k from source to dest.
None values are not passed on. If k already exists in dest an
error is raised.
"""
value = source.pop(key, None)
if value is not None:
safe_setitem(dest, key, value, name=name)
return value
def _apply_mask(
data: np.ndarray, encoded_fill_values: list, decoded_fill_value: Any, dtype: Any
) -> np.ndarray:
"""Mask all matching values in a NumPy arrays."""
data = np.asarray(data, dtype=dtype)
condition = False
for fv in encoded_fill_values:
condition |= data == fv
return np.where(condition, decoded_fill_value, data)
class CFMaskCoder(VariableCoder):
"""Mask or unmask fill values according to CF conventions."""
def encode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_encoding(variable)
dtype = np.dtype(encoding.get("dtype", data.dtype))
fv = encoding.get("_FillValue")
mv = encoding.get("missing_value")
if (
fv is not None
and mv is not None
and not duck_array_ops.allclose_or_equiv(fv, mv)
):
raise ValueError(
f"Variable {name!r} has conflicting _FillValue ({fv}) and missing_value ({mv}). Cannot encode data."
)
if fv is not None:
# Ensure _FillValue is cast to same dtype as data's
encoding["_FillValue"] = dtype.type(fv)
fill_value = pop_to(encoding, attrs, "_FillValue", name=name)
if not pd.isnull(fill_value):
data = duck_array_ops.fillna(data, fill_value)
if mv is not None:
# Ensure missing_value is cast to same dtype as data's
encoding["missing_value"] = dtype.type(mv)
fill_value = pop_to(encoding, attrs, "missing_value", name=name)
if not pd.isnull(fill_value) and fv is None:
data = duck_array_ops.fillna(data, fill_value)
return Variable(dims, data, attrs, encoding)
def decode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_decoding(variable)
raw_fill_values = [
pop_to(attrs, encoding, attr, name=name)
for attr in ("missing_value", "_FillValue")
]
if raw_fill_values:
encoded_fill_values = {
fv
for option in raw_fill_values
for fv in np.ravel(option)
if not pd.isnull(fv)
}
if len(encoded_fill_values) > 1:
warnings.warn(
"variable {!r} has multiple fill values {}, "
"decoding all values to NaN.".format(name, encoded_fill_values),
SerializationWarning,
stacklevel=3,
)
dtype, decoded_fill_value = dtypes.maybe_promote(data.dtype)
if encoded_fill_values:
transform = partial(
_apply_mask,
encoded_fill_values=encoded_fill_values,
decoded_fill_value=decoded_fill_value,
dtype=dtype,
)
data = lazy_elemwise_func(data, transform, dtype)
return Variable(dims, data, attrs, encoding)
def _scale_offset_decoding(data, scale_factor, add_offset, dtype):
data = np.array(data, dtype=dtype, copy=True)
if scale_factor is not None:
data *= scale_factor
if add_offset is not None:
data += add_offset
return data
def _choose_float_dtype(dtype, has_offset):
"""Return a float dtype that can losslessly represent `dtype` values."""
# Keep float32 as-is. Upcast half-precision to single-precision,
# because float16 is "intended for storage but not computation"
if dtype.itemsize <= 4 and np.issubdtype(dtype, np.floating):
return np.float32
# float32 can exactly represent all integers up to 24 bits
if dtype.itemsize <= 2 and np.issubdtype(dtype, np.integer):
# A scale factor is entirely safe (vanishing into the mantissa),
# but a large integer offset could lead to loss of precision.
# Sensitivity analysis can be tricky, so we just use a float64
# if there's any offset at all - better unoptimised than wrong!
if not has_offset:
return np.float32
# For all other types and circumstances, we just use float64.
# (safe because eg. complex numbers are not supported in NetCDF)
return np.float64
class CFScaleOffsetCoder(VariableCoder):
"""Scale and offset variables according to CF conventions.
Follows the formula:
decode_values = encoded_values * scale_factor + add_offset
"""
def encode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_encoding(variable)
if "scale_factor" in encoding or "add_offset" in encoding:
dtype = _choose_float_dtype(data.dtype, "add_offset" in encoding)
data = data.astype(dtype=dtype, copy=True)
if "add_offset" in encoding:
data -= pop_to(encoding, attrs, "add_offset", name=name)
if "scale_factor" in encoding:
data /= pop_to(encoding, attrs, "scale_factor", name=name)
return Variable(dims, data, attrs, encoding)
def decode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_decoding(variable)
if "scale_factor" in attrs or "add_offset" in attrs:
scale_factor = pop_to(attrs, encoding, "scale_factor", name=name)
add_offset = pop_to(attrs, encoding, "add_offset", name=name)
dtype = _choose_float_dtype(data.dtype, "add_offset" in attrs)
if np.ndim(scale_factor) > 0:
scale_factor = np.asarray(scale_factor).item()
if np.ndim(add_offset) > 0:
add_offset = np.asarray(add_offset).item()
transform = partial(
_scale_offset_decoding,
scale_factor=scale_factor,
add_offset=add_offset,
dtype=dtype,
)
data = lazy_elemwise_func(data, transform, dtype)
return Variable(dims, data, attrs, encoding)
class UnsignedIntegerCoder(VariableCoder):
def encode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_encoding(variable)
# from netCDF best practices
# https://www.unidata.ucar.edu/software/netcdf/docs/BestPractices.html
# "_Unsigned = "true" to indicate that
# integer data should be treated as unsigned"
if encoding.get("_Unsigned", "false") == "true":
pop_to(encoding, attrs, "_Unsigned")
signed_dtype = np.dtype(f"i{data.dtype.itemsize}")
if "_FillValue" in attrs:
new_fill = signed_dtype.type(attrs["_FillValue"])
attrs["_FillValue"] = new_fill
data = duck_array_ops.around(data).astype(signed_dtype)
return Variable(dims, data, attrs, encoding)
def decode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_decoding(variable)
if "_Unsigned" in attrs:
unsigned = pop_to(attrs, encoding, "_Unsigned")
if data.dtype.kind == "i":
if unsigned == "true":
unsigned_dtype = np.dtype(f"u{data.dtype.itemsize}")
transform = partial(np.asarray, dtype=unsigned_dtype)
data = lazy_elemwise_func(data, transform, unsigned_dtype)
if "_FillValue" in attrs:
new_fill = unsigned_dtype.type(attrs["_FillValue"])
attrs["_FillValue"] = new_fill
elif data.dtype.kind == "u":
if unsigned == "false":
signed_dtype = np.dtype(f"i{data.dtype.itemsize}")
transform = partial(np.asarray, dtype=signed_dtype)
data = lazy_elemwise_func(data, transform, signed_dtype)
if "_FillValue" in attrs:
new_fill = signed_dtype.type(attrs["_FillValue"])
attrs["_FillValue"] = new_fill
else:
warnings.warn(
f"variable {name!r} has _Unsigned attribute but is not "
"of integer type. Ignoring attribute.",
SerializationWarning,
stacklevel=3,
)
return Variable(dims, data, attrs, encoding)