/
utils.py
347 lines (280 loc) · 11.2 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import warnings
from functools import wraps, singledispatch
from typing import Mapping, Any, Sequence, Union
import h5py
import pandas as pd
import numpy as np
from scipy import sparse
from .logging import get_logger
from ._core.sparse_dataset import SparseDataset
from .compat import CupyArray, CupySparseMatrix
logger = get_logger(__name__)
@singledispatch
def asarray(x):
"""Convert x to a numpy array"""
return np.asarray(x)
@asarray.register(sparse.spmatrix)
def asarray_sparse(x):
return x.toarray()
@asarray.register(SparseDataset)
def asarray_sparse_dataset(x):
return asarray(x.value)
@asarray.register(h5py.Dataset)
def asarray_h5py_dataset(x):
return x[...]
@asarray.register(CupyArray)
def asarray_cupy(x):
return x.get()
@asarray.register(CupySparseMatrix)
def asarray_cupy_sparse(x):
return x.toarray().get()
@singledispatch
def convert_to_dict(obj) -> dict:
return dict(obj)
@convert_to_dict.register(dict)
def convert_to_dict_dict(obj: dict):
return obj
@convert_to_dict.register(np.ndarray)
def convert_to_dict_ndarray(obj: np.ndarray):
if obj.dtype.fields is None:
raise TypeError(
"Can only convert np.ndarray with compound dtypes to dict, "
f"passed array had “{obj.dtype}”."
)
return {k: obj[k] for k in obj.dtype.fields.keys()}
@convert_to_dict.register(type(None))
def convert_to_dict_nonetype(obj: None):
return dict()
@singledispatch
def dim_len(x, axis):
"""\
Return the size of an array in dimension `axis`.
Returns None if `x` is an awkward array with variable length in the requested dimension.
"""
return x.shape[axis]
try:
from .compat import awkward as ak
def _size_at_depth(layout, depth, lateral_context, **kwargs):
"""Callback function for dim_len_awkward, resolving the dim_len for a given level"""
if layout.is_numpy:
# if it's an embedded rectilinear array, we have to deal with its shape
# which might not be 1-dimensional
if layout.is_unknown:
shape = (0,)
else:
shape = layout.shape
numpy_axis = lateral_context["axis"] - depth + 1
if not (1 <= numpy_axis < len(shape)):
raise TypeError(f"axis={lateral_context['axis']} is too deep")
lateral_context["out"] = shape[numpy_axis]
return ak.contents.EmptyArray()
elif layout.is_list and depth == lateral_context["axis"]:
if layout.parameter("__array__") in ("string", "bytestring"):
# Strings are implemented like an array of lists of uint8 (ListType(NumpyType(...)))
# which results in an extra hierarchy-level that shouldn't show up in dim_len
# See https://github.com/scikit-hep/awkward/discussions/1654#discussioncomment-3736747
raise TypeError(f"axis={lateral_context['axis']} is too deep")
if layout.is_regular:
# if it's a regular list, you want the size
lateral_context["out"] = layout.size
else:
# if it's an irregular list, you want a null token
lateral_context["out"] = -1
return ak.contents.EmptyArray()
elif layout.is_record and depth == lateral_context["axis"]:
lateral_context["out"] = len(layout.fields)
return ak.contents.EmptyArray()
elif layout.is_record:
# currently, we don't recurse into records
# in theory we could, just not sure how to do it at the moment
# Would need to consider cases like: scalars, unevenly sized values
raise TypeError(
f"Cannot recurse into record type found at axis={lateral_context['axis']}"
)
elif layout.is_union:
# if it's a union, you could get the result of each union branch
# separately and see if they're all the same; if not, it's an error
result = None
for content in layout.contents:
context = {"axis": lateral_context["axis"]}
ak.transform(
_size_at_depth,
content,
lateral_context=context,
)
if result is None:
result = context["out"]
elif result != context["out"]:
# Union branches have different lengths -> return null token
lateral_context["out"] = -1
return ak.contents.EmptyArray()
lateral_context["out"] = result
return ak.contents.EmptyArray()
@dim_len.register(ak.Array)
def dim_len_awkward(array, axis):
"""Get the length of an awkward array in a given dimension
Returns None if the dimension is of variable length.
Code adapted from @jpivarski's solution in https://github.com/scikit-hep/awkward/discussions/1654#discussioncomment-3521574
"""
if axis < 0: # negative axis is another can of worms... maybe later
raise NotImplementedError("Does not support negative axis")
elif axis == 0:
return len(array)
else:
# communicate with the recursive function using a context (lateral)
context = {"axis": axis}
# "transform" but we don't care what kind of array it returns
ak.transform(
_size_at_depth,
array,
lateral_context=context,
)
# Use `None` as null token.
return None if context["out"] == -1 else context["out"]
@asarray.register(ak.Array)
def asarray_awkward(x):
return x
except ImportError:
pass
def make_index_unique(index: pd.Index, join: str = "-"):
"""
Makes the index unique by appending a number string to each duplicate index element:
'1', '2', etc.
If a tentative name created by the algorithm already exists in the index, it tries
the next integer in the sequence.
The first occurrence of a non-unique value is ignored.
Parameters
----------
join
The connecting string between name and integer.
Examples
--------
>>> from anndata import AnnData
>>> adata = AnnData(np.ones((2, 3)), var=pd.DataFrame(index=["a", "a", "b"]))
>>> adata.var_names
Index(['a', 'a', 'b'], dtype='object')
>>> adata.var_names_make_unique()
>>> adata.var_names
Index(['a', 'a-1', 'b'], dtype='object')
"""
if index.is_unique:
return index
from collections import Counter
values = index.values.copy()
indices_dup = index.duplicated(keep="first")
values_dup = values[indices_dup]
values_set = set(values)
counter = Counter()
issue_interpretation_warning = False
example_colliding_values = []
for i, v in enumerate(values_dup):
while True:
counter[v] += 1
tentative_new_name = v + join + str(counter[v])
if tentative_new_name not in values_set:
values_set.add(tentative_new_name)
values_dup[i] = tentative_new_name
break
issue_interpretation_warning = True
if len(example_colliding_values) < 5:
example_colliding_values.append(tentative_new_name)
if issue_interpretation_warning:
warnings.warn(
f"Suffix used ({join}[0-9]+) to deduplicate index values may make index "
+ "values difficult to interpret. There values with a similar suffixes in "
+ "the index. Consider using a different delimiter by passing "
+ "`join={delimiter}`"
+ "Example key collisions generated by the make_index_unique algorithm: "
+ str(example_colliding_values)
)
values[indices_dup] = values_dup
index = pd.Index(values, name=index.name)
return index
def warn_names_duplicates(attr: str):
names = "Observation" if attr == "obs" else "Variable"
warnings.warn(
f"{names} names are not unique. "
f"To make them unique, call `.{attr}_names_make_unique`.",
UserWarning,
stacklevel=2,
)
def ensure_df_homogeneous(
df: pd.DataFrame, name: str
) -> Union[np.ndarray, sparse.csr_matrix]:
# TODO: rename this function, I would not expect this to return a non-dataframe
if all(isinstance(dt, pd.SparseDtype) for dt in df.dtypes):
arr = df.sparse.to_coo().tocsr()
else:
arr = df.to_numpy()
if df.dtypes.nunique() != 1:
warnings.warn(f"{name} converted to numpy array with dtype {arr.dtype}")
return arr
def convert_dictionary_to_structured_array(source: Mapping[str, Sequence[Any]]):
names = list(source.keys())
try: # transform to byte-strings
cols = [
np.asarray(col)
if np.array(col[0]).dtype.char not in {"U", "S"}
else np.asarray(col).astype("U")
for col in source.values()
]
except UnicodeEncodeError as e:
raise ValueError(
"Currently only support ascii strings. "
"Don’t use “ö” etc. for sample annotation."
) from e
# if old_index_key not in source:
# names.append(new_index_key)
# cols.append(np.arange(len(cols[0]) if cols else n_row).astype("U"))
# else:
# names[names.index(old_index_key)] = new_index_key
# cols[names.index(old_index_key)] = cols[names.index(old_index_key)].astype("U")
dtype_list = list(
zip(names, [str(c.dtype) for c in cols], [(c.shape[1],) for c in cols])
)
# might be unnecessary
dtype = np.dtype(dtype_list)
arr = np.zeros((len(cols[0]),), dtype)
# here, we do not want to call BoundStructArray.__getitem__
# but np.ndarray.__getitem__, therefore we avoid the following line
# arr = np.ndarray.__new__(cls, (len(cols[0]),), dtype)
for i, name in enumerate(dtype.names):
arr[name] = np.array(cols[i], dtype=dtype_list[i][1])
return arr
def deprecated(new_name: str):
"""\
This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
"""
def decorator(func):
@wraps(func)
def new_func(*args, **kwargs):
# turn off filter
warnings.simplefilter("always", DeprecationWarning)
warnings.warn(
f"Use {new_name} instead of {func.__name__}, "
f"{func.__name__} will be removed in the future.",
category=DeprecationWarning,
stacklevel=2,
)
warnings.simplefilter("default", DeprecationWarning) # reset filter
return func(*args, **kwargs)
setattr(new_func, "__deprecated", True)
return new_func
return decorator
class DeprecationMixinMeta(type):
"""\
Use this as superclass so deprecated methods and properties
do not appear in vars(MyClass)/dir(MyClass)
"""
def __dir__(cls):
def is_deprecated(attr):
if isinstance(attr, property):
attr = attr.fget
return getattr(attr, "__deprecated", False)
return [
item
for item in type.__dir__(cls)
if not is_deprecated(getattr(cls, item, None))
]