Skip to content

Commit

Permalink
Issue #158 - fromIterator override taking an onFinish callback
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandru committed Jun 6, 2016
1 parent 9bc77f2 commit 35a6316
Show file tree
Hide file tree
Showing 11 changed files with 616 additions and 38 deletions.
2 changes: 0 additions & 2 deletions build.sbt
Expand Up @@ -51,8 +51,6 @@ lazy val sharedSettings = warnUnusedImport ++ Seq(
scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, majorVersion)) if majorVersion >= 11 =>
Seq(
// Enables optimisations, but only for 2.11, because 2.10 isn't to be trusted
"-optimise",
// Turns all warnings into errors ;-)
"-Xfatal-warnings",
// Enables linter options
Expand Down
Expand Up @@ -27,7 +27,7 @@ import scala.util.{Failure, Success, Try}
/** Represents an acknowledgement of processing that a consumer
* sends back upstream. Useful to implement back-pressure.
*/
sealed abstract class Ack extends Future[Ack]
sealed abstract class Ack extends Future[Ack] with Serializable

object Ack {
/** Acknowledgement of processing that signals upstream that the
Expand Down
Expand Up @@ -479,9 +479,32 @@ object Observable {
(ref, ref)
}

/** Converts any `Iterator` into an [[Observable]]. */
/** Converts any `Iterator` into an [[Observable]].
*
* @param iterator to transform into an observable
*/
def fromIterator[A](iterator: Iterator[A]): Observable[A] =
new builders.IteratorAsObservable[A](iterator)
new builders.IteratorAsObservable[A](iterator, Cancelable.empty)

/** Converts any `Iterator` into an [[Observable]].
*
* This variant of `fromIterator` takes an `onFinish` callback that
* will be called when the streaming is finished, either with
* `onComplete`, `onError`, when the downstream signals a `Stop` or
* when the subscription gets canceled.
*
* This `onFinish` callback is guaranteed to be called only once.
*
* Useful for controlling resource deallocation (e.g. closing file
* handles).
*
* @param iterator to transform into an observable
* @param onFinish a callback that will be called for resource deallocation
* whenever the iterator is complete, or when the stream is
* canceled
*/
def fromIterator[A](iterator: Iterator[A], onFinish: () => Unit): Observable[A] =
new builders.IteratorAsObservable[A](iterator, Cancelable(onFinish))

/** Converts any `Iterable` into an [[Observable]]. */
def fromIterable[A](iterable: Iterable[A]): Observable[A] =
Expand Down
Expand Up @@ -23,9 +23,12 @@ import monix.reactive.observers.Subscriber

/** Converts any `Iterable` into an observable */
private[reactive] final
class IterableAsObservable[T](iterable: Iterable[T]) extends Observable[T] {
class IterableAsObservable[T](
iterable: Iterable[T])
extends Observable[T] {

def unsafeSubscribeFn(subscriber: Subscriber[T]): Cancelable = {
new IteratorAsObservable(iterable.iterator)
new IteratorAsObservable(iterable.iterator, Cancelable.empty)
.unsafeSubscribeFn(subscriber)
}
}
Expand Up @@ -25,82 +25,152 @@ import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.util.{Success, Failure}
import scala.util.control.NonFatal
import scala.util.{Success, Failure}

/** Converts any `Iterator` into an observable */
private[reactive] final
class IteratorAsObservable[T](iterator: Iterator[T]) extends Observable[T] {
private[reactive] final class IteratorAsObservable[T](
iterator: Iterator[T],
onFinish: Cancelable) extends Observable[T] {

def unsafeSubscribeFn(subscriber: Subscriber[T]): Cancelable = {
import subscriber.{scheduler => s}
var streamError = true

// Protect against contract violations - we are only allowed to
// call onError if no other terminal event has been called.
var streamErrors = true
try {
val isEmpty = iterator.isEmpty
streamError = false

streamErrors = false
// Short-circuiting empty iterators, as there's no reason to
// start the streaming if we have no elements
if (isEmpty) {
subscriber.onComplete()
Cancelable.empty
}
else {
val cancelable = BooleanCancelable()
// Starting the synchronous loop
fastLoop(iterator, subscriber, cancelable, s.executionModel, 0)(s)
cancelable
}
} catch {
case NonFatal(ex) if streamError =>
subscriber.onError(ex)
Cancelable.empty
case NonFatal(ex) =>
// We can only stream onError events if we have a guarantee
// that no other final events happened, otherwise we could
// violate the contract.
if (streamErrors) {
subscriber.onError(ex)
Cancelable.empty
} else {
triggerCancel(s)
s.reportFailure(ex)
Cancelable.empty
}
}
}

/** Calls the onFinish callback ensuring that it doesn't throw errors,
* or if it does, log them using our `Scheduler`.
*/
private def triggerCancel(s: Scheduler): Unit =
try onFinish.cancel() catch {
case NonFatal(ex) =>
s.reportFailure(ex)
}

/** In case of an asynchronous boundary, we reschedule the the
* run-loop on another logical thread. Usage of `onComplete` takes
* care of that.
*/
private def reschedule(ack: Future[Ack], iter: Iterator[T],
out: Subscriber[T], c: BooleanCancelable, s: Scheduler, em: ExecutionModel): Unit = {
out: Subscriber[T], c: BooleanCancelable, em: ExecutionModel)
(implicit s: Scheduler): Unit = {

ack.onComplete {
case Success(next) =>
if (next == Continue)
fastLoop(iter, out, c, em, 0)(s)
// If fastLoop throws, then it's a contract violation and
// the only thing we can do is to log it
try fastLoop(iter, out, c, em, 0) catch {
case NonFatal(ex) =>
triggerCancel(s)
s.reportFailure(ex)
}
else {
// Downstream Stop happened
triggerCancel(s)
}
case Failure(ex) =>
// The subscriber's `onNext` is not allowed to throw errors
// because we don't know what to do with it. At this point the
// behavior is undefined. So if it happens, we log the error
// and trigger the `onFinish` cancelable.
triggerCancel(s)
s.reportFailure(ex)
}(s)
}
}

@tailrec
private def fastLoop(iter: Iterator[T], out: Subscriber[T], c: BooleanCancelable,
/** The `fastLoop` is a tail-recursive function that goes through the
* elements of our iterator, one by one, and tries to push them
* synchronously, for as long as the `ExecutionModel` permits.
*
* After it encounters an asynchronous boundary (i.e. an
* uncompleted `Future` returned by `onNext`), then we
* [[reschedule]] the loop on another logical thread.
*/
@tailrec private
def fastLoop(iter: Iterator[T], out: Subscriber[T], c: BooleanCancelable,
em: ExecutionModel, syncIndex: Int)(implicit s: Scheduler): Unit = {

// the result of onNext calls, on which we must do back-pressure
// The result of onNext calls, on which we must do back-pressure
var ack: Future[Ack] = Continue
// we do not want to catch errors from our interaction with our observer,
// since SafeObserver should take care of than, hence we must only
// catch and stream errors related to the interactions with the iterator
var streamError = true
// true in case our iterator is seen to be empty and we must signal onComplete
// We do not want to catch errors from our interaction with our
// observer, since SafeObserver should take care of than, hence we
// must only catch and stream errors related to the interactions
// with the iterator
var streamErrors = true
// True in case our iterator is seen to be empty and we must
// signal onComplete
var iteratorIsDone = false
// non-null in case we caught an iterator related error and we must signal onError
// non-null in case we caught an iterator related error and we
// must signal onError
var iteratorTriggeredError: Throwable = null

// We need to protect against errors, but we only take care of
// iterator-related exceptions, otherwise we are dealing with a
// contract violation and we won't take care of that
try {
if (iter.hasNext) {
val next = iter.next()
streamError = false
streamErrors = false
ack = out.onNext(next)
} else {
iteratorIsDone = true
}
} catch {
case NonFatal(ex) if streamError =>
case NonFatal(ex) if streamErrors =>
iteratorTriggeredError = ex
}

if (iteratorIsDone)
out.onComplete()
else if (iteratorTriggeredError != null)
out.onError(iteratorTriggeredError)
// Signaling onComplete
if (iteratorIsDone) {
streamErrors = true
try {
onFinish.cancel()
streamErrors = false
out.onComplete()
} catch {
case NonFatal(ex) if streamErrors =>
out.onError(ex)
}
}
else if (iteratorTriggeredError != null) {
triggerCancel(s)
// Signaling error only if the subscription isn't canceled
if (!c.isCanceled) out.onError(iteratorTriggeredError)
}
else {
// Logic for collapsing execution loops
val nextIndex =
if (ack == Continue) em.nextFrameIndex(syncIndex)
else if (ack == Stop) -1
Expand All @@ -109,7 +179,10 @@ class IteratorAsObservable[T](iterator: Iterator[T]) extends Observable[T] {
if (nextIndex > 0)
fastLoop(iter, out, c, em, nextIndex)
else if (nextIndex == 0 && !c.isCanceled)
reschedule(ack, iter, out, c, s, em)
reschedule(ack, iter, out, c, em)
else
// Downstream Stop happened
triggerCancel(s)
}
}
}
Expand Up @@ -22,7 +22,7 @@ import monix.reactive.observables.ObservableLike
import ObservableLike._
import monix.reactive.observers.Subscriber

private[reactive] final class TransformOperator[-A,+B](pipe: Pipe[A,B])
private[reactive] final class PipeThroughOperator[-A,+B](pipe: Pipe[A,B])
extends Operator[A,B] {

def apply(sb: Subscriber[B]): Subscriber[A] = {
Expand Down
Expand Up @@ -1165,7 +1165,7 @@ trait ObservableLike[+A, Self[+T] <: ObservableLike[T, Self]] { self: Self[A] =>
* the source observable with it.
*/
def pipeThrough[I >: A, B](pipe: Pipe[I,B]): Self[B] =
self.liftByOperator(new TransformOperator(pipe))
self.liftByOperator(new PipeThroughOperator(pipe))

/** Applies a binary operator to a start value and all elements of
* this Observable, going left to right and returns a new
Expand Down

0 comments on commit 35a6316

Please sign in to comment.