T
- the value type pushed@FunctionalInterface -public interface FlowableOnSubscribe<T>-
subscribe()
method that receives
- an instance of a FlowableEmitter
instance that allows pushing
- events in a backpressure-safe and cancellation-safe manner.Modifier and Type | -Method and Description | -
---|---|
void |
-subscribe(@NonNull FlowableEmitter<T> emitter)
-Called for each
-Subscriber that subscribes. |
-
void subscribe(@NonNull - @NonNull FlowableEmitter<T> emitter) - throws Throwable-
Subscriber
that subscribes.emitter
- the safe emitter instance, never null
Throwable
- on errorT
- the type of item the Observer expects to observepublic interface Observer<T>
-
- When an Observer
is subscribed to an ObservableSource
through the ObservableSource.subscribe(Observer)
method,
- the ObservableSource
calls onSubscribe(Disposable)
with a Disposable
that allows
- disposing the sequence at any time, then the
- ObservableSource
may call the Observer's onNext(T)
method any number of times
- to provide notifications. A well-behaved
- ObservableSource
will call an Observer
's onComplete()
method exactly once or the Observer
's
- onError(java.lang.Throwable)
method exactly once.
-
- Calling the Observer
's method must happen in a serialized fashion, that is, they must not
- be invoked concurrently by multiple threads in an overlapping fashion and the invocation pattern must
- adhere to the following protocol:
-
onSubscribe onNext* (onError | onComplete)?
-
- Subscribing an Observer
to multiple ObservableSource
s is not recommended. If such reuse
- happens, it is the duty of the Observer
implementation to be ready to receive multiple calls to
- its methods and ensure proper concurrent behavior of its business logic.
-
- Calling onSubscribe(Disposable)
, onNext(Object)
or onError(Throwable)
with a
- null
argument is forbidden.
-
- The implementations of the onXXX
methods should avoid throwing runtime exceptions other than the following cases
- (see Rule 2.13 of the Reactive Streams specification):
-
null
, the methods can throw a NullPointerException
.
- Note though that RxJava prevents null
s to enter into the flow and thus there is generally no
- need to check for nulls in flows assembled from standard sources and intermediate operators.
- VirtualMachineError
).- Violating Rule 2.13 results in undefined flow behavior. Generally, the following can happen: -
onError(java.lang.Throwable)
call.ObservableSource.subscribe(Observer)
throws instead of returning normally.Scheduler
or Executor
)
- providing the asynchronous boundary the code is running and either routes the exception to the global
- RxJavaPlugins.onError(Throwable)
handler or the current thread's
- Thread.UncaughtExceptionHandler.uncaughtException(Thread, Throwable)
handler.Observable
's perspective, an Observer
is the end consumer thus it is the Observer
's
- responsibility to handle the error case and signal it "further down". This means unreliable code in the onXXX
- methods should be wrapped into `try-catch`es, specifically in onError(Throwable)
or onComplete()
, and handled there
- (for example, by logging it or presenting the user with an error dialog). However, if the error would be thrown from
- onNext(Object)
, Rule 2.13 mandates
- the implementation calls Disposable.dispose()
and signals the exception in a way that is adequate to the target context,
- for example, by calling onError(Throwable)
on the same Observer
instance.
-
- If, for some reason, the Observer
won't follow Rule 2.13, the Observable.safeSubscribe(Observer)
can wrap it
- with the necessary safeguards and route exceptions thrown from onNext
into onError
and route exceptions thrown
- from onError
and onComplete
into the global error handler via RxJavaPlugins.onError(Throwable)
.
Modifier and Type | -Method and Description | -
---|---|
void |
-onComplete()
-Notifies the
-Observer that the Observable has finished sending push-based notifications. |
-
void |
-onError(@NonNull Throwable e)
-Notifies the
-Observer that the Observable has experienced an error condition. |
-
void |
-onNext(T t)
-Provides the
-Observer with a new item to observe. |
-
void |
-onSubscribe(@NonNull Disposable d)
-Provides the
-Observer with the means of cancelling (disposing) the
- connection (channel) with the Observable in both
- synchronous (from within onNext(Object) ) and asynchronous manner. |
-
void onSubscribe(@NonNull - @NonNull Disposable d)-
Observer
with the means of cancelling (disposing) the
- connection (channel) with the Observable
in both
- synchronous (from within onNext(Object)
) and asynchronous manner.d
- the Disposable
instance whose Disposable.dispose()
can
- be called anytime to cancel the connectionvoid onNext(@NonNull - T t)-
Observer
with a new item to observe.
-
- The Observable
may call this method 0 or more times.
-
- The Observable
will not call this method again after it calls either onComplete()
or
- onError(java.lang.Throwable)
.
t
- the item emitted by the Observablevoid onError(@NonNull - @NonNull Throwable e)-
Observer
that the Observable
has experienced an error condition.
-
- If the Observable
calls this method, it will not thereafter call onNext(T)
or
- onComplete()
.
e
- the exception encountered by the Observablevoid onComplete()-
Observer
that the Observable
has finished sending push-based notifications.
-
- The Observable
will not call this method if it calls onError(java.lang.Throwable)
.
public abstract class Scheduler -extends Object-
Scheduler
is an object that specifies an API for scheduling
- units of work provided in the form of Runnable
s to be
- executed without delay (effectively as soon as possible), after a specified time delay or periodically
- and represents an abstraction over an asynchronous boundary that ensures
- these units of work get executed by some underlying task-execution scheme
- (such as custom Threads, event loop, Executor
or Actor system)
- with some uniform properties and guarantees regardless of the particular underlying
- scheme.
-
- You can get various standard, RxJava-specific instances of this class via
- the static methods of the Schedulers
utility class.
-
- The so-called Scheduler.Worker
s of a Scheduler
can be created via the createWorker()
method which allow the scheduling
- of multiple Runnable
tasks in an isolated manner. Runnable
tasks scheduled on a Worker
are guaranteed to be
- executed sequentially and in a non-overlapping fashion. Non-delayed Runnable
tasks are guaranteed to execute in a
- First-In-First-Out order but their execution may be interleaved with delayed tasks.
- In addition, outstanding or running tasks can be cancelled together via
- Disposable.dispose()
without affecting any other Worker
instances of the same Scheduler
.
-
- Implementations of the scheduleDirect(java.lang.Runnable)
and Scheduler.Worker.schedule(java.lang.Runnable)
methods are encouraged to call the RxJavaPlugins.onSchedule(Runnable)
- method to allow a scheduler hook to manipulate (wrap or replace) the original Runnable
task before it is submitted to the
- underlying task-execution scheme.
-
- The default implementations of the scheduleDirect
methods provided by this abstract class
- delegate to the respective schedule
methods in the Scheduler.Worker
instance created via createWorker()
- for each individual Runnable
task submitted. Implementors of this class are encouraged to provide
- a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such Worker
s
- for every task.
- This delegation is done via special wrapper instances around the original Runnable
before calling the respective
- Worker.schedule
method. Note that this can lead to multiple RxJavaPlugins.onSchedule
calls and potentially
- multiple hooks applied. Therefore, the default implementations of scheduleDirect
(and the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)
)
- wrap the incoming Runnable
into a class that implements the SchedulerRunnableIntrospection
- interface which can grant access to the original or hooked Runnable
, thus, a repeated RxJavaPlugins.onSchedule
- can detect the earlier hook and not apply a new one over again.
-
- The default implementation of now(TimeUnit)
and Scheduler.Worker.now(TimeUnit)
methods to return current
- System.currentTimeMillis()
value in the desired time unit. Custom Scheduler
implementations can override this
- to provide specialized time accounting (such as virtual time to be advanced programmatically).
- Note that operators requiring a Scheduler
may rely on either of the now()
calls provided by
- Scheduler
or Worker
respectively, therefore, it is recommended they represent a logically
- consistent source of the current time.
-
- The default implementation of the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)
method uses
- the Scheduler.Worker.schedule(Runnable, long, TimeUnit)
for scheduling the Runnable
task periodically.
- The algorithm calculates the next absolute time when the task should run again and schedules this execution
- based on the relative time between it and Scheduler.Worker.now(TimeUnit)
. However, drifts or changes in the
- system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
- Therefore, the default implementation uses the clockDriftTolerance()
value (set via
- rx3.scheduler.drift-tolerance
in minutes) to detect a drift in Scheduler.Worker.now(TimeUnit)
and
- re-adjust the absolute/relative time calculation accordingly.
-
- The default implementations of start()
and shutdown()
do nothing and should be overridden if the
- underlying task-execution scheme supports stopping and restarting itself.
-
- If the Scheduler
is shut down or a Worker
is disposed, the schedule
methods
- should return the Disposable.disposed()
singleton instance indicating the shut down/disposed
- state to the caller. Since the shutdown or dispose can happen from any thread, the schedule
implementations
- should make best effort to cancel tasks immediately after those tasks have been submitted to the
- underlying task-execution scheme if the shutdown/dispose was detected after this submission.
-
- All methods on the Scheduler
and Worker
classes should be thread safe.
Modifier and Type | -Class and Description | -
---|---|
static class |
-Scheduler.Worker
-Represents an isolated, sequential worker of a parent Scheduler for executing
-Runnable tasks on
- an underlying task-execution scheme (such as custom Threads, event loop, Executor or Actor system). |
-
Modifier and Type | -Method and Description | -
---|---|
static long |
-clockDriftTolerance()
-Returns the clock drift tolerance in nanoseconds.
- |
-
abstract @NonNull Scheduler.Worker |
-createWorker()
-Retrieves or creates a new
-Scheduler.Worker that represents sequential execution of actions. |
-
long |
-now(@NonNull TimeUnit unit)
-Returns the 'current time' of the Scheduler in the specified time unit.
- |
-
@NonNull Disposable |
-scheduleDirect(@NonNull Runnable run)
-Schedules the given task on this Scheduler without any time delay.
- |
-
@NonNull Disposable |
-scheduleDirect(@NonNull Runnable run,
- long delay,
- @NonNull TimeUnit unit)
-Schedules the execution of the given task with the given time delay.
- |
-
@NonNull Disposable |
-schedulePeriodicallyDirect(@NonNull Runnable run,
- long initialDelay,
- long period,
- @NonNull TimeUnit unit)
-Schedules a periodic execution of the given task with the given initial time delay and repeat period.
- |
-
void |
-shutdown()
-Instructs the Scheduler instance to stop threads,
- stop accepting tasks on any outstanding
-Scheduler.Worker instances
- and clean up any associated resources with this Scheduler. |
-
void |
-start()
-Allows the Scheduler instance to start threads
- and accept tasks on them.
- |
-
<S extends Scheduler & Disposable> |
-when(@NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)
-Allows the use of operators for controlling the timing around when
- actions scheduled on workers are actually done.
- |
-
public static long clockDriftTolerance()-
Related system property: rx3.scheduler.drift-tolerance
in minutes.
@NonNull -public abstract @NonNull Scheduler.Worker createWorker()-
Scheduler.Worker
that represents sequential execution of actions.
-
- When work is completed, the Worker
instance should be released
- by calling Disposable.dispose()
to avoid potential resource leaks in the
- underlying task-execution scheme.
-
- Work on a Scheduler.Worker
is guaranteed to be sequential and non-overlapping.
public long now(@NonNull - @NonNull TimeUnit unit)-
unit
- the time unitNullPointerException
- if unit
is null
public void start()-
- Implementations should make sure the call is idempotent, thread-safe and
- should not throw any RuntimeException
if it doesn't support this
- functionality.
public void shutdown()-
Scheduler.Worker
instances
- and clean up any associated resources with this Scheduler.
-
- Implementations should make sure the call is idempotent, thread-safe and
- should not throw any RuntimeException
if it doesn't support this
- functionality.
@NonNull -public @NonNull Disposable scheduleDirect(@NonNull - @NonNull Runnable run)-
- This method is safe to be called from multiple threads but there are no - ordering or non-overlapping guarantees between tasks.
run
- the task to executeNullPointerException
- if run
is null
@NonNull -public @NonNull Disposable scheduleDirect(@NonNull - @NonNull Runnable run, - long delay, - @NonNull - @NonNull TimeUnit unit)-
- This method is safe to be called from multiple threads but there are no - ordering guarantees between tasks.
run
- the task to scheduledelay
- the delay amount, non-positive values indicate non-delayed schedulingunit
- the unit of measure of the delay amountNullPointerException
- if run
or unit
is null
@NonNull -public @NonNull Disposable schedulePeriodicallyDirect(@NonNull - @NonNull Runnable run, - long initialDelay, - long period, - @NonNull - @NonNull TimeUnit unit)-
- This method is safe to be called from multiple threads but there are no - ordering guarantees between tasks. - -
- The periodic execution is at a fixed rate, that is, the first execution will be after the
- initialDelay
, the second after initialDelay + period
, the third after
- initialDelay + 2 * period
, and so on.
run
- the task to scheduleinitialDelay
- the initial delay amount, non-positive values indicate non-delayed schedulingperiod
- the period at which the task should be re-executedunit
- the unit of measure of the delay amountNullPointerException
- if run
or unit
is null
@NonNull -public <S extends Scheduler & Disposable> S when(@NonNull - @NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)-
Scheduler
. The only parameter
- is a function that flattens an Flowable
of Flowable
- of Completable
s into just one Completable
. There must be
- a chain of operators connecting the returned value to the source
- Flowable
otherwise any work scheduled on the returned
- Scheduler
will not be executed.
-
- When createWorker()
is invoked a Flowable
of
- Completable
s is onNext'd to the combinator to be flattened. If
- the inner Flowable
is not immediately subscribed to an calls to
- Scheduler.Worker.schedule(java.lang.Runnable)
are buffered. Once the Flowable
is
- subscribed to actions are then onNext'd as Completable
s.
-
- Finally the actions scheduled on the parent Scheduler
when the
- inner most Completable
s are subscribed to.
-
- When the Scheduler.Worker
is unsubscribed the Completable
emits an
- onComplete and triggers any behavior in the flattening operator. The
- Flowable
and all Completable
s give to the flattening
- function never onError.
-
- Limit the amount concurrency two at a time without creating a new fix - size thread pool: - -
- Scheduler limitScheduler = Schedulers.computation().when(workers -> { - // use merge max concurrent to limit the number of concurrent - // callbacks two at a time - return Completable.merge(Flowable.merge(workers), 2); - }); --
- This is a slightly different way to limit the concurrency but it has some
- interesting benefits and drawbacks to the method above. It works by
- limited the number of concurrent Scheduler.Worker
s rather than individual
- actions. Generally each Flowable
uses its own Scheduler.Worker
.
- This means that this will essentially limit the number of concurrent
- subscribes. The danger comes from using operators like
- Flowable.zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.rxjava3.functions.BiFunction)
where
- subscribing to the first Flowable
could deadlock the
- subscription to the second.
-
-
- Scheduler limitScheduler = Schedulers.computation().when(workers -> { - // use merge max concurrent to limit the number of concurrent - // Flowables two at a time - return Completable.merge(Flowable.merge(workers, 2)); - }); -- - Slowing down the rate to no more than than 1 a second. This suffers from - the same problem as the one above I could find an
Flowable
- operator that limits the rate without dropping the values (aka leaky
- bucket algorithm).
-
- - Scheduler slowScheduler = Schedulers.computation().when(workers -> { - // use concatenate to make each worker happen one at a time. - return Completable.concat(workers.map(actions -> { - // delay the starting of the next worker by 1 second. - return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); - })); - }); -- -
History: 2.0.1 - experimental
S
- a Scheduler and a Subscriptioncombine
- the function that takes a two-level nested Flowable sequence of a Completable and returns
- the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.NullPointerException
- if combine
is null
T1
- the first value typeT2
- the second value typeT3
- the third value typeR
- the result type@FunctionalInterface -public interface Function3<T1,T2,T3,R>-
Modifier and Type | -Method and Description | -
---|---|
R |
-apply(T1 t1,
- T2 t2,
- T3 t3)
-Calculate a value based on the input values.
- |
-
R apply(T1 t1, - T2 t2, - T3 t3) - throws Throwable-
t1
- the first valuet2
- the second valuet3
- the third valueThrowable
- if the implementation wishes to throw any type of exception@FunctionalInterface -public interface LongConsumer-
Modifier and Type | -Method and Description | -
---|---|
void |
-accept(long t)
-Consume a primitive long input.
- |
-