New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Iterant.fromReactivePublisher #662

Merged
merged 5 commits into from May 14, 2018

Conversation

Projects
None yet
2 participants
@oleg-py
Collaborator

oleg-py commented Apr 22, 2018

Closes #618.

This uses AsyncVar from Monix itself which is very nice for providing a necessary backpressure / waiting for that sort of conversion, as compared to messing with Promises / callbacks directly.

val generate: F[Iterant[F, A]] = F.async[Iterant[F, A]] { cb =>
result.take
.onComplete(try_ => cb(try_.toEither))(immediate)
} <* timer.shift

This comment has been minimized.

@oleg-py

oleg-py Apr 22, 2018

Collaborator

This is the only thing I use Timer for. This way I can be sure that continuation goes on on a user-defined pool (Timer.shift does not require to pass a EC, unlike Async.shift), but it might not be strictly required.

private[this] var subscription: Subscription = _
private[this] var offset = 0
private[this] var buffer: Array[Any] =
if (bufferSize == 1) null else new Array(bufferSize)

This comment has been minimized.

@oleg-py

oleg-py Apr 22, 2018

Collaborator

I'm using array here for memory efficiency, but it might not be the best idea if user passes in a buffer size of Int.MaxValue. Maybe ArrayBuffer is a better solution, at least if bufferSize is greater than some reasonably small number.

This comment has been minimized.

@alexandru

alexandru Apr 30, 2018

Member

Int.MaxValue works in the reactive streams spec, however not with your chosen model.

Basically the only way to have Int.MaxValue work is to implement a pull strategy like that in Observable.bufferIntrospective, where the consumer does not wait for the buffer to fill in order to send the next batch, being able to send whatever is in the current buffer whenever the consumer is ready for it. And yes, it requires an unbounded buffer. Pretty risky.

In our case the buffer is fixed because we're only streaming batches when all requested elements of a batch have been delivered. Am I wrong about that?


This was just an answer to what you said above. It's OK to use an Array 👍

new Eq[Iterant[Task, A]] {
def eqv(lh: Iterant[Task, A], rh: Iterant[Task, A]): Boolean = {
implicit val s = TestScheduler()

This comment has been minimized.

@oleg-py

oleg-py Apr 22, 2018

Collaborator

I had to remove these local TestSchedulers and require a test suite to provide one.

The reason was that, say I wanted to use Iterant.toReactivePublisher, which takes a Scheduler, and I have one in test suite. To actually run it, I need to call a .tick() on a test scheduler. In other methods, equalityFuture will internally do it, but here it calls .tick() on a different scheduler, and the Future I got from toReactivePublisher would not be completed.

In particular, this intuitive test:

Iterant[IO].fromReactivePublisher(it.toReactivePublisher) <-> it

would fail, because LHS would not be completed.

@codecov

This comment has been minimized.

codecov bot commented Apr 23, 2018

Codecov Report

Merging #662 into master will increase coverage by 0.01%.
The diff coverage is 100%.

@@            Coverage Diff            @@
##           master    #662      +/-   ##
=========================================
+ Coverage   90.89%   90.9%   +0.01%     
=========================================
  Files         377     379       +2     
  Lines       10089   10130      +41     
  Branches     1900    1908       +8     
=========================================
+ Hits         9170    9209      +39     
- Misses        919     921       +2
@alexandru

Hey, thanks for working on this 🙂

Left some comments ...

private[this] var subscription: Subscription = _
private[this] var offset = 0
private[this] var buffer: Array[Any] =
if (bufferSize == 1) null else new Array(bufferSize)

This comment has been minimized.

@alexandru

alexandru Apr 30, 2018

Member

Int.MaxValue works in the reactive streams spec, however not with your chosen model.

Basically the only way to have Int.MaxValue work is to implement a pull strategy like that in Observable.bufferIntrospective, where the consumer does not wait for the buffer to fill in order to send the next batch, being able to send whatever is in the current buffer whenever the consumer is ready for it. And yes, it requires an unbounded buffer. Pretty risky.

In our case the buffer is fixed because we're only streaming batches when all requested elements of a batch have been delivered. Am I wrong about that?


This was just an answer to what you said above. It's OK to use an Array 👍

val Single : PullStrategy = Batched(1)
val Default: PullStrategy = Batched(64)
case class Batched(size: Int) extends PullStrategy {

This comment has been minimized.

@alexandru

alexandru Apr 30, 2018

Member

Make this case class a final.

package monix.tail
sealed abstract class PullStrategy extends Product with Serializable

This comment has been minimized.

@alexandru

alexandru Apr 30, 2018

Member

To be honest I like packages to be clean for self discovery. When people look at monix.tail or monix.reactive or monix.eval, I want them to see data types of immediate interest, as a map to follow. But a PullStrategy is only of interest within the context of Iterant.fromReactivePublisher, so in the main package it feels like noise.

So I'm not really sure, but I would move it in another package. Maybe even in monix.execution.rstreams, where we've got some utilities for reactive-streams stuff. Who knows, maybe we'll end up reusing it in monix-reactive.

I'll leave this decision to you btw, I don't care that much about it.

package monix.tail
sealed abstract class PullStrategy extends Product with Serializable

This comment has been minimized.

@alexandru

alexandru Apr 30, 2018

Member

I would also name it ReactivePullStrategy or something like that, because it's very much related to the reactivestreams.org protocol. If we'll add another Introspective strategy or something, it will only make sense via this protocol integration.

val generate: F[Iterant[F, A]] = F.async[Iterant[F, A]] { cb =>
result.take
.onComplete(Callback.fromAttempt(cb))(immediate)
} <* timer.shift

This comment has been minimized.

@alexandru

alexandru Apr 30, 2018

Member

I think this could use an optimization, this being effectively the implementation of MVar#take. Avoids going through Promise:

      F.async { cb =>
        val cb2 = Callback.fromAttempt(cb)
        av.unsafeTake(cb2) match {
          case null => () // do nothing
          case a => async(Right(a))
        }
      }

oleg-py added some commits May 1, 2018

Address feedback on PR
- Rename PullStrategy to ReactivePullStrategy and move it to rxstreams
- Add docs for ReactivePullStrategy
- Make ReactivePullStrategy.Batched class final
- Use optimized `unsafeTake` instead of `take`
Merge branch 'master' into iterant-fromreactive
# Conflicts:
#	monix-tail/shared/src/test/scala/monix/tail/ArbitraryInstances.scala
#	monix-tail/shared/src/test/scala/monix/tail/IterantBracketSuite.scala
@alexandru

Not entirely sure if this is correct, but I like the API, so Ițm going to merge it.

@alexandru alexandru merged commit 3de9830 into monix:master May 14, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment