New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add intersperse operator #368

Merged
merged 4 commits into from Jun 2, 2017

Conversation

Projects
None yet
2 participants
@omainegra
Contributor

omainegra commented Jun 1, 2017

Proposal for the operator Observable.intersperse. It also includes unit tests for it. This PR resolves #367

@codecov

This comment has been minimized.

codecov bot commented Jun 1, 2017

Codecov Report

Merging #368 into master will decrease coverage by 0.1%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #368      +/-   ##
==========================================
- Coverage   86.47%   86.37%   -0.11%     
==========================================
  Files         313      314       +1     
  Lines        8406     8417      +11     
  Branches     1683     1681       -2     
==========================================
+ Hits         7269     7270       +1     
- Misses       1137     1147      +10
@alexandru

Thanks @omainegra, looks pretty good, but needs a couple of small changes - don't get discouraged by the verbose comments, this is for the purpose of passing on knowledge.

import scala.concurrent.Future
/**
* Created by omainegra on 5/29/17.

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

As a general rule, I abstained from adding names in source files - you can add yourself to the AUTHORS file.

This comment has been minimized.

@omainegra

omainegra Jun 1, 2017

Contributor

Sorry about this, just IntelliJ template, fixing it

* Created by omainegra on 5/29/17.
*/
private[reactive] final class IntersperseObservable[+A](source: Observable[A],
separator: A) extends Observable[A]{ self =>

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

I'm obsessed with indentation and I'd prefer for this to be consistent with the other files. Personally I don't like things dangling so far on the right, although I can understand the preference.

out.onNext(elem)
}
else {
out.onNext(separator).flatMap {

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

Doing a flatMap on a Future can be pretty inefficient since it forces an asynchronous boundary.

This is why Monix introduces a couple of special Future extension methods for avoiding async boundaries if the Future returned by onNext is already complete. Monix is efficient because it tries to collapse these async boundaries if the downstream subscriber returns a plain Continue or Stop that's immediately available.

Don't get me wrong, this code is correct, but it can be more efficient if you use syncFlatMap instead:

out.onNext(separator).syncFlatMap {
  ???
}

As a note, for user code this is actually dangerous because it is stack-unsafe (in loops), but we know what we are doing here and it's fine.

}
def onError(ex: Throwable) = {
if (!isDone.getAndSet(true)){

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

Just as with firstTime, you're only modifying isDone in onComplete and onError. The calls to onNext, onComplete and onError are ordered (synchronized) already so this can be a private[this] var.

The only concurrency problem you can have is that the producer doesn't need to back-pressure for that final Future[Ack] returned by the last onNext, so depending what you do in those flatMaps, you can end up with a concurrency problem, but you don't need anything special here.

}
}
def onComplete() = {
downstreamAck.syncOnContinue {

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

In this case you don't need to back-pressure on downstreamAck, because this is legal (as an optimization of the protocol:

out.onNext(e)
out.onComplete()

But this is NOT legal of course:

out.onNext(e)
out.onNext(e)
out.onComplete()

There are operators that keep that last ack around and use it in onComplete, but that's only because they have to do more stuff.

I suspect that if you have to stream a final end, then you do need to have that ack to back-pressure on, but you don't need that here.

* @param separator is the separator
*/
def intersperse[B >: A](separator: B): Self[B] =
self.transform(self => new IntersperseObservable(self, separator))

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

If we introduce this, we also need the version with the start and the end tokens and I think it can be handled by the same implementation - in that Observable source you can make them optional or something.

import scala.concurrent.duration._
object IntersperseSuite extends BaseOperatorSuite {

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

👍 it's good that you introduced the test as well

separator: A) extends Observable[A]{ self =>
override def unsafeSubscribeFn(out: Subscriber[A]): Cancelable = {
val firstTime = AtomicBoolean(true)

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

You are modifying firstTime in onNext only, so you don't need to synchronize it, because the onNext calls are guaranteed to be ordered (the same guarantee you get with actors processing messages).

Atomics come with overhead and this can be a simple private[this] var in that Subscriber class.

This comment has been minimized.

@omainegra

omainegra Jun 1, 2017

Contributor

I didn't know about this, good to know

@alexandru

Looking good, just one final touch before the merge :-)

downstreamAck.syncFlatMap {
case Continue if end.isDefined => out.onNext(end.get)
case ack => ack
}.syncOnContinue {

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

Looking good dude. One final touch ... there's no need for back-pressure with this syncOnContinue here, because you can do ...

out.onNext(end.get)
out.onComplete()

This optimisation (not waiting on the final onNext for sending onComplete) is good from a networking POV, but it's also why Observable.tailRecM can work well.

I would also prefer if you wouldn't wait on downstreamAck in case end.isDefined == false. It's totally fine if we have multiple if branches or case statements.

This comment has been minimized.

@omainegra

omainegra Jun 1, 2017

Contributor

Ok then :-). I'm still learning about all these things. You have been a great help

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

It's totally fine @omainegra, no rush, I breathe this protocol and know the optimisation opportunities because I came up with it.

And your first try was correct and pretty good.

This comment has been minimized.

@omainegra

omainegra Jun 1, 2017

Contributor

I need assistance here please. If I don't do back-pressure before onComplete, one unit test fails.

[info] monix.reactive.internal.operators.IntersperseSuite
[info] - should emit exactly the requested elements
[info] - should work for synchronous observers
[info] - should work for asynchronous observers *** FAILED ***
[info]   received 24310 != expected 24530 (BaseOperatorSuite.scala:157)
[info]     minitest.api.Asserts$class.assertEquals(Asserts.scala:67)
[info]     monix.reactive.internal.operators.BaseOperatorSuite.assertEquals(BaseOperatorSuite.scala:30)
[info]     monix.reactive.internal.operators.BaseOperatorSuite$$anonfun$3.apply(BaseOperatorSuite.scala:157)
[info]     monix.reactive.internal.operators.BaseOperatorSuite$$anonfun$3.apply(BaseOperatorSuite.scala:125)
[info]     minitest.api.TestSpec$$anonfun$from$1.apply(TestSpec.scala:54)
[info]     minitest.api.TestSpec$$anonfun$from$1.apply(TestSpec.scala:52)
[info]     minitest.api.TestSpec.apply(TestSpec.scala:28)

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

Sure, but please copy/paste your newest onComplete

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

It does look like a contract problem, I don't dismiss the possibility of me being wrong :-)

This comment has been minimized.

@omainegra

omainegra Jun 1, 2017

Contributor

This is the new onComplete. In the else branch is commented the code with back-pressure. As it is, it's not working.

def onComplete() = {
  if (end.isDefined){
    downstreamAck.syncOnComplete {
      case Success(_) =>
        out.onNext(end.get)
        signalOnComplete()
      case _ =>
    }
  } else {
    signalOnComplete()
//          downstreamAck.syncOnComplete {
//            case Success(_) => signalOnComplete()
//            case _ =>
//          }
  }
}

private[this] def signalOnComplete(): Unit = {
  if (!isDone) {
    isDone = true
    out.onComplete()
  }
}

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

OK, I think I know what's going on. In onNext you can end up pushing 2 events instead of one, so if the subscriber is delaying on sep, you still have the elem queued up for sending it, so you can end up triggering onNext(end) and onComplete before that happens. See, I was wrong :-)

Btw, you need to check that you receive a Continue and don't need to do anything in case of Stop or Failure(ex), so doing just one syncOnContinue there is totally fine.

ack.syncOnContinue {
  if (end.nonEmpty) out.onNext(end.get)
  out.onComplete
}

Or smth.

This comment has been minimized.

@omainegra

omainegra Jun 1, 2017

Contributor

Yes, It's as you say. One last question. Is it required the isDone logic`?, because I just removed it and all test passed OK

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

Yeah, you probably don't need it - isDone in onComplete is usually useful when you're doing stuff in onNext that triggers a completion and then you want to ensure that you're not going to be completing that stream a second time.

It might be OK to remove it.

start.map(out.onNext).getOrElse(Continue).syncFlatMap {
case Continue => out.onNext(elem)
case ack => ack
}

This comment has been minimized.

@alexandru

alexandru Jun 1, 2017

Member

👍 looking good

* @param separator is the separator
* @param end the last element emitted
*/
def intersperse[B >: A](start: B, separator: B, end: B): Self[B] =

This comment has been minimized.

@alexandru

@alexandru alexandru merged commit 94a556b into monix:master Jun 2, 2017

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@alexandru alexandru added this to the 3.0.0 milestone Jan 21, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment