New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Parallel for Observable for using combineLatest #536

Merged
merged 8 commits into from Jan 17, 2018

Conversation

Projects
None yet
3 participants
@Avasil
Collaborator

Avasil commented Jan 16, 2018

Closes #459

Thanks to @LukaJCB for most of the implementation. :)

@codecov

This comment has been minimized.

codecov bot commented Jan 16, 2018

Codecov Report

Merging #536 into master will decrease coverage by 0.03%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #536      +/-   ##
==========================================
- Coverage   90.46%   90.42%   -0.04%     
==========================================
  Files         363      366       +3     
  Lines        9623     9643      +20     
  Branches     1797     1803       +6     
==========================================
+ Hits         8705     8720      +15     
- Misses        918      923       +5

@Avasil Avasil changed the title from Add a Parallel for Observable for using combineLatest to WIP: Add a Parallel for Observable for using combineLatest Jan 16, 2018

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 16, 2018

Right now CombineObservable fails ApplicativeTests but I'll see if I can make it work and in worst case we can useApply as @LukaJCB suggested

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 16, 2018

What laws are failing?

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 16, 2018

Applicative[CombineObservable].applicative.applicative interchange
fa: List(OnNext(1), OnComplete)
fb: List(OnNext(-1152585554), OnNext(1), OnComplete)

and

Applicative[CombineObservable].applicative.monoidal right identity
fa: List(OnNext(964858958), OnComplete)
fb: List(OnNext(-1), OnNext(964858958), OnComplete)
@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 16, 2018

As for right identity:

    val obs = new CombineObservable(Observable(0, 1))
    val F = implicitly[Applicative[CombineObservable]]
    val x = (F.product(obs, F.pure(())), obs)
    val y = F.imap(x._1) { case (a, _) => a } { a => (a, ()) }

    // List(1)
    val lhs = y.value.toListL.runAsync
    // List(0, 1)
    val rhs = obs.value.toListL.runAsync
    // List((1, ()))
    val product = F.product(obs, F.pure(())).value.toListL.runAsync

I don't think it can pass because it compares entire Observable to result of combineLatest which discards unpaired elements and will take only the latest.

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 16, 2018

On the other hand, I feel like CombinedObservable should only take the most recent element from Observable so it might work this way

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 16, 2018

@alexandru The best thing I came up with so far is:

final class CombineObservable[A] private (val value: Observable[A], val latestValue: Observable[A])

object CombineObservable {
  def apply[A](value: Observable[A]): CombineObservable[A] = {
    new CombineObservable(value, value.combineLatestMap(Observable.now(()))((a, _) => a))
  }

  implicit def combineObservableApplicative: Applicative[CombineObservable] = new Applicative[CombineObservable] {
    def pure[A](x: A): CombineObservable[A] = new CombineObservable(Observable.now(x), Observable.now(x))

    def ap[A, B](ff: CombineObservable[(A) => B])(fa: CombineObservable[A]) = CombineObservable(
      ff.value.combineLatestMap(fa.value)((f, a) => f(a))
    )

    override def map[A, B](fa: CombineObservable[A])(f: A => B): CombineObservable[B] =
      CombineObservable(fa.value.map(f))

    override def product[A, B](fa: CombineObservable[A], fb: CombineObservable[B]): CombineObservable[(A, B)] = {
      CombineObservable(fa.value.combineLatest(fb.value))
    }
  }
}

there might be a better way to take the latest element but that's the general idea. This way all Applicative laws pass but I feel like it is too much of a hack. :P Without storing additional value NonEmptyParallelLaws.parallelRoundTrip check is failing which is:

def parallelRoundTrip[A](ma: M[A]): IsEq[M[A]] =
  P.sequential(P.parallel(ma)) <-> ma
// translates to
CombineObservable(ma).value <-> ma
@alexandru

This comment has been minimized.

Member

alexandru commented Jan 16, 2018

combineLatest is very non-deterministic. Resembles parMap2 on Task, but there only the effects can come in a different order, yet the result will always be the same, whereas for combineLatest this law does not hold for any two streams with a size greater than one:

combineLatestMap(oa, ob) <-> combineLatestMap(oa, ob)

The operation that produces deterministic results in this case is actually zip, which back-pressures one stream until the other emits a value. But then zip isn't as useful.

final class CombineObservable[A] private (val value: Observable[A], val latestValue: Observable[A])

We can't do this, no, as this is no longer a newtype.

Try to see if it passes the laws of ApplyLaws and NonEmptyParallel.
If it doesn't, then there's nothing that we can do.

Avasil added some commits Jan 16, 2018

@Avasil Avasil changed the title from WIP: Add a Parallel for Observable for using combineLatest to Add a Parallel for Observable for using combineLatest Jan 16, 2018

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 16, 2018

@alexandru NonEmptyParallel passes without any issues

@LukaJCB

This comment has been minimized.

LukaJCB commented Jan 16, 2018

👍 Thanks for this!

@alexandru

Thanks @Avasil, I'd like some changes to the newtype encoding, see my comments.

* [[Observable]] and provide [[cats.Apply]] instance
* which uses [[Observable.combineLatest]] to combine elements.
*/
final class CombineObservable[A](val value: Observable[A]) extends AnyVal

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

