Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lossless cl #34

Merged
merged 7 commits into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 95 additions & 18 deletions shed/event_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,10 +1042,11 @@ class combine_latest(EventStream):
>>> assert len(L) == 6
"""

special_docs_names = ['start', 'descriptor', 'stop']

def __init__(self, *children, emit_on=None):
self.last = [None for _ in children]
self.special_docs_names = ['start', 'descriptor', 'stop']
self.special_docs = {k: [None for _ in children] for k in
self.special_last = {k: [None for _ in children] for k in
self.special_docs_names}
self.missing = set(children)
self.special_missing = {k: set(children) for k in
Expand All @@ -1060,25 +1061,101 @@ def __init__(self, *children, emit_on=None):

def update(self, x, who=None):
name, doc = x
idx = self.children.index(who)
if name in self.special_docs_names:
idx = self.children.index(who)
self.special_docs[name][idx] = x
if self.special_missing[name] and who in \
self.special_missing[name]:
self.special_missing[name].remove(who)

self.special_docs[name][self.children.index(who)] = x
if not self.special_missing[name] and who in self.emit_on:
tup = tuple(self.special_docs[name])
return self.emit(tup)
local_missing = self.special_missing[name]
local_last = self.special_last[name]

else:
if self.missing and who in self.missing:
self.missing.remove(who)
local_missing = self.missing
local_last = self.last

local_last[idx] = x
if local_missing and who in local_missing:
local_missing.remove(who)

# we have a document from every one or we are on the emitting stream
if not local_missing and who in self.emit_on:
tup = tuple(local_last)
return self.emit(tup)


class zip_latest(EventStream):
"""Combine multiple streams together to a stream of tuples

This will emit a new tuple of the elements from the lossless stream paired
with the latest elements from the other streams.

Parameters
----------
lossless : EventStream instance
The stream who's documents will always be emitted
children: EventStream instances
The streams to combine

Examples
--------
>>> from shed.utils import to_event_model
>>> from streams import Stream
>>> import shed.event_streams as es
>>> a = [1, 2, 3] # base data
>>> b = [4, 5, 6]
>>> g = to_event_model(a, [('det', {'dtype': 'float'})])
>>> gg = to_event_model(b, [('det', {'dtype': 'float'})])
>>> source = Stream()
>>> source2 = Stream()
>>> m = es.zip_latest(source, source2)
>>> l = m.sink(print)
>>> L = m.sink_to_list()
>>> for doc1 in g: zz = source.emit(doc1)
>>> for doc2 in gg: z = source2.emit(doc2)
>>> assert len(L) == 6
"""

special_docs_names = ['start', 'descriptor', 'stop']

def __init__(self, lossless, *children):
children = (lossless, ) + children
self.last = [None for _ in children]
self.special_last = {k: [None for _ in children] for k in
self.special_docs_names}
self.missing = set(children)
self.special_missing = {k: set(children) for k in
self.special_docs_names}
self.lossless = lossless
self.lossless_buffer = deque()
EventStream.__init__(self, children=children)

self.last[self.children.index(who)] = x
if not self.missing and who in self.emit_on:
tup = tuple(self.last)
return self.emit(tup)
def update(self, x, who=None):
name, doc = x
idx = self.children.index(who)
if name in self.special_docs_names:
local_missing = self.special_missing[name]
local_last = self.special_last[name]
local_type = 'special'

else:
local_missing = self.missing
local_last = self.last
local_type = 'event'
if who is self.lossless:
self.lossless_buffer.append(x)

local_last[idx] = x
if local_missing and who in local_missing:
local_missing.remove(who)

if not local_missing:
if local_type == 'special':
return self.emit(tuple(local_last))
# check start and descriptors emitted if not buffer
if not all([self.special_missing[k] for k in ['start',
'descriptor']]):
L = []
while self.lossless_buffer:
local_last[0] = self.lossless_buffer.popleft()
L.append(self.emit(tuple(local_last)))
return L


class Eventify(EventStream):
Expand Down
48 changes: 48 additions & 0 deletions shed/tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,54 @@ def test_combine_latest(exp_db, start_uid1, start_uid3):
assert n in assert_docs


def test_lossless_combine_latest(exp_db, start_uid1, start_uid3):
source = Stream()
source2 = Stream()

L = es.zip_latest(source, source2).sink_to_list()
ih1 = exp_db[start_uid1]
ih2 = exp_db[start_uid3]
s = exp_db.restream(ih1)
s2 = exp_db.restream(ih2)
for b in s2:
source2.emit(b)
for a in s:
source.emit(a)

assert_docs = set()
for l1, l2 in L:
assert l1[0] == l2[0]
assert_docs.add(l1[0])
assert l1 != l2
for n in ['start', 'descriptor', 'event', 'stop']:
assert n in assert_docs
assert len(L) == len(list(exp_db.restream(ih1)))


def test_lossless_combine_latest_reverse(exp_db, start_uid1, start_uid3):
source = Stream()
source2 = Stream()

L = es.zip_latest(source, source2).sink_to_list()
ih1 = exp_db[start_uid1]
ih2 = exp_db[start_uid3]
s = exp_db.restream(ih1)
s2 = exp_db.restream(ih2)
for a in s:
source.emit(a)
for b in s2:
source2.emit(b)

assert_docs = set()
for l1, l2 in L:
assert l1[0] == l2[0]
assert_docs.add(l1[0])
assert l1 != l2
for n in ['start', 'descriptor', 'event', 'stop']:
assert n in assert_docs
assert len(L) == len(list(exp_db.restream(ih1)))


def test_eventify(exp_db, start_uid1):
source = Stream()

Expand Down