Skip to content

Commit

Permalink
Fix/improve run method of core.FillRequest. Add keyword arguments buf…
Browse files Browse the repository at this point in the history
…fer_input, buffer_output, reset_name. Require explicit reset for FillCompute elements. Add tests (incomplete).
  • Loading branch information
ynikitenko committed Feb 21, 2022
1 parent 5b58705 commit 2048963
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 131 deletions.
206 changes: 172 additions & 34 deletions lena/core/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class FillRequest(object):
A *FillRequest* element slices the flow during *fill*
and yields results for each chunk during *request*.
It can also call a method *reset* after each *request*.
"""
# FillRequest is not deprecated, but changing its meaning.
# # They seem redundant, because mostly they are used
Expand All @@ -227,29 +228,49 @@ class FillRequest(object):
# # and FillRequest is of no help for that.

def __init__(self, el,
fill="fill", request="request",
reset=True, bufsize=1, yield_on_remainder=False):
"""Names for *fill* and *request* can be customized
during initialization.
fill="fill", request="request", reset_name="reset",
reset=None, bufsize=1, yield_on_remainder=False,
buffer_input=None, buffer_output=None):
"""Names for actual *fill*, *request* and *reset*
can be provided during initialization (the latter is set
through *reset_name*).
*FillRequest* can be initialized from a *FillCompute* element.
If a callable *request* method was not found,
*el* must have a callable *compute* method.
*request* in this case is *compute*.
By default, *FillRequest* implements *run* method
that splits the flow into subslices of *bufsize* elements.
If *el* has a callable *run* method,
it is used instead of the default one.
If a keyword argument *reset* is ``True`` (default),
*el* must have a method *reset*, and in this case
*FillRequest* can also be initialized from a *Run* element.
In that case *el* is not required to have *fill*, *compute*
or *reset* methods
(and *FillRequest* will not have such missing methods).
*FillRequest* implements *run* method
that splits the flow into subslices of *bufsize* values
and feeds them to the *run* method of *el*.
Since we require no less than *bufsize* values
(except *yield_on_remainder* is ``True``),
we need to store either *bufsize* values of the incoming flow
or all values produced by *el.run* for each slice.
This is set by *buffer_input* or *buffer_output*.
One and only one of them must be ``True``.
For example, if the element receives file names and produces
data from them, it would be wise to buffer input. If the element
receives much data and produces a histogram,
one should buffer output.
If a keyword argument *reset* is ``True``,
*el* must have a method *reset_name*, and in this case
:meth:`reset` is called after each :meth:`request`
(including those during :meth:`run`).
In general, *Run* elements have no *reset* methods,
but for *FillCompute* elements *reset* must be set explicitly.
If *yield_on_remainder* is ``True``, then the output will be yielded
If *yield_on_remainder* is ``True``,
then the output will be yielded
even if the element was filled less than *bufsize* times
(but at least once).
In that case no internal buffers are used during :meth:`run`
and corresponding attributes are not checked.
**Attributes**
Expand All @@ -261,34 +282,77 @@ def __init__(self, el,
or *FillRequest* could not be derived from *FillCompute*,
or if *reset* is ``True``, but *el* has no method *reset*,
:exc:`.LenaTypeError` is raised.
.. versionchanged:: 0.5
Add keyword arguments *yield_on_remainder*, *buffer_input*,
*buffer_output*, *reset_name*.
Require explicit *reset* for *FillCompute* elements.
"""
# todo: rename bufsize to size or something cleverer
fill = getattr(el, fill, None)
# we don't allow other names for reset
# el_reset = getattr(el, "reset", None)
if not callable(fill):

el_reset = getattr(el, reset_name, None)
if callable(el_reset):
# we set this method even if *reset* is False,
# because reset() may be called manually
self._el_reset = el_reset
elif reset:
raise exceptions.LenaTypeError(
"fill method {} must exist and be callable".format(fill)
"{} method must exist and be callable".format(reset_name)
)
else:
# disable the missing method.
self.reset = None
self._reset = bool(reset)

if reset and not callable(getattr(el, "reset", None)):
run = getattr(el, "run", None)
if run and callable(run):
bi = bool(buffer_input)
bo = bool(buffer_output)
# if yield_on_remainder is True, buffers are not used.
if not yield_on_remainder and int(bi) + int(bo) != 1:
raise exceptions.LenaValueError(
"one and only one of buffer_input or buffer_output "
"must be set"
)
self._buffer_input = bi
self.run = self._run_run
has_run = True
else:
# This method won't work in case of no fill/compute/request,
# but it will raise during initialization if so.
self.run = self._run_fill_compute
has_run = False

el_fill = getattr(el, fill, None)
if callable(el_fill):
self._el_fill = el_fill
if reset is None:
raise exceptions.LenaTypeError(
"reset must be set explicitly "
"if {} method is present".format(fill)
)
elif not has_run:
raise exceptions.LenaTypeError(
"reset must exist and be callable"
"fill method {} must exist and be callable".format(fill)
)
self.fill = fill
self._reset = reset
else:
self.fill = None

