Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement `InterleaveObservable` #168

Closed
leakyabstraction opened this Issue Jun 17, 2016 · 9 comments

Comments

Projects
None yet
2 participants
@leakyabstraction
Copy link
Contributor

commented Jun 17, 2016

This is essentially for the purpose of having a way to construct an observable by deterministically interleaving items emitted from two streams, with the similar behavior as observed in scalaz-stream/fs2. This is my first attempt to implement any Observable combinator in monix, so could you take a look at the below gist, and comment ? If it looks good, I will open a PR.

https://gist.github.com/leakyabstraction/145b57560f7acffb87ee7664d7713c71

@alexandru alexandru added this to the 2.0 milestone Jun 17, 2016

@alexandru

This comment has been minimized.

Copy link
Member

commented Jun 17, 2016

Hey @leakyabstraction, interesting.

Please do a proper Pull Request. I'll assign this ticket to you if you want it.

On a cursory look, is this equivalent with the following?

Observable
  .zipWith2(source1, source2) { (a,b) => Observable(a,b) }
  .flatten

Do you think there would be performance benefits to implementing it manually instead of expressing it with zipWith2? It's basically the same logic, except for the added flatten, so I guess so.

TODO:

  • Please add comments. Monix is heavily commented for problematic stuff. Like for example that select variable needs a serious explanation.
  • Needs some tests. Btw - if you'll look at the current tests, I tried to come up with some basic tests that all operators should pass. It's not perfect, but works fine. You can also inspire yourself from Zip2Suite which I think will be similar. And add whatever tests that should go there. We also have ScalaCheck so you can use it if inclined to specify properties. I'm actually planning for that, in a later version, though for now we've started with the tests in monix-cats.
  • Method needs to be added in ObservableLike and static function needs to be added in the Observable companion object

Question: does the FS2 variant support variable arity? Should we?

A note on efficiency - as you probably saw, we strive to be efficient by collapsing pipelines synchronous when we can. There are also some private macros in Ack.scala, expressed as macros because of the JVM inlining problem. I'll expose them on the next release.

@leakyabstraction

This comment has been minimized.

Copy link
Contributor Author

commented Jun 17, 2016

@alexandru yes what you propose would work. However there are already some primitive operations in current code base that can also be implemented by others. Take fromIterator/fromIterable as example, they can be alternatively implemented as,

def fromIterator[A](it: Iterator[A]): Observable[A] = Observable.repeat(it).takeWhile(_.hasNext).map(_.next)

I guess with manual implementations, they can end up more efficient ?

My next step plan after this is to implement mergeSort which would merge two sorted streams into one which is also sorted. Ideally, if monix can provide some mechanism (like wye in scalaz-stream/fs2 ?) to deterministically interweave two (or multiple for the sake of arity) streams into one, that would at a high level solve a series of similar problems, such as interleave or mergeSort.

yes, I will add comments. I was trying to scratch something quickly that can work.
yes, I will add tests.

I don't think with fs2/scalaz-stream one can interleave more than 2 streams. That said, I think it would be worthwhile of exploring the possibility of having it, which becomes more important for the sake of mergeSort. For my current implementation of interleaving two streams, they need to lock on each other's promise if select is not assigned to itself, i.e., -1 -> 1, 1 -> -1. With more than 2 streams, say 3, the way of locking onNext callback should be like: 1 -> 3, 2 -> 1, 3 -> 2 ? What do you think ?

I will take all these into consideration, and open a PR after I feel satisfied from my perspective.

An (not directly) irrelevant question though -
I did find the manual implementation in monix is kind of hard to understand and write, since it has var, locking, synchronizing and promises all twisted together. Is there some sort of best practice you would suggest that someone new to the project (such as me lol) should strive to obey, in order to make minimum mistakes, or none thereof ?

@alexandru

This comment has been minimized.

Copy link
Member

commented Jun 17, 2016

So, many operations on observables are redundant and that's OK. It's by design actually. We strive to be useful and efficient, not minimal :)

Unfortunately you picked a first operator that is hard to implement. It's hard because it is concurrent. You basically have two streams that race each other and have to contend on something and so you need to deal with concurrency.

FS2 is probably easier to deal with for implementing this zip (or interleave) because FS2 is pull-based and you can pull from the first and when that's done then you can pull from the second. But if you parallelize the work and pull from both at the same time, you end up with the same concurrency problems. Hence the need to synchronize, or to use Atomic in places and to be careful.

