Skip to content

Commit

Permalink
Revert "Fix 'MatchError: null' in Iterant.fromReactivePublisher (#1181)"
Browse files Browse the repository at this point in the history
This reverts commit ec75ebc.
  • Loading branch information
Avasil committed May 17, 2020
1 parent ec75ebc commit 88a4656
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ private[tail] object IterantFromReactivePublisher {
extends Subscriber[A] {

private[this] val sub = SingleAssignSubscription()
private[this] val state = Atomic.withPadding(Uninitialized: State[F, A], LeftRight128)
private[this] val state = Atomic.withPadding(null: State[F, A], LeftRight128)

def start: F[Iterant[F, A]] =
F.async { cb =>
if (initialize()) {
if (state.compareAndSet(null, Empty(bufferSize))) {
sub.request(
// Requesting unlimited?
if (bufferSize < Int.MaxValue) bufferSize
Expand All @@ -67,9 +67,6 @@ private[tail] object IterantFromReactivePublisher {
take(cb)
}

private def initialize(): Boolean =
state.compareAndSet(Uninitialized, Empty(bufferSize))

private[this] val generate: (Int => F[Iterant[F, A]]) = {
if (eagerBuffer) {
val task = F.async[Iterant[F, A]](take)
Expand Down Expand Up @@ -99,10 +96,6 @@ private[tail] object IterantFromReactivePublisher {

@tailrec def onNext(a: A): Unit =
state.get match {
case Uninitialized =>
initialize()
onNext(a)

case current @ Enqueue(queue, length, toReceive) =>
if (!state.compareAndSet(current, Enqueue(queue.enqueue(a), length + 1, toReceive)))
onNext(a)
Expand All @@ -124,10 +117,6 @@ private[tail] object IterantFromReactivePublisher {

@tailrec private def finish(fa: Iterant[F, A]): Unit =
state.get match {
case Uninitialized =>
initialize()
finish(fa)

case current @ Enqueue(queue, length, _) =>
val update: Iterant[F, A] = length match {
case 0 => fa
Expand Down Expand Up @@ -162,12 +151,8 @@ private[tail] object IterantFromReactivePublisher {
def onComplete(): Unit =
finish(Iterant.empty)

@tailrec private def take(cb: Either[Throwable, Iterant[F, A]] => Unit): Unit =
private def take(cb: Either[Throwable, Iterant[F, A]] => Unit): Unit =
state.get match {
case Uninitialized =>
initialize()
take(cb)

case current @ Enqueue(queue, length, toReceive) =>
if (length == 0) {
val update = Take(cb, toReceive)
Expand Down Expand Up @@ -207,8 +192,6 @@ private[tail] object IterantFromReactivePublisher {

private sealed abstract class State[+F[_], +A]

private case object Uninitialized extends State[Nothing, Nothing]

private final case class Stop[F[_], A](fa: Iterant[F, A]) extends State[F, A]

private final case class Enqueue[F[_], A](queue: Queue[A], length: Int, toReceive: Int) extends State[F, A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,6 @@ object IterantFromReactivePublisherSuite extends BaseTestSuite {
}
}

test("fromReactivePublisher handles immediate completion") { implicit s =>
val publisher = new Publisher[Unit] {
def subscribe(subscriber: Subscriber[_ >: Unit]): Unit = {
subscriber.onComplete()
}
}
val f = Iterant[Task].fromReactivePublisher(publisher).completedL.runToFuture

s.tick()
assertEquals(f.value, Some(Success(())))
}

class RangePublisher(from: Int, until: Int, step: Int, finish: Option[Throwable], onCancel: Promise[Unit])(
implicit sc: Scheduler)
extends Publisher[Int] {
Expand Down

0 comments on commit 88a4656

Please sign in to comment.