Skip to content

Commit

Permalink
core.FillRequest kwargs buffer_input and buffer_output are obligatory…
Browse files Browse the repository at this point in the history
… (same for FillRequestSeq). Fix fill and request methods. Add bufsize argument to split._get_seq_with_type .
  • Loading branch information
ynikitenko committed Feb 23, 2022
1 parent 47603a9 commit fe84d39
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 90 deletions.
151 changes: 121 additions & 30 deletions lena/core/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class FillRequest(object):
A *FillRequest* element slices the flow during *fill*
and yields results for each chunk during *request*.
It can also call a method *reset* after each *request*.
It can also call *reset* after each *request*.
"""
# FillRequest is not deprecated, but changing its meaning.
# # They seem redundant, because mostly they are used
Expand All @@ -227,17 +227,18 @@ class FillRequest(object):
# # sometimes need to be more versatile,
# # and FillRequest is of no help for that.

def __init__(self, el,
fill="fill", request="request", reset_name="reset",
reset=None, bufsize=1, yield_on_remainder=False,
buffer_input=None, buffer_output=None):
"""Names for actual *fill*, *request* and *reset*
def __init__(self, el, bufsize=1, reset=None,
buffer_input=None, buffer_output=None,
yield_on_remainder=False,
fill="fill", request="request", reset_name="reset"
):
"""Names for actual *fill*, *request* and *reset* methods
can be provided during initialization (the latter is set
through *reset_name*).
*FillRequest* can be initialized from a *FillCompute* element.
If a callable *request* method was not found,
*el* must have a callable *compute* method.
*el* must have a method *compute*.
*request* in this case is *compute*.
*FillRequest* can also be initialized from a *Run* element.
Expand All @@ -247,10 +248,12 @@ def __init__(self, el,
*FillRequest* implements *run* method
that splits the flow into subslices of *bufsize* values
and feeds them to the *run* method of *el*.
Since we require no less than *bufsize* values
(except *yield_on_remainder* is ``True``),
we need to store either *bufsize* values of the incoming flow
or all values produced by *el.run* for each slice.
or all values produced by *el.run* or *el.request*
for each slice.
This is set by *buffer_input* or *buffer_output*.
One and only one of them must be ``True``.
For example, if the element receives file names and produces
Expand Down Expand Up @@ -304,17 +307,19 @@ def __init__(self, el,
self.reset = None
self._reset = bool(reset)

# set buffer_input
bi = bool(buffer_input)
bo = bool(buffer_output)
# if yield_on_remainder is True, buffers are not used.
if not yield_on_remainder and int(bi) + int(bo) != 1:
raise exceptions.LenaValueError(
"one and only one of buffer_input or buffer_output "
"must be set"
)
self._buffer_input = bi

run = getattr(el, "run", None)
if run and callable(run):
bi = bool(buffer_input)
bo = bool(buffer_output)
# if yield_on_remainder is True, buffers are not used.
if not yield_on_remainder and int(bi) + int(bo) != 1:
raise exceptions.LenaValueError(
"one and only one of buffer_input or buffer_output "
"must be set"
)
self._buffer_input = bi
self.run = self._run_run
has_run = True
else:
Expand All @@ -326,14 +331,20 @@ def __init__(self, el,
el_fill = getattr(el, fill, None)
if callable(el_fill):
self._el_fill = el_fill

self._n_count = 0
if self._buffer_input:
self._buffer_in = []
else:
self._buffer_out = []

if reset is None:
raise exceptions.LenaTypeError(
"reset must be set explicitly "
"if {} method is present".format(fill)
"reset must be set explicitly for a FillCompute element"
)
elif not has_run:
raise exceptions.LenaTypeError(
"fill method {} must exist and be callable".format(fill)
"no callable run or fill method {} were found".format(fill)
)
else:
self.fill = None
Expand Down Expand Up @@ -363,19 +374,91 @@ def __init__(self, el,

self._el = el

def fill(self, value): # pylint: disable=no-self-use,unused-argument
"""Fill *el* with *value*."""
def fill(self, value):
"""Fill *el* with *value*.
If more than *bufsize* values were filled,
incoming values are stored in a buffer
(if *buffer_input* is ``True``)
or, otherwise, the output of *el.request* is stored in a buffer,
until it is requested.
"""
if self._n_count and not self._n_count % self.bufsize:
if self._buffer_input:
self._buffer_in.append(value)
return
else:
# add output to the output buffer
self._buffer_out.extend(self.request())
# don't reset because need to know that fill was called
# self._n_count = 0

self._el_fill(value)
self._n_count += 1

def request(self):
"""Yield computed values."""
for val in self._el_request():
yield val
"""Yield results (if they are available) and possibly reset.
If input or output buffers were filled, all their contents
are processed and yielded.
"""
# yield what was filled into the element
if self._n_count >= self.bufsize:
for val in self._el_request():
yield val
if self._reset:
self._el_reset()
# it is important that request is not called
# when not enough values were filled after last request
self._n_count = self._n_count % self.bufsize

# process buffers.
# Buffers are always filled after the element,
# therefore the order is correct.
if not self._buffer_input:
# all results are in _buffer_out
for val in self._buffer_out:
yield val
if self._yield_on_remainder:
for val in self._el_request():
yield val
# reset was already called when filling _buffer_out
return
else:
# fill the buffer from _buffer_in and yield
nfills = 0
bufsize = self.bufsize
buffer_in = self._buffer_in
while True:
if nfills == bufsize:
for val in self._el_request():
yield val
if self._reset:
self._el_reset()
nfills = 0
del buffer_in[:bufsize]
# should be slower, because a slice below
# copies elements
## self._buffer_in = self._buffer_in[bufsize:]
continue

# fill the element with values from buffer
try:
val = buffer_in[nfills]
except IndexError:
if self._yield_on_remainder:
for val in self._el_request():
yield val
break
else:
self._el_fill(val)
nfills += 1

if self._reset:
self._el_reset()

def reset(self):
"""Reset *el*."""
"""Reset *el* (ignoring the initialization setting)."""
self._el_reset()

def run(self, flow):
Expand Down Expand Up @@ -414,11 +497,11 @@ def _run_fill_compute(self, flow):
# if the flow was smaller than the required bufsize
break
else:
self.fill(val)
self._el_fill(val)
nfills += 1

for val in slice_:
self.fill(val)
self._el_fill(val)
nfills += 1

# Flow finished too early.
Expand All @@ -427,12 +510,18 @@ def _run_fill_compute(self, flow):
if self._yield_on_remainder:
# can't return smth in Python 2 generator.
# return self.request()
for result in self.request():
for result in self._el_request():
yield result
# # in fact, this should be undefined
# # for the remainder.
# if self._reset:
# self._el_reset()
return

for result in self.request():
for result in self._el_request():
yield result
if self._reset:
self._el_reset()

def _run_run(self, flow):
# todo: to improve performance, one might create
Expand All @@ -458,6 +547,8 @@ def _run_run(self, flow):
islice(flow, bufsize-1))):
yield val
# usually Run elements have no reset, but...
# we call reset here, because we don't call request
# (which usually calls reset itself)
if self._reset:
self.reset()

Expand Down
37 changes: 25 additions & 12 deletions lena/core/fill_request_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,24 @@ def __init__(self, *args, **kwargs):
or unknown keyword arguments were received,
:exc:`.LenaTypeError` is raised.
"""
if "bufsize" in kwargs:
self._bufsize = kwargs.pop("bufsize")
else:
self._bufsize = 1
reset = kwargs.pop("reset", True)
self._reset = reset