This is OK-ish, but I'd like to avoid any boxing, plus I think we should be standardizing on a newtype encoding in the Cats ecosystem.

Please use the newtype encoding that I used for IO.Par in this PR: typelevel/cats-effect#115

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

To make it clear:

  • add monix.execution.internal.Newtype1 (private to Monix, I hope will standardize on something in the community)
  • add object CombineObservable like you are doing here, but extend it from Newtype1 and so the newtype will be CombineObservable.Type[+A] (and also, there won't be a corresponding class)
  • on this object add apply and unwrap functions that work with .asInstanceOf

This comment has been minimized.

@Avasil

Avasil Jan 17, 2018

Collaborator

@alexandru
Okay, I implemented something that works but I think it could be nice to organize it into something like what you did in you PR with IO.Par instead of CombineObservable.Type.

Also I have followed your guidelines but is there any reason against implementing Newtype1 ( based on your IONewtype ) as:

private[monix] abstract class Newtype1[F[_]] { self =>
  type Base
  trait Tag extends Any
  type Type[+A] <: Base with Tag

  def apply[A](fa: F[A]): Type[A] =
    fa.asInstanceOf[Type[A]]

  def unwrap[A](fa: Type[A]): F[A] =
    fa.asInstanceOf[F[A]]
}

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Right, if it works, it's fine. Note that's probably an F[+_] in the case of Observable, but then I don't see a reason for why invariance wouldn't work here.

This comment has been minimized.

@Avasil

Avasil Jan 17, 2018

Collaborator

It actually works with just F[_] for this use case

* [[Observable]] and provide [[cats.Apply]] instance
* which uses [[Observable.combineLatest]] to combine elements.
/** Newtype encoding for an [[Observable]] datatype that has a [[cats.Apply]]
* instance which uses [[Observable.combineLatest]] to combine elements

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Do a unidoc command in sbt, make sure these references are resolved - I know that it has problems with resolving imports, so if you're not in the same package you have to do the full path, but check first:

[[monix.reactive.Observable.combineLatest Observable.combineLatest]]

This comment has been minimized.

@Avasil

Avasil Jan 17, 2018

Collaborator

Done 👍

@alexandru

Hey, looks good, but I'd like some minor changes to change some def into val, as a slight performance improvement, see comments.

def sequential = new (CombineObservable.Type ~> Observable) {
def apply[A](fa: CombineObservable.Type[A]): Observable[A] = unwrap(fa)
}
def parallel = new (Observable ~> CombineObservable.Type) {

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Please make parallel and sequential to be val instead of def, because we can reuse the FunctionK instances.

@@ -4470,6 +4470,23 @@ object Observable {
override def empty[A]: Observable[A] =
Observable.empty[A]
}
/** [[cats.NonEmptyParallel]] instance for [[Observable]]. */
implicit def observableNonEmptyParallel: NonEmptyParallel[Observable, CombineObservable.Type] =

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Instances with no dependencies should be val and not def, otherwise it's going to create an unnecessary new instance every time it's required. Make this a val too.

import CombineObservable.{apply => wrap}
def flatMap: FlatMap[Observable] = implicitly[FlatMap[Observable]]
def apply: Apply[CombineObservable.Type] = CombineObservable.combineObservableApplicative

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

These can remain def, since all they do is to return an already initialized instance (hopefully).

object CombineObservable extends Newtype1[Observable] {
implicit def combineObservableApplicative: Apply[CombineObservable.Type] = new Apply[CombineObservable.Type] {

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Make this a val too.

implicit def combineObservableApplicative: Apply[CombineObservable.Type] = new Apply[CombineObservable.Type] {
import CombineObservable.{apply => wrap}
def ap[A, B](ff: CombineObservable.Type[(A) => B])(fa: CombineObservable.Type[A]) =

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Override map2 as well.

And use override on implemented methods, even if they are abstract, because it's easier to protect against API breakage that way.

import CombineObservable.unwrap
import CombineObservable.{apply => wrap}
def flatMap: FlatMap[Observable] = implicitly[FlatMap[Observable]]

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Use override even on abstract methods, because we need to protect against API breakage.

* instance which uses [[Observable.combineLatest]] to combine elements
* needed for implementing [[cats.NonEmptyParallel]]
*/

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

Please delete this newline here. Personal pet peeve, plus Scaladoc might malfunction.

wrap(unwrap(fa).map(f))
override def map2[A, B, C](fa: CombineObservable.Type[A],
fb: CombineObservable.Type[B])(f: (A, B) => C): CombineObservable.Type[C] =

This comment has been minimized.

@Avasil

Avasil Jan 17, 2018

Collaborator

There might be nicer way to format it

This comment has been minimized.

@alexandru

alexandru Jan 17, 2018

Member

My preference is this:

override def map2[A, B, C](fa: CombineObservable.Type[A], fb: CombineObservable.Type[B])
  (f: (A, B) => C): CombineObservable.Type[C] = 
  wrap(unwrap(fa).combineLatestMap(unwrap(fb))(f))

Not much of a difference though.

@alexandru alexandru merged commit 3c0099d into monix:master Jan 17, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@Avasil Avasil deleted the Avasil:Observable#parallel branch Jan 21, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment