Skip to content

Commit

Permalink
Merge pull request #34 from CJ-Wright/lossless_cl
Browse files Browse the repository at this point in the history
Lossless cl
  • Loading branch information
CJ-Wright committed Aug 18, 2017
2 parents 0e56571 + 0ec2453 commit 7298708
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 18 deletions.
113 changes: 95 additions & 18 deletions shed/event_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,10 +1042,11 @@ class combine_latest(EventStream):
>>> assert len(L) == 6
"""

special_docs_names = ['start', 'descriptor', 'stop']

def __init__(self, *children, emit_on=None):
self.last = [None for _ in children]
self.special_docs_names = ['start', 'descriptor', 'stop']
self.special_docs = {k: [None for _ in children] for k in
self.special_last = {k: [None for _ in children] for k in
self.special_docs_names}
self.missing = set(children)
self.special_missing = {k: set(children) for k in
Expand All @@ -1060,25 +1061,101 @@ def __init__(self, *children, emit_on=None):

def update(self, x, who=None):
name, doc = x
idx = self.children.index(who)
if name in self.special_docs_names:
idx = self.children.index(who)
self.special_docs[name][idx] = x
if self.special_missing[name] and who in \
self.special_missing[name]:
self.special_missing[name].remove(who)

self.special_docs[name][self.children.index(who)] = x
if not self.special_missing[name] and who in self.emit_on:
tup = tuple(self.special_docs[name])
return self.emit(tup)
local_missing = self.special_missing[name]
local_last = self.special_last[name]

else:
if self.missing and who in self.missing:
self.missing.remove(who)
local_missing = self.missing
local_last = self.last

local_last[idx] = x
if local_missing and who in local_missing:
local_missing.remove(who)

# we have a document from every one or we are on the emitting stream
if not local_missing and who in self.emit_on:
tup = tuple(local_last)
return self.emit(tup)


class zip_latest(EventStream):
"""Combine multiple streams together to a stream of tuples
This will emit a new tuple of the elements from the lossless stream paired
with the latest elements from the other streams.
Parameters
----------
lossless : EventStream instance
The stream who's documents will always be emitted
children: EventStream instances
The streams to combine
Examples
--------
>>> from shed.utils import to_event_model
>>> from streams import Stream
>>> import shed.event_streams as es
>>> a = [1, 2, 3] # base data
>>> b = [4, 5, 6]
>>> g = to_event_model(a, [('det', {'dtype': 'float'})])
>>> gg = to_event_model(b, [('det', {'dtype': 'float'})])
>>> source = Stream()
>>> source2 = Stream()
>>> m = es.zip_latest(source, source2)
>>> l = m.sink(print)
>>> L = m.sink_to_list()
>>> for doc1 in g: zz = source.emit(doc1)
>>> for doc2 in gg: z = source2.emit(doc2)
>>> assert len(L) == 6
"""

special_docs_names = ['start', 'descriptor', 'stop']

def __init__(self, lossless, *children):
children = (lossless, ) + children
self.last = [None for _ in children]
self.special_last = {k: [None for _ in children] for k in
self.special_docs_names}
self.missing = set(children)
self.special_missing = {k: set(children) for k in
self.special_docs_names}
self.lossless = lossless
self.lossless_buffer = deque()
EventStream.__init__(self, children=children)

self.last[self.children.index(who)] = x
if not self.missing and who in self.emit_on:
tup = tuple(self.last)
return self.emit(tup)
def update(self, x, who=None):
name, doc = x
idx = self.children.index(who)
if name in self.special_docs_names:
local_missing = self.special_missing[name]
local_last = self.special_last[name]
local_type = 'special'

else:
local_missing = self.missing
local_last = self.last
local_type = 'event'
if who is self.lossless:
self.lossless_buffer.append(x)

local_last[idx] = x
if local_missing and who in local_missing:
local_missing.remove(who)

if not local_missing:
if local_type == 'special':
return self.emit(tuple(local_last))
# check start and descriptors emitted if not buffer
if not all([self.special_missing[k] for k in ['start',
'descriptor']]):
L = []
while self.lossless_buffer:
local_last[0] = self.lossless_buffer.popleft()
L.append(self.emit(tuple(local_last)))
return L


class Eventify(EventStream):
Expand Down
48 changes: 48 additions & 0 deletions shed/tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,54 @@ def test_combine_latest(exp_db, start_uid1, start_uid3):
assert n in assert_docs


def test_lossless_combine_latest(exp_db, start_uid1, start_uid3):
source = Stream()
source2 = Stream()

L = es.zip_latest(source, source2).sink_to_list()
ih1 = exp_db[start_uid1]
ih2 = exp_db[start_uid3]
s = exp_db.restream(ih1)
s2 = exp_db.restream(ih2)
for b in s2:
source2.emit(b)
for a in s:
source.emit(a)

assert_docs = set()
for l1, l2 in L:
assert l1[0] == l2[0]
assert_docs.add(l1[0])
assert l1 != l2
for n in ['start', 'descriptor', 'event', 'stop']:
assert n in assert_docs
assert len(L) == len(list(exp_db.restream(ih1)))


def test_lossless_combine_latest_reverse(exp_db, start_uid1, start_uid3):
source = Stream()
source2 = Stream()

L = es.zip_latest(source, source2).sink_to_list()
ih1 = exp_db[start_uid1]
ih2 = exp_db[start_uid3]
s = exp_db.restream(ih1)
s2 = exp_db.restream(ih2)
for a in s:
source.emit(a)
for b in s2:
source2.emit(b)

assert_docs = set()
for l1, l2 in L:
assert l1[0] == l2[0]
assert_docs.add(l1[0])
assert l1 != l2
for n in ['start', 'descriptor', 'event', 'stop']:
assert n in assert_docs
assert len(L) == len(list(exp_db.restream(ih1)))


def test_eventify(exp_db, start_uid1):
source = Stream()

Expand Down

0 comments on commit 7298708

Please sign in to comment.