if kwargs:
raise exceptions.LenaTypeError(
"unknown kwargs {}".format(kwargs)
)
# if "bufsize" in kwargs:
# self._bufsize = kwargs.pop("bufsize")
# else:
# self._bufsize = 1
# reset = kwargs.pop("reset", True)

# if "buffer_input" or "buffer_output" not in kwargs:
# raise exceptions.LenaTypeError(
# "kwargs must contain buffer_input or buffer_output"
# )
# buffer_input = kwargs.pop("buffer_input", False)
# buffer_output = kwargs.pop("buffer_output", False)

# if kwargs:
# raise exceptions.LenaTypeError(
# "unknown kwargs {}".format(kwargs)
# )

##| not sure now. Why is it not documented?
# `-> *args* can consist of one tuple,
# which in that case is expanded.
Expand All @@ -54,7 +61,11 @@ def __init__(self, *args, **kwargs):
check_sequence_type.is_fill_request_el,
el_name="FillRequest", seq_name="FillRequestSeq"
)
fr = adapters.FillRequest(self, reset=reset, bufsize=self._bufsize)
fr = adapters.FillRequest(self, **kwargs)
# just for tests
self._fr = fr
self._reset = fr._reset
# fr = adapters.FillRequest(self, reset=reset, bufsize=self._bufsize)
self.run = fr.run

def fill(self, value):
Expand All @@ -80,6 +91,8 @@ def request(self):
else:
results = vals

# todo: add reset here.

# FillRequest must produce a generator, so no conversion is needed.
return results

Expand Down
15 changes: 12 additions & 3 deletions lena/core/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from . import meta


def _get_seq_with_type(seq):
def _get_seq_with_type(seq, bufsize=None):
"""Return a (sequence, type) pair.
Sequence is derived from *seq*
(or is *seq*, if that is of a sequence type).
Expand All @@ -37,7 +37,16 @@ def _get_seq_with_type(seq):
elif ct.is_fill_request_seq(seq):
seq_type = "fill_request"
if not ct.is_fill_request_el(seq):
seq = fill_request_seq.FillRequestSeq(*seq)
seq = fill_request_seq.FillRequestSeq(
*seq, bufsize=bufsize,
# if we have a FillRequest element inside,
# it decides itself when to reset.
reset=False,
# todo: change the interface, because
# no difference with buffer_output: we fill
# without a buffer
buffer_input=True
)
# Source is not checked,
# because it must be Source explicitly.
else:
Expand Down Expand Up @@ -102,7 +111,7 @@ def __init__(self, seqs, bufsize=1000, copy_buf=True):

for sequence in seqs:
try:
seq, seq_type = _get_seq_with_type(sequence)
seq, seq_type = _get_seq_with_type(sequence, bufsize)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be one of "
Expand Down
6 changes: 3 additions & 3 deletions tests/core/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,17 @@ def test_fill_request_init():

# no fill method raises
with pytest.raises(LenaTypeError):
FillRequest(lambda _: 0)
FillRequest(lambda _: 0, buffer_input=True, reset=True)

# no reset raises
sum_ = Sum()
sum_.reset = None
with pytest.raises(LenaTypeError):
FillRequest(sum_, reset=True)
FillRequest(sum_, reset=True, buffer_input=True)

# wrong bufsize raises
with pytest.raises(LenaValueError):
FillRequest(Sum(), bufsize=0, reset=False)
FillRequest(Sum(), bufsize=0, reset=False, buffer_input=True)

# missing fill raises
class MyFillRequest(FillRequest):
Expand Down

0 comments on commit fe84d39

Please sign in to comment.