Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
5580 lines (5225 sloc) 217 KB
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monix.reactive
import java.io.{BufferedReader, InputStream, PrintStream, Reader}
import cats.{Alternative, Applicative, Apply, CoflatMap, Eq, Eval, FlatMap, Functor, FunctorFilter, Monoid, NonEmptyParallel, Order, ~>}
import cats.effect.{Bracket, Effect, ExitCase, IO, Resource}
import monix.eval.{Coeval, Task, TaskLift, TaskLike}
import monix.eval.Task.defaultOptions
import monix.execution.Ack.{Continue, Stop}
import monix.execution.ChannelType.MultiProducer
import monix.execution._
import monix.execution.annotations.{UnsafeBecauseImpure, UnsafeProtocol}
import monix.execution.cancelables.{BooleanCancelable, SingleAssignCancelable}
import monix.execution.exceptions.UpstreamTimeoutException
import monix.reactive.Observable.Operator
import monix.reactive.OverflowStrategy.Synchronous
import monix.reactive.internal.builders
import monix.reactive.internal.builders._
import monix.reactive.internal.deprecated.{ObservableDeprecatedBuilders, ObservableDeprecatedMethods}
import monix.reactive.internal.operators._
import monix.reactive.internal.subscribers.ForeachSubscriber
import monix.reactive.observables._
import monix.reactive.observers._
import monix.reactive.subjects._
import org.reactivestreams.{Publisher => RPublisher, Subscriber => RSubscriber}
import scala.collection.mutable
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
/** The `Observable` type that implements the Reactive Pattern.
*
* Provides methods of subscribing to the Observable and operators
* for combining observable sources, filtering, modifying,
* throttling, buffering, error handling and others.
*
* See the available documentation at: [[https://monix.io]]
*
* @define concatMergeDifference ==Concat vs Merge==
*
* The difference between the [[Observable!.concat concat]]
* operation and [[Observable!.merge merge]] is that `concat`
* cares about the ordering of sequences (e.g. all items
* emitted by the first observable in the sequence will come
* before the elements emitted by the second observable),
* whereas `merge` doesn't care about that (elements get
* emitted as they come). Because of back-pressure applied to
* observables, `concat` is safe to use in all contexts,
* whereas `merge` requires buffering. Or in other words
* `concat` has deterministic, lawful behavior (being the
* "monadic bind"), whereas `merge` has non-deterministic
* behavior.
*
* @define concatDescription Concatenates the sequence of observables
* emitted by the source into one observable, without any
* transformation.
*
* You can combine the items emitted by multiple observables
* so that they act like a single sequence by using this
* operator.
*
* This operation is the "monadic bind", implementing the
* `flatMap` operation of [[cats.Monad]].
*
* $concatMergeDifference
*
* @define delayErrorsDescription ==Delaying Errors==
*
* This version is reserving `onError` notifications until
* all of the observables complete and only then passing the
* issued errors(s) downstream. Note that the streamed error is a
* [[monix.execution.exceptions.CompositeException CompositeException]],
* since multiple errors from multiple streams can happen.
*
* @define concatReturn an observable that emits the merged events of all
* streams created by the source
*
* @define switchMapDescription Returns a new observable that emits the items
* emitted by the observable most recently generated by the
* mapping function.
*
* @define overflowStrategyParam the [[OverflowStrategy overflow strategy]]
* used for buffering, which specifies what to do in case
* we're dealing with a slow consumer - should an unbounded
* buffer be used, should back-pressure be applied, should
* the pipeline drop newer or older events, should it drop
* the whole buffer? See [[OverflowStrategy]] for more
* details.
*
* @define defaultOverflowStrategy this operation needs to do buffering
* and by not specifying an [[OverflowStrategy]], the
* [[OverflowStrategy.Default default strategy]] is being
* used.
*
* @define mergeMapDescription Creates a new observable by applying a
* function that you supply to each item emitted by the
* source observable, where that function returns an
* observable, and then merging those resulting observable
* and emitting the results of this merger.
*
* $concatMergeDifference
*
* @define mergeMapReturn an observable that emits the result of applying the
* transformation function to each item emitted by the source
* observable and merging the results of the observables
* obtained from this transformation.
*
* @define mergeDescription
*
* @define mergeReturn an observable containing the merged events of all
* streams created by the source
*
* @define asyncBoundaryDescription Forces a buffered asynchronous boundary.
*
* Internally it wraps the observer implementation given to
* `onSubscribe` into a
* [[monix.reactive.observers.BufferedSubscriber BufferedSubscriber]].
*
* Normally Monix's implementation guarantees that events are
* not emitted concurrently, and that the publisher MUST NOT
* emit the next event without acknowledgement from the
* consumer that it may proceed, however for badly behaved
* publishers, this wrapper provides the guarantee that the
* downstream [[monix.reactive.Observer Observer]] given in
* `subscribe` will not receive concurrent events.
*
* WARNING: if the buffer created by this operator is
* unbounded, it can blow up the process if the data source
* is pushing events faster than what the observer can
* consume, as it introduces an asynchronous boundary that
* eliminates the back-pressure requirements of the data
* source. Unbounded is the default
* [[monix.reactive.OverflowStrategy overflowStrategy]], see
* [[monix.reactive.OverflowStrategy OverflowStrategy]] for
* options.
*
* @define onOverflowParam a function that is used for signaling a special
* event used to inform the consumers that an overflow event
* happened, function that receives the number of dropped
* events as a parameter (see [[OverflowStrategy.Evicted]])
*
* @define bufferWithSelectorDesc Periodically gather items emitted by
* an observable into bundles and emit these bundles rather than
* emitting the items one at a time, whenever the `selector`
* observable signals an event.
*
* The resulting observable collects the elements of the source
* in a buffer and emits that buffer whenever the given `selector`
* observable emits an `onNext` event, when the buffer is emitted
* as a sequence downstream and then reset. Thus the resulting
* observable emits connected, non-overlapping bundles triggered
* by the given `selector`.
*
* If `selector` terminates with an `onComplete`, then the resulting
* observable also terminates normally. If `selector` terminates with
* an `onError`, then the resulting observable also terminates with an
* error.
*
* If the source observable completes, then the current buffer gets
* signaled downstream. If the source triggers an error then the
* current buffer is being dropped and the error gets propagated
* immediately.
*
* @define unsafeBecauseImpure '''UNSAFE WARNING''':
* this operation can trigger the execution of side effects, which
* breaks referential transparency and is thus not a pure function.
*
* For FP code these functions shouldn't be called until
* "the end of the world", which is to say at the end of
* the program (for a console app), or at the end of a web
* request.
*
* Otherwise for modifying or operating on streams, prefer
* its pure functions like [[publishSelector]] for sharing
* the data source, or [[map]] or [[flatMap]] for operating
* on its events. Or in case of specialized logic, prefer
* to suspend these side effects via
* [[monix.reactive.Observable.suspend Observable.suspend]].
* Monix also provides [[monix.eval.Task Task]] which can
* also be used for suspending side effects and the `Task`
* was built to interop well with `Observable`.
*
* @define unsafeSubscribe '''UNSAFE PROTOCOL:''' This function is
* "unsafe" to call because it does not protect the calls to
* the given [[Observer]] implementation and thus knowledge
* of the protocol is needed.
*
* Prefer normal
* [[monix.reactive.Observable!.subscribe(subscriber* subscribe]]
* when consuming a stream, these unsafe subscription methods
* being useful when building operators and for testing
* purposes.
*
* Normal `subscribe` protects users in these ways:
*
* - it does a best effort attempt to catch and report
* exceptions that violate the protocol
* - the final `onComplete` or `onError` message is
* guaranteed to be signaled after the completion
* of the [[monix.execution.Ack acknowledgement]]
* received from the last `onNext`; the internal
* protocol doesn't require back-pressuring of
* this last message for performance reasons
*
* @define catsOrderInterop ==Cats Order and Scala Interop==
*
* Monix prefers to work with [[cats.Order]] for assessing the order
* of elements that have an ordering defined, instead of
* [[scala.math.Ordering]].
*
* We do this for consistency, as Monix is now building on top of Cats.
* This may change in the future, depending on what happens with
* [[https://github.com/typelevel/cats/issues/2455 typelevel/cats#2455]].
*
* Building a `cats.Order` is easy to do if you already have a
* Scala `Ordering` instance:
* {{{
* import cats.Order
*
* case class Person(name: String, age: Int)
*
* // Starting from a Scala Ordering
* implicit val scalaOrderingForPerson: Ordering[Person] =
* new Ordering[Person] {
* def compare(x: Person, y: Person): Int =
* x.age.compareTo(y.age) match {
* case 0 => x.name.compareTo(y.name)
* case o => o
* }
* }
*
* // Building a cats.Order from it
* implicit val catsOrderForPerson: Order[Person] =
* Order.fromOrdering
* }}}
*
* You can also do that in reverse, so you can prefer `cats.Order`
* (due to Cats also exposing laws and tests for free) and build a
* Scala `Ordering` when needed:
* {{{
* val scalaOrdering = catsOrderForPerson.toOrdering
* }}}
*
* @define catsEqInterop ==Cats Eq and Scala Interop==
*
* Monix prefers to work with [[cats.Eq]] for assessing the equality
* of elements that have an ordering defined, instead of
* [[scala.math.Equiv]].
*
* We do this because Scala's `Equiv` has a default instance defined
* that's based on universal equality and that's a big problem, because
* when using the `Eq` type class, it is universal equality that we
* want to avoid and there have been countless of bugs in the ecosystem
* related to both universal equality and `Equiv`. Thankfully people
* are working to fix it.
*
* We also do this for consistency, as Monix is now building on top of
* Cats. This may change in the future, depending on what happens with
* [[https://github.com/typelevel/cats/issues/2455 typelevel/cats#2455]].
*
* Defining `Eq` instance is easy and we can use universal equality
* in our definitions as well:
* {{{
* import cats.Eq
*
* case class Address(host: String, port: Int)
*
* implicit val eqForAddress: Eq[Address] =
* Eq.fromUniversalEquals
* }}}
*/
abstract class Observable[+A] extends Serializable { self =>
// -----------------------------------------------------------------------
// Impure operations (that break referential transparency) ...
/** Characteristic function for an `Observable` instance, that creates
* the subscription and that eventually starts the streaming of
* events to the given [[Observer]], to be provided by observable
* implementations.
*
* $unsafeSubscribe
*
* $unsafeBecauseImpure
*/
@UnsafeProtocol
@UnsafeBecauseImpure
def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable
/** Given an [[monix.reactive.Observer observer]] and a
* [[monix.execution.Scheduler scheduler]] for managing async
* boundaries, subscribes to this observable for events.
*
* Helper for calling the
* [[Observable.unsafeSubscribeFn(subscriber* abstract method]].
*
* $unsafeSubscribe
*
* $unsafeBecauseImpure
*/
@UnsafeProtocol
@UnsafeBecauseImpure
final def unsafeSubscribeFn(observer: Observer[A])(implicit s: Scheduler): Cancelable =
unsafeSubscribeFn(Subscriber(observer, s))
/** Subscribes to the stream.
*
* $unsafeBecauseImpure
*
* @return a subscription that can be used to cancel the streaming.
* @see [[consumeWith]] for another way of consuming observables
*/
@UnsafeBecauseImpure
final def subscribe(observer: Observer[A])(implicit s: Scheduler): Cancelable =
subscribe(Subscriber(observer, s))
/** Subscribes to the stream.
*
* $unsafeBecauseImpure
*
* @return a subscription that can be used to cancel the streaming.
* @see [[consumeWith]] for another way of consuming observables
*/
@UnsafeBecauseImpure
final def subscribe(subscriber: Subscriber[A]): Cancelable =
unsafeSubscribeFn(SafeSubscriber[A](subscriber))
/** Subscribes to the stream.
*
* $unsafeBecauseImpure
*
* @return a subscription that can be used to cancel the streaming.
* @see [[consumeWith]] for another way of consuming observables
*/
@UnsafeBecauseImpure
final def subscribe(nextFn: A => Future[Ack], errorFn: Throwable => Unit)(implicit s: Scheduler): Cancelable =
subscribe(nextFn, errorFn, () => ())
/** Subscribes to the stream.
*
* $unsafeBecauseImpure
*
* @return a subscription that can be used to cancel the streaming.
* @see [[consumeWith]] for another way of consuming observables
*/
@UnsafeBecauseImpure
final def subscribe()(implicit s: Scheduler): Cancelable =
subscribe(_ => Continue)
/** Subscribes to the stream.
*
* $unsafeBecauseImpure
*
* @return a subscription that can be used to cancel the streaming.
* @see [[consumeWith]] for another way of consuming observables
*/
@UnsafeBecauseImpure
final def subscribe(nextFn: A => Future[Ack])(implicit s: Scheduler): Cancelable =
subscribe(nextFn, error => s.reportFailure(error), () => ())
/** Subscribes to the stream.
*
* $unsafeBecauseImpure
*
* @return a subscription that can be used to cancel the streaming.
* @see [[consumeWith]] for another way of consuming observables
*/
@UnsafeBecauseImpure
final def subscribe(nextFn: A => Future[Ack], errorFn: Throwable => Unit, completedFn: () => Unit)
(implicit s: Scheduler): Cancelable = {
subscribe(new Subscriber[A] {
implicit val scheduler = s
def onNext(elem: A) = nextFn(elem)
def onComplete() = completedFn()
def onError(ex: Throwable) = errorFn(ex)
})
}
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers).
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def multicast[B >: A, R](pipe: Pipe[B, R])(implicit s: Scheduler): ConnectableObservable[R] =
ConnectableObservable.multicast(this, pipe)
/** Returns a new Observable that multi-casts (shares) the original Observable
* between multiple consumers.
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def share(implicit s: Scheduler): Observable[A] =
publish.refCount
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers). The underlying subject used is a
* [[monix.reactive.subjects.PublishSubject PublishSubject]].
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def publish(implicit s: Scheduler): ConnectableObservable[A] =
unsafeMulticast(PublishSubject[A]())
/** Caches the emissions from the source Observable and replays them
* in order to any subsequent Subscribers. This operator has
* similar behavior to [[Observable!.replay(implicit* replay]]
* except that this auto-subscribes to the source Observable rather
* than returning a
* [[monix.reactive.observables.ConnectableObservable ConnectableObservable]]
* for which you must call
* [[monix.reactive.observables.ConnectableObservable.connect connect]]
* to activate the subscription.
*
* When you call cache, it does not yet subscribe to the source
* Observable and so does not yet begin caching items. This only
* happens when the first Subscriber calls the resulting
* Observable's `subscribe` method.
*
* Note: You sacrifice the ability to cancel the origin when you
* use the cache operator so be careful not to use this on
* Observables that emit an infinite or very large number of items
* that will use up memory.
*
* $unsafeBecauseImpure
*
* @return an Observable that, when first subscribed to, caches all of its
* items and notifications for the benefit of subsequent subscribers
*/
@UnsafeBecauseImpure
final def cache: Observable[A] =
CachedObservable.create(self)
/** Caches the emissions from the source Observable and replays them
* in order to any subsequent Subscribers. This operator has
* similar behavior to [[Observable!.replay(implicit* replay]]
* except that this auto-subscribes to the source Observable rather
* than returning a
* [[monix.reactive.observables.ConnectableObservable ConnectableObservable]]
* for which you must call
* [[monix.reactive.observables.ConnectableObservable.connect connect]]
* to activate the subscription.
*
* When you call cache, it does not yet subscribe to the source
* Observable and so does not yet begin caching items. This only
* happens when the first Subscriber calls the resulting
* Observable's `subscribe` method.
*
* $unsafeBecauseImpure
*
* @param maxCapacity is the maximum buffer size after which old events
* start being dropped (according to what happens when using
* [[monix.reactive.subjects.ReplaySubject.createLimited[A](capacity:Int,initial* ReplaySubject.createLimited]])
*
* @return an Observable that, when first subscribed to, caches all of its
* items and notifications for the benefit of subsequent subscribers
*/
@UnsafeBecauseImpure
final def cache(maxCapacity: Int): Observable[A] =
CachedObservable.create(self, maxCapacity)
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers). The underlying subject used is a
* [[monix.reactive.subjects.BehaviorSubject BehaviorSubject]].
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def behavior[B >: A](initialValue: B)(implicit s: Scheduler): ConnectableObservable[B] =
unsafeMulticast(BehaviorSubject[B](initialValue))
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers). The underlying subject used is a
* [[monix.reactive.subjects.ReplaySubject ReplaySubject]].
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def replay(implicit s: Scheduler): ConnectableObservable[A] =
unsafeMulticast(ReplaySubject[A]())
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers). The underlying subject used is a
* [[monix.reactive.subjects.ReplaySubject ReplaySubject]].
*
* $unsafeBecauseImpure
*
* @param bufferSize is the size of the buffer limiting the number
* of items that can be replayed (on overflow the head
* starts being dropped)
*/
@UnsafeBecauseImpure
final def replay(bufferSize: Int)(implicit s: Scheduler): ConnectableObservable[A] =
unsafeMulticast(ReplaySubject.createLimited[A](bufferSize))
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers).
*
* '''UNSAFE PROTOCOL''': This operator is unsafe because `Subject`
* objects are stateful and have to obey the `Observer` contract,
* meaning that they shouldn't be subscribed multiple times, so
* they are error prone. Only use if you know what you're doing,
* otherwise prefer the safe [[Observable!.multicast multicast]]
* operator.
*
* $unsafeBecauseImpure
*/
@UnsafeProtocol
@UnsafeBecauseImpure
final def unsafeMulticast[B >: A, R](processor: Subject[B, R])(implicit s: Scheduler): ConnectableObservable[R] =
ConnectableObservable.unsafeMulticast(this, processor)
/** Converts this observable into a multicast observable, useful for
* turning a cold observable into a hot one (i.e. whose source is
* shared by all observers). The underlying subject used is a
* [[monix.reactive.subjects.AsyncSubject AsyncSubject]].
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def publishLast(implicit s: Scheduler): ConnectableObservable[A] =
unsafeMulticast(AsyncSubject[A]())
/** Creates a new [[monix.execution.CancelableFuture CancelableFuture]]
* that upon execution will signal the first generated element of the
* source observable. Returns an `Option` because the source can be empty.
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def runAsyncGetFirst(implicit s: Scheduler, opts: Task.Options = defaultOptions): CancelableFuture[Option[A]] =
firstOptionL.runToFutureOpt
/** Creates a new [[monix.execution.CancelableFuture CancelableFuture]]
* that upon execution will signal the last generated element of the
* source observable. Returns an `Option` because the source can be empty.
*
* $unsafeBecauseImpure
*/
@UnsafeBecauseImpure
final def runAsyncGetLast(implicit s: Scheduler, opts: Task.Options = defaultOptions): CancelableFuture[Option[A]] =
lastOptionL.runToFutureOpt
/** Subscribes to the source `Observable` and foreach element emitted
* by the source it executes the given callback.
*/
@UnsafeBecauseImpure
final def foreach(cb: A => Unit)(implicit s: Scheduler): CancelableFuture[Unit] = {
val p = Promise[Unit]()
val onFinish = Callback.fromPromise(p)
val c = unsafeSubscribeFn(new ForeachSubscriber[A](cb, onFinish, s))
CancelableFuture(p.future, c)
}
// -----------------------------------------------------------------------
// Pure operations ...
/** Transforms the source using the given operator. */
final def liftByOperator[B](operator: Operator[A, B]): Observable[B] =
new LiftByOperatorObservable(self, operator)
/** On execution, consumes the source observable
* with the given [[Consumer]], effectively transforming the
* source observable into a [[monix.eval.Task Task]].
*/
final def consumeWith[R](f: Consumer[A, R]): Task[R] =
f(self)
/** Alias for [[prepend]]. */
final def +:[B >: A](elem: B): Observable[B] =
prepend(elem)
/** Creates a new Observable that emits the given element and then it
* also emits the events of the source (prepend operation).
*/
final def prepend[B >: A](elem: B): Observable[B] =
Observable.cons(elem, self)
/** Alias for [[append]]. */
final def :+[B >: A](elem: B): Observable[B] =
append(elem)
/** Creates a new Observable that emits the events of the source and
* then it also emits the given element (appended to the stream).
*/
final def append[B >: A](elem: B): Observable[B] =
self appendAll Observable.now(elem)
/** Given the source observable and another `Observable`, emits all of
* the items from the first of these Observables to emit an item
* and cancel the other.
*/
final def ambWith[B >: A](other: Observable[B]): Observable[B] =
Observable.firstStartedOf(self, other)
/** Periodically gather items emitted by an observable into bundles
* and emit these bundles rather than emitting the items one at a
* time. This version of `buffer` is emitting items once the
* internal buffer has reached the given count.
*
* If the source observable completes, then the current buffer gets
* signaled downstream. If the source triggers an error then the
* current buffer is being dropped and the error gets propagated
* immediately.
*
* @param count the maximum size of each buffer before it should
* be emitted
*/
final def bufferTumbling(count: Int): Observable[Seq[A]] =
bufferSliding(count, count)
/** Returns an observable that emits buffers of items it collects from
* the source observable. The resulting observable emits buffers
* every `skip` items, each containing `count` items.
*
* If the source observable completes, then the current buffer gets
* signaled downstream. If the source triggers an error then the
* current buffer is being dropped and the error gets propagated
* immediately.
*
* For `count` and `skip` there are 3 possibilities:
*
* 1. in case `skip == count`, then there are no items dropped and
* no overlap, the call being equivalent to `bufferTumbling(count)`
* 1. in case `skip < count`, then overlap between buffers
* happens, with the number of elements being repeated being
* `count - skip`
* 1. in case `skip > count`, then `skip - count` elements start
* getting dropped between windows
*
* @param count the maximum size of each buffer before it should
* be emitted
* @param skip how many items emitted by the source observable should
* be skipped before starting a new buffer. Note that when
* skip and count are equal, this is the same operation as
* `bufferTumbling(count)`
*/
final def bufferSliding(count: Int, skip: Int): Observable[Seq[A]] =
liftByOperator(new BufferSlidingOperator(count, skip))
/** Periodically gather items emitted by an observable into bundles
* and emit these bundles rather than emitting the items one at a
* time.
*
* This version of `buffer` emits a new bundle of items
* periodically, every timespan amount of time, containing all
* items emitted by the source Observable since the previous bundle
* emission.
*
* If the source observable completes, then the current buffer gets
* signaled downstream. If the source triggers an error then the
* current buffer is being dropped and the error gets propagated
* immediately.
*
* @param timespan the interval of time at which it should emit
* the buffered bundle
*/
final def bufferTimed(timespan: FiniteDuration): Observable[Seq[A]] =
bufferTimedAndCounted(timespan, 0)
/** Periodically gather items emitted by an observable into bundles
* and emit these bundles rather than emitting the items one at a
* time.
*
* The resulting observable emits connected, non-overlapping
* buffers, each of a fixed duration specified by the `timespan`
* argument or a maximum size specified by the `maxCount` argument
* (whichever is reached first).
*
* If the source observable completes, then the current buffer gets
* signaled downstream. If the source triggers an error then the
* current buffer is being dropped and the error gets propagated
* immediately.
*
* @param timespan the interval of time at which it should emit
* the buffered bundle
* @param maxCount is the maximum bundle size, after which the
* buffered bundle gets forcefully emitted
*/
final def bufferTimedAndCounted(timespan: FiniteDuration, maxCount: Int): Observable[Seq[A]] =
new BufferTimedObservable[A](self, timespan, maxCount)
/** Periodically gather items emitted by an observable into bundles
* and emit these bundles rather than emitting the items one at a
* time. Back-pressure the source when the buffer is full.
*
* The resulting observable emits connected, non-overlapping
* buffers, each of a fixed duration specified by the `period`
* argument.
*
* The bundles are emitted at a fixed rate. If the source is
* silent, then the resulting observable will start emitting empty
* sequences.
*
* If the source observable completes, then the current buffer gets
* signaled downstream. If the source triggers an error then the
* current buffer is being dropped and the error gets propagated
* immediately.
*
* A `maxSize` argument is specified as the capacity of the
* bundle. In case the source is too fast and `maxSize` is reached,
* then the source will be back-pressured.
*
* A `sizeOf` argument is specified as the weight each element
* represents in the bundle. Defaults to count each element as
* weighting 1.
*
* The difference with [[bufferTimedAndCounted]] is that
* [[bufferTimedWithPressure]] applies back-pressure from the time
* when the buffer is full until the buffer is emitted, whereas
* [[bufferTimedAndCounted]] will forcefully emit the buffer when
* it's full.
*
* @param period the interval of time at which it should emit
* the buffered bundle
* @param maxSize is the maximum buffer size, after which the
* source starts being back-pressured
* @param sizeOf is the function to compute the weight of each
* element in the buffer
*/
final def bufferTimedWithPressure(period: FiniteDuration, maxSize: Int, sizeOf: A => Int = _ => 1): Observable[Seq[A]] = {
val sampler = Observable.intervalAtFixedRate(period, period)
new BufferWithSelectorObservable(self, sampler, maxSize, sizeOf)
}
/** $bufferWithSelectorDesc
*
* @param selector is the observable that triggers the
* signaling of the current buffer
*/
final def bufferWithSelector[S](selector: Observable[S]): Observable[Seq[A]] =
new BufferWithSelectorObservable[A, S](self, selector, 0, (_: A) => 1)
/** $bufferWithSelectorDesc
*
* A `maxSize` argument is specified as the capacity of the
* bundle. In case the source is too fast and `maxSize` is reached,
* then the source will be back-pressured.
*
* @param selector is the observable that triggers the signaling of the
* current buffer
* @param maxSize is the maximum bundle size, after which the
* source starts being back-pressured
*/
final def bufferWithSelector[S](selector: Observable[S], maxSize: Int): Observable[Seq[A]] =
new BufferWithSelectorObservable(self, selector, maxSize, (_: A) => 1)
/** Buffers signals while busy, after which it emits the
* buffered events as a single bundle.
*
* This operator starts applying back-pressure when the
* underlying buffer's size is exceeded.
*/
final def bufferIntrospective(maxSize: Int): Observable[List[A]] =
new BufferIntrospectiveObservable[A](self, maxSize)
/** Implementation of `bracket` from `cats.effect.Bracket`.
*
* See [[https://typelevel.org/cats-effect/typeclasses/bracket.html documentation]].
*/
final def bracket[B](use: A => Observable[B])(release: A => Task[Unit]): Observable[B] =
bracketCase(use)((a, _) => release(a))
/** Version of [[bracket]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So in `release` you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*/
final def bracketF[F[_], B](use: A => Observable[B])(release: A => F[Unit])
(implicit F: TaskLike[F]): Observable[B] =
bracket(use)(release.andThen(F.toTask))
/** Implementation of `bracketCase` from `cats.effect.Bracket`.
*
* See [[https://typelevel.org/cats-effect/typeclasses/bracket.html documentation]].
*/
final def bracketCase[B](use: A => Observable[B])(release: (A, ExitCase[Throwable]) => Task[Unit]): Observable[B] =
new ConcatMapObservable(uncancelable, use, release, delayErrors = false)
/** Version of [[bracketCase]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So in `release` you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*/
final def bracketCaseF[F[_], B](use: A => Observable[B])(release: (A, ExitCase[Throwable]) => F[Unit])
(implicit F: TaskLike[F]): Observable[B] =
bracketCase(use)((a, e) => F.toTask(release(a, e)))
/** Applies the given partial function to the source
* for each element for which the given partial function is defined.
*
* @param pf the function that filters and maps the source
* @return an observable that emits the transformed items by the
* given partial function
*/
final def collect[B](pf: PartialFunction[A, B]): Observable[B] =
self.liftByOperator(new CollectOperator(pf))
/** Creates a new observable from the source and another given
* observable, by emitting elements combined in pairs.
*
* It emits an item whenever any of the source Observables emits an
* item (so long as each of the source Observables has emitted at
* least one item).
*
* == Visual Example ==
*
* <pre>
* stream1: 1 - - 2 - - 3 - 4 - -
* stream2: 1 - - 2 - 3 - - - - 4
*
* result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
* </pre>
*
* See [[zip]] for an alternative that pairs the items in strict sequence.
*
* @param other is an observable that gets paired with the source
*/
final def combineLatest[B](other: Observable[B]): Observable[(A, B)] =
new CombineLatest2Observable[A, B, (A, B)](self, other)((a, b) => (a, b))
/** Creates a new observable from the source and another given
* observable, by emitting elements combined in pairs.
*
* It emits an item whenever any of the source Observables emits an
* item (so long as each of the source Observables has emitted at
* least one item).
*
* == Visual Example ==
*
* <pre>
* stream1: 1 - - 2 - - 3 - 4 - -
* stream2: 1 - - 2 - 3 - - - - 4
*
* result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
* </pre>
*
* See [[zipMap]] for an alternative that pairs the items
* in strict sequence.
*
* @param other is an observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def combineLatestMap[B, R](other: Observable[B])(f: (A, B) => R): Observable[R] =
new CombineLatest2Observable[A, B, R](self, other)(f)
/** Ignores all items emitted by the source Observable and only calls
* onCompleted or onError.
*
* @return an empty Observable that only calls onCompleted or onError,
* based on which one is called by the source Observable
*/
final def completed: Observable[Nothing] =
self.liftByOperator(CompletedOperator)
/** Doesn't emit anything until a `timeout` period passes without the
* source emitting anything. When that timeout happens, we
* subscribe to the observable generated by the given function, an
* observable that will keep emitting until the source will break
* the silence by emitting another event.
*
* Note: If the source observable keeps emitting items more
* frequently than the length of the time window, then no items
* will be emitted by the resulting Observable.
*
* @param f is a function that receives the last element generated
* by the source, generating an observable to be subscribed
* when the source is timing out
* @param timeout the length of the window of time that must pass after
* the emission of an item from the source Observable in
* which that Observable emits no items in order for the
* item to be emitted by the resulting Observable
*/
final def debounceTo[B](timeout: FiniteDuration, f: A => Observable[B]): Observable[B] =
self.switchMap(a => f(a).delayExecution(timeout))
/** Hold an Observer's subscription request for a specified amount of
* time before passing it on to the source Observable.
*
* @param timespan is the time to wait before the subscription
* is being initiated.
*/
final def delayExecution(timespan: FiniteDuration): Observable[A] =
new DelayExecutionByTimespanObservable(self, timespan)
/** Convert an observable that emits observables into a single
* observable that emits the items emitted by the
* most-recently-emitted of those observables.
*
* Similar with [[concatMap]], however the source isn't
* back-pressured when emitting new events. Instead new events
* being emitted are cancelling the active child observables.
*
* ==Example==
*
* The `switchMap` can express a lot of cool, time-based operations.
* For example we can express [[debounce]] in terms of `switchMap`:
* {{{
* import scala.concurrent.duration._
*
* def debounce[A](stream: Observable[A], d: FiniteDuration): Observable[A] =
* stream.switchMap { x =>
* Observable.now(x).delayExecution(d)
* }
* }}}
*
* @param f is a generator for the streams that are being merged
*/
final def switchMap[B](f: A => Observable[B]): Observable[B] =
new SwitchMapObservable[A, B](self, f)
/** Emits the last item from the source Observable if a particular
* timespan has passed without it emitting another item, and keeps
* emitting that item at regular intervals until the source breaks
* the silence.
*
* So compared to regular [[debounceTo]] this version
* keeps emitting the last item of the source.
*
* Note: If the source Observable keeps emitting items more
* frequently than the length of the time window then no items will
* be emitted by the resulting Observable.
*
* @param period the length of the window of time that must pass after
* the emission of an item from the source Observable in
* which that Observable emits no items in order for the
* item to be emitted by the resulting Observable at regular
* intervals, also determined by period
* @see [[echoRepeated]] for a similar operator that also mirrors
* the source observable
*/
final def debounceRepeated(period: FiniteDuration): Observable[A] =
new DebounceObservable(self, period, repeat = true)
/** Emit items from the source, or emit a default item if
* the source completes after emitting no items.
*/
final def defaultIfEmpty[B >: A](default: => B): Observable[B] =
self.liftByOperator(new DefaultIfEmptyOperator[B](default _))
/** Delays emitting the final `onComplete` event by the specified amount. */
final def delayOnComplete(delay: FiniteDuration): Observable[A] =
new DelayOnCompleteObservable(self, delay)
/** Returns an Observable that emits the items emitted by the source
* Observable shifted forward in time by a specified delay.
*
* Each time the source Observable emits an item, delay starts a
* timer, and when that timer reaches the given duration, the
* Observable returned from delay emits the same item.
*
* NOTE: this delay refers strictly to the time between the
* `onNext` event coming from our source and the time it takes the
* downstream observer to get this event. On the other hand the
* operator is also applying back-pressure, so on slow observers
* the actual time passing between two successive events may be
* higher than the specified `duration`.
*
* @param duration - the delay to shift the source by
* @return the source Observable shifted in time by the specified delay
*/
final def delayOnNext(duration: FiniteDuration): Observable[A] =
new DelayByTimespanObservable[A](self, duration)
/** Returns an Observable that emits the items emitted by the source
* Observable shifted forward in time.
*
* This variant of `delay` sets its delay duration on a per-item
* basis by passing each item from the source Observable into a
* function that returns an Observable and then monitoring those
* Observables. When any such Observable emits an item or
* completes, the Observable returned by delay emits the associated
* item.
*
* @param selector is a function that returns an Observable for
* each item emitted by the source Observable, which is then
* used to delay the emission of that item by the resulting
* Observable until the Observable returned from `selector`
* emits an item
* @return the source Observable shifted in time by
* the specified delay
*/
final def delayOnNextBySelector[B](selector: A => Observable[B]): Observable[A] =
new DelayBySelectorObservable[A, B](self, selector)
/** Hold an Observer's subscription request until the given `trigger`
* observable either emits an item or completes, before passing it
* on to the source Observable.
*
* If the given `trigger` completes in error, then the subscription is
* terminated with `onError`.
*
* @param trigger the observable that must either emit an item or
* complete in order for the source to be subscribed.
*/
final def delayExecutionWith(trigger: Observable[_]): Observable[A] =
new DelayExecutionWithTriggerObservable(self, trigger)
/** Version of [[delayExecutionWith]] that can work with generic `F[_]`
* tasks, anything that's supported via [[ObservableLike]] conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*/
final def delayExecutionWithF[F[_]](trigger: F[_])(implicit F: ObservableLike[F]): Observable[A] =
delayExecutionWith(F.toObservable(trigger))
/** Converts the source Observable that emits `Notification[A]` (the
* result of [[materialize]]) back to an Observable that emits `A`.
*/
final def dematerialize[B](implicit ev: A <:< Notification[B]): Observable[B] =
self.asInstanceOf[Observable[Notification[B]]].liftByOperator(new DematerializeOperator[B])
/** Suppress duplicate consecutive items emitted by the source.
*
* Example:
* {{{
* // Needed to bring standard Eq instances in scope:
* import cats.implicits._
*
* // Yields 1, 2, 1, 3, 2, 4
* val stream = Observable(1, 1, 1, 2, 2, 1, 1, 3, 3, 3, 2, 2, 4, 4, 4)
* .distinctUntilChanged
* }}}
*
* Duplication is detected by using the equality relationship
* provided by the [[cats.Eq]] type class. This allows one to
* override the equality operation being used (e.g. maybe the
* default `.equals` is badly defined, or maybe you want reference
* equality, so depending on use case).
*
* $catsEqInterop
*
* @param A is the [[cats.Eq]] instance that defines equality
* for the elements emitted by the source
*/
final def distinctUntilChanged[AA >: A](implicit A: Eq[AA]): Observable[AA] =
self.liftByOperator(new DistinctUntilChangedOperator()(A))
/** Given a function that returns a key for each element emitted by
* the source, suppress consecutive duplicate items.
*
* Example:
* {{{
* // Needed to bring standard instances in scope:
* import cats.implicits._
*
* // Yields 1, 2, 3, 4
* val stream = Observable(1, 3, 2, 4, 2, 3, 5, 7, 4)
* .distinctUntilChangedByKey(_ % 2)
* }}}
*
* Duplication is detected by using the equality relationship
* provided by the [[cats.Eq]] type class. This allows one to
* override the equality operation being used (e.g. maybe the
* default `.equals` is badly defined, or maybe you want reference
* equality, so depending on use case).
*
* $catsEqInterop
*
* @param key is a function that returns a `K` key for each element,
* a value that's then used to do the deduplication
*
* @param K is the [[cats.Eq]] instance that defines equality for
* the key type `K`
*/
final def distinctUntilChangedByKey[K](key: A => K)(implicit K: Eq[K]): Observable[A] =
self.liftByOperator(new DistinctUntilChangedByKeyOperator(key)(K))
/** Executes the given task when the streaming is stopped
* due to a downstream [[monix.execution.Ack.Stop Stop]] signal
* returned by [[monix.reactive.Observer.onNext onNext]].
*
* The given `task` gets evaluated *before* the upstream
* receives the `Stop` event (is back-pressured).
*
* Example:
* {{{
* import monix.eval.Task
*
* val stream = Observable.range(0, Int.MaxValue)
* .doOnEarlyStop(Task(println("Stopped early!")))
* .take(100)
* }}}
*
* NOTE: in most cases what you want is [[guaranteeCase]]
* or [[bracketCase]]. This operator is available for
* fine-grained control.
*/
final def doOnEarlyStop(task: Task[Unit]): Observable[A] =
self.liftByOperator(new DoOnEarlyStopOperator[A](task))
/** Version of [[doOnEarlyStop]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* Example:
* {{{
* import cats.effect.IO
*
* val stream = Observable.range(0, Int.MaxValue)
* .doOnEarlyStopF(IO(println("Stopped early!")))
* .take(100)
* }}}
*
* NOTE: in most cases what you want is [[guaranteeCase]]
* or [[bracketCase]]. This operator is available for
* fine-grained control.
*/
final def doOnEarlyStopF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnEarlyStop(F.toTask(task))
/**
* Executes the given callback when the connection is being cancelled,
* via the [[monix.execution.Cancelable Cancelable]] reference returned
* on subscribing to the created observable.
*
* Example:
* {{{
* import monix.eval.Task
*
* Observable.range(0, Int.MaxValue)
* .doOnEarlyStop(Task(println("Cancelled!")))
* .take(100)
* }}}
*
* NOTE: in most cases what you want is [[guaranteeCase]]
* or [[bracketCase]]. This operator is available for
* fine-grained control.
*/
final def doOnSubscriptionCancel(task: Task[Unit]): Observable[A] =
new DoOnSubscriptionCancelObservable[A](self, task)
/** Version of [[doOnSubscriptionCancel]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* Example:
* {{{
* import cats.effect.IO
*
* Observable
* .range(0, Int.MaxValue)
* .doOnEarlyStopF(IO(println("Cancelled!")))
* .take(100)
* }}}
*
* NOTE: in most cases what you want is [[guaranteeCase]]
* or [[bracketCase]]. This operator is available for
* fine-grained control.
*/
final def doOnSubscriptionCancelF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnSubscriptionCancel(F.toTask(task))
/** Evaluates the given task when the stream has ended with an
* `onComplete` event, but before the complete event is emitted.
*
* The task gets evaluated and is finished *before* the `onComplete`
* signal gets sent downstream.
*
* {{{
* import monix.eval.Task
*
* Observable.range(0, 10)
* .doOnComplete(Task(println("Completed!")))
* }}}
*
* NOTE: in most cases what you want is [[guaranteeCase]]
* or [[bracketCase]]. This operator is available for
* fine-grained control.
*
* @param task the task to execute when the `onComplete`
* event gets emitted
*/
final def doOnComplete(task: Task[Unit]): Observable[A] =
self.liftByOperator(new DoOnCompleteOperator[A](task))
/** Version of [[doOnComplete]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* {{{
* import cats.effect.IO
*
* Observable.range(0, 10)
* .doOnCompleteF(IO(println("Completed!")))
* }}}
*/
final def doOnCompleteF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnComplete(F.toTask(task))
/** Executes the given task when the stream is interrupted with an
* error, before the `onError` event is emitted downstream.
*
* Example:
* {{{
* import monix.eval.Task
*
* val dummy = new RuntimeException("dummy")
*
* (Observable.range(0, 10) ++ Observable.raiseError(dummy))
* .doOnError { e =>
* Task(println(s"Triggered error: $$e"))
* }
* }}}
*
* NOTE: should protect the code in this callback, because if it
* throws an exception the `onError` event will prefer signaling
* the original exception and otherwise the behavior is undefined.
*
* NOTE: in most cases what you want is [[guaranteeCase]]
* or [[bracketCase]]. This operator is available for
* fine-grained control.
*/
final def doOnError(cb: Throwable => Task[Unit]): Observable[A] =
self.liftByOperator(new DoOnErrorOperator[A](cb))
/** Version of [[doOnError]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* {{{
* import cats.effect.IO
*
* val dummy = new RuntimeException("dummy")
*
* (Observable.range(0, 10) ++ Observable.raiseError(dummy))
* .doOnErrorF { e =>
* IO(println(s"Triggered error: $$e"))
* }
* }}}
*/
final def doOnErrorF[F[_]](cb: Throwable => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnError(e => F.toTask(cb(e)))
/** Evaluates the given callback for each element generated by the
* source Observable, useful for triggering async side-effects.
*
* @return a new Observable that executes the specified
* callback for each element
*
* @see [[doOnNext]] for a simpler version that doesn't allow
* asynchronous execution.
*/
final def doOnNext(cb: A => Task[Unit]): Observable[A] =
self.mapEval(a => cb(a).map(_ => a))
/** Version of [[doOnNext]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* @return a new Observable that executes the specified
* callback for each element
*/
final def doOnNextF[F[_]](cb: A => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
self.doOnNext(a => F.toTask(cb(a)))
/** Executes the given callback on each acknowledgement received from
* the downstream subscriber, executing a generated
* [[monix.eval.Task Task]] and back-pressuring until the task
* is done.
*
* This method helps in executing logic after messages get
* processed, for example when messages are polled from
* some distributed message queue and an acknowledgement
* needs to be sent after each message in order to mark it
* as processed.
*
* @see [[doOnNextAckF]] for a version that can do evaluation with
* any data type via [[monix.eval.TaskLike]]
*/
final def doOnNextAck(cb: (A, Ack) => Task[Unit]): Observable[A] =
self.liftByOperator(new DoOnNextAckOperator[A](cb))
/** Version of [[doOnNextAck]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*/
final def doOnNextAckF[F[_]](cb: (A, Ack) => F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnNextAck((a, ack) => Task.from(cb(a, ack))(F))
/** Executes the given callback only for the first element generated
* by the source Observable, useful for doing a piece of
* computation only when the stream starts.
*
* For example this observable will have a "delayed execution"
* of 1 second, plus a delayed first element of another 1 second,
* therefore it will take a total of 2 seconds for the first
* element to be emitted:
*
* {{{
* import monix.eval._
* import scala.concurrent.duration._
*
* Observable.range(0, 100)
* .delayExecution(1.second)
* .doOnStart { a =>
* for {
* _ <- Task.sleep(1.second)
* _ <- Task(println(s"Started with: $$a"))
* } yield ()
* }
* }}}
*
* @return a new Observable that executes the specified task
* only for the first element
*/
final def doOnStart(cb: A => Task[Unit]): Observable[A] =
self.liftByOperator(new DoOnStartOperator[A](cb))
/** Version of [[doOnStart]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* {{{
* import cats.implicits._
* import cats.effect._
* import scala.concurrent.duration._
* import monix.execution.Scheduler.Implicits.global
* // Needed for IO.sleep
* implicit val timer = global.timerLiftIO[IO]
*
* Observable.range(0, 100)
* .delayExecution(1.second)
* .doOnStartF { a =>
* for {
* _ <- IO.sleep(1.second)
* _ <- IO(println(s"Started with: $$a"))
* } yield ()
* }
* }}}
*/
final def doOnStartF[F[_]](cb: A => F[Unit])(implicit F: Effect[F]): Observable[A] =
doOnStart(a => Task.fromEffect(cb(a))(F))
/** Executes the given callback just _before_ the subscription
* to the source happens.
*
* For example this is equivalent with [[delayExecution]]:
*
* {{{
* import monix.eval.Task
* import scala.concurrent.duration._
*
* Observable.range(0, 10)
* .doOnSubscribe(Task.sleep(1.second))
* }}}
*
* @see [[doAfterSubscribe]] for executing a callback just after
* a subscription happens.
*/
final def doOnSubscribe(task: Task[Unit]): Observable[A] =
new DoOnSubscribeObservable.Before[A](self, task)
/** Version of [[doOnSubscribe]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* For example this is equivalent with [[delayExecution]]:
*
* {{{
* import cats.effect._
* import scala.concurrent.duration._
* import monix.execution.Scheduler.Implicits.global
* // Needed for IO.sleep
* implicit val timer = global.timerLiftIO[IO]
*
* Observable.range(0, 10)
* .doOnSubscribeF(IO.sleep(1.second))
* }}}
*/
final def doOnSubscribeF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnSubscribe(F.toTask(task))
/** Executes the given callback just _after_ the subscription
* happens.
*
* The executed `Task` executes after the subscription happens
* and it will delay the first event being emitted. For example
* this would delay the emitting of the first event by 1 second:
*
* {{{
* import monix.eval.Task
* import scala.concurrent.duration._
*
* Observable.range(0, 100)
* .doAfterSubscribe(Task.sleep(1.second))
* }}}
*
* @see [[doOnSubscribe]] for executing a callback just before
* a subscription happens.
*/
final def doAfterSubscribe(task: Task[Unit]): Observable[A] =
new DoOnSubscribeObservable.After[A](self, task)
/** Version of [[doAfterSubscribe]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* {{{
* import cats.effect._
* import scala.concurrent.duration._
* import monix.execution.Scheduler.Implicits.global
* // Needed for IO.sleep
* implicit val timer = global.timerLiftIO[IO]
*
* Observable.range(0, 100)
* .doAfterSubscribeF(IO.sleep(1.second))
* }}}
*/
final def doAfterSubscribeF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doAfterSubscribe(F.toTask(task))
/** Creates a new observable that drops the events of the source, only
* for the specified `timestamp` window.
*
* @param timespan the window of time during which the new observable
* must drop events emitted by the source
*/
final def dropByTimespan(timespan: FiniteDuration): Observable[A] =
new DropByTimespanObservable(self, timespan)
/** Drops the last `n` elements (from the end).
*
* @param n the number of elements to drop
* @return a new Observable that drops the first ''n'' elements
* emitted by the source
*/
final def dropLast(n: Int): Observable[A] =
self.liftByOperator(new DropLastOperator[A](n))
/** Discard items emitted by the source until a second
* observable emits an item or completes.
*
* If the `trigger` observable completes in error, then the
* resulting observable will also end in error when it notices
* it (next time an element is emitted by the source).
*
* @param trigger the observable that has to emit an item before the
* source begin to be mirrored by the resulting observable
*/
final def dropUntil(trigger: Observable[Any]): Observable[A] =
new DropUntilObservable(self, trigger)
/** Drops the longest prefix of elements that satisfy the given
* predicate and returns a new observable that emits the rest.
*/
final def dropWhile(p: A => Boolean): Observable[A] =
self.liftByOperator(new DropByPredicateOperator(p))
/** Drops the longest prefix of elements that satisfy the given
* function and returns a new observable that emits the rest. In
* comparison with [[dropWhile]], this version accepts a function
* that takes an additional parameter: the zero-based index of the
* element.
*/
final def dropWhileWithIndex(p: (A, Int) => Boolean): Observable[A] =
self.liftByOperator(new DropByPredicateWithIndexOperator(p))
/** Utility that can be used for debugging purposes.
*/
final def dump(prefix: String, out: PrintStream = System.out): Observable[A] =
new DumpObservable[A](self, prefix, out)
/** Mirror the source observable as long as the source keeps emitting
* items, otherwise if `timeout` passes without the source emitting
* anything new then the observable will emit the last item.
*
* Note: If the source Observable keeps emitting items more
* frequently than the length of the time window then the resulting
* observable will mirror the source exactly.
*
* @param timeout the window of silence that must pass in order for the
* observable to echo the last item
*/
final def echoOnce(timeout: FiniteDuration): Observable[A] =
new EchoObservable(self, timeout, onlyOnce = true)
/** Mirror the source observable as long as the source keeps emitting
* items, otherwise if `timeout` passes without the source emitting
* anything new then the observable will start emitting the last
* item repeatedly.
*
* Note: If the source Observable keeps emitting items more
* frequently than the length of the time window then the resulting
* observable will mirror the source exactly.
*
* @param timeout the window of silence that must pass in order for the
* observable to start echoing the last item
*/
final def echoRepeated(timeout: FiniteDuration): Observable[A] =
new EchoObservable(self, timeout, onlyOnce = false)
/** Creates a new Observable that emits the events of the source and
* then it also emits the given elements (appended to the stream).
*/
final def endWith[B >: A](elems: Seq[B]): Observable[B] =
self appendAll Observable.fromIterable(elems)
/** Concatenates the source with another observable.
*
* Ordering of subscription is preserved, so the second observable
* starts only after the source observable is completed
* successfully with an `onComplete`. On the other hand, the second
* observable is never subscribed if the source completes with an
* error.
*
* == Visual Example ==
*
* <pre>
* streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
* streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
*
* result: a1, a2, a3, a4, b1, b2, b3, b4
* </pre>
*
*/
final def ++[B >: A](other: => Observable[B]): Observable[B] =
appendAll(Observable.defer(other))
/**
* A strict variant of [[++]].
*/
final def appendAll[B >: A](other: Observable[B]): Observable[B] =
new ConcatObservable[B](self, other)
/** Emits the given exception instead of `onComplete`.
*
* @param error the exception to emit onComplete
* @return a new Observable that emits an exception onComplete
*/
final def endWithError(error: Throwable): Observable[A] =
self.liftByOperator(new EndWithErrorOperator[A](error))
/** Returns an observable that emits a single Throwable, in case an
* error was thrown by the source, otherwise it isn't going to emit
* anything.
*/
final def failed: Observable[Throwable] =
self.liftByOperator(FailedOperator)
/** Alias for [[headOrElse]]. */
final def firstOrElse[B >: A](default: => B): Observable[B] =
headOrElse(default)
/** Emits the first element emitted by the source, or otherwise if the
* source is completed without emitting anything, then the
* `default` is emitted.
*/
final def headOrElse[B >: A](default: => B): Observable[B] =
head.foldLeft(Option.empty[B])((_, elem) => Some(elem)).map {
case Some(elem) => elem
case None => default
}
/** Returns a new observable that applies the given function
* to each item emitted by the source and emits the result.
*/
final def map[B](f: A => B): Observable[B] =
self.liftByOperator(new MapOperator(f))
/** Alias for [[concatMap]].
*
* NOTE: one primary difference between Monix and other Rx /
* ReactiveX implementations is that in Monix `flatMap` is an alias
* for `concatMap` and NOT `mergeMap`.
*/
final def flatMap[B](f: A => Observable[B]): Observable[B] =
self.concatMap(f)
/** Applies a function that you supply to each item emitted by the
* source observable, where that function returns observables, and
* then concatenating those resulting sequences and emitting the
* results of this concatenation.
*
* This implements the lawful "monadic bind", the `flatMap`
* operation of [[cats.Monad]].
*
* ==Example==
* {{{
* Observable(1, 2, 3).concatMap { x =>
* for {
* _ <- Observable.eval(println(s"Processing $$x"))
* x <- Observable(x, x)
* } yield x
* }
* }}}
*
* $concatMergeDifference
*
* @param f is a generator for the streams being concatenated
* @return $concatReturn
*/
final def concatMap[B](f: A => Observable[B]): Observable[B] =
new ConcatMapObservable[A, B](self, f, null, delayErrors = false)
/** Alias of [[concatMapDelayErrors]]. */
final def flatMapDelayErrors[B](f: A => Observable[B]): Observable[B] =
concatMapDelayErrors(f)
/** Alias of [[switchMap]]. */
final def flatMapLatest[B](f: A => Observable[B]): Observable[B] =
self.switchMap(f)
/** Applies a binary operator to a start value and to elements
* produced by the source observable, going from left to right,
* producing and concatenating observables along the way.
*
* It's the combination between [[scan]] and [[flatMap]].
*
* @see [[flatScan0]] for the version that emits seed element at the beginning
*/
final def flatScan[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
new FlatScanObservable[A, R](self, seed _, op, delayErrors = false)
/** Applies a binary operator to a start value and to elements
* produced by the source observable, going from left to right,
* producing and concatenating observables along the way.
*
* It's the combination between [[scan0]] and [[flatMap]].
*/
final def flatScan0[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
Observable.eval(seed).flatMap(s => s +: flatScan(s)(op))
/** Version of [[flatScan]] that delays the errors from the emitted
* streams until the source completes.
*
* $delayErrorsDescription
*
* @see [[flatScan]]
*/
final def flatScanDelayErrors[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
new FlatScanObservable[A, R](self, seed _, op, delayErrors = true)
/** Version of [[flatScan0]] that delays the errors from the emitted
* streams until the source completes.
*
* $delayErrorsDescription
*
* @see [[flatScan0]]
*/
final def flatScan0DelayErrors[R](seed: => R)(op: (R, A) => Observable[R]): Observable[R] =
Observable.eval(seed).flatMap(s => s +: flatScanDelayErrors(s)(op))
/** $concatDescription
*
* Alias for [[Observable!.concat concat]].
*
* @return $concatReturn
*/
final def flatten[B](implicit ev: A <:< Observable[B]): Observable[B] =
concat
/** $concatDescription
*
* ==Equivalence with concatMap==
*
* The `concat` operation is basically `concatMap` with the
* identity function, as you can count on this equivalence:
*
* `stream.concat <-> stream.concatMap(x => x)`
*
* == Visual Example ==
*
* <pre>
* streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
* streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
*
* result: a1, a2, a3, a4, b1, b2, b3, b4
* </pre>
* @return $concatReturn
*/
final def concat[B](implicit ev: A <:< Observable[B]): Observable[B] =
concatMap[B](x => x)
/** Alias for [[concatDelayErrors]]. */
final def flattenDelayErrors[B](implicit ev: A <:< Observable[B]): Observable[B] =
concatDelayErrors
/** Version of [[Observable!.concat concat]] that delays errors emitted by child
* observables until the stream completes.
*
* $delayErrorsDescription
*
* ==Example==
*
* {{{
* val dummy1 = new RuntimeException("dummy1")
* val dummy2 = new RuntimeException("dummy2")
*
* val stream = Observable(
* Observable(1).endWithError(dummy1),
* Observable.raiseError(dummy2),
* Observable(2, 3)
* )
*
* val concatenated =
* stream.concatDelayErrors
* }}}
*
* The resulting stream in this example emits `1, 2, 3` in order
* and then completes with a `CompositeException` of both `dummy1`
* and `dummy2`.
*
* @return $concatReturn
*/
final def concatDelayErrors[B](implicit ev: A <:< Observable[B]): Observable[B] =
concatMapDelayErrors(x => x)
/** Applies a function that you supply to each item emitted by the
* source observable, where that function returns sequences
* and then concatenating those resulting sequences and emitting the
* results of this concatenation.
*
* $delayErrorsDescription
*
* ==Example==
*
* {{{
* val dummy1 = new RuntimeException("dummy1")
* val dummy2 = new RuntimeException("dummy2")
*
* Observable(1, 2, 3).concatMapDelayErrors {
* case 1 => Observable(1).endWithError(dummy1)
* case 2 => Observable.raiseError(dummy2)
* case x => Observable(x, x)
* }
* }}}
*
* The resulting stream in this example emits `1, 3, 3` in order
* and then completes with a `CompositeException` of both `dummy1`
* and `dummy2`.
*
* @param f is a generator for the streams being concatenated
* @return $concatReturn
*/
final def concatMapDelayErrors[B](f: A => Observable[B]): Observable[B] =
new ConcatMapObservable[A, B](self, f, null, delayErrors = true)
/** Alias for [[switch]]. */
final def flattenLatest[B](implicit ev: A <:< Observable[B]): Observable[B] =
self.switch
/** Convert an observable that emits observables into a single
* observable that emits the items emitted by the
* most-recently-emitted of those observables.
*
* Similar with [[flatten]], however the source isn't
* back-pressured when emitting new events. Instead new events
* being emitted are cancelling the active child observables.
*
* ==Equivalence with switchMap==
*
* The `switch` operation can be expressed in terms of [[switchMap]],
* as we have this equivalence:
*
* `stream.switch <-> stream.switchMap(x => x)`
*
* @see the description of [[switchMap]] for an example.
*/
final def switch[B](implicit ev: A <:< Observable[B]): Observable[B] =
self.switchMap(x => x)
/** Returns an Observable that emits a single boolean, either true, in
* case the given predicate holds for all the items emitted by the
* source, or false in case at least one item is not verifying the
* given predicate.
*
* @param p is a function that evaluates the items emitted by the source
* Observable, returning `true` if they pass the filter
* @return an Observable that emits only true or false in case the given
* predicate holds or not for all the items
*/
final def forall(p: A => Boolean): Observable[Boolean] =
exists(e => !p(e)).map(r => !r)
/** Returns an Observable which emits a single value, either true, in
* case the given predicate holds for at least one item, or false
* otherwise.
*
* @param p is a function that evaluates the items emitted by the
* source Observable, returning `true` if they pass the
* filter
* @return an Observable that emits only true or false in case
* the given predicate holds or not for at least one item
*/
final def exists(p: A => Boolean): Observable[Boolean] =
find(p).foldLeft(false)((_, _) => true)
/** Groups the items emitted by an Observable according to a specified
* criterion, and emits these grouped items as GroupedObservables,
* one GroupedObservable per group.
*
* Note: A [[monix.reactive.observables.GroupedObservable GroupedObservable]]
* will cache the items it is to emit until such time as it is
* subscribed to. For this reason, in order to avoid memory leaks,
* you should not simply ignore those GroupedObservables that do
* not concern you. Instead, you can signal to them that they may
* discard their buffers by doing something like `source.take(0)`.
*
* @param keySelector a function that extracts the key for each item
*/
final def groupBy[K](keySelector: A => K)
(implicit os: Synchronous[Nothing] = OverflowStrategy.Unbounded): Observable[GroupedObservable[K, A]] =
self.liftByOperator(new GroupByOperator[A, K](os, keySelector))
/** Given a routine make sure to execute it whenever the current
* stream reaches the end, successfully, in error, or canceled.
*
* Implements `cats.effect.Bracket.guarantee`.
*
* Example: {{{
* import monix.eval.Task
*
* Observable.suspend(???).guarantee(Task.eval {
* println("Releasing resources!")
* })
* }}}
*
* @param f is the function to execute on early stop
*/
final def guarantee(f: Task[Unit]): Observable[A] =
guaranteeCase(_ => f)
/** Version of [[guarantee]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*/
final def guaranteeF[F[_]](f: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
guarantee(F.toTask(f))
/** Returns a new `Observable` in which `f` is scheduled to be executed
* when the source is completed, in success, error or when cancelled.
*
* Implements `cats.effect.Bracket.guaranteeCase`.
*
* This would typically be used to ensure that a finalizer
* will run at the end of the stream.
*
* Example: {{{
* import cats.effect.ExitCase
* import monix.eval.Task
*
* val stream = Observable.suspend(???).guaranteeCase(err => Task {
* err match {
* case ExitCase.Completed =>
* println("Completed successfully!")
* case ExitCase.Error(e) =>
* e.printStackTrace()
* case ExitCase.Canceled =>
* println("Was stopped early!")
* }
* })
* }}}
*
* NOTE this is using `cats.effect.ExitCase` to signal the termination
* condition, like this:
*
* - if completed via `onComplete` or via `Stop` signalled by the
* consumer, then the function receives `ExitCase.Completed`
* - if completed via `onError` or in certain cases in which errors
* are detected (e.g. the consumer returns an error), then the function
* receives `ExitCase.Error(e)`
* - if the subscription was cancelled, then the function receives
* `ExitCase.Canceled`
*
* In other words `Completed` is for normal termination conditions,
* `Error` is for exceptions being detected and `Canceled` is for
* when the subscription gets canceled.
*
* @param f is the finalizer to execute when streaming is terminated, by
* successful completion, error or cancellation; for specifying the
* side effects to use
*/
final def guaranteeCase(f: ExitCase[Throwable] => Task[Unit]): Observable[A] =
new GuaranteeCaseObservable[A](this, f)
/** Version of [[guaranteeCase]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*/
final def guaranteeCaseF[F[_]](f: ExitCase[Throwable] => F[Unit])
(implicit F: TaskLike[F]): Observable[A] =
guaranteeCase(e => F.toTask(f(e)))
/** Alias for [[completed]]. Ignores all items emitted by
* the source and only calls onCompleted or onError.
*
* @return an empty sequence that only calls onCompleted or onError,
* based on which one is called by the source Observable
*/
final def ignoreElements: Observable[Nothing] =
self.liftByOperator(CompletedOperator)
/** Creates a new observable from this observable and another given
* observable by interleaving their items into a strictly
* alternating sequence.
*
* So the first item emitted by the new observable will be the item
* emitted by `self`, the second item will be emitted by the other
* observable, and so forth; when either `self` or `other` calls
* `onCompletes`, the items will then be directly coming from the
* observable that has not completed; when `onError` is called by
* either `self` or `other`, the new observable will call `onError`
* and halt.
*
* See [[Observable!.merge merge]] for a more relaxed alternative that doesn't emit
* items in strict alternating sequence.
*
* @param other is an observable that interleaves with the source
* @return a new observable sequence that alternates emission of
* the items from both child streams
*/
final def interleave[B >: A](other: Observable[B]): Observable[B] =
new Interleave2Observable(self, other)
/** Only emits the last element emitted by the source observable,
* after which it's completed immediately.
*/
final def last: Observable[A] = takeLast(1)
/** Creates a new observable that only emits the last `n` elements
* emitted by the source.
*
* In case the source triggers an error, then the underlying
* buffer gets dropped and the error gets emitted immediately.
*/
final def takeLast(n: Int): Observable[A] =
if (n <= 0) Observable.empty else self.liftByOperator(new TakeLastOperator(n))
/** Maps elements from the source using a function that can do
* asynchronous processing by means of [[monix.eval.Task Task]].
*
* Example:
* {{{
* import monix.eval.Task
* import scala.concurrent.duration._
*
* Observable.range(0, 100)
* .mapEval(x => Task(x).delayExecution(1.second))
* }}}
*
* @see [[mapEvalF]] for a version that works with a generic
* `F[_]` (e.g. `cats.effect.IO`, Scala's `Future`),
* powered by [[monix.eval.TaskLike]]
*/
final def mapEval[B](f: A => Task[B]): Observable[B] =
new MapTaskObservable[A, B](self, f)
/** Version of [[mapEval]] that can work with generic
* `F[_]` tasks, anything that's supported via [[monix.eval.TaskLike]]
* conversions.
*
* So you can work among others with:
*
* - `cats.effect.IO`
* - `monix.eval.Coeval`
* - `scala.concurrent.Future`
* - ...
*
* Example:
* {{{
* import cats.implicits._
* import cats.effect.IO
* import scala.concurrent.duration._
* import monix.execution.Scheduler.Implicits.global
* // Needed for IO.sleep
* implicit val timer = global.timerLiftIO[IO]
*
* Observable.range(0, 100).mapEvalF { x =>
* IO.sleep(1.second) *> IO(x)
* }
* }}}
*
* @see [[mapEval]] for a version specialized for
* [[monix.eval.Task Task]]
*/
final def mapEvalF[F[_], B](f: A => F[B])(implicit F: TaskLike[F]): Observable[B] =
mapEval(a => Task.from(f(a))(F))
/** Given a mapping function that maps events to [[monix.eval.Task tasks]],
* applies it in parallel on the source, but with a specified
* `parallelism`, which indicates the maximum number of tasks that
* can be executed in parallel returning them preserving original order.
*
* Similar in spirit with
* [[monix.reactive.Consumer.loadBalance[A,R](parallelism* Consumer.loadBalance]],
* but expressed as an operator that executes [[monix.eval.Task Task]]
* instances in parallel.
*
* Note that when the specified `parallelism` is 1, it has the same
* behavior as [[mapEval]].
*
* @param parallelism is the maximum number of tasks that can be executed
* in parallel, over which the source starts being
* back-pressured
*
* @param f is the mapping function that produces tasks to execute
* in parallel, which will eventually produce events for the
* resulting observable stream
*
* @see [[mapParallelUnordered]] for a variant that does not preserve order
* which may lead to faster execution times
* @see [[mapEval]] for serial execution
*/
final def mapParallelOrdered[B](parallelism: Int)(f: A => Task[B])
(implicit os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B] =
new MapParallelOrderedObservable[A, B](self, parallelism, f, os)
/** Given a mapping function that maps events to [[monix.eval.Task tasks]],
* applies it in parallel on the source, but with a specified
* `parallelism`, which indicates the maximum number of tasks that
* can be executed in parallel.
*
* Similar in spirit with
* [[monix.reactive.Consumer.loadBalance[A,R](parallelism* Consumer.loadBalance]],
* but expressed as an operator that executes [[monix.eval.Task Task]]
* instances in parallel.
*
* Note that when the specified `parallelism` is 1, it has the same
* behavior as [[mapEval]].
*
* @param parallelism is the maximum number of tasks that can be executed
* in parallel, over which the source starts being
* back-pressured
*
* @param f is the mapping function that produces tasks to execute
* in parallel, which will eventually produce events for the
* resulting observable stream
*
* @see [[mapParallelOrdered]] for a variant that does preserve order
* @see [[mapEval]] for serial execution
*/
final def mapParallelUnordered[B](parallelism: Int)(f: A => Task[B])
(implicit os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B] =
new MapParallelUnorderedObservable[A, B](self, parallelism, f, os)
/** Converts the source Observable that emits `A` into an Observable
* that emits `Notification[A]`.
*/
final def materialize: Observable[Notification[A]] =
self.liftByOperator(new MaterializeOperator[A])
/** Concurrently merges the observables emitted by the source, into
* a single observable.
*
* ==Equivalence with mergeMap==
*
* The `merge` operation is [[mergeMap]] with the identity
* function:
*
* `stream.merge <-> stream.mergeMap(x => x)`
*
* $concatMergeDifference
*
* == Visual Example ==
*
* <pre>
* streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
* streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
*
* result: a1, b1, a2, b2, b3, a3, a4, b4
* </pre>
*
* @note $defaultOverflowStrategy
* @return $mergeReturn
*/
final def merge[B](implicit ev: A <:< Observable[B],
os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B] =
self.mergeMap(x => x)(os)
/** Concurrently merges the observables emitted by the source with
* the given generator function into a single observable.
*
* $concatMergeDifference
*
* ==Example==
* {{{
* Observable(1, 2, 3).mergeMap { x =>
* Observable.eval(println(s"Processing $$x"))
* .executeAsync
* .flatMap(_ => Observable(x, x))
* }
* }}}
*
* In this example the source will yield 3 streams and those 3
* streams are being subscribed immediately, therefore the order of
* the events will be non-deterministic, as the streams will be
* evaluated concurrently.
*
* == Visual Example ==
*
* <pre>
* streamA: a1 -- -- a2 -- -- a3 -- a4 -- --
* streamB: b1 -- -- b2 -- b3 -- -- -- -- b4
*
* result: a1, b1, a2, b2, b3, a3, a4, b4
* </pre>
* @param f is a generator for the streams that will get merged
* @return $mergeMapReturn
*/
final def mergeMap[B](f: A => Observable[B])
(implicit os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B] =
new MergeMapObservable[A, B](self, f, os, delayErrors = false)
/** $mergeDescription
*
* $delayErrorsDescription
*
* @note $defaultOverflowStrategy
* @return $mergeReturn
*/
final def mergeDelayErrors[B](implicit ev: A <:< Observable[B],
os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B] =
self.mergeMap(x => x)(os)
/** $mergeMapDescription
*
* $delayErrorsDescription
*
* @param f is a generator for the streams that will get merged
* @return $mergeMapReturn
*/
final def mergeMapDelayErrors[B](f: A => Observable[B])
(implicit os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B] =
new MergeMapObservable[A, B](self, f, os, delayErrors = true)
/** Overrides the default [[monix.execution.Scheduler Scheduler]],
* possibly forcing an asynchronous boundary on subscription
* (if `forceAsync` is set to `true`, the default).
*
* When an `Observable` is subscribed with
* [[Observable.subscribe(subscriber* subscribe]],
* it needs a `Scheduler`, which is going to be injected in the
* processing pipeline, to be used for managing asynchronous
* boundaries, scheduling execution with delay, etc.
*
* Normally the [[monix.execution.Scheduler Scheduler]] gets injected
* implicitly when doing `subscribe`, but this operator overrides
* the injected subscriber for the given source. And if the source is
* normally using that injected scheduler (given by `subscribe`),
* then the effect will be that all processing will now happen
* on the override.
*
* To put it in other words, in Monix it's usually the consumer and
* not the producer that specifies the scheduler and this operator
* allows for a different behavior.
*
* This operator also subsumes the effects of [[subscribeOn]],
* meaning that the subscription logic itself will start on
* the provided scheduler if `forceAsync = true` (the default).
*
* @see [[observeOn(s:monix\.execution\.Scheduler)* observeOn]]
* and [[subscribeOn]].
*
* @param s is the [[monix.execution.Scheduler Scheduler]] to use
* for overriding the default scheduler and for forcing
* an asynchronous boundary if `forceAsync` is `true`
*
* @param forceAsync indicates whether an asynchronous boundary
* should be forced right before the subscription of the
* source `Observable`, managed by the provided `Scheduler`
*
* @return a new `Observable` that mirrors the source on subscription,
* but that uses the provided scheduler for overriding
* the default and possibly force an extra asynchronous
* boundary on execution
*/
final def executeOn(s: Scheduler, forceAsync: Boolean = true): Observable[A] =
new ExecuteOnObservable[A](self, s, forceAsync)
/** Mirrors the source observable, but upon subscription ensure
* that the evaluation forks into a separate (logical) thread.
*
* The execution is managed by the injected
* [[monix.execution.Scheduler scheduler]] in `subscribe()`.
*/
final def executeAsync: Observable[A] =
new ExecuteAsyncObservable(self)
/** Returns a new observable that will execute the source with a different
* [[monix.execution.ExecutionModel ExecutionModel]].
*
* This allows fine-tuning the options injected by the scheduler
* locally. Example:
*
* {{{
* import monix.execution.ExecutionModel.AlwaysAsyncExecution
*
* val stream = Observable(1, 2, 3)
* .executeWithModel(AlwaysAsyncExecution)
* }}}
*
* @param em is the
* [[monix.execution.ExecutionModel ExecutionModel]]
* that will be used when evaluating the source.
*/
final def executeWithModel(em: ExecutionModel): Observable[A] =
new ExecuteWithModelObservable[A](self, em)
/** Operator that specifies a different
* [[monix.execution.Scheduler Scheduler]], on which subscribers
* will observe events, instead of the default one.
*
* An `Observable` with an applied `observeOn` call will forward
* events into a buffer that uses the specified `Scheduler`
* reference to cycle through events and to make `onNext` calls to
* downstream listeners.
*
* Example:
* {{{
* import monix.execution.Scheduler
* import monix.execution.Scheduler.Implicits.global
* val io = Scheduler.io("my-io")
*
* Observable(1, 2, 3).map(_ + 1)
* .observeOn(io)
* .foreach(x => println(x))
* }}}
*
* In the above example the first `map` (whatever comes before the
* `observeOn` call) gets executed using the default `Scheduler`
* (might execute on the current thread even), however the
* `foreach` that's specified after `observeOn` will get executed
* on the indicated `Scheduler`.
*
* NOTE: this operator does not guarantee that downstream listeners
* will actually use the specified `Scheduler` to process events,
* because this depends on the rest of the pipeline. E.g. this will
* not work OK:
*
* {{{
* import monix.reactive.OverflowStrategy.Unbounded
*
* Observable.suspend(???)
* .observeOn(io).asyncBoundary(Unbounded)
* }}}
*
* This sample might not do what a user of `observeOn` would
* want. Indeed the implementation will use the provided `io`
* reference for calling `onNext` / `onComplete` / `onError`
* events, however because of the following asynchronous boundary
* created the actual listeners will probably end up being execute
* on a different `Scheduler`.
*
* The underlying implementation uses
* [[monix.reactive.observers.BufferedSubscriber a buffer]]
* to forward events. The
* [[monix.reactive.OverflowStrategy OverflowStrategy]]
* being applied is the
* [[monix.reactive.OverflowStrategy.Default default one]].
*
* @see [[observeOn[B>:A](s:monix\.execution\.Scheduler,os:monix\.reactive\.OverflowStrategy[B]* observeOn(Scheduler, OverflowStrategy)]]
* for the version that allows customizing the
* [[monix.reactive.OverflowStrategy OverflowStrategy]]
* being used by the underlying buffer.
*
* @param s is the alternative `Scheduler` reference to use
* for observing events
*/
final def observeOn(s: Scheduler): Observable[A] =
observeOn(s, OverflowStrategy.Default)
/** Operator that specifies a different
* [[monix.execution.Scheduler Scheduler]], on which subscribers
* will observe events, instead of the default one.
*
* This overloaded version of `observeOn` takes an extra
* [[monix.reactive.OverflowStrategy OverflowStrategy]]
* parameter specifying the behavior of the underlying buffer.
*
* @see [[observeOn(s:monix\.execution\.Scheduler)* observeOn(Scheduler)]] for
* the version that does not take an `OverflowStrategy` parameter.
*
* @param s is the alternative `Scheduler` reference to use
* for observing events
* @param os is the [[monix.reactive.OverflowStrategy OverflowStrategy]]
* to apply to the underlying buffer
*/
final def observeOn[B >: A](s: Scheduler, os: OverflowStrategy[B]): Observable[B] =
new ObserveOnObservable[B](self, s, os)
/** If the connection is [[monix.execution.Cancelable.cancel cancelled]]
* then trigger a `CancellationException`.
*
* A connection can be cancelled with the help of the
* [[monix.execution.Cancelable Cancelable]]
* returned on [[Observable.subscribe(subscriber* subscribe]].
*
* Because the cancellation is effectively concurrent with the
* signals the [[monix.reactive.Observer Observer]] receives and because
* we need to uphold the contract, this operator will effectively
* synchronize access to [[monix.reactive.Observer.onNext onNext]],
* [[monix.reactive.Observer.onComplete onComplete]] and
* [[monix.reactive.Observer.onError onError]]. It will also watch
* out for asynchronous [[monix.execution.Ack.Stop Stop]] events.
*
* In other words, this operator does heavy synchronization, can
* prove to be inefficient and you should avoid using it because
* the signaled error can interfere with functionality from other
* operators that use cancellation internally and cancellation in
* general is a side-effecting operation that should be avoided,
* unless it's necessary.
*/
final def onCancelTriggerError: Observable[A] =
new OnCancelTriggerErrorObservable[A](self)
/** Returns an Observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which case
* the streaming of events continues with the specified backup
* sequence.
*
* The created Observable mirrors the behavior of the source in
* case the source does not end with an error.
*
* NOTE that compared with `onErrorResumeNext` from Rx.NET, the
* streaming is not resumed in case the source is terminated
* normally with an `onComplete`.
*
* @param that is a backup sequence that's being subscribed
* in case the source terminates with an error.
*/
final def onErrorFallbackTo[B >: A](that: Observable[B]): Observable[B] =
self.onErrorHandleWith(_ => that)
/** Returns an observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which
* case the streaming of events fallbacks to an observable
* emitting a single element generated by the backup function.
*
* See [[onErrorRecover]] for the version that takes a
* partial function as a parameter.
*
* @param f - a function that matches errors with a
* backup element that is emitted when the source
* throws an error.
*/
final def onErrorHandle[B >: A](f: Throwable => B): Observable[B] =
onErrorHandleWith { elem => Observable.now(f(elem)) }
/** Returns an observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which
* case the streaming of events fallbacks to an observable
* emitting a single element generated by the backup function.
*
* The created Observable mirrors the behavior of the source
* in case the source does not end with an error or if the
* thrown `Throwable` is not matched.
*
* See [[onErrorHandle]] for the version that takes a
* total function as a parameter.
*
* @param pf is a function that matches errors with a
* backup element that is emitted when the source
* throws an error.
*/
final def onErrorRecover[B >: A](pf: PartialFunction[Throwable, B]): Observable[B] =
onErrorHandleWith(ex => (pf andThen Observable.now).applyOrElse(ex, Observable.raiseError))
/** Returns an Observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which case
* the streaming of events continues with the specified backup
* sequence generated by the given function.
*
* The created Observable mirrors the behavior of the source in
* case the source does not end with an error or if the thrown
* `Throwable` is not matched.
*
* See [[onErrorHandleWith]] for the version that takes a
* total function as a parameter.
*
* @param pf is a function that matches errors with a
* backup throwable that is subscribed when the source
* throws an error.
*/
final def onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Observable[B]]): Observable[B] =
onErrorHandleWith(ex => pf.applyOrElse(ex, Observable.raiseError))
/** Returns an Observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which case
* the streaming of events continues with the specified backup
* sequence generated by the given function.
*
* See [[onErrorRecoverWith]] for the version that takes a
* partial function as a parameter.
*
* @param f is a function that matches errors with a
* backup throwable that is subscribed when the source
* throws an error.
*/
final def onErrorHandleWith[B >: A](f: Throwable => Observable[B]): Observable[B] =
new OnErrorRecoverWithObservable(self, f)
/** Returns an Observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which case
* it tries subscribing to the source again in the hope that it
* will complete without an error.
*
* The number of retries is limited by the specified `maxRetries`
* parameter, so for an Observable that always ends in error the
* total number of subscriptions that will eventually happen is
* `maxRetries + 1`.
*/
final def onErrorRestart(maxRetries: Long): Observable[A] = {
require(maxRetries >= 0, "maxRetries should be positive")
new OnErrorRetryCountedObservable(self, maxRetries)
}
/** Returns an Observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which case
* it tries subscribing to the source again in the hope that it
* will complete without an error.
*
* The given predicate establishes if the subscription should be
* retried or not.
*/
final def onErrorRestartIf(p: Throwable => Boolean): Observable[A] =
new OnErrorRetryIfObservable[A](self, p)
/** Returns an Observable that mirrors the behavior of the source,
* unless the source is terminated with an `onError`, in which case
* it tries subscribing to the source again in the hope that it
* will complete without an error.
*
* NOTE: The number of retries is unlimited, so something like
* `Observable.error(new RuntimeException).onErrorRestartUnlimited`
* will loop forever.
*/
final def onErrorRestartUnlimited: Observable[A] =
new OnErrorRetryCountedObservable(self, -1)
/** Given a [[monix.reactive.Pipe Pipe]], transform
* the source observable with it.
*/
final def pipeThrough[I >: A, B](pipe: Pipe[I, B]): Observable[B] =
new PipeThroughObservable(self, pipe)
/** Returns an observable that emits the results of invoking a
* specified selector on items emitted by a
* [[monix.reactive.observables.ConnectableObservable ConnectableObservable]],
* which shares a single subscription to the underlying sequence.
*
* @param f is a selector function that can use the multicasted source sequence
* as many times as needed, without causing multiple subscriptions
* to the source sequence. Observers to the given source will
* receive all notifications of the source from the time of the
* subscription forward.
*/
final def publishSelector[R](f: Observable[A] => Observable[R]): Observable[R] =
pipeThroughSelector(Pipe.publish[A], f)
/** Returns an observable that emits the results of invoking a
* specified selector on items emitted by a
* [[monix.reactive.observables.ConnectableObservable ConnectableObservable]],
* which shares a single subscription to the underlying sequence.
*
* @param pipe is the [[Pipe]] used to transform the source into a multicast
* (hot) observable that can be shared in the selector function
*
* @param f is a selector function that can use the multicasted source sequence
* as many times as needed, without causing multiple subscriptions
* to the source sequence. Observers to the given source will
* receive all notifications of the source from the time of the
* subscription forward.
*/
final def pipeThroughSelector[S >: A, B, R](pipe: Pipe[S, B], f: Observable[B] => Observable[R]): Observable[R] =
new PipeThroughSelectorObservable[S, B, R](self, pipe, f)
/** Applies a binary operator to a start value and all elements of
* this Observable, going left to right and returns a new
* Observable that emits only one item before `onComplete`.
*/
final def reduce[B >: A](op: (B, B) => B): Observable[B] =
self.liftByOperator(new ReduceOperator[B](op))
/** Repeats the items emitted by the source continuously. It
* caches the generated items until `onComplete` and repeats them
* forever.
*
* It terminates either on error or if the source is empty.
*/
final def repeat: Observable[A] =
new RepeatSourceObservable[A](self)
/** Keeps restarting / resubscribing the source until the predicate
* returns `true` for the the first emitted element, after which
* it starts mirroring the source.
*/
final def restartUntil(p: A => Boolean): Observable[A] =
new RestartUntilObservable[A](self, p)
/** Emit the most recent items emitted by an observable within
* periodic time intervals. If no new value has been emitted since
* the last time it was sampled, it signals the last emitted value
* anyway.
*
* @see [[sample]] for a variant that doesn't repeat the last value on silence
* @see [[sampleRepeatedBy]] for fine control
* @param period the timespan at which sampling occurs
*/
final def sampleRepeated(period: FiniteDuration): Observable[A] =
self.sampleRepeatedBy(Observable.intervalAtFixedRate(period, period))
/** Returns an observable that, when the specified sampler observable
* emits an item or completes, emits the most recently emitted item
* (if any) emitted by the source Observable since the previous
* emission from the sampler observable. If no new value has been
* emitted since the last time it was sampled, it signals the last
* emitted value anyway.
*
* @see [[sampleBy]] for a variant that doesn't repeat the last value on silence
* @see [[sampleRepeated]] for a periodic sampling
* @param sampler - the Observable to use for sampling the source Observable
*/
final def sampleRepeatedBy[B](sampler: Observable[B]): Observable[A] =
new ThrottleLastObservable[A, B](self, sampler, shouldRepeatOnSilence = true)
/** Applies a binary operator to a start value and all elements of
* this Observable, going left to right and returns a new
* Observable that emits on each step the result of the applied
* function.
*
* Similar to [[foldLeft]], but emits the state on each
* step. Useful for modeling finite state machines.
*
* @see [[scan0]] for the version that emits seed element at the beginning
*/
final def scan[S](seed: => S)(op: (S, A) => S): Observable[S] =
new ScanObservable[A, S](self, seed _, op)
/** Applies a binary operator to a start value and all elements of
* this Observable, going left to right and returns a new
* Observable that emits on each step the result of the applied
* function.
*
* This is a version of [[scan]] that emits seed element at the beginning,
* similar to `scanLeft` on Scala collections
*/
final def scan0[S](seed: => S)(op: (S, A) => S): Observable[S] =
Observable.eval(seed).flatMap(s => s +: scan(s)(op))
/** Applies a binary operator to a start value and all elements of
* this stream, going left to right and returns a new stream that
* emits on each step the result of the applied function.
*
* Similar with [[scan]], but this can suspend and evaluate
* side effects with an `F[_]` data type that implements the
* `cats.effect.Effect` type class, thus allowing for lazy or
* asynchronous data processing.
*
* Similar to [[foldLeft]] and [[foldWhileLeft]], but emits the
* state on each step. Useful for modeling finite state machines.
*
* Example showing how state can be evolved and acted upon:
*
* {{{
* // Using cats.effect.IO for evaluating our side effects
* import cats.effect.IO
*
* sealed trait State[+A] { def count: Int }
* case object Init extends State[Nothing] { def count = 0 }
* case class Current[A](current: Option[A], count: Int)
* extends State[A]
*
* case class Person(id: Int, name: String)
*
* // TODO: to implement!
* def requestPersonDetails(id: Int): IO[Option[Person]] =
* IO.raiseError(new NotImplementedError)
*
* // TODO: to implement
* val source: Observable[Int] =
* Observable.raiseError(new NotImplementedError)
*
* // Initial state
* val seed = IO.pure(Init : State[Person])
*
* val scanned = source.scanEvalF(seed) { (state, id) =>
* requestPersonDetails(id).map { person =>
* state match {
* case Init =>
* Current(person, 1)
* case Current(_, count) =>
* Current(person, count + 1)
* }
* }
* }
*
* val filtered = scanned
* .takeWhile(_.count < 10)
* .collect { case Current(a, _) => a }
* }}}
*
* @see [[scanEval0F]] for the version that emits seed element at the beginning
*
* @see [[scan]] for the synchronous, non-lazy version, or
* [[scanEval]] for the [[monix.eval.Task Task]]-specialized
* version.
*
* @param seed is the initial state
* @param op is the function that evolves the current state
*
* @param F is the `cats.effect.Effect` type class implementation
* for type `F`, which controls the evaluation. `F` can be
* a data type such as [[monix.eval.Task]] or `cats.effect.IO`,
* which implement `Effect`.
*
* @return a new observable that emits all intermediate states being
* resulted from applying the given function
*/
final def scanEvalF[F[_], S](seed: F[S])(op: (S, A) => F[S])
(implicit F: TaskLike[F]): Observable[S] =
scanEval(Task.from(seed)(F))((s, a) => Task.from(op(s, a))(F))
/** Applies a binary operator to a start value and all elements of
* this stream, going left to right and returns a new stream that
* emits on each step the result of the applied function.
*
* This is a version of [[scanEvalF]] that emits seed element at the beginning,
* similar to `scanLeft` on Scala collections
*/
final def scanEval0F[F[_], S](seed: F[S])(op: (S, A) => F[S])
(implicit F: TaskLike[F], A: Applicative[F]): Observable[S] =
Observable.fromTaskLike(seed).flatMap(s => s +: scanEvalF(A.pure(s))(op))
/** Applies a binary operator to a start value and all elements of
* this stream, going left to right and returns a new stream that
* emits on each step the result of the applied function.
*
* Similar with [[scan]], but this can suspend and evaluate
* side effects with [[monix.eval.Task Task]], thus allowing for
* asynchronous data processing.
*
* Similar to [[foldLeft]] and [[foldWhileLeft]], but emits the
* state on each step. Useful for modeling finite state machines.
*
* Example showing how state can be evolved and acted upon:
*
* {{{
* import monix.eval.Task
*
* sealed trait State[+A] { def count: Int }
* case object Init extends State[Nothing] { def count = 0 }
* case class Current[A](current: Option[A], count: Int)
* extends State[A]
*
* case class Person(id: Int, name: String)
*
* // TODO: to implement!
* def requestPersonDetails(id: Int): Task[Option[Person]] =
* Task.raiseError(new NotImplementedError)
*
* // TODO: to implement
* val source: Observable[Int] =
* Observable.raiseError(new NotImplementedError)
*
* // Initial state
* val seed = Task.pure(Init : State[Person])
*
* val scanned = source.scanEval(seed) { (state, id) =>
* requestPersonDetails(id).map { person =>
* state match {
* case Init =>
* Current(person, 1)
* case Current(_, count) =>
* Current(person, count + 1)
* }
* }
* }
*
* val filtered = scanned
* .takeWhile(_.count < 10)
* .collect { case Current(a, _) => a }
* }}}
*
* @see [[scanEval0]] for the version that emits seed element at the beginning
* @see [[scan]] for the version that does not require using `Task`
* in the provided operator
*
* @param seed is the initial state
* @param op is the function that evolves the current state
*
* @return a new observable that emits all intermediate states being
* resulted from applying the given function
*/
final def scanEval[S](seed: Task[S])(op: (S, A) => Task[S]): Observable[S] =
new ScanTaskObservable(self, seed, op)
/** Applies a binary operator to a start value and all elements of
* this stream, going left to right and returns a new stream that
* emits on each step the result of the applied function.
*
* This is a version of [[scanEval]] that emits seed element at the beginning.
*/
final def scanEval0[S](seed: Task[S])(op: (S, A) => Task[S]): Observable[S] =
Observable.fromTask(seed).flatMap(s => s +: scanEval(Task.pure(s))(op))
/** Given a mapping function that returns a `B` type for which we have
* a [[cats.Monoid]] instance, returns a new stream that folds the incoming
* elements of the sources using the provided `Monoid[B].combine`, with the
* initial seed being the `Monoid[B].empty` value, emitting the generated values
* at each step.
*
* Equivalent with [[scan]] applied with the given [[cats.Monoid]], so given
* our `f` mapping function returns a `B`, this law holds:
*
* <pre>
* val B = implicitly[Monoid[B]]
*
* stream.scanMap(f) <-> stream.scan(B.empty)(B.combine)
* </pre>
*
* Example:
* {{{
* import cats.implicits._
*
* // Yields 2, 6, 12, 20, 30, 42
* val stream = Observable(1, 2, 3, 4, 5, 6).scanMap(x => x * 2)
* }}}
*
* @param f is the mapping function applied to every incoming element of this `Observable`
* before folding using `Monoid[B].combine`
*
* @return a new `Observable` that emits all intermediate states being
* resulted from applying `Monoid[B].combine` function
*/
final def scanMap[B](f: A => B)(implicit B: Monoid[B]): Observable[B] =
self.scan(B.empty)((acc, a) => B.combine(acc, f(a)))
/** Given a mapping function that returns a `B` type for which we have
* a [[cats.Monoid]] instance, returns a new stream that folds the incoming
* elements of the sources using the provided `Monoid[B].combine`, with the
* initial seed being the `Monoid[B].empty` value, emitting the generated values
* at each step.
*
* This is a version of [[scanMap]] that emits seed element at the beginning.
*/
final def scanMap0[B](f: A => B)(implicit B: Monoid[B]): Observable[B] =
B.empty +: scanMap(f)
/** Creates a new Observable that emits the given elements and then
* it also emits the events of the source (prepend operation).
*/
final def startWith[B >: A](elems: Seq[B]): Observable[B] =
Observable.fromIterable(elems) appendAll self
/** Returns a new Observable that uses the specified `Scheduler` for
* initiating the subscription.
*
* This is different from [[executeOn]] because the given `scheduler`
* is only used to start the subscription, but does not override the
* default [[monix.execution.Scheduler Scheduler]].
*/
final def subscribeOn(scheduler: Scheduler): Observable[A] =
new SubscribeOnObservable[A](self, scheduler)
/** In case the source is empty, switch to the given backup. */
final def switchIfEmpty[B >: A](backup: Observable[B]): Observable[B] =
new SwitchIfEmptyObservable[B](self, backup)
/** Drops the first element of the source observable,
* emitting the rest.
*/
final def tail: Observable[A] = drop(1)
/** Drops the first `n` elements (from the start).
*
* @param n the number of elements to drop
* @return a new Observable that drops the first ''n'' elements
* emitted by the source
*/
final def drop(n: Int): Observable[A] =
self.liftByOperator(new DropFirstOperator(n))
/** Creates a new Observable that emits the events of the source, only
* for the specified `timestamp`, after which it completes.
*
* @param timespan the window of time during which the new Observable
* is allowed to emit the events of the source
*/
final def takeByTimespan(timespan: FiniteDuration): Observable[A] =
new TakeLeftByTimespanObservable(self, timespan)
/** Creates a new Observable that emits every n-th event from the source,
* dropping intermediary events.
*/
final def takeEveryNth(n: Int): Observable[A] =
self.liftByOperator(new TakeEveryNthOperator(n))
/** Creates a new observable that mirrors the source until
* the given `trigger` emits either an element or `onComplete`,
* after which it is completed.
*
* The resulting observable is completed as soon as `trigger`
* emits either an `onNext` or `onComplete`. If `trigger`
* emits an `onError`, then the resulting observable is also
* completed with error.
*
* @param trigger is an observable that will cancel the
* streaming as soon as it emits an event
*/
final def takeUntil(trigger: Observable[Any]): Observable[A] =
new TakeUntilObservable[A](self, trigger)
/** Takes longest prefix of elements that satisfy the given predicate
* and returns a new Observable that emits those elements.
*/
final def takeWhile(p: A => Boolean): Observable[A] =
self.liftByOperator(new TakeByPredicateOperator(p))
/** Takes longest prefix of elements while given [[monix.execution.cancelables.BooleanCancelable BooleanCancelable]]
* is not canceled and returns a new Observable that emits those elements.
*/
final def takeWhileNotCanceled(c: BooleanCancelable): Observable[A] =
self.liftByOperator(new TakeWhileNotCanceledOperator(c))
/** Returns an Observable that emits only the first item emitted by
* the source Observable during sequential time windows of a
* specified duration.
*
* This differs from [[Observable!.throttleLast]] in that this only
* tracks passage of time whereas `throttleLast` ticks at scheduled
* intervals.
*
* @param interval time to wait before emitting another item after
* emitting the last item
*/
final def throttleFirst(interval: FiniteDuration): Observable[A] =
self.liftByOperator(new ThrottleFirstOperator[A](interval))
/** Emit the most recent items emitted by the source within
* periodic time intervals.
*
* Alias for [[sample]].
*
* @param period duration of windows within which the last item
* emitted by the source Observable will be emitted
*/
final def throttleLast(period: FiniteDuration): Observable[A] =
sample(period)
/** Emit the most recent items emitted by the source within
* periodic time intervals.
*
* Use the `sample` operator to periodically look at an observable
* to see what item it has most recently emitted since the previous
* sampling. Note that if the source observable has emitted no
* items since the last time it was sampled, the observable that
* results from the `sample` operator will emit no item for that
* sampling period.
*
* @see [[sampleBy]] for fine control
* @see [[sampleRepeated]] for repeating the last value on silence
* @param period the timespan at which sampling occurs
*/
final def sample(period: FiniteDuration): Observable[A] =
self.sampleBy(Observable.intervalAtFixedRate(period, period))
/** Returns an observable that, when the specified sampler
* emits an item or completes, emits the most recently emitted item
* (if any) emitted by the source since the previous
* emission from the sampler.
*
* Use the `sampleBy` operator to periodically look at an observable
* to see what item it has most recently emitted since the previous
* sampling. Note that if the source observable has emitted no
* items since the last time it was sampled, the observable that
* results from the `sampleBy` operator will emit no item.
*
* @see [[sample]] for periodic sampling
* @see [[sampleRepeatedBy]] for repeating the last value on silence
* @param sampler - the observable to use for sampling the source
*/
final def sampleBy[B](sampler: Observable[B]): Observable[A] =
new ThrottleLastObservable[A, B](self, sampler, shouldRepeatOnSilence = false)
/** Only emit an item from an observable if a particular timespan has
* passed without it emitting another item.
*
* Note: If the source observable keeps emitting items more
* frequently than the length of the time window, then no items will
* be emitted by the resulting observable.
*
* Alias for [[debounce]].
*
* @param timeout the length of the window of time that must pass after
* the emission of an item from the source observable in
* which that observable emits no items in order for the
* item to be emitted by the resulting observable
* @see [[echoOnce]] for a similar operator that also mirrors
* the source observable
*/
final def throttleWithTimeout(timeout: FiniteDuration): Observable[A] =
debounce(timeout)
/** Only emit an item from an observable if a particular timespan has
* passed without it emitting another item.
*
* Note: If the source observable keeps emitting items more
* frequently than the length of the time window, then no items will
* be emitted by the resulting observable.
*
* @param timeout the length of the window of time that must pass after
* the emission of an item from the source observable in
* which that observable emits no items in order for the
* item to be emitted by the resulting observable
* @see [[echoOnce]] for a similar operator that also mirrors
* the source observable
*/
final def debounce(timeout: FiniteDuration): Observable[A] =
new DebounceObservable(self, timeout, repeat = false)
/** Returns an observable that mirrors the source but that will trigger a
* [[monix.execution.exceptions.DownstreamTimeoutException DownstreamTimeoutException]]
* in case the downstream subscriber takes more than the given timespan
* to process an `onNext` message.
*
* Note that this ignores the time it takes for the upstream to send
* `onNext` messages. For detecting slow producers see [[timeoutOnSlowUpstream]].
*
* @param timeout maximum duration for `onNext`.
*/
final def timeoutOnSlowDownstream(timeout: FiniteDuration): Observable[A] =
new DownstreamTimeoutObservable[A](self, timeout)
/** Returns an observable that mirrors the source but applies a timeout
* for each emitted item by the upstream. If the next item isn't
* emitted within the specified timeout duration starting from its
* predecessor, the source is terminated and the downstream gets
* subscribed to the given backup.
*
* Note that this ignores the time it takes to process `onNext`.
* If dealing with a slow consumer, see [[timeoutOnSlowDownstream]].
*
* @param timeout maximum duration between emitted items before
* a timeout occurs (ignoring the time it takes to process `onNext`)
* @param backup is the alternative data source to subscribe to on timeout
*/
final def timeoutOnSlowUpstreamTo[B >: A](timeout: FiniteDuration, backup: Observable[B]): Observable[B] =
self.timeoutOnSlowUpstream(timeout).onErrorHandleWith {
case UpstreamTimeoutException(`timeout`) => backup
case other => Observable.raiseError(other)
}
/** Returns an observable that mirrors the source but applies a timeout
* for each emitted item by the upstream. If the next item isn't
* emitted within the specified timeout duration starting from its
* predecessor, the resulting Observable terminates and notifies
* observers of a TimeoutException.
*
* Note that this ignores the time it takes to process `onNext`.
* If dealing with a slow consumer, see [[timeoutOnSlowDownstream]].
*
* @param timeout maximum duration between emitted items before
* a timeout occurs (ignoring the time it takes to process `onNext`)
*/
final def timeoutOnSlowUpstream(timeout: FiniteDuration): Observable[A] =
new UpstreamTimeoutObservable[A](self, timeout)
/** While the destination observer is busy, buffers events, applying
* the given overflowStrategy.
*
* @param overflowStrategy - $overflowStrategyParam
*/
final def whileBusyBuffer[B >: A](overflowStrategy: OverflowStrategy.Synchronous[B]): Observable[B] =
asyncBoundary(overflowStrategy)
/** $asyncBoundaryDescription
*
* @param overflowStrategy - $overflowStrategyParam
*/
final def asyncBoundary[B >: A](overflowStrategy: OverflowStrategy[B]): Observable[B] =
liftByOperator(new AsyncBoundaryOperator[B](overflowStrategy))
/** While the destination observer is busy, drop the incoming events.
*/
final def whileBusyDropEvents: Observable[A] =
self.liftByOperator(new WhileBusyDropEventsOperator[A])
/** While the destination observer is busy, drop the incoming events.
* When the downstream recovers, we can signal a special event
* meant to inform the downstream observer how many events where
* dropped.
*
* @param onOverflow - $onOverflowParam
*/
final def whileBusyDropEventsAndSignal[B >: A](onOverflow: Long => B): Observable[B] =
self.liftByOperator(new WhileBusyDropEventsAndSignalOperator[B](onOverflow))
/** Combines the elements emitted by the source with the latest element
* emitted by another observable.
*
* Similar with `combineLatest`, but only emits items when the single source
* emits an item (not when any of the Observables that are passed to the operator
* do, as combineLatest does).
*
* == Visual Example ==
*
* <pre>
* stream1: 1 - - 2 - - 3 - 4 - -
* stream2: 1 - - 2 - 3 - - - - 4
*
* result: (1, 1), (2, 2), (3, 3), (4, 3)
* </pre>
*
* @param other is an observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def withLatestFrom[B, R](other: Observable[B])(f: (A, B) => R): Observable[R] =
new WithLatestFromObservable[A, B, R](self, other, f)
/** Combines the elements emitted by the source with the latest elements
* emitted by two observables.
*
* Similar with `combineLatest`, but only emits items when the single source
* emits an item (not when any of the Observables that are passed to the operator
* do, as combineLatest does).
*
* @param o1 is the first observable that gets paired with the source
* @param o2 is the second observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def withLatestFrom2[B1, B2, R](o1: Observable[B1], o2: Observable[B2])(f: (A, B1, B2) => R): Observable[R] =
self.withLatestFrom(Observable.combineLatest2(o1, o2)) { (a, tuple) =>
f(a, tuple._1, tuple._2)
}
/** Combines the elements emitted by the source with the latest elements
* emitted by three observables.
*
* Similar with `combineLatest`, but only emits items when the single source
* emits an item (not when any of the Observables that are passed to the operator
* do, as combineLatest does).
*
* @param o1 is the first observable that gets paired with the source
* @param o2 is the second observable that gets paired with the source
* @param o3 is the third observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def withLatestFrom3[B1, B2, B3, R](o1: Observable[B1], o2: Observable[B2], o3: Observable[B3])
(f: (A, B1, B2, B3) => R): Observable[R] = {
self.withLatestFrom(Observable.combineLatest3(o1, o2, o3)) { (a, o) =>
f(a, o._1, o._2, o._3)
}
}
/** Combines the elements emitted by the source with the latest elements
* emitted by four observables.
*
* Similar with `combineLatest`, but only emits items when the single source
* emits an item (not when any of the Observables that are passed to the operator
* do, as combineLatest does).
*
* @param o1 is the first observable that gets paired with the source
* @param o2 is the second observable that gets paired with the source
* @param o3 is the third observable that gets paired with the source
* @param o4 is the fourth observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def withLatestFrom4[B1, B2, B3, B4, R](
o1: Observable[B1], o2: Observable[B2], o3: Observable[B3], o4: Observable[B4])
(f: (A, B1, B2, B3, B4) => R): Observable[R] = {
self.withLatestFrom(Observable.combineLatest4(o1, o2, o3, o4)) { (a, o) =>
f(a, o._1, o._2, o._3, o._4)
}
}
/** Combines the elements emitted by the source with the latest elements
* emitted by five observables.
*
* Similar with `combineLatest`, but only emits items when the single source
* emits an item (not when any of the Observables that are passed to the operator
* do, as combineLatest does).
*
* @param o1 is the first observable that gets paired with the source
* @param o2 is the second observable that gets paired with the source
* @param o3 is the third observable that gets paired with the source
* @param o4 is the fourth observable that gets paired with the source
* @param o5 is the fifth observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def withLatestFrom5[B1, B2, B3, B4, B5, R](
o1: Observable[B1], o2: Observable[B2], o3: Observable[B3],
o4: Observable[B4], o5: Observable[B5])
(f: (A, B1, B2, B3, B4, B5) => R): Observable[R] = {
self.withLatestFrom(Observable.combineLatest5(o1, o2, o3, o4, o5)) { (a, o) =>
f(a, o._1, o._2, o._3, o._4, o._5)
}
}
/** Combines the elements emitted by the source with the latest elements
* emitted by six observables.
*
* Similar with `combineLatest`, but only emits items when the single source
* emits an item (not when any of the Observables that are passed to the operator
* do, as combineLatest does).
*
* @param o1 is the first observable that gets paired with the source
* @param o2 is the second observable that gets paired with the source
* @param o3 is the third observable that gets paired with the source
* @param o4 is the fourth observable that gets paired with the source
* @param o5 is the fifth observable that gets paired with the source
* @param o6 is the sixth observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def withLatestFrom6[B1, B2, B3, B4, B5, B6, R](
o1: Observable[B1], o2: Observable[B2], o3: Observable[B3],
o4: Observable[B4], o5: Observable[B5], o6: Observable[B6])
(f: (A, B1, B2, B3, B4, B5, B6) => R): Observable[R] = {
self.withLatestFrom(Observable.combineLatest6(o1, o2, o3, o4, o5, o6)) { (a, o) =>
f(a, o._1, o._2, o._3, o._4, o._5, o._6)
}
}
/** Creates a new observable from this observable and another given
* observable by combining their items in pairs in a strict sequence.
*
* So the first item emitted by the new observable will be the tuple of the
* first items emitted by each of the source observables; the second item
* emitted by the new observable will be a tuple with the second items
* emitted by each of those observables; and so forth.
*
* == Visual Example ==
*
* <pre>
* stream1: 1 - - 2 - - 3 - 4 - -
* stream2: 1 - - 2 - 3 - - - - 4
*
* result: (1, 1), (2, 2), (3, 3), (4, 4)
* </pre>
*
* See [[combineLatest]] for a more relaxed alternative that doesn't
* combine items in strict sequence.
*
* @param other is an observable that gets paired with the source
* @return a new observable sequence that emits the paired items
* of the source observables
*/
final def zip[B](other: Observable[B]): Observable[(A, B)] =
new Zip2Observable[A, B, (A, B)](self, other)((a, b) => (a, b))
/** Creates a new observable from this observable and another given
* observable by combining their items in pairs in a strict sequence.
*
* So the first item emitted by the new observable will be the result
* of the function applied to the first item emitted by each of
* the source observables; the second item emitted by the new observable
* will be the result of the function applied to the second item
* emitted by each of those observables; and so forth.
*
* == Visual Example ==
*
* <pre>
* stream1: 1 - - 2 - - 3 - 4 - -
* stream2: 1 - - 2 - 3 - - - - 4
*
* result: (1, 1), (2, 2), (3, 3), (4, 4)
* </pre>
*
* See [[combineLatestMap]] for a more relaxed alternative that doesn't
* combine items in strict sequence.
*
* @param other is an observable that gets paired with the source
* @param f is a mapping function over the generated pairs
*/
final def zipMap[B, R](other: Observable[B])(f: (A, B) => R): Observable[R] =
new Zip2Observable[A, B, R](self, other)(f)
/** Zips the emitted elements of the source with their indices. */
final def zipWithIndex: Observable[(A, Long)] =
self.liftByOperator(new ZipWithIndexOperator[A])
/** Creates a new observable from this observable that will emit a specific `separator`
* between every pair of elements.
*
* Usage sample:
*
* {{{
* // Yields "a : b : c : d"
* Observable("a", "b", "c", "d")
* .intersperse(" : ")
* .foldLeftL("")(_ ++ _)
* }}}
*
* @param separator is the separator
*/
final def intersperse[B >: A](separator: B): Observable[B] =
new IntersperseObservable(self, None, separator, None)
/** Creates a new observable from this observable that will emit the `start` element
* followed by the upstream elements paired with the `separator`, and lastly the `end` element.
*
* Usage sample:
*
* {{{
* // Yields "begin a : b : c : d end"
* Observable("a", "b", "c", "d")
* .intersperse("begin ", " : ", " end")
* .foldLeftL("")(_ ++ _)
* }}}
*
* @param start is the first element emitted
* @param separator is the separator
* @param end the last element emitted
*/
final def intersperse[B >: A](start: B, separator: B, end: B): Observable[B] =
new IntersperseObservable(self, Some(start), separator, Some(end))
/** Converts this `Observable` into an `org.reactivestreams.Publisher`.
*
* Meant for interoperability with other Reactive Streams
* implementations.
*
* Usage sample:
*
* {{{
* import monix.eval.Task
* import monix.execution.rstreams.SingleAssignSubscription
* import org.reactivestreams.{Publisher, Subscriber, Subscription}
*
* def sum(source: Publisher[Int], requestSize: Int): Task[Long] =
* Task.create { (_, cb) =>
* val sub = SingleAssignSubscription()
*
* source.subscribe(new Subscriber[Int] {
* private[this] var requested = 0L
* private[this] var sum = 0L
*
* def onSubscribe(s: Subscription): Unit = {
* sub := s
* requested = requestSize
* s.request(requestSize)
* }
*
* def onNext(t: Int): Unit = {
* sum += t
* if (requestSize != Long.MaxValue) requested -= 1
*
* if (requested <= 0) {
* requested = requestSize
* sub.request(requestSize)
* }
* }
*
* def onError(t: Throwable): Unit =
* cb.onError(t)
* def onComplete(): Unit =
* cb.onSuccess(sum)
* })
*
* // Cancelable that can be used by Task
* sub
* }
*
* import monix.execution.Scheduler.Implicits.global
* val pub = Observable(1, 2, 3, 4).toReactivePublisher
*
* // Yields 10
* sum(pub, requestSize = 128)
* }}}
*
* See the [[http://www.reactive-streams.org/ Reactive Streams]]
* protocol for details.
*/
final def toReactivePublisher[B >: A](implicit s: Scheduler): RPublisher[B] =
new RPublisher[B] {
def subscribe(subscriber: RSubscriber[_ >: B]): Unit = {
val subscription = SingleAssignCancelable()
subscription := unsafeSubscribeFn(SafeSubscriber(
Subscriber.fromReactiveSubscriber(subscriber, subscription)
))
}
}
/** Returns a [[monix.eval.Task Task]] that upon execution
* will signal the last generated element of the source observable.
*
* Returns an `Option` because the source can be empty.
*/
final def lastOptionL: Task[Option[A]] =
map(Some.apply).lastOrElseL(None)
/** Creates a new [[monix.eval.Task Task]] that upon execution
* will signal the last generated element of the source observable.
*
* In case the stream was empty, then the given default gets
* evaluated and emitted.
*/
final def lastOrElseL[B >: A](default: => B): Task[B] =
Task.create { (s, cb) =>
unsafeSubscribeFn(new Subscriber.Sync[A] {
implicit val scheduler: Scheduler = s
private[this] var value: A = _
private[this] var isEmpty = true
def onNext(elem: A): Ack = {
if (isEmpty) isEmpty = false
value = elem
Continue
}
def onError(ex: Throwable): Unit = {
cb.onError(ex)
}
def onComplete(): Unit = {
if (isEmpty)
cb(Try(default))
else
cb.onSuccess(value)
}
})
}
/** Creates a new Observable that emits the total number of `onNext`
* events that were emitted by the source.
*
* Note that this Observable emits only one item after the source
* is complete. And in case the source emits an error, then only
* that error will be emitted.
*/
final def count: Observable[Long] =
self.liftByOperator(CountOperator)
/** Creates a task that emits the total number of `onNext`
* events that were emitted by the source.
*/
final def countL: Task[Long] =
count.headL
/** Returns an Observable which only emits the first item for which
* the predicate holds.
*
* @param p is a function that evaluates the items emitted by the
* source Observable, returning `true` if they pass the filter
* @return an Observable that emits only the first item in the original
* Observable for which the filter evaluates as `true`
*/
final def find(p: A => Boolean): Observable[A] =
filter(p).head
/** Returns a task which emits the first item for which
* the predicate holds.
*
* @param p is a function that evaluates the items emitted by the
* source observable, returning `true` if they pass the filter
*
* @return a task that emits the first item in the source
* observable for which the filter evaluates as `true`
*/
final def findL(p: A => Boolean): Task[Option[A]] =
find(p).headOptionL
/** Given evidence that type `A` has a `cats.Monoid` implementation,
* folds the stream with the provided monoid definition.
*
* For streams emitting numbers, this effectively sums them up.
* For strings, this concatenates them.
*
* Example:
*
* {{{
* import cats.implicits._
*
* // Yields 10
* val stream1 = Observable(1, 2, 3, 4).fold
*
* // Yields "1234"
* val stream2 = Observable("1", "2", "3", "4").fold
* }}}
*
* Note, in case you don't have a `Monoid` instance in scope,
* but you feel like you should, try this import:
*
* {{{
* import cats.instances.all._
* }}}
*
* @see [[Observable.foldL foldL]] for the version that returns a
* task instead of an observable.
*
* @param A is the `cats.Monoid` type class instance that's needed
* in scope for folding the source
*
* @return the result of combining all elements of the source,
* or the defined `Monoid.empty` element in case the
* stream is empty
*/
final def fold[AA >: A](implicit A: Monoid[AA]): Observable[AA] =
foldLeft(A.empty)(A.combine)
/** Given evidence that type `A` has a `cats.Monoid` implementation,
* folds the stream with the provided monoid definition.
*
* For streams emitting numbers, this effectively sums them up.
* For strings, this concatenates them.
*
* Example:
*
* {{{
* import cats.implicits._
*
* // Yields 10
* val stream1 = Observable(1, 2, 3, 4).foldL
*
* // Yields "1234"
* val stream2 = Observable("1", "2", "3", "4").foldL
* }}}
*
* @see [[fold]] for the version that returns an observable
* instead of a task.
*
* @param A is the `cats.Monoid` type class instance that's needed
* in scope for folding the source
*
* @return the result of combining all elements of the source,
* or the defined `Monoid.empty` element in case the
* stream is empty
*/
final def foldL[AA >: A](implicit A: Monoid[AA]): Task[AA] =
fold(A).headL
/** Folds the source observable, from start to finish, until the
* source completes, or until the operator short-circuits the
* process by returning `false`.
*
* Note that a call to [[foldLeft]] is equivalent to this function
* being called with an operator always returning `true` as the first
* member of its result.
*
* Example: {{{
* // Sums first 10 items
* val stream1 = Observable.range(0, 1000).foldWhileLeft((0L, 0)) {
* case ((sum, count), e) =>
* val next = (sum + e, count + 1)
* if (count + 1 < 10) Left(next) else Right(next)
* }
*
* // Implements exists(predicate)
* val stream2 = Observable(1, 2, 3, 4, 5).foldWhileLeft(false) {
* (default, e) =>
* if (e == 3) Right(true) else Left(default)
* }
*
* // Implements forall(predicate)
* val stream3 = Observable(1, 2, 3, 4, 5).foldWhileLeft(true) {
* (default, e) =>
* if (e != 3) Right(false) else Left(default)
* }
* }}}
*
* @see [[Observable.foldWhileLeftL foldWhileLeftL]] for a version
* that returns a task instead of an observable.
*
* @param seed is the initial state, specified as a possibly lazy value;
* it gets evaluated when the subscription happens and if it
* triggers an error then the subscriber will get immediately
* terminated with an error
*
* @param op is the binary operator returning either `Left`,
* signaling that the state should be evolved or a `Right`,
* signaling that the process can be short-circuited and
* the result returned immediately
*
* @return the result of inserting `op` between consecutive
* elements of this observable, going from left to right with
* the `seed` as the start value, or `seed` if the observable
* is empty
*/
final def foldWhileLeft[S](seed: => S)(op: (S, A) => Either[S, S]): Observable[S] =
new FoldWhileLeftObservable[A, S](self, seed _, op)
/** Folds the source observable, from start to finish, until the
* source completes, or until the operator short-circuits the
* process by returning `false`.
*