el_request = getattr(el, request, None)
if callable(el_request):
# we don't check whether it has fill method here.
self._el_request = el_request
else:
# derive from compute
compute = getattr(el, "compute", None)
if not callable(compute):
if callable(compute):
self._el_request = compute
elif not has_run:
raise exceptions.LenaTypeError(
"request or compute methods must exist and be callable"
"element must have callable methods request, compute or run"
)
self._el_request = compute
else:
self.request = None

if(bufsize != int(bufsize) or bufsize < 1):
raise exceptions.LenaValueError(
Expand All @@ -297,27 +361,22 @@ def __init__(self, el,
self.bufsize = int(bufsize)
self._yield_on_remainder = bool(yield_on_remainder)

# looks wrong!
run = getattr(el, "run", None)
if run and callable(run):
self.run = run

self._el = el

def fill(self, value): # pylint: disable=no-self-use,unused-argument
"""Fill *el* with *value*."""
raise exceptions.LenaNotImplementedError
self._el_fill(value)

def request(self):
"""Yield computed values."""
for val in self._el_request():
yield val
if self._reset:
self._el.reset()
self._el_reset()

def reset(self):
"""Reset *el*."""
self._el.reset()
self._el_reset()

def run(self, flow):
"""Process the *flow* slice by slice.
Expand All @@ -327,12 +386,16 @@ def run(self, flow):
Repeat until the *flow* is exhausted.
If *fill* was not called even once (*flow* was empty),
nothing is yielded, because *bufsize* values were not obtained.
nothing is yielded, because *bufsize* values were not obtained
(in contrast to *FillCompute*, for which output for an empty
flow is reasonable).
The last slice may contain less than *bufsize* values.
If there were any and if *yield_on_remainder* is ``True``,
*request* will be called for that.
"""
raise exceptions.LenaNotImplementedError

def _run_fill_compute(self, flow):
while True:
# A slice is a non-materialized list, which means
# that it will not take place of *bufsize* in memory.
Expand Down Expand Up @@ -371,6 +434,81 @@ def run(self, flow):
for result in self.request():
yield result

def _run_run(self, flow):
# todo: to improve performance, one might create
# a separate run method for bufsize = 1.
from itertools import islice, chain
el_run = self._el.run
bufsize = self.bufsize

# we can yield results one by one
if self._yield_on_remainder:
# it is important that flow is an iterable, not a sequence,
# so we can use islice repeatedly
while True:
try:
val = next(flow)
except StopIteration:
# empty flow gives empty results
return
else:
# at least one event present
# this would give bad performance for bufsize=1
for val in el_run(chain([val],
islice(flow, bufsize-1))):
yield val
# usually Run elements have no reset, but...
if self._reset:
self.reset()

# we can't yield results one by one,
# because we need to be sure
# that *bufsize* values were encountered

class slice_iterated_with_count():

def __init__(self, size, seq):
self.count = 0
self._size = size
self._seq = seq

def __iter__(self):
count = 0
for val in islice(self._seq, self._size):
count += 1
yield val
self.count = count

if self._buffer_input:
while True:
buffer = list(islice(flow, bufsize))

# incomplete buffer, yield nothing.
if len(buffer) < bufsize:
# _yield_on_remainder is False
return

# full buffer received, process.
for val in el_run(iter(buffer)):
yield val
# probably False for Run element.
if self._reset:
self._el_reset()
else:
# buffer output
# slice_ can be iterated multiple times
slice_ = slice_iterated_with_count(bufsize, flow)
while True:
results = list(el_run(slice_))
if slice_.count < bufsize:
return
for val in results:
yield val

# probably False for Run element.
if self._reset:
self._el_reset()


class Run(object):
"""Adapter for a *Run* element."""
Expand Down Expand Up @@ -444,7 +582,7 @@ def _call_run(self, flow):
yield self._el(val)

def run(self, flow):
"""Yield transformed elements from the incoming *flow*."""
"""Yield transformed values from the incoming *flow*."""
# will be redefined in __init__
# for val in flow:
# yield self._el(val)
Expand Down
6 changes: 3 additions & 3 deletions lena/core/fill_request_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def __init__(self, *args, **kwargs):
raise exceptions.LenaTypeError(
"unknown kwargs {}".format(kwargs)
)
## not sure now. Why is it not documented?
# *args* can consist of one tuple,
# which in that case is expanded.
##| not sure now. Why is it not documented?
# `-> *args* can consist of one tuple,
# which in that case is expanded.
fill_compute_seq._init_sequence_with_el(
self, args, "_fill_request",
check_sequence_type.is_fill_request_el,
Expand Down

0 comments on commit 2048963

Please sign in to comment.