Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for #1002 #1008

Merged
merged 9 commits into from Oct 30, 2019

Conversation

@jbracker
Copy link
Contributor

jbracker commented Sep 5, 2019

This is a fix for #1002.

All it does it replace the default implementation of Observable.combineLatestList with the custom implementation CombineLatestListObservable. The custom implementation takes special care to not cause any stack issues and tries to produce as little overhead as possible.

CombineLatestListObservable is based on the template set by CombineLatest2Observable and just expands on it to handle any number of incoming observables.

Should Observable.combineLatestList use the specialized implementations for 1 to 6 incoming Observables if possible (i.e. CombineLatest2Observable, CombineLatest3Observable, etc.)?

Jan Bracker added 2 commits Sep 5, 2019
…t observables to fix the issue. The custom observable is based on the CombineLatest2Observable and just modifies its template to work with an arbitrary number of inputs.
@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Sep 10, 2019

Thank you very much for PR @jbracker
We are a bit busy with 3.0.0 release so we are slacking a bit with reviews but I will try to review it as soon as I can and this is something we can add in bugfix release.

Should Observable.combineLatestList use the specialized implementations for 1 to 6 incoming Observables if possible (i.e. CombineLatest2Observable, CombineLatest3Observable, etc.)?

It will be hard to know unless we benchmark it so I think it's OK in the current state

@alexandru

This comment has been minimized.

Copy link
Member

alexandru commented Sep 16, 2019

A review for this ticket is on my TODO 🙂

The problem with these Observable operators is that they operate with concurrent side effects.

@alexandru alexandru added the bug label Sep 16, 2019
@alexandru alexandru added this to the 3.1.0 milestone Sep 16, 2019
@jbracker

This comment has been minimized.

Copy link
Contributor Author

jbracker commented Sep 16, 2019

Glad to hear that this is making progress :)

The problem with these Observable operators is that they operate with concurrent side effects.

That's why I based the implementation on CombineLatest2Observable.