And as a philosophy, the API needs to be clean and lawful and so on, but the internals can be dirty. As long as the internals are properly encapsulated, efficiency trumps clarity, because once an operator is proven and has tests, then it's done and we no longer need to touch it. Plus at some point we'll have enough useful operations available that we won't need to implement new ones from scratch. I feel like we are nearly there.

My advice is to start with something that works. Your snippet is a good start. If you find it too complex, then start with something simpler. This basically gives you a good mental model of what you have to deal with. And then you can start applying optimizations. And we can talk on a pull request, I'll be glad to give you advice and stuff.

@alexandru

This comment has been minimized.

Copy link
Member

commented Jun 17, 2016

@leakyabstraction I was thinking ... if we implement interleave2, we can't implement interleave3, interleave4 .... and interleaveList from it, in the same way we did with zip.

So in the case of zip, even with the focus on efficiency, I just couldn't do it for zip3 and more because it's just too hard. And that's fine because doing zip3 in terms of zip2 is reasonable. But we cannot implement interleave3 in terms of interleave2. And that's a problem.

I think you should just do this in terms of zip.flatten. This way we can easily have variable arity by piggybacking on zip.

And another thing, I don't think mergeSort will work in terms of interleave. The problem with interleave (and zip) is that it blocks one observable until the other one emits. If you want to describe it in terms of what we already have, it's probably a combination of merge and scan.

@leakyabstraction

This comment has been minimized.

Copy link
Contributor Author

commented Jun 17, 2016

@alexandru yes I was thinking to implement interleave, interleave2 up until interleave6 manually (just like what you have done for zipNObservable), and arity more than that will fall back to zip andThen flatten.

For mergeSort, I am not going to implement it based on interleave, but I will strive to work out a manual one. mergSort with arity more than 2, and even for a list of a list, as an intellectual exercise, I will see if I can implement it just with merge and scan as you suggested.

BTW - make flatMap has the same behavior as concatMap, and let mergeMap serve what its name means is a big conceptual relief. I still cannot wrap my head around the decision of using flatMap to really do the mergeMap in rxscala/rxjava :)

Cheers,

@alexandru

This comment has been minimized.

Copy link
Member

commented Jun 17, 2016

Btw, I actually forgot that zip3 (and so forth) now has its own implementation. Guess I considered it important :-)

So mergeSort in terms of merge + scan won't work that well because it won't back-pressure. Do it as an exercise if you want.

So I'll leave the implementation of interleave to your judgment. Looking forward to the PR. mergeSort also sounds good if you want to tackle that.

@leakyabstraction

This comment has been minimized.

Copy link
Contributor Author

commented Jun 18, 2016

@alexandru It is harder than I thought, and even what you proposed by chaining zip and flatten would not work. Take below as an example,

Observable.range(0, 3).interleave(Observable(0, 2)) shouldBe Observable(0, 0, 1, 1, 2)

However if we usezip, it would essentially throw away the last element from the first observable, and leave the interleaved stream to be Observable(0, 0, 1, 1), which is not right. We have to do some mind twisted condition check whether the composed stream should be allowed to finish when either child stream is finished, and also whether the buffered element of either child stream should be emitted or not upon termination of the stream. Facts are I can pass the unit tests, but the property tests mostly blow up in my face. I need to think about this more...

@alexandru

This comment has been minimized.

Copy link
Member

commented Jun 23, 2016

@leakyabstraction it might be actually easier than zip, because now that I think about it, with zip we have extra restrictions - for example with zip2 we can only emit when we have both values, which means we have to remember those values, hence the dance we are doing with that Either.

So for example we can have afterA1 and afterA2 promises of type Promise[Ack].

If onNext happens for obsA1, we need to do afterA2.future.flatMap { ... }. And when onNext happens for obsA2, we need to do afterA1.future.flatMap { ... }. And the trick with onComplete is that if it happens for obsA1 first, we can simply complete afterA1 and obviously leave it completed, so then whenever obsA2 emits onNext and does afterA1.future.flatMap { ... }, then it will not block.

The other problem is how do we make this more efficient. But again, an implementation that works is good enough at first.

Does that make sense btw?

@alexandru

This comment has been minimized.

Copy link
Member

commented Jun 29, 2016

The interleave operator is on master. Closing this issue.

@leakyabstraction please open a new one for the Pad operator.

@alexandru alexandru closed this Jun 29, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.