Skip to content

Commit

Permalink
[dev]: reader producers can now produce between a start and stop and …
Browse files Browse the repository at this point in the history
…mask production has slight effeciency by testing for empty arrays
  • Loading branch information
mscaudill committed Aug 28, 2023
1 parent a766bb6 commit 212c85b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 25 deletions.
61 changes: 36 additions & 25 deletions src/openseize/core/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,22 @@ def producer(data: Union[npt.NDArray, Iterable[npt.NDArray], Reader,
values that are False will be ignored. If None (Default),
producer will produce all values from object.
kwargs:
Keyword arguments specific to data type that ndarrays will be
produced from. For Reader instances, valid kwargs are padvalue
(see io.bases.Readers and io.edf.Reader) For generating
functions, all the positional and keyword arguments must be
passed to the function through these kwargs to avoid name
collisions with the producer func arguments.
Keyword arguments are specific to data argmument type:
- Reader:
padvalue:
see reader.read method
start:
The start sample to begin data production along axis.
stop:
The stop sample to halt data production along axis.
- Generating Function:
All positional and keyword arguments to the Gen. func. must be passed
through these kwargs to avoid name collisions with the producer
func arguments.
- Arrays:
The kwargs are ignored.
- Sequences:
The kwargs are ignored
Returns: An iterable of ndarrays of shape chunksize along axis.
"""
Expand Down Expand Up @@ -278,14 +288,20 @@ def to_array(self, dtype=float):

return np.concatenate(list(self), axis=self.axis)

# FIXME 8/10/2023 Add start and stop for reader production so add to producer
# docs!!!!!

class ReaderProducer(Producer):
"""A Producer of ndarrays from an openseize file Reader instance.
Attrs:
see Producer attrs
kwargs: dict
Producer Attrs
start:
The start sample along production axis at which data production
begins.
stop:
The stop sample along production axis at which data production
stops.
kwargs:
Arguments passed to read method of a file reader instance.
Notes:
Expand All @@ -298,20 +314,22 @@ class ReaderProducer(Producer):
def __init__(self, data, chunksize, axis, **kwargs):
"""Initialize this Producer with a closed 'data' Reader instance."""

# remove start and stop optional kwargs for ReaderProducers
a, b = kwargs.pop('start', 0), kwargs.pop('stop', data.shape[axis])

super().__init__(data, chunksize, axis, **kwargs)
self.data.close()

# Pop the start and stop from kwargs
a = self.kwargs.pop('start', 0)
b = self.kwargs.pop('stop', self.data.shape[axis])
self.start, self.stop, _ = slice(a, b).indices(data.shape[axis])

# close for serialization
self.data.close()

@property
def shape(self):
"""Return the summed shape of all arrays in this Reader."""

# FIXME TEST SHAPE
s = list(self.data.shape)
s[axis] = self.stop - self.start
s[self.axis] = self.stop - self.start
return tuple(s)

def __iter__(self):
Expand All @@ -320,7 +338,6 @@ def __iter__(self):
# Open the data reader
self.data.open()

# FIXME TEST
starts = np.arange(self.start, self.stop, self.chunksize)
for a, b in zip_longest(starts, starts[1:], fillvalue=self.stop):
yield self.data.read(a, b, **self.kwargs)
Expand Down Expand Up @@ -429,13 +446,12 @@ def __iter__(self):
# append to tmp again
continue

# TODO TEST this change
# else runs after normal loop exit -- required here
else: #pylint: disable=useless-else-on-loop

# yield whatever is left in tmp (its below chunksize)
if tmp_size > 0:
remaining = np.concatenate(tmp, axis=self.axis)
remaining = np.concatenate(tmp, axis=self.axis)
if remaining.size > 0:
yield remaining


Expand Down Expand Up @@ -487,13 +503,8 @@ def __iter__(self):
"""Returns an iterator of boolean masked numpy arrays along axis."""

collector = FIFOArray(self.chunksize, self.axis)

# FIXME
# I think the issue is not related to filtering and put. Its related to
# reading from a file values you will never use
for arr, maskarr in zip(self.data, self.mask):

# TODO test that this gives a speed enhancement
if not np.any(maskarr):
continue

Expand Down
68 changes: 68 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import pytest
import time
import numpy as np
from itertools import zip_longest
from pathlib import Path
Expand Down Expand Up @@ -299,6 +300,73 @@ def test_frommaskedreader(demo_data):
slicer[-1] = slice(start, stop)
assert np.allclose(masked[tuple(slicer)], pro_arr)

def test_subread_shape(demo_data):
"""Validate that the shape of data produced between random starts & stops
matches the expected shape for 1000 random reads."""

reader = Reader(demo_data)
starts = np.random.randint(low=0, high=reader.shape[-1]-1, size=1000)
stops = starts + np.random.randint(low=1, high=int(10e6), size=1000)
# ensure largest stop is within data
stops = np.minimum(stops, reader.shape[-1])
for start, stop in zip(starts, stops):
pro = producer(reader, chunksize=30e5, axis=-1, start=start, stop=stop)
assert pro.shape[-1] == stop - start

reader.close()

def test_subread_values(demo_data):
"""Validate that the values produced between random starts & stops match the
expected values for 300 random reads."""

reader = Reader(demo_data)
starts = np.random.randint(low=0, high=reader.shape[-1]-1, size=300)
stops = starts + np.random.randint(low=1, high=int(1e6), size=300)
# ensure largest stop is within data
stops = np.minimum(stops, reader.shape[-1])
for start, stop in zip(starts, stops):

pro = producer(reader, chunksize=30e5, axis=-1, start=start, stop=stop)
assert np.allclose(pro.to_array(), reader.read(start, stop))

reader.close()

def test_subread_channels(demo_data):
"""Validate that values produced between random starts & stops for
a restricted set of channes match the expected values for 300 random
reads."""

reader = Reader(demo_data)
reader.channels = [0, 2]
starts = np.random.randint(low=0, high=reader.shape[-1]-1, size=300)
stops = starts + np.random.randint(low=1, high=int(1e6), size=300)
# ensure largest stop is within data
stops = np.minimum(stops, reader.shape[-1])
for start, stop in zip(starts, stops):

pro = producer(reader, chunksize=30e5, axis=-1, start=start, stop=stop)
assert np.allclose(pro.to_array(), reader.read(start, stop))

reader.close()

def test_subread_mask(demo_data):
"""Validate that values produced between random starts & stops with a mask
match the expected values for 300 random reads."""

reader = Reader(demo_data)
starts = np.random.randint(low=0, high=reader.shape[-1]-1, size=300)
stops = starts + np.random.randint(low=1, high=int(1e6), size=300)
# ensure largest stop is within data
stops = np.minimum(stops, reader.shape[-1])
for start, stop in zip(starts, stops):

mask = np.random.choice([True, False], size=stop-start, p=[.4, .6])
x = reader.read(start, stop)[:, mask]
pro = producer(reader, 30e5, axis=-1, start=start, stop=stop, mask=mask)
assert np.allclose(pro.to_array(), x)

reader.close()

def test_asproducer0():
"""Verify that the as_producer decorator correctly decorates
a generating function converting it into a producer type."""
Expand Down

0 comments on commit 212c85b

Please sign in to comment.