Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Streams implementation for Iterant #417

Merged
merged 5 commits into from Aug 15, 2017

Conversation

alexandru
Copy link
Member

@alexandru alexandru commented Aug 15, 2017

Implements an Iterant conversion to org.reactivestreams.Publisher, compliant with the specification at reactive-streams.org.

sealed abstract class Iterant[F[_], A] {
  //...
  def toReactivePublisher(implicit F: Effect[F], ec: Scheduler): Publisher[A]
}

For this to work F[_] is constrained to have an Effect[F] implementation (type class from cats-effect).

Usage sample given in the ScalaDoc:

import monix.eval.Task
import monix.execution.rstreams.SingleAssignmentSubscription
import org.reactivestreams.{Publisher, Subscriber, Subscription}

def sum(source: Publisher[Int], requestSize: Int): Task[Long] =
  Task.create { (_, cb) =>
    val sub = SingleAssignmentSubscription()

    source.subscribe(new Subscriber[Int] {
      private[this] var requested = 0L
      private[this] var sum = 0L

      def onSubscribe(s: Subscription): Unit = {
        sub := s
        requested = requestSize
        s.request(requestSize)
      }

      def onNext(t: Int): Unit = {
        sum += t
        if (requestSize != Long.MaxValue) requested -= 1

        if (requested <= 0) {
          requested = requestSize
          sub.request(request)
        }
      }

      def onError(t: Throwable): Unit =
        cb.onError(t)
      def onComplete(): Unit =
        cb.onSuccess(sum)
    })

    // Cancelable that can be used by Task
    sub
  }

val pub: Publisher[Int] = Iterant[Task].of(1, 2, 3, 4).toReactivePublisher

// Yields 10
sum(pub, requestSize = 128)

Update

Since this PR, I've tuned the implementation and added comments in PR #418, view the latest implementation at IterantToReactivePublisher.

@codecov
Copy link

codecov bot commented Aug 15, 2017

Codecov Report

Merging #417 into master will increase coverage by 0.16%.
The diff coverage is 96.59%.

@@            Coverage Diff             @@
##           master     #417      +/-   ##
==========================================
+ Coverage   88.49%   88.65%   +0.16%     
==========================================
  Files         339      340       +1     
  Lines        9297     9385      +88     
  Branches     1245     1267      +22     
==========================================
+ Hits         8227     8320      +93     
+ Misses       1070     1065       -5

@alexandru alexandru merged commit 67f8ddd into monix:master Aug 15, 2017
@alexandru alexandru added this to the 3.0.0 milestone Jan 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant