Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combine_latest emit_on behavior #48

Closed
CJ-Wright opened this issue Aug 4, 2017 · 26 comments
Closed

combine_latest emit_on behavior #48

CJ-Wright opened this issue Aug 4, 2017 · 26 comments
Assignees

Comments

@CJ-Wright
Copy link
Member

Should we buffer the combine_latest emit_on stream?

Context

Consider the following situation, we have two streams a and b. We are going to use combine_latest to combine them, with an emit only on a. 5 entries come down from a then one comes down from b then one from a. Due to the lack of data from b none of the 5 initial a entries have been emitted since the b was missing. Now that we have b data do we expect it to emit 6 times (which would require the buffering of the emit_on stream or only once (which may violate the idea of the emit_on stream always emitting)?

@mrocklin @ordirules @danielballan

@CJ-Wright
Copy link
Member Author

CJ-Wright commented Aug 4, 2017

In a corollary should we have an equivalent to zip_longest?

@jrmlhermitte
Copy link
Contributor

good question. I think we may want to first go back and review existing streams. Here's CombineLatest from reactiveX:

The CombineLatest operator behaves in a similar way to Zip, but while Zip emits items only when each of the zipped source Observables have emitted a previously unzipped item, CombineLatest emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). When any of the source Observables emits an item, CombineLatest combines the most recently emitted items from each of the other source Observables, using a function you provide, and emits the return value from that function.

We've modified it by adding emit_on. I'd say that we should stick to what they use and only emit once. Basically, our result with emit_on=a would be like that figure but with only "2A", "3D", "4D, and "5D" as a result. It would make things simpler in that we don't have to worry about buffering (and consequently backfilling).

Consequently, should we start making figures for the methods here? I can go ahead and begin. We just need to agree the medium. I generally use inkscape.

@jrmlhermitte
Copy link
Contributor

can you clarify about zip_longest? let's use their figure for zip here as an example. how would the result of zip_longest differ from their suggested output?

@CJ-Wright
Copy link
Member Author

I am a very visual art challenged person (I can appreciate it, just not create it) so I'd say go ahead with whichever medium works for you, thank you!

@jrmlhermitte
Copy link
Contributor

hehe same, but i guess i can get something started, later this weekend hopefully, as a PR. If we end up using it, we should remember to squash commits with image changes to save space...

@CJ-Wright
Copy link
Member Author

for zip_longest we'd emit a 5D assuming that we decided on the top one being the longest.

@CJ-Wright
Copy link
Member Author

CJ-Wright commented Aug 4, 2017

Maybe a better view:
Streams coming into a node can be "lossy" or "lossless". For most operator nodes (map, filter, accumulate) this is not a problem, we only have one stream and that makes it automatically lossless. However, for multi-stream nodes (zip, combine_latest) which emit a tuple this becomes a problem.

  • combine_latest is lossy, we don't buffer any of the data coming in so if we push 5 pieces of data into a and none into b we will not see the first 4 assuming that the next item is from b (all 5 if emit_on=a). If we pre-load combine_latest with data we will lose it.

  • zip is also lossy. If the number of items in either stream is uneven we may never see them.

We may want the ability to specify lossless streams coming into the multi-stream nodes.
For example a combine_latest with the a stream set to lossless will buffer the a stream items and then when a b comes along emit a series of ab items which combine the new b with all the buffered a.

So maybe zip_longest is congruent to a lossless combine_latest?

@CJ-Wright
Copy link
Member Author

Something like
-A-B-C-D-E---------------------------------------------F------
+
-----------------1-----------------------2-3-4-5-----------------
=
-----------------A1-B1-C1-D1-E1--------------------F5-----

@CJ-Wright
Copy link
Member Author

Here is an implemented example (in the event model)
xpdAcq/shed-streaming#34

@CJ-Wright
Copy link
Member Author

@ordirules you might look at tikz to make these examples, we may be able to make them with streams (which would mean that if/when we update the code the examples will also be updated) and then insert them into the docs.

@jrmlhermitte
Copy link
Contributor

thanks, I used to use asymptote which I really like but it's probably out of date :-D. I'll take a look at this later. LaTEX is always a plus!

@CJ-Wright
Copy link
Member Author

Note that combine_latest without an emit on is lossless, since every piece of data that comes through will come out at some point.

@limx0
Copy link

limx0 commented Aug 9, 2017

@CJ-Wright doesn't the name combine_latest imply a lossy function? I read that to be "you will always have the most recent data from each source", without making guarantees that you will see every piece of data from every source. My thoughts are we should use a different function name if you want to have a lossless version?

@CJ-Wright
Copy link
Member Author

CJ-Wright commented Aug 10, 2017

That's fair. I'm rather bad and at naming things, any suggestions?
Edit: Although I didn't think that zip also implied a lossy nature.

@limx0
Copy link

limx0 commented Aug 10, 2017

I agree on that - I think zip implies a lossless function that waits for all inputs before emitting a result (as we would expect with the builtin zip.

Edit: (General rumblings unrelated to topic)
TBH, in practice I'm not really sure what sort of real-time system is likely to emit values in such a perfect fashion? I would never use zip personally because I would always be worried about one of my data sources breaking and throwing out the entire stream.

Something like
-a-b-c--
-1-exception-3--

is going to yield 1a, b3, which (to me) is unexpected and could throw out sense of order in your system

@CJ-Wright
Copy link
Member Author

CJ-Wright commented Aug 10, 2017

@limx0 Can you clarify what you mean by "perfect fashion"?
Edit: I looked at the zip def again.

Putting the naming issues aside, would other people find something useful where one stream will be guaranteed to be emitted while combining it with another stream?

@limx0
Copy link

limx0 commented Aug 10, 2017

By perfect fashion I mean; having multiple sources of data that emit elements at the (almost) exact same time and without fail in a way that your zip function continues to emit as intended.

This is more of a practical issue than a library/theory note. What I mean is while zip works fantastically well in the simple test case of

a = Stream()
b = Stream()
c = s.zip(a, b)

a.emit(1)
b.emit('a')
a.emit(2)
b.emit('b')

But in the wild, where your a and b streams are some external data sources that are not perfectly reliable or emit data at the exact same time, taking zip at face-value may lead to unintended consequences and trip some people up.

Edit: This is more of a caution for using external sources, if you have control over all of your inputs it's probably fine

@limx0
Copy link

limx0 commented Aug 10, 2017

Yep, I'm sure they would. My general thought is leave combine_on_latest and zip exactly as they are. In my opinion they work as expected and I wouldn't like to see them changed

@CJ-Wright
Copy link
Member Author

CJ-Wright commented Aug 10, 2017

I have some experiments where that is doable, especially where you know that the number of elements will be the same.

# cumulative average
a = Stream()
b = scan(add, a)
c = scan(lambda x, y: x + 1, a, start=0)
d = zip(b, c)
e = map(divide, d)
for data in data_source:
    a.emit(data)

Edit: bug in scan lambda needs two args

@CJ-Wright
Copy link
Member Author

Maybe I should make a hybrid zip_latest node.

@limx0
Copy link

limx0 commented Aug 10, 2017

Also keen to have a zip_longest as you mentioned above, I can see that this could definitely be useful is some case.

-A-B-C-D-E---------------------------------------------F------
+
-----------------1-----------------------2-3-4-5-----------------
=
-----------------A1-B1-C1-D1-E1--------------------F5-----

What about zip_product as a name?

Edit: Nevermind I was thinking it did more of a product-like combination. I'm not 100% sure what your stream above ^^ is doing
Edit2: Yep, zip_latest seems appropriate

@CJ-Wright
Copy link
Member Author

zip_product works for me. I have a version of this already in the heterogeneous sibling library: https://github.com/xpdAcq/SHED/pull/34/files#diff-65d9b3aec4fc1da045d510c0bf1badf7R979
(I can change the name, but it seems to do the job. Relevant tests: https://github.com/xpdAcq/SHED/pull/34/files#diff-dfb8639c0c54113885f051bbf83937ccR640)

@CJ-Wright
Copy link
Member Author

@mrocklin @ordirules Thoughts on a name?

This was referenced Aug 10, 2017
@jrmlhermitte
Copy link
Contributor

jrmlhermitte commented Aug 10, 2017

Just catching up. Actually, I think maybe if it's just for the use case you mentioned earlier with the diagram, it could be combined_latest(wait_first=True) or something. So:

    s = Stream()
    s2 = Stream()
    s3 = s.combine_latest(s2, wait_first=True)
    s3.map(print)

would give:

s1 : A--B------------C---------D-
s2 : -------1-----2---------3--4---
s3 : ------A1B1----C2-------D4-

as opposed to:

s1               : A--B------------C---------D-
s2               : -------1-----2---------3--4---
s3 (no wait)     : ------b1--------C2-------D4-

Which is just as you mentioned, combine_latest, but with some buffering.

I may have missed something. If yes, could you provide a use case and schematic?

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 10, 2017 via email

@CJ-Wright
Copy link
Member Author

I don't have too much preference one way or another (between creating a separate node and putting a flag on combine_latest). Just let me know what the consensus is and I'll go with that (unless BDFL wants to weigh in one side or the other).

A name consensus would be nice too!
(side note I wish that github supported voting for this kind of things)

@CJ-Wright CJ-Wright self-assigned this Aug 15, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants