New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Iterant.switchIfEmpty operator #504

Merged
merged 6 commits into from Jan 11, 2018

Conversation

Projects
None yet
2 participants
@oleg-py
Collaborator

oleg-py commented Jan 8, 2018

Closes #499

@codecov

This comment has been minimized.

codecov bot commented Jan 8, 2018

Codecov Report

Merging #504 into master will decrease coverage by 0.01%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #504      +/-   ##
==========================================
- Coverage   89.99%   89.97%   -0.02%     
==========================================
  Files         356      357       +1     
  Lines        9281     9295      +14     
  Branches     1779     1787       +8     
==========================================
+ Hits         8352     8363      +11     
- Misses        929      932       +3
@alexandru

Hey @oleg-py, thanks for the PR, a good start, but see my comments.

Suspend(primary.earlyStop.as(backup), F.unit)
case _ =>
Suspend(backup.earlyStop.as(source), F.unit)

This comment has been minimized.

@alexandru

alexandru Jan 9, 2018

Member

We don't need to call backup.earlyStop because Iterant is lazy and we're assuming that backup wasn't evaluated yet.

We are doing the same thing in concat (++) for example, where if the first stream ends in error, we just return the error, ignoring the second stream.

So at this point you just need to return source.

def loop(source: Iterant[F, A]): Iterant[F, A] =
source match {
case Suspend(rest, stop) => Suspend(rest.map(loop), stop)
case NextBatch(batch, rest, stop) if batch.cursor().isEmpty =>

This comment has been minimized.

@alexandru

alexandru Jan 9, 2018

Member

Here we have a problem. As an implementation detail of Iterant, the NextBatch and NextCursor are wrapping the equivalent of an Iterable and Iterator, but once you start operating on iterators, you end up with side-effects.

So batch.cursor().isEmpty is side-effectful and we can't allow that without suspending it.
Take a look for example at .take where we are doing something like this:

    source match {
      case NextBatch(_, _, _) | NextCursor(_, _, _) =>
        // We can have side-effects with NextBatch/NextCursor
        // processing, so suspending execution in this case
        Suspend(F.delay(loop(n)(source)), source.earlyStop)
      case _ =>
        loop(n)(source)
    }
}

We need the same here.

This comment has been minimized.

@alexandru

alexandru Jan 9, 2018

Member

Another thing — once you evaluate batch.cursor(), given that it is side-effectful, it's wasteful to still return source in case the generated cursor is NOT empty.

It's better at this point to transform that node into a NextCursor. So...

case NextBatch(batch, rest, stop) =>
  val cursor = batch.cursor()
  if (cursor.isEmpty) 
     Suspend(rest.map(loop), stop)
  else
    NextCursor(cursor, rest, stop)

This comment has been minimized.

@alexandru

alexandru Jan 9, 2018

Member

And yet another implementation detail ... we assume that batch.cursor() is hostile and can throw (after all we expect side effects from it), the general idea being that you can take any dirty Java Iterable and wrap it into a batch or cursor.

So we also need a try / catch that wraps this pattern matching and that returns Halt(error).
See IterantTake for this one as well.

Suspend(rest.map(loop), stop)
case Halt(None) =>
Suspend(primary.earlyStop.as(backup), F.unit)

This comment has been minimized.

@alexandru

alexandru Jan 9, 2018

Member

We don't need to call primary.earlyStop here, because primary is already at Halt(None) according to your pattern matching and there is no "early stop" remaining to trigger. We know the stream is done, so at this point you can just return backup.

@alexandru alexandru referenced this pull request Jan 9, 2018

Closed

Add Iterant.takeEveryNth #500

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 9, 2018

Thank you @alexandru for your insightful comments. I've adjusted the PR.

My initial assumptions on how stop values were used were not exactly correct. I thought it should've executed always on completion, which was not right, and so my tests were not corrected.

However, once I added suspending, I was not able to construct a test case which would make the try/catch required (i.e. everything seems to work the same way without it (for EitherT[Eval, Throwable, A], IO, Coeval and Task)

oleg-py added some commits Jan 9, 2018

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

@oleg-py that earlyStop is for early termination; when you receive a Halt then it means the producer has already terminated whatever underlying data source it had.

Think about it like this — when you're traversing a plain iterator like the following, the producer that gave you the iterator will probably close any connections when the iterator's hasNext returns false.

iterator.foreach { x => println(x) }

However if you do this ...

iterator.take(10).foreach(println)

Then in this case you're in trouble, because you're just taking 10 elements and ignore the rest. Which means the stream will never complete and the underlying data source will never close.

With Iterant however, this is an "early termination". The end of the stream hasn't been reached, but by using earlyStop you're signaling to the producer that you're no longer interested in the rest of the stream.

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

For generating errors, built a fake Batch with ThrowExceptionBatch or a fake Cursor with ThrowExceptionCursor.

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 10, 2018

@alexandru yes, I've figured that out already 👍 .
I was assuming that

Suspend(F(stream), effect) <-> stream.doOnComplete(_ => effect)

instead of being

 Suspend(F(stream), effect) <-> stream.doOnEarlyStop(effect)

I do have tests with broken batches & cursors. These have demonstrated to me where try ... catch is not enough and I need to use suspend. I'm saying that I was not able to construct a test case that would show that suspend is not enough and try ... catch is required (although it doesn't hurt either).

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

The purpose of a try ... catch is to treat the error using a Halt instead of letting the error be caught by the underlying F[_] (e.g. Task).

I think you can prove it by seeing if earlyStop is called in case of an error.

@alexandru alexandru merged commit f83eae2 into monix:master Jan 11, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment