Skip to content

Commit

Permalink
Add core.LenaSplit base class for split sequences.
Browse files Browse the repository at this point in the history
  • Loading branch information
ynikitenko committed Sep 19, 2023
1 parent cc6f9c7 commit 269824e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 76 deletions.
4 changes: 2 additions & 2 deletions lena/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .fill_compute_seq import FillComputeSeq
from .fill_seq import FillSeq
from .fill_request_seq import FillRequestSeq
from .split import Split
from .split import Split, LenaSplit
from .meta import alter_sequence, flatten
from .check_sequence_type import (
is_source,
Expand All @@ -31,7 +31,7 @@

__all__ = [
'Call', 'FillCompute', 'FillInto', 'FillRequest', 'Run', 'SourceEl',
'LenaSequence', 'Sequence', 'Source', 'Split',
'LenaSequence', 'Sequence', 'Source', 'Split', 'LenaSplit',
'FillSeq',
'FillComputeSeq', 'FillRequestSeq',
'LenaException',
Expand Down
157 changes: 83 additions & 74 deletions lena/core/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,86 @@ def _remove_uc_intersection(unknown_contexts_seq, intersection):
unknown_contexts.remove(cont)


class Split(object):
class LenaSplit(object):
"""Abstract base class for split sequences."""

def __init__(self, seqs):
self._seqs = seqs

contexts = []
unknown_contexts_seq = []
# the order of sequences is not important
for seq in seqs:
# first we get all known contexts, then unknown ones
if hasattr(seq, "_get_context"):
contexts.append(seq._get_context())
if hasattr(seq, "_unknown_contexts"):
# it is important that we have links to actual lists
unknown_contexts_seq.append(seq._unknown_contexts)

if unknown_contexts_seq and len(unknown_contexts_seq) == len(seqs):
# otherwise ignore them (the intersection is empty):
# they will be set from external context.
intersection = _get_uc_intersection(unknown_contexts_seq)
self._unknown_contexts = intersection
# never update template contexts twice.
_remove_uc_intersection(unknown_contexts_seq, intersection)

# todo: if a template context updates an existing context,
# this will be wrong. But what if it is a feature?
# Don't do that if you are not sure what you are doing (I'm not)
# Maybe we shall remove some keys from the intersection
# if this ever becomes a problem.
self._context = lena.context.intersection(*contexts)

def _repr_nested(self, base_indent="", indent=" "*4, el_separ=",\n"):
# copied from LenaSequence, see the diffs
def repr_maybe_nested(el, base_indent, indent):
if hasattr(el, "_repr_nested"):
return el._repr_nested(base_indent=base_indent+indent, indent=indent)
else:
return base_indent + indent + repr(el)

elems = el_separ.join((repr_maybe_nested(el, base_indent=base_indent,
indent=indent)
# diff here
for el in self._seqs))

if "\n" in el_separ and self._seqs:
# maybe new line
mnl = "\n"
# maybe base indent
mbi = base_indent
else:
mnl = ""
mbi = ""
# diff here in name and brackets
return "".join([base_indent, "Split",
"([", mnl, elems, mnl, mbi, "])"])

def _get_context(self):
return deepcopy(self._context)

def _set_context(self, context):
# we don't update current context here,
# because Split is always within a sequence.
# todo
# If it is not, it has no external context,
# or one must first copy its context before setting a new one.
for seq in self._seqs:
if hasattr(seq, "_set_context"):
seq._set_context(context)

def __eq__(self, other):
if not isinstance(other, LenaSplit):
return NotImplemented
return self._seqs == other._seqs

def __repr__(self):
return self._repr_nested()


class Split(LenaSplit):
"""Split data flow and run analysis in parallel."""

def __init__(self, seqs, bufsize=1000, copy_buf=True):
Expand Down Expand Up @@ -153,7 +232,7 @@ def __init__(self, seqs, bufsize=1000, copy_buf=True):
)

seqs = [meta.alter_sequence(seq) for seq in seqs]
self._seqs = []
new_seqs = []
self._seq_types = []

for sequence in seqs:
Expand All @@ -165,7 +244,7 @@ def __init__(self, seqs, bufsize=1000, copy_buf=True):
"FillComputeSeq, FillRequestSeq or Source, "
"{} provided".format(sequence)
)
self._seqs.append(seq)
new_seqs.append(seq)
self._seq_types.append(seq_type)

different_seq_types = set(self._seq_types)
Expand Down Expand Up @@ -194,31 +273,7 @@ def __init__(self, seqs, bufsize=1000, copy_buf=True):
)
self._bufsize = bufsize

contexts = []
unknown_contexts_seq = []
# the order of sequences is not important
for seq in self._seqs:
# first we get all known contexts, then unknown ones
if hasattr(seq, "_get_context"):
contexts.append(seq._get_context())
if hasattr(seq, "_unknown_contexts"):
# it is important that we have links to actual lists
unknown_contexts_seq.append(seq._unknown_contexts)

if unknown_contexts_seq and len(unknown_contexts_seq) == len(self._seqs):
# otherwise ignore them (the intersection is empty):
# they will be set from external context.
intersection = _get_uc_intersection(unknown_contexts_seq)
self._unknown_contexts = intersection
# never update template contexts twice.
_remove_uc_intersection(unknown_contexts_seq, intersection)

# todo: if a template context updates an existing context,
# this will be wrong. But what if it is a feature?
# Don't do that if you are not sure what you are doing (I'm not)
# Maybe we shall remove some keys from the intersection
# if this ever becomes a problem.
self._context = lena.context.intersection(*contexts)
super(Split, self).__init__(new_seqs)

def __call__(self):
"""Each initialization sequence generates flow.
Expand Down Expand Up @@ -399,49 +454,3 @@ def run(self, flow):
if flow_was_empty:
for val in seq.run([]):
yield val

def _repr_nested(self, base_indent="", indent=" "*4, el_separ=",\n"):
# copied from LenaSequence, see the diffs
def repr_maybe_nested(el, base_indent, indent):
if hasattr(el, "_repr_nested"):
return el._repr_nested(base_indent=base_indent+indent, indent=indent)
else:
return base_indent + indent + repr(el)

elems = el_separ.join((repr_maybe_nested(el, base_indent=base_indent,
indent=indent)
# diff here
for el in self._seqs))

if "\n" in el_separ and self._seqs:
# maybe new line
mnl = "\n"
# maybe base indent
mbi = base_indent
else:
mnl = ""
mbi = ""
# diff here in name and brackets
return "".join([base_indent, "Split",
"([", mnl, elems, mnl, mbi, "])"])

def _get_context(self):
return deepcopy(self._context)

def _set_context(self, context):
# we don't update current context here,
# because Split is always within a sequence.
# todo
# If it is not, it has no external context,
# or one must first copy its context before setting a new one.
for seq in self._seqs:
if hasattr(seq, "_set_context"):
seq._set_context(context)

def __eq__(self, other):
if not isinstance(other, Split):
return NotImplemented
return self._seqs == other._seqs

def __repr__(self):
return self._repr_nested()

0 comments on commit 269824e

Please sign in to comment.