-
Notifications
You must be signed in to change notification settings - Fork 258
/
array_sequence.py
601 lines (492 loc) · 21.5 KB
/
array_sequence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
import numbers
from operator import mul
from functools import reduce
import numpy as np
from ..deprecated import deprecate_with_version
MEGABYTE = 1024 * 1024
def is_array_sequence(obj):
""" Return True if `obj` is an array sequence. """
try:
return obj.is_array_sequence
except AttributeError:
return False
def is_ndarray_of_int_or_bool(obj):
return (isinstance(obj, np.ndarray) and
(np.issubdtype(obj.dtype, np.integer) or
np.issubdtype(obj.dtype, np.bool_)))
def _safe_resize(a, shape):
""" Resize an ndarray safely, using minimal memory """
try:
a.resize(shape)
except ValueError:
a = a.copy()
a.resize(shape, refcheck=False)
return a
class _BuildCache(object):
def __init__(self, arr_seq, common_shape, dtype):
self.offsets = list(arr_seq._offsets)
self.lengths = list(arr_seq._lengths)
self.next_offset = arr_seq._get_next_offset()
self.bytes_per_buf = arr_seq._buffer_size * MEGABYTE
# Use the passed dtype only if null data array
self.dtype = dtype if arr_seq._data.size == 0 else arr_seq._data.dtype
if arr_seq.common_shape != () and common_shape != arr_seq.common_shape:
raise ValueError(
"All dimensions, except the first one, must match exactly")
self.common_shape = common_shape
n_in_row = reduce(mul, common_shape, 1)
bytes_per_row = n_in_row * dtype.itemsize
self.rows_per_buf = max(1, self.bytes_per_buf // bytes_per_row)
def update_seq(self, arr_seq):
arr_seq._offsets = np.array(self.offsets)
arr_seq._lengths = np.array(self.lengths)
def _define_operators(cls):
""" Decorator which adds support for some Python operators. """
def _wrap(cls, op, inplace=False, unary=False):
def fn_unary_op(self):
return self._op(op)
def fn_binary_op(self, value):
return self._op(op, value, inplace=inplace)
setattr(cls, op, fn_unary_op if unary else fn_binary_op)
fn = getattr(cls, op)
fn.__name__ = op
fn.__doc__ = getattr(np.ndarray, op).__doc__
for op in ["__add__", "__sub__", "__mul__", "__mod__", "__pow__",
"__floordiv__", "__truediv__", "__lshift__", "__rshift__",
"__or__", "__and__", "__xor__"]:
_wrap(cls, op=op, inplace=False)
_wrap(cls, op=f"__i{op.strip('_')}__", inplace=True)
for op in ["__eq__", "__ne__", "__lt__", "__le__", "__gt__", "__ge__"]:
_wrap(cls, op)
for op in ["__neg__", "__abs__", "__invert__"]:
_wrap(cls, op, unary=True)
return cls
@_define_operators
class ArraySequence(object):
""" Sequence of ndarrays having variable first dimension sizes.
This is a container that can store multiple ndarrays where each ndarray
might have a different first dimension size but a *common* size for the
remaining dimensions.
More generally, an instance of :class:`ArraySequence` of length $N$ is
composed of $N$ ndarrays of shape $(d_1, d_2, ... d_D)$ where $d_1$
can vary in length between arrays but $(d_2, ..., d_D)$ have to be the
same for every ndarray.
"""
def __init__(self, iterable=None, buffer_size=4):
""" Initialize array sequence instance
Parameters
----------
iterable : None or iterable or :class:`ArraySequence`, optional
If None, create an empty :class:`ArraySequence` object.
If iterable, create a :class:`ArraySequence` object initialized
from array-like objects yielded by the iterable.
If :class:`ArraySequence`, create a view (no memory is allocated).
For an actual copy use :meth:`.copy` instead.
buffer_size : float, optional
Size (in Mb) for memory allocation when `iterable` is a generator.
"""
# Create new empty `ArraySequence` object.
self._is_view = False
self._data = np.array([])
self._offsets = np.array([], dtype=np.intp)
self._lengths = np.array([], dtype=np.intp)
self._buffer_size = buffer_size
self._build_cache = None
if iterable is None:
return
if is_array_sequence(iterable):
# Create a view.
self._data = iterable._data
self._offsets = iterable._offsets
self._lengths = iterable._lengths
self._is_view = True
return
self.extend(iterable)
@property
def is_array_sequence(self):
return True
@property
def common_shape(self):
""" Matching shape of the elements in this array sequence. """
return self._data.shape[1:]
@property
def total_nb_rows(self):
""" Total number of rows in this array sequence. """
return np.sum(self._lengths)
@property
@deprecate_with_version("'ArraySequence.data' property is deprecated.\n"
"Please use the 'ArraySequence.get_data()' method instead",
'3.0', '4.0')
def data(self):
""" Elements in this array sequence. """
view = self._data.view()
view.setflags(write=False)
return view
def get_data(self):
""" Returns a *copy* of the elements in this array sequence.
Notes
-----
To modify the data on this array sequence, one can use
in-place mathematical operators (e.g., `seq += ...`) or the use
assignment operator (i.e, `seq[...] = value`).
"""
return self.copy()._data
def _check_shape(self, arrseq):
""" Check whether this array sequence is compatible with another. """
msg = "cannot perform operation - array sequences have different"
if len(self._lengths) != len(arrseq._lengths):
msg += f" lengths: {len(self._lengths)} vs. {len(arrseq._lengths)}."
raise ValueError(msg)
if self.total_nb_rows != arrseq.total_nb_rows:
msg += f" amount of data: {self.total_nb_rows} vs. {arrseq.total_nb_rows}."
raise ValueError(msg)
if self.common_shape != arrseq.common_shape:
msg += f" common shape: {self.common_shape} vs. {arrseq.common_shape}."
raise ValueError(msg)
return True
def _get_next_offset(self):
""" Offset in ``self._data`` at which to write next rowelement """
if len(self._offsets) == 0:
return 0
imax = np.argmax(self._offsets)
return self._offsets[imax] + self._lengths[imax]
def append(self, element, cache_build=False):
""" Appends `element` to this array sequence.
Append can be a lot faster if it knows that it is appending several
elements instead of a single element. In that case it can cache the
parameters it uses between append operations, in a "build cache". To
tell append to do this, use ``cache_build=True``. If you use
``cache_build=True``, you need to finalize the append operations with
:meth:`finalize_append`.
Parameters
----------
element : ndarray
Element to append. The shape must match already inserted elements
shape except for the first dimension.
cache_build : {False, True}
Whether to save the build cache from this append routine. If True,
append can assume it is the only player updating `self`, and the
caller must finalize `self` after all append operations, with
``self.finalize_append()``.
Returns
-------
None
Notes
-----
If you need to add multiple elements you should consider
`ArraySequence.extend`.
"""
element = np.asarray(element)
if element.size == 0:
return
el_shape = element.shape
n_items, common_shape = el_shape[0], el_shape[1:]
build_cache = self._build_cache
in_cached_build = build_cache is not None
if not in_cached_build: # One shot append, not part of sequence
build_cache = _BuildCache(self, common_shape, element.dtype)
next_offset = build_cache.next_offset
req_rows = next_offset + n_items
if self._data.shape[0] < req_rows:
self._resize_data_to(req_rows, build_cache)
self._data[next_offset:req_rows] = element
build_cache.offsets.append(next_offset)
build_cache.lengths.append(n_items)
build_cache.next_offset = req_rows
if in_cached_build:
return
if cache_build:
self._build_cache = build_cache
else:
build_cache.update_seq(self)
def finalize_append(self):
""" Finalize process of appending several elements to `self`
:meth:`append` can be a lot faster if it knows that it is appending
several elements instead of a single element. To tell the append
method this is the case, use ``cache_build=True``. This method
finalizes the series of append operations after a call to
:meth:`append` with ``cache_build=True``.
"""
if self._build_cache is None:
return
self._build_cache.update_seq(self)
self._build_cache = None
self.shrink_data()
def _resize_data_to(self, n_rows, build_cache):
""" Resize data array if required """
# Calculate new data shape, rounding up to nearest buffer size
n_bufs = np.ceil(n_rows / build_cache.rows_per_buf)
extended_n_rows = int(n_bufs * build_cache.rows_per_buf)
new_shape = (extended_n_rows,) + build_cache.common_shape
if self._data.size == 0:
self._data = np.empty(new_shape, dtype=build_cache.dtype)
else:
self._data = _safe_resize(self._data, new_shape)
def shrink_data(self):
self._data.resize((self._get_next_offset(),) + self.common_shape,
refcheck=False)
def extend(self, elements):
""" Appends all `elements` to this array sequence.
Parameters
----------
elements : iterable of ndarrays or :class:`ArraySequence` object
If iterable of ndarrays, each ndarray will be concatenated along
the first dimension then appended to the data of this
ArraySequence.
If :class:`ArraySequence` object, its data are simply appended to
the data of this ArraySequence.
Returns
-------
None
Notes
-----
The shape of the elements to be added must match the one of the data of
this :class:`ArraySequence` except for the first dimension.
"""
# If possible try pre-allocating memory.
try:
iter_len = len(elements)
except TypeError:
pass
else: # We do know the iterable length
if iter_len == 0:
return
e0 = np.asarray(elements[0])
n_elements = np.sum([len(e) for e in elements])
self._build_cache = _BuildCache(self, e0.shape[1:], e0.dtype)
self._resize_data_to(self._get_next_offset() + n_elements,
self._build_cache)
for e in elements:
self.append(e, cache_build=True)
self.finalize_append()
def copy(self):
""" Creates a copy of this :class:`ArraySequence` object.
Returns
-------
seq_copy : :class:`ArraySequence` instance
Copy of `self`.
Notes
-----
We do not simply deepcopy this object because we have a chance to use
less memory. For example, if the array sequence being copied is the
result of a slicing operation on an array sequence.
"""
seq = self.__class__()
total_lengths = np.sum(self._lengths)
seq._data = np.empty((total_lengths,) + self._data.shape[1:],
dtype=self._data.dtype)
next_offset = 0
offsets = []
for offset, length in zip(self._offsets, self._lengths):
offsets.append(next_offset)
chunk = self._data[offset:offset + length]
seq._data[next_offset:next_offset + length] = chunk
next_offset += length
seq._offsets = np.asarray(offsets)
seq._lengths = self._lengths.copy()
return seq
def __getitem__(self, idx):
""" Get sequence(s) through standard or advanced numpy indexing.
Parameters
----------
idx : int or slice or list or ndarray
If int, index of the element to retrieve.
If slice, use slicing to retrieve elements.
If list, indices of the elements to retrieve.
If ndarray with dtype int, indices of the elements to retrieve.
If ndarray with dtype bool, only retrieve selected elements.
Returns
-------
ndarray or :class:`ArraySequence`
If `idx` is an int, returns the selected sequence.
Otherwise, returns a :class:`ArraySequence` object which is a view
of the selected sequences.
"""
if isinstance(idx, (numbers.Integral, np.integer)):
start = self._offsets[idx]
return self._data[start:start + self._lengths[idx]]
seq = self.__class__()
seq._is_view = True
if isinstance(idx, tuple):
off_idx = idx[0]
seq._data = self._data.__getitem__((slice(None),) + idx[1:])
else:
off_idx = idx
seq._data = self._data
if isinstance(off_idx, slice): # Standard list slicing
seq._offsets = self._offsets[off_idx]
seq._lengths = self._lengths[off_idx]
return seq
if isinstance(off_idx, (list, range)) or is_ndarray_of_int_or_bool(off_idx):
# Fancy indexing
seq._offsets = self._offsets[off_idx]
seq._lengths = self._lengths[off_idx]
return seq
raise TypeError("Index must be either an int, a slice, a list of int"
" or a ndarray of bool! Not " + str(type(idx)))
def __setitem__(self, idx, elements):
""" Set sequence(s) through standard or advanced numpy indexing.
Parameters
----------
idx : int or slice or list or ndarray
If int, index of the element to retrieve.
If slice, use slicing to retrieve elements.
If list, indices of the elements to retrieve.
If ndarray with dtype int, indices of the elements to retrieve.
If ndarray with dtype bool, only retrieve selected elements.
elements: ndarray or :class:`ArraySequence`
Data that will overwrite selected sequences.
If `idx` is an int, `elements` is expected to be a ndarray.
Otherwise, `elements` is expected a :class:`ArraySequence` object.
"""
if isinstance(idx, (numbers.Integral, np.integer)):
start = self._offsets[idx]
self._data[start:start + self._lengths[idx]] = elements
return
if isinstance(idx, tuple):
off_idx = idx[0]
data = self._data.__getitem__((slice(None),) + idx[1:])
else:
off_idx = idx
data = self._data
if isinstance(off_idx, slice): # Standard list slicing
offsets = self._offsets[off_idx]
lengths = self._lengths[off_idx]
elif isinstance(off_idx, (list, range)) or is_ndarray_of_int_or_bool(off_idx):
# Fancy indexing
offsets = self._offsets[off_idx]
lengths = self._lengths[off_idx]
else:
raise TypeError("Index must be either an int, a slice, a list of int"
" or a ndarray of bool! Not " + str(type(idx)))
if is_array_sequence(elements):
if len(lengths) != len(elements):
msg = f"Trying to set {len(lengths)} sequences with {len(elements)} sequences."
raise ValueError(msg)
if sum(lengths) != elements.total_nb_rows:
msg = f"Trying to set {sum(lengths)} points with {elements.total_nb_rows} points."
raise ValueError(msg)
for o1, l1, o2, l2 in zip(offsets, lengths, elements._offsets, elements._lengths):
data[o1:o1 + l1] = elements._data[o2:o2 + l2]
elif isinstance(elements, numbers.Number):
for o1, l1 in zip(offsets, lengths):
data[o1:o1 + l1] = elements
else: # Try to iterate over it.
for o1, l1, element in zip(offsets, lengths, elements):
data[o1:o1 + l1] = element
def _op(self, op, value=None, inplace=False):
""" Applies some operator to this arraysequence.
This handles both unary and binary operators with a scalar or another
array sequence. Operations are performed directly on the underlying
data, or a copy of it, which depends on the value of `inplace`.
Parameters
----------
op : str
Name of the Python operator (e.g., `"__add__"`).
value : scalar or :class:`ArraySequence`, optional
If None, the operator is assumed to be unary.
Otherwise, that value is used in the binary operation.
inplace: bool, optional
If False, the operation is done on a copy of this array sequence.
Otherwise, this array sequence gets modified directly.
"""
seq = self if inplace else self.copy()
if is_array_sequence(value) and seq._check_shape(value):
elements = zip(seq._offsets, seq._lengths,
self._offsets, self._lengths,
value._offsets, value._lengths)
# Change seq.dtype to match the operation resulting type.
o0, l0, o1, l1, o2, l2 = next(elements)
tmp = getattr(self._data[o1:o1 + l1], op)(value._data[o2:o2 + l2])
seq._data = seq._data.astype(tmp.dtype)
seq._data[o0:o0 + l0] = tmp
for o0, l0, o1, l1, o2, l2 in elements:
seq._data[o0:o0 + l0] = getattr(self._data[o1:o1 + l1], op)(value._data[o2:o2 + l2])
else:
args = [] if value is None else [value] # Dealing with unary and binary ops.
elements = zip(seq._offsets, seq._lengths, self._offsets, self._lengths)
# Change seq.dtype to match the operation resulting type.
o0, l0, o1, l1 = next(elements)
tmp = getattr(self._data[o1:o1 + l1], op)(*args)
seq._data = seq._data.astype(tmp.dtype)
seq._data[o0:o0 + l0] = tmp
for o0, l0, o1, l1 in elements:
seq._data[o0:o0 + l0] = getattr(self._data[o1:o1 + l1], op)(*args)
return seq
def __iter__(self):
if len(self._lengths) != len(self._offsets):
raise ValueError("ArraySequence object corrupted:"
" len(self._lengths) != len(self._offsets)")
for offset, lengths in zip(self._offsets, self._lengths):
yield self._data[offset: offset + lengths]
def __len__(self):
return len(self._offsets)
def __repr__(self):
if len(self) > np.get_printoptions()['threshold']:
# Show only the first and last edgeitems.
edgeitems = np.get_printoptions()['edgeitems']
data = str(list(self[:edgeitems]))[:-1]
data += ", ..., "
data += str(list(self[-edgeitems:]))[1:]
else:
data = str(list(self))
return f"{self.__class__.__name__}({data})"
def save(self, filename):
""" Saves this :class:`ArraySequence` object to a .npz file. """
np.savez(filename,
data=self._data,
offsets=self._offsets,
lengths=self._lengths)
@classmethod
def load(cls, filename):
""" Loads a :class:`ArraySequence` object from a .npz file. """
content = np.load(filename)
seq = cls()
seq._data = content["data"]
seq._offsets = content["offsets"]
seq._lengths = content["lengths"]
return seq
def create_arraysequences_from_generator(gen, n, buffer_sizes=None):
""" Creates :class:`ArraySequence` objects from a generator yielding tuples
Parameters
----------
gen : generator
Generator yielding a size `n` tuple containing the values to put in the
array sequences.
n : int
Number of :class:`ArraySequences` object to create.
buffer_sizes : list of float, optional
Sizes (in Mb) for each ArraySequence's buffer.
"""
if buffer_sizes is None:
buffer_sizes = [4] * n
seqs = [ArraySequence(buffer_size=size) for size in buffer_sizes]
for data in gen:
for i, seq in enumerate(seqs):
if data[i].nbytes > 0:
seq.append(data[i], cache_build=True)
for seq in seqs:
seq.finalize_append()
return seqs
def concatenate(seqs, axis):
""" Concatenates multiple :class:`ArraySequence` objects along an axis.
Parameters
----------
seqs: iterable of :class:`ArraySequence` objects
Sequences to concatenate.
axis : int
Axis along which the sequences will be concatenated.
Returns
-------
new_seq: :class:`ArraySequence` object
New :class:`ArraySequence` object which is the result of
concatenating multiple sequences along the given axis.
"""
new_seq = seqs[0].copy()
if axis == 0:
# This is the same as an extend.
for seq in seqs[1:]:
new_seq.extend(seq)
return new_seq
new_seq._data = np.concatenate([seq._data for seq in seqs], axis=axis)
return new_seq