Skip to content

Commit

Permalink
core.FillRequest element no longer yields in case of empty flow. Add …
Browse files Browse the repository at this point in the history
…yield_on_remainder kwarg. Improve docs and exception messages.
  • Loading branch information
ynikitenko committed Feb 20, 2022
1 parent 9be50bd commit 5b58705
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 117 deletions.
152 changes: 82 additions & 70 deletions lena/core/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
>>> list(my_run.run([1, 2, 3]))
[1, 2, 3]
"""
from __future__ import print_function

import itertools

from . import exceptions
Expand Down Expand Up @@ -216,10 +214,21 @@ def _run_fill_into(self, element, value):
class FillRequest(object):
"""Adapter for a *FillRequest* element.
A *FillRequest* element has methods *fill(value)* and *request()*.
A *FillRequest* element slices the flow during *fill*
and yields results for each chunk during *request*.
"""

def __init__(self, el, fill="fill", request="request", reset=True, bufsize=1):
# FillRequest is not deprecated, but changing its meaning.
# # They seem redundant, because mostly they are used
# # as FillCompute (and a lot of code is duplicated).
# # Memory warnings should be indicated elsewhere
# # (like special fields).
# # Moreover, FillCompute elements
# # sometimes need to be more versatile,
# # and FillRequest is of no help for that.

def __init__(self, el,
fill="fill", request="request",
reset=True, bufsize=1, yield_on_remainder=False):
"""Names for *fill* and *request* can be customized
during initialization.
Expand All @@ -234,10 +243,13 @@ def __init__(self, el, fill="fill", request="request", reset=True, bufsize=1):
it is used instead of the default one.
If a keyword argument *reset* is ``True`` (default),
*el* must have a method *reset, and in this case
*el* must have a method *reset*, and in this case
:meth:`reset` is called after each :meth:`request`
(including those during :meth:`run`).
If *reset* is ``False``, :meth:`reset` is never called.
If *yield_on_remainder* is ``True``, then the output will be yielded
even if the element was filled less than *bufsize* times
(but at least once).
**Attributes**
Expand All @@ -250,114 +262,114 @@ def __init__(self, el, fill="fill", request="request", reset=True, bufsize=1):
or if *reset* is ``True``, but *el* has no method *reset*,
:exc:`.LenaTypeError` is raised.
"""
# todo: rename bufsize to size or something cleverer
fill = getattr(el, fill, None)
request = getattr(el, request, None)
el_reset = getattr(el, "reset", None)
# we don't allow other names for reset
# el_reset = getattr(el, "reset", None)
if not callable(fill):
raise exceptions.LenaTypeError(
"fill method {} must exist and be callable".format(fill)
)
if reset and not callable(el_reset):

if reset and not callable(getattr(el, "reset", None)):
raise exceptions.LenaTypeError(
"reset must exist and be callable"
)
self.fill = fill

self._reset = reset
if callable(request):
self._request_meth = request

el_request = getattr(el, request, None)
if callable(el_request):
self._el_request = el_request
else:
# derive from compute and reset
# derive from compute
compute = getattr(el, "compute", None)
# reset = getattr(el, "reset", None)
if not callable(compute): # or not callable(reset):
if not callable(compute):
raise exceptions.LenaTypeError(
"request or compute must exist and be callable"
"request or compute methods must exist and be callable"
)
self.request = self._compute_reset
self._el_request = compute

if(bufsize != int(bufsize) or bufsize < 1):
raise exceptions.LenaValueError(
"bufsize must be a natural number, {} provided".format(bufsize)
"bufsize must be a natural number, not {}".format(bufsize)
)
self.bufsize = int(bufsize)
self._yield_on_remainder = bool(yield_on_remainder)

# looks wrong!
run = getattr(el, "run", None)
if run and callable(run):
self.run = run
self._el = el

def _compute_reset(self):
for val in self._el.compute():
yield val
if self._reset:
self._el.reset()
self._el = el

def fill(self, value): # pylint: disable=no-self-use,unused-argument
"""Fill *self* with *value*."""
def fill(self, value): # pylint: disable=no-self-use,unused-argument
"""Fill *el* with *value*."""
raise exceptions.LenaNotImplementedError

def request(self):
"""Yield computed values.
May be called at any time,
the flow may still contain zero or more items.
"""
for val in self._request_meth():
"""Yield computed values."""
for val in self._el_request():
yield val
if self._reset:
self._el.reset()
# raise exceptions.LenaNotImplementedError

def reset(self):
"""Reset the element *el*."""
"""Reset *el*."""
self._el.reset()

def run(self, flow):
"""Implement *run* method.
First, *fill* is called for each value in a subslice of *flow*
of *self.bufsize* size.
After that, results are yielded from *self.request()*.
This repeats until the *flow* is exhausted.
If *fill* was not called even once (*flow* is empty),
the results for a general *FillRequest* are undefined
(for example, it can run *request* or raise an exception).
This adapter runs *request* in this case.
If the last slice is empty, *request* is not run for that.
Note that the last slice may contain less than *bufsize* values.
If that is important, implement your own method.
A slice is a non-materialized list,
which means that it will not take place of *bufsize* in memory.
"""Process the *flow* slice by slice.
*fill* each value from a subslice of *flow*
of *bufsize* length, then yield results from *request*.
Repeat until the *flow* is exhausted.
If *fill* was not called even once (*flow* was empty),
nothing is yielded, because *bufsize* values were not obtained.
The last slice may contain less than *bufsize* values.
If there were any and if *yield_on_remainder* is ``True``,
*request* will be called for that.
"""
filled_once = False

while True:
buf = itertools.islice(flow, self.bufsize)
# check whether the flow contains at least one element
# A slice is a non-materialized list, which means
# that it will not take place of *bufsize* in memory.
slice_ = itertools.islice(flow, self.bufsize)
# Reset the counter; what if it grows too large?
# Maybe it will allow faster nfills % bufsize?
# May be irrelevant though.
nfills = 0

# there is no other way to check
# whether the flow contains at least one element
try:
arg = next(buf)
val = next(slice_)
except StopIteration:
if filled_once:
# if nothing was filled this time,
# don't yield anything
break
else:
# *request* is run nevertheless.
# Unlike FillCompute, we don't yield anything
# if the flow was smaller than the required bufsize
break
else:
self.fill(val)
nfills += 1

for val in slice_:
self.fill(val)
nfills += 1

# Flow finished too early.
# Normally nfills would be equal to self.bufsize
if nfills % self.bufsize:
if self._yield_on_remainder:
# can't return smth in Python 2 generator.
# return self.request()
for result in self.request():
yield result
break
else:
self.fill(arg)
filled_once = True
return

for arg in buf:
self.fill(arg)
for result in self.request():
yield result
if self._reset:
self._el.reset()


class Run(object):
Expand Down
6 changes: 4 additions & 2 deletions lena/core/check_sequence_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

def is_fill_compute_el(obj):
"""Object contains executable methods 'fill' and 'compute'."""
return hasattr(obj, 'fill') and hasattr(obj, 'compute') \
and callable(obj.fill) and callable(obj.compute)
return (hasattr(obj, 'fill')
and hasattr(obj, 'compute')
and callable(obj.fill)
and callable(obj.compute))


def is_fill_compute_seq(seq):
Expand Down
32 changes: 15 additions & 17 deletions lena/core/fill_request_seq.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
from __future__ import print_function

from . import lena_sequence
from . import fill_compute_seq
from . import check_sequence_type
from . import sequence
from . import adapters
from . import check_sequence_type
from . import exceptions
from . import fill_compute_seq
from . import lena_sequence
from . import sequence


class FillRequestSeq(lena_sequence.LenaSequence):
"""Sequence with one :class:`FillRequest` element.
"""Sequence with one *FillRequest* element.
Input flow is preprocessed with the *Sequence*
Input flow is preprocessed with the sequence
before the *FillRequest* element,
then it fills the *FillRequest* element.
When the results are yielded from the *FillRequest*,
they are postprocessed with the *Sequence* after
that element.
they are postprocessed with the elements that follow it.
"""

def __init__(self, *args, **kwargs):
Expand All @@ -29,17 +26,15 @@ def __init__(self, *args, **kwargs):
To change that, explicitly cast the first element
to :class:`.FillInto`.
*kwargs* can contain *bufsize*, which is used during *run*.
See :class:`FillRequest` for more information on *run*.
By default *bufsize* is *1*. Other *kwargs* raise
:exc:`.LenaTypeError`.
*kwargs* can contain *bufsize* or *reset*.
See :class:`FillRequest` for more information on them.
By default *bufsize* is *1*.
If *FillRequest* element was not found,
or if the sequences before or after that
could not be correctly initialized,
the sequences could not be correctly initialized,
or unknown keyword arguments were received,
:exc:`.LenaTypeError` is raised.
"""
# *args* can consist of one tuple, which in that case is expanded.
if "bufsize" in kwargs:
self._bufsize = kwargs.pop("bufsize")
else:
Expand All @@ -51,6 +46,9 @@ def __init__(self, *args, **kwargs):
raise exceptions.LenaTypeError(
"unknown kwargs {}".format(kwargs)
)
## not sure now. Why is it not documented?
# *args* can consist of one tuple,
# which in that case is expanded.
fill_compute_seq._init_sequence_with_el(
self, args, "_fill_request",
check_sequence_type.is_fill_request_el,
Expand Down
22 changes: 11 additions & 11 deletions lena/core/split.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
"""Split data flow and run analysis in parallel."""
# Split and helper functions.
from __future__ import print_function

import copy
import itertools

Expand Down Expand Up @@ -51,7 +48,7 @@ def _get_seq_with_type(seq):
seq = sequence.Sequence(seq)
except exceptions.LenaTypeError:
raise exceptions.LenaTypeError(
"unknown argument type. Must be one of "
"unknown argument type. Must be a "
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(seq)
)
Expand All @@ -65,20 +62,19 @@ class Split(object):

def __init__(self, seqs, bufsize=1000, copy_buf=True):
"""*seqs* must be a list of Sequence, Source, FillComputeSeq
or FillRequestSeq sequences
(any other container will raise :exc:`.LenaTypeError`).
or FillRequestSeq sequences.
If *seqs* is empty, *Split* acts as an empty *Sequence* and
yields all values it receives.
*bufsize* is the size of the buffer for the input flow.
If *bufsize* is ``None``,
whole input flow is materialized in the buffer.
*bufsize* must be a natural number or ``None``,
otherwise :exc:`.LenaValueError` is raised.
*bufsize* must be a natural number or ``None``.
*copy_buf* sets whether the buffer should be copied during *run*.
*copy_buf* sets whether the buffer should be copied
during :meth:`run`.
This is important if different sequences can change input data
and interfere with each other.
and thus interfere with each other.
Common type:
If each sequence from *seqs* has a common type,
Expand All @@ -90,6 +86,9 @@ def __init__(self, seqs, bufsize=1000, copy_buf=True):
if *copy_buf* is True), and *compute*
yields values from all sequences in turn
(as would also do *request* or *Source.__call__*).
In case of wrong initialization arguments, :exc:`.LenaTypeError`
or :exc:`.LenaValueError` is raised.
"""
# todo: copy_buf must be always True. Isn't that?
if not isinstance(seqs, list):
Expand Down Expand Up @@ -145,13 +144,14 @@ def __call__(self):
This method is available only if each self sequence is a
:class:`.Source`,
otherwise :exc:`.LenaAttributeError` is raised during the execution.
otherwise runtime :exc:`.LenaAttributeError` is raised.
"""
if self._n_seq_types != 1 or not ct.is_source(self._sequences[0]):
raise exceptions.LenaAttributeError(
"Split has no method '__call__'. It should contain "
"only Source sequences to be callable"
)
# todo: use itertools.chain and check performance difference
for seq in self._sequences:
for result in seq():
yield result
Expand Down

0 comments on commit 5b58705

Please sign in to comment.