@@ -5859,14 +5859,10 @@ object Observable extends ObservableDeprecatedBuilders {
* returning a new observable that generates sequences.

This comment has been minimized.

Copy link
@jakoschiko

jakoschiko Sep 17, 2019

The doc mentions that the implemenation is based on combineLatestMap2. That's not true anymore.

@Avasil
Avasil approved these changes Sep 30, 2019
Copy link
Collaborator

Avasil left a comment

I've left few comments about potential optimizations - though as it is with performance, my intuition can be completely wrong :D

Would be nice to check with a quick benchmark but overall I think this solution should work. If you don't have time for trying out my ideas then it's fine for me, we can just open an issue for it and check it eventually

I will leave it hanging for a while to give @alexandru or @oleg-py a chance to take a look but we'll push it forward whenever there's a time for a new release

Thank you so much for PR and patience @jbracker

// MUST BE synchronized by `lock`
val observables: Array[Observable[(A, Int)]] = new Array(numberOfObservables)
observables.indices.foreach { i =>
observables(i) = obss(i).map(x => (x, i))

This comment has been minimized.

Copy link
@Avasil

Avasil Sep 30, 2019

Collaborator

I feel like it could be a bit slow if we pass a big list.
Maybe we could use iterator or just use toArray? Although I don't know about performance difference without any benchmark

import scala.util.Success
import scala.util.control.NonFatal

/** Only used in [[Observable.combineLatestList()]]. Is tested by `CombineLatestListSuite`. */

This comment has been minimized.

Copy link
@Avasil

Avasil Sep 30, 2019

Collaborator

Redundant comment IMO

This comment has been minimized.

Copy link
@oleg-py

oleg-py Oct 25, 2019

Collaborator

Yeah, we don't really mention tests in comments, but we do say "implementation for Observable.combineLatestList"

@jbracker

This comment has been minimized.

Copy link
Contributor Author

jbracker commented Oct 17, 2019

@Avasil @jakoschiko I have addressed some of the comments:

  • Comments have been updated. I did not see any harm leaving the class comment for CombineLatestListObservable.
  • toArray is not an option because we don't have a ClassTag for Observable[A] and I don't think we wan't one there. I could make it an ArraySeq or a Vector though if you think that would be preferable.
  • Using Array.fill now for initialization of hasElems.
  • Yes, you are right I don't need to map the index into the observable. Having it on the outside is sufficient.
  • I created the counter hasElemCount to allow fast checks.
…need for class tags and make everything compatible with scala 2.12 and 2.13. This addresses the failed travis build.
@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Oct 24, 2019

toArray is not an option because we don't have a ClassTag for Observable[A] and I don't think we wan't one there. I could make it an ArraySeq or a Vector though if you think that would be preferable.

My main worry about

val observables: Array[(Int, Observable[A])] = new Array(numberOfObservables)
  observables.indices.foreach { i =>
    observables(i) = (i, obss(i))
  }

is that if obss is a List (likely scenario) then our complexity is 1 + 2 + 3, ... + n

Anyway, fantastic job with PR!

Avasil and others added 3 commits Oct 24, 2019
@jbracker

This comment has been minimized.

Copy link
Contributor Author

jbracker commented Oct 24, 2019

You are absolutely right. I was blind to that. Made an update that should prevent the described issue with possibly quadratic construction times.

observables.foreach { case (obs, index) =>
composite += obs.unsafeSubscribeFn(new Subscriber[A] {
implicit val scheduler: Scheduler = out.scheduler

Comment on lines 133 to 136

This comment has been minimized.

Copy link
@oleg-py

oleg-py Oct 24, 2019

Collaborator

Correct me if I wrong, but why do we need these tuples in observables? It seems that index is only used here, so one should be able to drop these allocations just by doing:

observables.foreach { obs =>
  val index = i // close over proper index
  i += 1

and thus you don't need to reallocate collection at all (can just use original obss coming to you)

This comment has been minimized.

Copy link
@jakoschiko

jakoschiko Oct 24, 2019

Or

observables.iterator.zipWithIndex.foreach { case (obs, index) =>
    ???
}}}

This comment has been minimized.

Copy link
@oleg-py

oleg-py Oct 25, 2019

Collaborator

@jakoschiko that is indeed an option, but it's only halfway - you skip the allocation of an extra collection, but not the tuples for each instance.

Copy link
Collaborator

oleg-py left a comment

@jbracker sorry for my piecemeal nitpicking. I think it's good to merge as is, but I have some final minor comments :)

import scala.util.Success
import scala.util.control.NonFatal

/** Only used in [[Observable.combineLatestList()]]. Is tested by `CombineLatestListSuite`. */

This comment has been minimized.

Copy link
@oleg-py

oleg-py Oct 25, 2019

Collaborator

Yeah, we don't really mention tests in comments, but we do say "implementation for Observable.combineLatestList"

import scala.util.control.NonFatal

/** Only used in [[Observable.combineLatestList()]]. Is tested by `CombineLatestListSuite`. */
private[reactive] final class CombineLatestListObservable[A, +R](observables: Seq[Observable[A]])(f: Seq[A] => R)

This comment has been minimized.

Copy link
@oleg-py

oleg-py Oct 25, 2019

Collaborator

I think we can get away without the f here.

combineLatestMapX helps avoid tuple allocations and induced primitive boxing, but we probably aren't buying much over combineLatestList(...).map as you do in your tests.

If you go with dropping it, you won't need to guard against user errors in f, so:

def rawOnNext(as: Seq[A]): Future[Ack] = if (isDone) Stop else out.onNext(as)
@jbracker

This comment has been minimized.

Copy link
Contributor Author

jbracker commented Oct 30, 2019

@oleg-py No problem, I think its important for good code to be thorough and your remarks are reasonable improvements. Both remarks should be addressed by my latest commit.

@Avasil Avasil merged commit 9ad1c04 into monix:master Oct 30, 2019
1 check passed
1 check passed
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@Avasil

This comment has been minimized.

Copy link
Collaborator

Avasil commented Oct 30, 2019

Many thanks @jbracker :)

@jbracker jbracker deleted the jbracker:issue-1002 branch Oct 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.