New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

101 Reactive Gems (working title) #21

Open
akarnokd opened this Issue Apr 14, 2016 · 22 comments

Comments

Projects
None yet
2 participants
@akarnokd
Collaborator

akarnokd commented Apr 14, 2016

In this issue, we should collect tips and tricks with reactive systems and dataflows.

These are not particularly advanced topics but the markdown support on GitHub makes it easier to write them up.

Once we run out of ideas, we may tidy it up and release it together (maybe a free ebook?).

Please post only gems here and open discussion about them in separate issues. Thanks.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

1) The merge/flatMap identity

The operators merge and flatMap are closely related and you can implement one with the other:

Given a nested Publisher, merging them is equivalent of applying flatMap with an identity function:

Publisher<Publisher<T>> sources = ...

merge(sources) == sources.flatMap(source -> source);

Given a Publisher and a mapping function T -> Publisher<R>, you can map the source with the function and merge the resulting nested sequence:

Publisher<T> source = ...
Function<T, R> mapper = ...

source.flatMap(mapper) == merge(source.map(mapper));

This identity is also true for other mapping operators:

  • concat and concatMap
  • concatEager and concatMapEager
  • switchOnNext and switchMap

This is a general property of the operation and works for non-reactive but functional API's as well. This can come in handy if some library doesn't offer both methods but only the one.

Collaborator

akarnokd commented Apr 14, 2016

1) The merge/flatMap identity

The operators merge and flatMap are closely related and you can implement one with the other:

Given a nested Publisher, merging them is equivalent of applying flatMap with an identity function:

Publisher<Publisher<T>> sources = ...

merge(sources) == sources.flatMap(source -> source);

Given a Publisher and a mapping function T -> Publisher<R>, you can map the source with the function and merge the resulting nested sequence:

Publisher<T> source = ...
Function<T, R> mapper = ...

source.flatMap(mapper) == merge(source.map(mapper));

This identity is also true for other mapping operators:

  • concat and concatMap
  • concatEager and concatMapEager
  • switchOnNext and switchMap

This is a general property of the operation and works for non-reactive but functional API's as well. This can come in handy if some library doesn't offer both methods but only the one.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

2) Compose at subscription time

Modern reactive libraries offer you fluent conversion operators: extend, to, as or compose. You can then apply your own transformative function which runs in assembly time and let's you customize a sequence with preset operators:

Function<Publisher<T>, Publisher<T>> addSchedulers = o -> 
    o.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

range(1, 10).compose(addSchedulers).subscribe(System.out::println);

The function addSchedulers is executed when the range.compose is executed in assembly time.

However, sometimes you want stateful operators with a sequence, for example, a counting map that updates a "global" state. Clearly, such operation doesn't properly work with multiple concurrent Subscribers to a sequence.

Luckily, you can use the composition operators above in conjunction with defer to shift the execution of the function to subscription time and allowing per-Subscriber state in the applied operators:

range(1, 10).compose(o -> defer(() -> {
    int[] counter = new int[] { 0 };
    return o.map(v -> ++counter[0]);
})).subscribe(System.out::println);
Collaborator

akarnokd commented Apr 14, 2016

2) Compose at subscription time

Modern reactive libraries offer you fluent conversion operators: extend, to, as or compose. You can then apply your own transformative function which runs in assembly time and let's you customize a sequence with preset operators:

Function<Publisher<T>, Publisher<T>> addSchedulers = o -> 
    o.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

range(1, 10).compose(addSchedulers).subscribe(System.out::println);

The function addSchedulers is executed when the range.compose is executed in assembly time.

However, sometimes you want stateful operators with a sequence, for example, a counting map that updates a "global" state. Clearly, such operation doesn't properly work with multiple concurrent Subscribers to a sequence.

Luckily, you can use the composition operators above in conjunction with defer to shift the execution of the function to subscription time and allowing per-Subscriber state in the applied operators:

range(1, 10).compose(o -> defer(() -> {
    int[] counter = new int[] { 0 };
    return o.map(v -> ++counter[0]);
})).subscribe(System.out::println);
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

3) FlatMap as map or filter

The operator flatMap very versatile and comes up frequently when one wants to join different mapped sequences.

The mapper function let's you return any Publisher instance, including empty or single element (just).

We can utilize this property and implement map with flatMap:

Function<T, R> mapper = ...

map(mapper) == flatMap(v -> just(mapper.apply(v)));

Given the current value from source, we apply the mapper function and return a constant just Publisher.

We can apply the same methodology with filter: return empty if the predicate doesn't hold and return just if it does hold:

Predicate<T> predicate = ...

filter(predicate) == flatMap(v -> predicate.test(v) ? just(v) : empty());

Note, however, that using flatMap as such incurs more overhead than a direct map or filter. Modern reactive libraries still optimize for these cases because flatMap let's you short-circuit a sequence by emitting an error Publisher:

source.flatMap(v -> {
    try {
        performIO(v);
        return just(v);
    } catch (IOException ex) {
        return error(ex);
    }
})
Collaborator

akarnokd commented Apr 14, 2016

3) FlatMap as map or filter

The operator flatMap very versatile and comes up frequently when one wants to join different mapped sequences.

The mapper function let's you return any Publisher instance, including empty or single element (just).

We can utilize this property and implement map with flatMap:

Function<T, R> mapper = ...

map(mapper) == flatMap(v -> just(mapper.apply(v)));

Given the current value from source, we apply the mapper function and return a constant just Publisher.

We can apply the same methodology with filter: return empty if the predicate doesn't hold and return just if it does hold:

Predicate<T> predicate = ...

filter(predicate) == flatMap(v -> predicate.test(v) ? just(v) : empty());

Note, however, that using flatMap as such incurs more overhead than a direct map or filter. Modern reactive libraries still optimize for these cases because flatMap let's you short-circuit a sequence by emitting an error Publisher:

source.flatMap(v -> {
    try {
        performIO(v);
        return just(v);
    } catch (IOException ex) {
        return error(ex);
    }
})
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

4) Continuations with concat

Sometimes, you want to run through a sequence of values and then resume with another sequence of a completely different type once the first completed. In this case, you'd usually also ignore the first sequence's values.

Some libraries already offer operators for it: after, then, andThen, etc. but if not available, you can use concat for it:

Publisher<Integer> a = ...
Publisher<String> b = ...

Publisher<String> c = (Publisher<String>)concat(a.ignoreElements(), b);

Unfortunately, you'll need some explicit casts and @SuppressWarnings for this; Java's type system doesn't seem to be powerful enough to express it otherwise.

Collaborator

akarnokd commented Apr 14, 2016

4) Continuations with concat

Sometimes, you want to run through a sequence of values and then resume with another sequence of a completely different type once the first completed. In this case, you'd usually also ignore the first sequence's values.

Some libraries already offer operators for it: after, then, andThen, etc. but if not available, you can use concat for it:

Publisher<Integer> a = ...
Publisher<String> b = ...

Publisher<String> c = (Publisher<String>)concat(a.ignoreElements(), b);

Unfortunately, you'll need some explicit casts and @SuppressWarnings for this; Java's type system doesn't seem to be powerful enough to express it otherwise.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

5) When subscribeOn and observeOn/publishOn act sometimes the same

One of the confusions with reactive operators are centered around subscribeOn and observeoOn (publishOn). For short, subscribeOn acts during subscription time over subscribe() calls and its effects travel upstream whereas observeOn acts during runtime over onNext|onError|onComplete calls and its effects travel downstream.

However, many sequential data sources, such as just, empty, error, fromArray, fromIterable and range, applying either subscribeOn or observeOn will yield the same output: events signalled on the specified Scheduler's thread.

just(1).subscribeOn(scheduler) == just(1).observeOn(scheduler)

range(1, 5).subscribeOn(scheduler) == range(1, 5).publishOn(scheduler)

fromArray("a", "b", "c").subscribeOn(scheduler) == fromArray("a", "b", "c").observeOn(scheduler)

// etc.

The reason for this equivalence is that these constant sources don't really have subscription side-effects and requests to them will trigger emissions on the Scheduler's thread anyway.

Most modern libraries already exploit this in respect to just; both patterns are transformed into a custom scheduled Publisher instance.

In addition, using observeOn may give better performance with the multi-valued sources due to micro-operator-fusion in its front. However, we don't fully see the ramifications of an automatic subscribeOn -> observeOn swap when a sequence is assembled due to the so-called strong pipelining effect when both operators are applied to the sequence:

range(1, 10).subscribeOn(schedulerA).observeOn(schedulerB);
Collaborator

akarnokd commented Apr 14, 2016

5) When subscribeOn and observeOn/publishOn act sometimes the same

One of the confusions with reactive operators are centered around subscribeOn and observeoOn (publishOn). For short, subscribeOn acts during subscription time over subscribe() calls and its effects travel upstream whereas observeOn acts during runtime over onNext|onError|onComplete calls and its effects travel downstream.

However, many sequential data sources, such as just, empty, error, fromArray, fromIterable and range, applying either subscribeOn or observeOn will yield the same output: events signalled on the specified Scheduler's thread.

just(1).subscribeOn(scheduler) == just(1).observeOn(scheduler)

range(1, 5).subscribeOn(scheduler) == range(1, 5).publishOn(scheduler)

fromArray("a", "b", "c").subscribeOn(scheduler) == fromArray("a", "b", "c").observeOn(scheduler)

// etc.

The reason for this equivalence is that these constant sources don't really have subscription side-effects and requests to them will trigger emissions on the Scheduler's thread anyway.

Most modern libraries already exploit this in respect to just; both patterns are transformed into a custom scheduled Publisher instance.

In addition, using observeOn may give better performance with the multi-valued sources due to micro-operator-fusion in its front. However, we don't fully see the ramifications of an automatic subscribeOn -> observeOn swap when a sequence is assembled due to the so-called strong pipelining effect when both operators are applied to the sequence:

range(1, 10).subscribeOn(schedulerA).observeOn(schedulerB);
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

6) FlatMap as concatMap

The operator flatMap let's you run multiple sources at once while concatMap runs one sequence at a time.

However, flatMap usually offers a parameter that let's limit its concurrency level. The smallest allowed concurrency level is 1, which will functionally act the same as concatMap:

flatMap(mapper, 1) == concatMap(mapper)

This identity helped many times to rule out possible bugs in either flatMap's or concatMap's implementation as a cause of some weird behavior or exception.

Note that in many implementations of concatMap, the operator prefetches its source values (2 elements in RxJava, 32 in Reactor) whereas a concurrency-constrained flatMap will prefetch exactly one source value and thus trigger sequence dependent side-effects at different times.

Collaborator

akarnokd commented Apr 14, 2016

6) FlatMap as concatMap

The operator flatMap let's you run multiple sources at once while concatMap runs one sequence at a time.

However, flatMap usually offers a parameter that let's limit its concurrency level. The smallest allowed concurrency level is 1, which will functionally act the same as concatMap:

flatMap(mapper, 1) == concatMap(mapper)

This identity helped many times to rule out possible bugs in either flatMap's or concatMap's implementation as a cause of some weird behavior or exception.

Note that in many implementations of concatMap, the operator prefetches its source values (2 elements in RxJava, 32 in Reactor) whereas a concurrency-constrained flatMap will prefetch exactly one source value and thus trigger sequence dependent side-effects at different times.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 14, 2016

Collaborator

7) Parallel processing

Reactive datasources are sequential in nature; they form a processing pipeline that runs in FIFO order, even if source values are available while processing earlier values.

Still, sometimes you want to go parallel, for example, execute many service calls at once or distribute some processing over the available CPUs.

There are some experiments with such reactive stream parallelization, but the effect itself can be achieved today with common operators.

Using flatMap

One of the ways is to use flatMap for the job:

range(1, 1_000)
.flatMap(v -> just(v).subscribeOn(computation).map(Object::hashCode))

If there are a lot of source values, scheduling them one-by-one adds quite an overhead. Instead, you can buffer some of them and act on them in a batched fashion:

range(1, 1_000_000)
.buffer(256)
.flatMap(list -> 
    just(list).subscribeOn(computation)
    .map(v -> 
        v.stream().map(Object::hashCode).collect(toList())
    )
)
.flatMapIterable(c -> c)

If the final order still matters, you can use concatMapEager instead of flatMap.

Using window

The operator window creates a nested Publisher<Publisher<T>> sequences which can be batched similarly to buffer but the values themselves become immediately available in the inner Publisher.

We can use this operator with some mapping and flattening to get the parallel effect:

range(1, 1_000_000)
.window(256)
.map(w -> 
    w.observeOn(computation)
    .map(Object::hashCode)
)
.concatMapEager(w -> w);

Note the use of observeOn here: the source inner window is no longer constant like above but emits onNext events as they arrive from the source. Applying subscribeOn would be useless.

Using groupBy

The operator groupBy also creates nested Publisher instances based on some key selector to determine which value goes into which group; i.e., routing or dispatching values to various inner windows.

One way is to use the current value's hashCode module some integer to select a "bucket" for it:

range(1, 1_000_000)
.groupBy(v -> v.hashCode() % 8)
.flatMap(g -> g.observeOn(computation).map(Object::hashCode))

Given a well distributed hashCode, this should yield a fairly balanced parallel processing of values.

Alternatively, you can do round-robin dispatching by using a counter instead:

long[] counter = { 0 };

range(1, 1_000_000)
.groupBy(v -> (counter[0]++) & 7)
.concatMapEager(g -> g.observeOn(computation).map(Object::hashCode))
Collaborator

akarnokd commented Apr 14, 2016

7) Parallel processing

Reactive datasources are sequential in nature; they form a processing pipeline that runs in FIFO order, even if source values are available while processing earlier values.

Still, sometimes you want to go parallel, for example, execute many service calls at once or distribute some processing over the available CPUs.

There are some experiments with such reactive stream parallelization, but the effect itself can be achieved today with common operators.

Using flatMap

One of the ways is to use flatMap for the job:

range(1, 1_000)
.flatMap(v -> just(v).subscribeOn(computation).map(Object::hashCode))

If there are a lot of source values, scheduling them one-by-one adds quite an overhead. Instead, you can buffer some of them and act on them in a batched fashion:

range(1, 1_000_000)
.buffer(256)
.flatMap(list -> 
    just(list).subscribeOn(computation)
    .map(v -> 
        v.stream().map(Object::hashCode).collect(toList())
    )
)
.flatMapIterable(c -> c)

If the final order still matters, you can use concatMapEager instead of flatMap.

Using window

The operator window creates a nested Publisher<Publisher<T>> sequences which can be batched similarly to buffer but the values themselves become immediately available in the inner Publisher.

We can use this operator with some mapping and flattening to get the parallel effect:

range(1, 1_000_000)
.window(256)
.map(w -> 
    w.observeOn(computation)
    .map(Object::hashCode)
)
.concatMapEager(w -> w);

Note the use of observeOn here: the source inner window is no longer constant like above but emits onNext events as they arrive from the source. Applying subscribeOn would be useless.

Using groupBy

The operator groupBy also creates nested Publisher instances based on some key selector to determine which value goes into which group; i.e., routing or dispatching values to various inner windows.

One way is to use the current value's hashCode module some integer to select a "bucket" for it:

range(1, 1_000_000)
.groupBy(v -> v.hashCode() % 8)
.flatMap(g -> g.observeOn(computation).map(Object::hashCode))

Given a well distributed hashCode, this should yield a fairly balanced parallel processing of values.

Alternatively, you can do round-robin dispatching by using a counter instead:

long[] counter = { 0 };

range(1, 1_000_000)
.groupBy(v -> (counter[0]++) & 7)
.concatMapEager(g -> g.observeOn(computation).map(Object::hashCode))
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

8) Executing an action if the source is empty

Sometimes you want to execute some action, such as logging, when it turns out the source Publisher is empty.

You should be familiar with the switchIfEmpty operator which switches to a new Publisher if the main Publisher is empty. We can then switch to an empty publisher and use doOnComplete with the desired action:

Publisher<T> source = ...

source.switchIfEmpty(empty().doOnComplete(() -> System.out.println("Empty source!")));
Collaborator

akarnokd commented Apr 15, 2016

8) Executing an action if the source is empty

Sometimes you want to execute some action, such as logging, when it turns out the source Publisher is empty.

You should be familiar with the switchIfEmpty operator which switches to a new Publisher if the main Publisher is empty. We can then switch to an empty publisher and use doOnComplete with the desired action:

Publisher<T> source = ...

source.switchIfEmpty(empty().doOnComplete(() -> System.out.println("Empty source!")));
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

9) Processing elements pairwise

Sometimes you want to process subsequent elements of a Publisher pairwise (in triplets, etc.). A possible way of doing this is by using buffer with skip option:

range(1, 9)
.buffer(2, 1)
.filter(b -> b.size() < 2)
.map(b -> b.get(0) + b.get(1))
.subscribe(System.out::println);

Since buffer may emit a partial buffer if the number of source items is odd, you have to filter out partial buffers (otherwise, the map will blow up with IndexOutOfBoundsException)

An alternative way is to use the publish overload taking a Function<Publisher<T>,Publisher<R>>. This overload, unlike the regular publish returns a Publisher and is in fact a cold operator. What it does is that for the duration of a Subscriber, it makes the source into a hot Publisher and you can attach as many operators to it via the Function callback; thus, the source events are shared among different "paths" without subscribing to the source multiple times.

We can use this publish with skip and zip to get pairs:

range(1, 9)
.publish(o -> zip(o, o.skip(1), (a, b) -> a + b))
.subscribe(System.out::println);

There is no need for filtering as there is no list anymore and we get the pairs nicely via lambda parameters. Naturally, it works with triplets as well:

range(1, 9)
.publish(o -> zip(o, o.skip(1), o.skip(2), (a, b, c) -> a + b + c))
.subscribe(System.out::println);
Collaborator

akarnokd commented Apr 15, 2016

9) Processing elements pairwise

Sometimes you want to process subsequent elements of a Publisher pairwise (in triplets, etc.). A possible way of doing this is by using buffer with skip option:

range(1, 9)
.buffer(2, 1)
.filter(b -> b.size() < 2)
.map(b -> b.get(0) + b.get(1))
.subscribe(System.out::println);

Since buffer may emit a partial buffer if the number of source items is odd, you have to filter out partial buffers (otherwise, the map will blow up with IndexOutOfBoundsException)

An alternative way is to use the publish overload taking a Function<Publisher<T>,Publisher<R>>. This overload, unlike the regular publish returns a Publisher and is in fact a cold operator. What it does is that for the duration of a Subscriber, it makes the source into a hot Publisher and you can attach as many operators to it via the Function callback; thus, the source events are shared among different "paths" without subscribing to the source multiple times.

We can use this publish with skip and zip to get pairs:

range(1, 9)
.publish(o -> zip(o, o.skip(1), (a, b) -> a + b))
.subscribe(System.out::println);

There is no need for filtering as there is no list anymore and we get the pairs nicely via lambda parameters. Naturally, it works with triplets as well:

range(1, 9)
.publish(o -> zip(o, o.skip(1), o.skip(2), (a, b, c) -> a + b + c))
.subscribe(System.out::println);
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

10) Executing a function asynchronously

You may often want to execute some Supplier or Runnable/Action0 asynchronously. Maybe it's something blocking or just long running and don't want to block/hold up your current thread.

Many modern libraries offer the fromCallable(Callable<T>) source factory where you can use a regular old Callable to execute some action. With Java 8 lambdas, it's easy to convert your callbacks into Callables:

Supplier<Integer> supplier = () -> 1;
fromCallable(supplier::get).subscribeOn(computation);

Runnable run = () -> System.out.println("Hello!");
fromCallable(() -> { run.run(); return null; }).subscribeOn(computation);

If for some reason fromCallable is not available, you can use a combination of just and map to get the same effect:

just("whatever")
.subscribeOn(computation)
.map(ignored -> supplier.get());

Of course, you can swap the order of subscribeOn and map here; they are functionally equivalent. However, as we saw with Gem # 5, just().subscribeOn() is usually optimized in modern libraries and an intermediate just().map().subscribeOn() would most likely prevent some optimizations.

Collaborator

akarnokd commented Apr 15, 2016

10) Executing a function asynchronously

You may often want to execute some Supplier or Runnable/Action0 asynchronously. Maybe it's something blocking or just long running and don't want to block/hold up your current thread.

Many modern libraries offer the fromCallable(Callable<T>) source factory where you can use a regular old Callable to execute some action. With Java 8 lambdas, it's easy to convert your callbacks into Callables:

Supplier<Integer> supplier = () -> 1;
fromCallable(supplier::get).subscribeOn(computation);

Runnable run = () -> System.out.println("Hello!");
fromCallable(() -> { run.run(); return null; }).subscribeOn(computation);

If for some reason fromCallable is not available, you can use a combination of just and map to get the same effect:

just("whatever")
.subscribeOn(computation)
.map(ignored -> supplier.get());

Of course, you can swap the order of subscribeOn and map here; they are functionally equivalent. However, as we saw with Gem # 5, just().subscribeOn() is usually optimized in modern libraries and an intermediate just().map().subscribeOn() would most likely prevent some optimizations.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

11) Caching the last element and clearing it on demand

A possible use for BehaviorSubject/BehaviorProcessor (EmitterProcessor#replayLast in Reactor), which remembers the last value and emits it to new Subscribers at the beginning of a sequence, is to use it as a single element cache - also known as a "reactive property".

Sometimes, the contents of this cache can become outdated and shouldn't be emitted to new Subscribers until a proper fresh value is generated. Unfortunately, most implementations don't offer a clear() method so until a new value gets assigned to to it via onNext the old one stays.

We will use this onNext in fact to clear out the current value. To do this, we have to establish a protocol where a special value from the Subject's value type indicates emptiness and instructs Subscribers to ignore it if encountered:

Integer CLEAR = new Integer(0);

BehaviorProcessor<Integer> cache = new BehaviorProcessor<>(CLEAR);

Publisher<Integer> front = cache.filter(v -> v != CLEAR);

front.subscribe(System.out::println);

cache.onNext(10);
cache.onNext(CLEAR);
cache.onNext(20);

front.subscribe(System.out::println);

In the example, we create a new (!) Integer instance and use it for reference comparison to determine if the cache is "empty".

Sometimes, however, you can't just create such an "empty" instance of the type you are working with. In this case, you have to revert to the lowest common denominator type: Object and downcast anything else back to your type that isn't the indicator:

Object CLEAR = new Object();

BehaviorProcessor<Object> cache = new BehaviorProcessor<>();

Publisher<String> front = cache.filter(v -> v != CLEAR).cast(String.class);

cache.onNext("abc");
cache.onNext(CLEAR);
cache.onNext("def");

front.subscribe(System.out::println);

Of course, directly exposing BehaviorSubject<Object> is a welcoming sign for all kinds of types. You can re-establish the type safety by wrapping the code into some class and allowing only type-correct onNext calls:

public final class Cache<T> implements Observer<T>, Publisher<T> {
    static final Object CLEAR = new Object();
    final BehaviorProcessor<Object> cache = new BehaviorProcessor<>(CLEAR);

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        cache.filter(v -> v != CLEAR).subscribe(subscriber);
    }

    @Override
    public void onNext(T t) {
        cache.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        cache.onError(t);
    }

    @Override
    public void onComplete() {
        cache.onComplete();
    }

    public void clear() {
        cache.onNext(CLEAR);
    }
}

Remember, Processors (and Subject) require a non-concurrent calls to their onXXX methods so you should only call them in a serialized fashion (such as a GUI thread or single threaded event-loop).

Collaborator

akarnokd commented Apr 15, 2016

11) Caching the last element and clearing it on demand

A possible use for BehaviorSubject/BehaviorProcessor (EmitterProcessor#replayLast in Reactor), which remembers the last value and emits it to new Subscribers at the beginning of a sequence, is to use it as a single element cache - also known as a "reactive property".

Sometimes, the contents of this cache can become outdated and shouldn't be emitted to new Subscribers until a proper fresh value is generated. Unfortunately, most implementations don't offer a clear() method so until a new value gets assigned to to it via onNext the old one stays.

We will use this onNext in fact to clear out the current value. To do this, we have to establish a protocol where a special value from the Subject's value type indicates emptiness and instructs Subscribers to ignore it if encountered:

Integer CLEAR = new Integer(0);

BehaviorProcessor<Integer> cache = new BehaviorProcessor<>(CLEAR);

Publisher<Integer> front = cache.filter(v -> v != CLEAR);

front.subscribe(System.out::println);

cache.onNext(10);
cache.onNext(CLEAR);
cache.onNext(20);

front.subscribe(System.out::println);

In the example, we create a new (!) Integer instance and use it for reference comparison to determine if the cache is "empty".

Sometimes, however, you can't just create such an "empty" instance of the type you are working with. In this case, you have to revert to the lowest common denominator type: Object and downcast anything else back to your type that isn't the indicator:

Object CLEAR = new Object();

BehaviorProcessor<Object> cache = new BehaviorProcessor<>();

Publisher<String> front = cache.filter(v -> v != CLEAR).cast(String.class);

cache.onNext("abc");
cache.onNext(CLEAR);
cache.onNext("def");

front.subscribe(System.out::println);

Of course, directly exposing BehaviorSubject<Object> is a welcoming sign for all kinds of types. You can re-establish the type safety by wrapping the code into some class and allowing only type-correct onNext calls:

public final class Cache<T> implements Observer<T>, Publisher<T> {
    static final Object CLEAR = new Object();
    final BehaviorProcessor<Object> cache = new BehaviorProcessor<>(CLEAR);

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        cache.filter(v -> v != CLEAR).subscribe(subscriber);
    }

    @Override
    public void onNext(T t) {
        cache.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        cache.onError(t);
    }

    @Override
    public void onComplete() {
        cache.onComplete();
    }

    public void clear() {
        cache.onNext(CLEAR);
    }
}

Remember, Processors (and Subject) require a non-concurrent calls to their onXXX methods so you should only call them in a serialized fashion (such as a GUI thread or single threaded event-loop).

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

12) processing elements of list-based values

Sometimes, an API gives you a Publisher<List<T>> where the signaled values are Lists and you want to process the elements in the list while holding the list together to be processed as a whole downstream later on.

Let's start out as a common source of Lists:

Publisher<List<Integer>> source = range(1, 1_000_000).buffer(256);

In-place processing

You can just simply run a for loop and manipulate the list in-place if the list is mutable (buffer emits such mutable list):

source.map(list -> {
    for (int i = 0; i < list.size(); i++) {
        list.set(i, list.get(i) + 2_000_000);
    }
    return list;
});

Or as a new list:

source.map(list -> {
    List<Integer> newList = new ArrayList<>(list.size());
    for (Integer v : list) {
        newList.add(v + 2_000_000);
    }
    return newList;
});

Stream processing

You can combine the worlds of reactive and the interactive Java 8 Stream processing:

source.map(list -> 
    list.stream()
    .map(v -> v + 2_000_000)
    .collect(toList())
);

ConcatMap

If you can't mute the list, stuck on Java 7 or before, or just don't want to look "non functional", you can use concatMap (not flatMap in order to keep the original list order), extract each list, process elements and then recollect the result into another list:

source.concatMap(list ->
    fromIterable(list)
    .map(v -> v + 2_000_000)
    .toList()
);
Collaborator

akarnokd commented Apr 15, 2016

12) processing elements of list-based values

Sometimes, an API gives you a Publisher<List<T>> where the signaled values are Lists and you want to process the elements in the list while holding the list together to be processed as a whole downstream later on.

Let's start out as a common source of Lists:

Publisher<List<Integer>> source = range(1, 1_000_000).buffer(256);

In-place processing

You can just simply run a for loop and manipulate the list in-place if the list is mutable (buffer emits such mutable list):

source.map(list -> {
    for (int i = 0; i < list.size(); i++) {
        list.set(i, list.get(i) + 2_000_000);
    }
    return list;
});

Or as a new list:

source.map(list -> {
    List<Integer> newList = new ArrayList<>(list.size());
    for (Integer v : list) {
        newList.add(v + 2_000_000);
    }
    return newList;
});

Stream processing

You can combine the worlds of reactive and the interactive Java 8 Stream processing:

source.map(list -> 
    list.stream()
    .map(v -> v + 2_000_000)
    .collect(toList())
);

ConcatMap

If you can't mute the list, stuck on Java 7 or before, or just don't want to look "non functional", you can use concatMap (not flatMap in order to keep the original list order), extract each list, process elements and then recollect the result into another list:

source.concatMap(list ->
    fromIterable(list)
    .map(v -> v + 2_000_000)
    .toList()
);
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

13) Awaiting completion of many sources

Sometimes you have a bunch of Publishers doing various things and you want to wait until they all complete and their emitted values are irrelevant in your case.

You may think of zip or combineLatest for doing this along with ignoreElements, however, zip and combineLatest won't work with most source combinations as you'd expect. These operators are terminate eagerly: if one of them completes without elements, they complete immediately and cancel the rest of the outstadning source sequences. In addition, if zip encounters a shorter sequence, it will also terminate eagerly.

You can simply use flatMap in conjunction with ignoreElements() to join the sources' termination:

Publisher<String> source1 = just("Hello");
Publisher<Integer> source2 = range(1, 3);
Publisher<Long> source3 = timer(1, TimeUnit.MILLISECONDS);

just(source1, source2, source3)
.flatMap(o -> o.ignoreElements())
.doOnComplete(() -> System.out.println("Done"))

Some libraries like Reactor also use Mono#when(Mono<T>....) to coordinate the completion and the data from many sources.

Collaborator

akarnokd commented Apr 15, 2016

13) Awaiting completion of many sources

Sometimes you have a bunch of Publishers doing various things and you want to wait until they all complete and their emitted values are irrelevant in your case.

You may think of zip or combineLatest for doing this along with ignoreElements, however, zip and combineLatest won't work with most source combinations as you'd expect. These operators are terminate eagerly: if one of them completes without elements, they complete immediately and cancel the rest of the outstadning source sequences. In addition, if zip encounters a shorter sequence, it will also terminate eagerly.

You can simply use flatMap in conjunction with ignoreElements() to join the sources' termination:

Publisher<String> source1 = just("Hello");
Publisher<Integer> source2 = range(1, 3);
Publisher<Long> source3 = timer(1, TimeUnit.MILLISECONDS);

just(source1, source2, source3)
.flatMap(o -> o.ignoreElements())
.doOnComplete(() -> System.out.println("Done"))

Some libraries like Reactor also use Mono#when(Mono<T>....) to coordinate the completion and the data from many sources.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

14) Errors as non-terminal events

By default, reactive protocols treat onError events as fatal and terminal events, tearing down the whole chain you delicately assembled.

Many times, you'd want to treat an error like any other value and keep the sequences running. For this, you have to hide an error from the library by first not calling onError but create a holder class for both normal and error values and emit those. Libraries often support this via classes such as Notification<T> or Signal<T>, very similar to Java 8's Optional<T> but for values, errors and emptiness:

EmitterProcessor<Signal<Integer>> bus = EmitterProcessor.create();

bus.dematerialize().consume(System.out::println, Throwable::printStackTrace);
bus.subscribe(System.out::println, Throwable::printStackTrace);

bus.onNext(Signal.next(1));
bus.onNext(Signal.next(RuntimeException()));
bus.onNext(Signal.next(2));
bus.onNext(Signal.next(3));

You can use dematerialize to turn those notification back to regular calls to onXXX methods on your Subscriber yet have the original source still active.

Collaborator

akarnokd commented Apr 15, 2016

14) Errors as non-terminal events

By default, reactive protocols treat onError events as fatal and terminal events, tearing down the whole chain you delicately assembled.

Many times, you'd want to treat an error like any other value and keep the sequences running. For this, you have to hide an error from the library by first not calling onError but create a holder class for both normal and error values and emit those. Libraries often support this via classes such as Notification<T> or Signal<T>, very similar to Java 8's Optional<T> but for values, errors and emptiness:

EmitterProcessor<Signal<Integer>> bus = EmitterProcessor.create();

bus.dematerialize().consume(System.out::println, Throwable::printStackTrace);
bus.subscribe(System.out::println, Throwable::printStackTrace);

bus.onNext(Signal.next(1));
bus.onNext(Signal.next(RuntimeException()));
bus.onNext(Signal.next(2));
bus.onNext(Signal.next(3));

You can use dematerialize to turn those notification back to regular calls to onXXX methods on your Subscriber yet have the original source still active.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

15) defer expressed as flatMap

The operator defer let's you return a custom Publisher instance for each of the Subscribers. The effect can be simulated by using just with flatMap:

Func0<Observable<Integer>> supplier = () -> range(System.currentTimeMillis() & 1023, 5);

defer(supplier) == just("whatever").flatMap(v -> supplier.call());

In fact you could write it with concatMap as well:

defer(supplier) == just("whatever").concatMap(v -> supplier.call());

Even though using flatMap and concatMap this way looks like too much overhead, most modern libraries optimize away just with a special defer like operator that behaves like this:

Function<Integer, Observable<Integer>> function = v -> range(v, 2);

just(-10).flatMap(function) == defer(() -> function.call(-10));

by exctacting the constant value from just and building a Func0 supplier that calls the original function with this constant and uses its returned Observable.

For this reason, using nest() with concatMap/flatMap is practically a no-op:

range(1, 5).nest().concatMap(o -> o.take(3)) == range(1, 5).take(3)
Collaborator

akarnokd commented Apr 15, 2016

15) defer expressed as flatMap

The operator defer let's you return a custom Publisher instance for each of the Subscribers. The effect can be simulated by using just with flatMap:

Func0<Observable<Integer>> supplier = () -> range(System.currentTimeMillis() & 1023, 5);

defer(supplier) == just("whatever").flatMap(v -> supplier.call());

In fact you could write it with concatMap as well:

defer(supplier) == just("whatever").concatMap(v -> supplier.call());

Even though using flatMap and concatMap this way looks like too much overhead, most modern libraries optimize away just with a special defer like operator that behaves like this:

Function<Integer, Observable<Integer>> function = v -> range(v, 2);

just(-10).flatMap(function) == defer(() -> function.call(-10));

by exctacting the constant value from just and building a Func0 supplier that calls the original function with this constant and uses its returned Observable.

For this reason, using nest() with concatMap/flatMap is practically a no-op:

range(1, 5).nest().concatMap(o -> o.take(3)) == range(1, 5).take(3)
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

16) Emit elements of a list periodically

The source operator interval let's you create a periodic sequence of ever increasing Long values. Sometimes, we don't care about the values themselves and just want to act at the right time.

Then comes a requirement of emitting items from a list periodically, that is, with some fixed delay between elements. We can first map the Long values into the elements of the list, but we should also make sure we don't run out of indexes and get IndexOutOfBoundsException:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

interval(1, TimeUnit.SECONDS)
.takeUntil(t -> t == list.size() - 1)
.map(t -> list.get((int)t))
.subscribe(System.out::println)

Given the interval, we take its elements until the current running value is exactly the list's size minus one. The operator takeUntil executes the predicate after the value itself has been emitted downstream. In the map then, we simply call List.get() with a cast-down of the timer's value and now we get an item from the list.

Collaborator

akarnokd commented Apr 15, 2016

16) Emit elements of a list periodically

The source operator interval let's you create a periodic sequence of ever increasing Long values. Sometimes, we don't care about the values themselves and just want to act at the right time.

Then comes a requirement of emitting items from a list periodically, that is, with some fixed delay between elements. We can first map the Long values into the elements of the list, but we should also make sure we don't run out of indexes and get IndexOutOfBoundsException:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

interval(1, TimeUnit.SECONDS)
.takeUntil(t -> t == list.size() - 1)
.map(t -> list.get((int)t))
.subscribe(System.out::println)

Given the interval, we take its elements until the current running value is exactly the list's size minus one. The operator takeUntil executes the predicate after the value itself has been emitted downstream. In the map then, we simply call List.get() with a cast-down of the timer's value and now we get an item from the list.

@smaldini

This comment has been minimized.

Show comment
Hide comment
@smaldini

smaldini Apr 15, 2016

Member

17) Add a timed gap between elements

ConcatMap has the interesting property of preserving FIFO sequence. You can use that advantage to shift sequence items by a specific time or any arbitrary Publisher like interval. Since concurrent execution of the mapped Timer or Delay is limited to one, shift will be equally applied to all elements:

range(1, 1_000)
.concatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println) 
// prints 1, ..... 2, ...... 3 ..... N  with ...... =1 second

==

interval(1000)
.map( i )
.consume(System.out::println)

You can also choose to relatively shift forward using FlatMap which will let many concurrent shift run in parallel, moving all the sequence forward :

range(1, 1_000)
.flatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println) 
// prints .... 1,  2,  3 , [..] C ..... N  with ...... = 1 second and C = max concurrency 
// (shifting groups of items)
Member

smaldini commented Apr 15, 2016

17) Add a timed gap between elements

ConcatMap has the interesting property of preserving FIFO sequence. You can use that advantage to shift sequence items by a specific time or any arbitrary Publisher like interval. Since concurrent execution of the mapped Timer or Delay is limited to one, shift will be equally applied to all elements:

range(1, 1_000)
.concatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println) 
// prints 1, ..... 2, ...... 3 ..... N  with ...... =1 second

==

interval(1000)
.map( i )
.consume(System.out::println)

You can also choose to relatively shift forward using FlatMap which will let many concurrent shift run in parallel, moving all the sequence forward :

range(1, 1_000)
.flatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println) 
// prints .... 1,  2,  3 , [..] C ..... N  with ...... = 1 second and C = max concurrency 
// (shifting groups of items)
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 15, 2016

Collaborator

18) Inner-join with flatMap

In SQL, we are used to write inner joins; the ones that pit one table against another table and forms all sorts of pairs. We can do such joins with flatMap:

Publisher<Integer> main = range(1, 5);
Publisher<String> slave = fromArray("a", "bb", "ccc");

main.flatMap(len -> slave.filter(s -> s.length() == len));

This will, for each main element stream through the slave Publisher and filtering out those elements of slave which have the current length from the main.

Some libraries offer overload that takes a two argument function which will receive these pairs and should produce some values out of them:

main.flatMap(v -> slave, (m, s) -> m == s.length());

Of course, one can return a Publisher from the second function and apply flatMap again to flatten out the resulting inner sequences, allowing us to "drop" irrelevant values from the final sequence:

main.flatMap(v -> slave, (m, s) -> m == s.length() ? just(s) : empty())
.flatMap(v -> v)
Collaborator

akarnokd commented Apr 15, 2016

18) Inner-join with flatMap

In SQL, we are used to write inner joins; the ones that pit one table against another table and forms all sorts of pairs. We can do such joins with flatMap:

Publisher<Integer> main = range(1, 5);
Publisher<String> slave = fromArray("a", "bb", "ccc");

main.flatMap(len -> slave.filter(s -> s.length() == len));

This will, for each main element stream through the slave Publisher and filtering out those elements of slave which have the current length from the main.

Some libraries offer overload that takes a two argument function which will receive these pairs and should produce some values out of them:

main.flatMap(v -> slave, (m, s) -> m == s.length());

Of course, one can return a Publisher from the second function and apply flatMap again to flatten out the resulting inner sequences, allowing us to "drop" irrelevant values from the final sequence:

main.flatMap(v -> slave, (m, s) -> m == s.length() ? just(s) : empty())
.flatMap(v -> v)
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 23, 2016

Collaborator

19) Eager concatenation

Sometimes, one would like to merge concurrently running sources and keep their order at the same time. The usual flatMap doesn't keep the order and the usual concatMap doesn't start the sources.

Many reactive libraries have an operator for this: concatMapEager (and concatEager) that gives the middle ground. In case your library doesn't support this operator, you can achieve a similar effect by concatenating pre-started sources via the help of an UnicastProcessor:

range(1, 10)
.map(v -> {
    Flux<T> o = getSource(v);
    UnicastProcessor<T> up = UnicastProcessor.create();
    o.subscribe(up);
    return up;
})
.concatMap(v -> v)
.subscribe(System.out::println, Throwable::printStackTrace);

Here, values 1..10 are mapped to a source which is then started by subscribing an UnicastProcessor to it. UnicastProcessor caches values (unbounded) until a subscriber, such as that of inside concat subscribes to it and dutifully replays/relays items.

In case you don't have an UnicastProcessor available, the are alternatives to it:

  1. By using ReplayProcessor instead, but note that it will retain all values until the whole sequence gets GC'd.
.map(v -> {
    Flux<T> o = getSource(v);
    ReplayProcessor<T> rp = ReplayProcessor.create();
    o.subscribe(rp);
    return rp;
})
  1. By using replay() and connect()
.map(v -> {
    Flux<T> f = getSource(v);
    ConnectableFlux<T> cf = f.replay();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    cf.connect();
    return cf;
})

That extra subscribe is necessary because replay may not start actually running its source unless there are Subscriber(s) waiting for the items already.

  1. By using replay() and autoConnect() or cache()
.map(v -> {
    Flux<T> f = getSource(v);
    Flux<T> cf = f.cache();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    return cf;
})

By default, cancellation won't cancel the prestarted sources. You have to manually wire up the end Subscriber with all participants:

List<Cancellation> cancellations = ...

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    UnicastProcessor<T> up = UnicastProcessor.create();
    cancellations.add(f.subscribe(up));
    return us;
})

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    ConnectableFlux<T> cf = f.replay();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    cancellations.add(cf.connect());
    return cf;
})

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    Flux<T> cf = f.replay().autoConnect(1, cancellations::add);
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    return cf;
})
Collaborator

akarnokd commented Apr 23, 2016

19) Eager concatenation

Sometimes, one would like to merge concurrently running sources and keep their order at the same time. The usual flatMap doesn't keep the order and the usual concatMap doesn't start the sources.

Many reactive libraries have an operator for this: concatMapEager (and concatEager) that gives the middle ground. In case your library doesn't support this operator, you can achieve a similar effect by concatenating pre-started sources via the help of an UnicastProcessor:

range(1, 10)
.map(v -> {
    Flux<T> o = getSource(v);
    UnicastProcessor<T> up = UnicastProcessor.create();
    o.subscribe(up);
    return up;
})
.concatMap(v -> v)
.subscribe(System.out::println, Throwable::printStackTrace);

Here, values 1..10 are mapped to a source which is then started by subscribing an UnicastProcessor to it. UnicastProcessor caches values (unbounded) until a subscriber, such as that of inside concat subscribes to it and dutifully replays/relays items.

In case you don't have an UnicastProcessor available, the are alternatives to it:

  1. By using ReplayProcessor instead, but note that it will retain all values until the whole sequence gets GC'd.
.map(v -> {
    Flux<T> o = getSource(v);
    ReplayProcessor<T> rp = ReplayProcessor.create();
    o.subscribe(rp);
    return rp;
})
  1. By using replay() and connect()
.map(v -> {
    Flux<T> f = getSource(v);
    ConnectableFlux<T> cf = f.replay();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    cf.connect();
    return cf;
})

That extra subscribe is necessary because replay may not start actually running its source unless there are Subscriber(s) waiting for the items already.

  1. By using replay() and autoConnect() or cache()
.map(v -> {
    Flux<T> f = getSource(v);
    Flux<T> cf = f.cache();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    return cf;
})

By default, cancellation won't cancel the prestarted sources. You have to manually wire up the end Subscriber with all participants:

List<Cancellation> cancellations = ...

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    UnicastProcessor<T> up = UnicastProcessor.create();
    cancellations.add(f.subscribe(up));
    return us;
})

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    ConnectableFlux<T> cf = f.replay();
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    cancellations.add(cf.connect());
    return cf;
})

// ...
.map(v -> {
    Flux<T> f = getSource(v);
    Flux<T> cf = f.replay().autoConnect(1, cancellations::add);
    cf.subscribe(e -> { }, Throwable::printStackTrace);
    return cf;
})
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Apr 24, 2016

Collaborator

20) defer via using

The operator defer let's you generate a source Observable/Publisher for each Subscriber whereas using let's you generate a resource per Subscriber, then use it to gereate a Observable/Publisher.

Therefore, you can imitate defer via a using setup where you create and ignore a resource and just create the source:

Supplier<Publisher<Integer>> s = () -> range(1, 10);

defer(s) => using(() -> "whatever", v -> s, v -> { });

However, you could also create the source itself as the resource and use identity-mapping on it:

using(s, v -> v, v -> { }) 

using(s::get, v -> v, v -> { })

The first shorter case is applicable if your library's using takes a Supplier and the second if it takes some () -> T generator function.

The reverse direction, namely expressing using as defer is a bit more involved:

Supplier<R> resource = ...
Function<R, Publisher<T>> source ==
Consumer<R> disposer;

defer(() -> 
    fromCallable(resource)
    .flatMap(r -> {
        try { 
            return source.apply(r)
            .doOnTerminate(() -> disposer.accept(r))
            .doOnUnsubscribe(() -> disposer.accept(r));
        } catch (Throwable ex) {
            disposer.accept(r);
            return error(ex);
        }
     })
);

In RxJava, a terminal event is followed by an unsubscribe call from most end-consumers (due to SafeSubscriber). In case the resource or the disposer is not idempotent, we have to make sure the dispose happens exactly once:

defer(() -> 
    fromCallable(resource)
    .flatMap(r -> {
        AtomicBoolean once = new AtomicBoolean();
        try { 
            return source.apply(r)
            .doOnTerminate(() -> disposeOnce(once, disposer, r))
            .doOnUnsubscribe(() -> disposeOnce(once, disposer, r));
        } catch (Throwable ex) {
            disposeOnce(once, disposer, r);
            return error(ex);
        }
     })
);

<R> void disposeOnce(AtomicBoolean once, Consumer<R> disposer, R resource) {
    if (once.compareAndSet(false, true)) {
        disposer.accept(resource);
    }
}
Collaborator

akarnokd commented Apr 24, 2016

20) defer via using

The operator defer let's you generate a source Observable/Publisher for each Subscriber whereas using let's you generate a resource per Subscriber, then use it to gereate a Observable/Publisher.

Therefore, you can imitate defer via a using setup where you create and ignore a resource and just create the source:

Supplier<Publisher<Integer>> s = () -> range(1, 10);

defer(s) => using(() -> "whatever", v -> s, v -> { });

However, you could also create the source itself as the resource and use identity-mapping on it:

using(s, v -> v, v -> { }) 

using(s::get, v -> v, v -> { })

The first shorter case is applicable if your library's using takes a Supplier and the second if it takes some () -> T generator function.

The reverse direction, namely expressing using as defer is a bit more involved:

Supplier<R> resource = ...
Function<R, Publisher<T>> source ==
Consumer<R> disposer;

defer(() -> 
    fromCallable(resource)
    .flatMap(r -> {
        try { 
            return source.apply(r)
            .doOnTerminate(() -> disposer.accept(r))
            .doOnUnsubscribe(() -> disposer.accept(r));
        } catch (Throwable ex) {
            disposer.accept(r);
            return error(ex);
        }
     })
);

In RxJava, a terminal event is followed by an unsubscribe call from most end-consumers (due to SafeSubscriber). In case the resource or the disposer is not idempotent, we have to make sure the dispose happens exactly once:

defer(() -> 
    fromCallable(resource)
    .flatMap(r -> {
        AtomicBoolean once = new AtomicBoolean();
        try { 
            return source.apply(r)
            .doOnTerminate(() -> disposeOnce(once, disposer, r))
            .doOnUnsubscribe(() -> disposeOnce(once, disposer, r));
        } catch (Throwable ex) {
            disposeOnce(once, disposer, r);
            return error(ex);
        }
     })
);

<R> void disposeOnce(AtomicBoolean once, Consumer<R> disposer, R resource) {
    if (once.compareAndSet(false, true)) {
        disposer.accept(resource);
    }
}
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd May 4, 2016

Collaborator

21) Caching and clearing

If we want to execute some code once and then hand out the generated values, such as login tokens or results of a network call, we usually can go for cache(), replay(), AsyncProcessor, etc.

However, sometimes the data gets outdated and there is no way of clearing the structures above. But we can restart the whole process and make sure new subscribers get the fresh data if we cache the cache itself and use defer to get the current caching source:

final AtomicReference<Mono<Long>> cache = new AtomicReference<>(getSource());

public Mono<Long> getSource() {
    return Mono.fromCallable(System::currentTimeMillis).cache();
}

public Mono<Long> get() {
    return Mono.defer(() -> cache.get());
}

public void reset() {
    cache.set(getSource());
}
Collaborator

akarnokd commented May 4, 2016

21) Caching and clearing

If we want to execute some code once and then hand out the generated values, such as login tokens or results of a network call, we usually can go for cache(), replay(), AsyncProcessor, etc.

However, sometimes the data gets outdated and there is no way of clearing the structures above. But we can restart the whole process and make sure new subscribers get the fresh data if we cache the cache itself and use defer to get the current caching source:

final AtomicReference<Mono<Long>> cache = new AtomicReference<>(getSource());

public Mono<Long> getSource() {
    return Mono.fromCallable(System::currentTimeMillis).cache();
}

public Mono<Long> get() {
    return Mono.defer(() -> cache.get());
}

public void reset() {
    cache.set(getSource());
}
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd May 27, 2016

Collaborator

22) Compute a single value only when requested

By design, fromCallable executes the Callable immediately and doesn't wait for a downstream request to appear.

In case you want to compute only when requested, you can use the same jump-start trick with just().map() similar to Gem 10).

just("irrelevant")
.map(unused -> {
    try {
        return callable.call();
    } catch (Exception ex) {
        throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
    }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Since subscribe() requests immediately, the difference from fromCallable is not obvious at first. To see the difference, we need to manually request after the subscribe() has returned. One way for demonstrating this is via TestSubscriber:

TestSubscriber<T> ts = new TestSubscriber<>(0L);

just("irrelevant")
.map(unused -> {
    System.out.println("callable.call()");
    try {
        return callable.call();
    } catch (Exception ex) {
        throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
    }
})
.subscribe(ts);

System.out.println("subscribe()");
ts.requestMore(1); // or ts.request(1); in Rsc

Now the console will first print subscribe() followed by callable.call().

Collaborator

akarnokd commented May 27, 2016

22) Compute a single value only when requested

By design, fromCallable executes the Callable immediately and doesn't wait for a downstream request to appear.

In case you want to compute only when requested, you can use the same jump-start trick with just().map() similar to Gem 10).

just("irrelevant")
.map(unused -> {
    try {
        return callable.call();
    } catch (Exception ex) {
        throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
    }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Since subscribe() requests immediately, the difference from fromCallable is not obvious at first. To see the difference, we need to manually request after the subscribe() has returned. One way for demonstrating this is via TestSubscriber:

TestSubscriber<T> ts = new TestSubscriber<>(0L);

just("irrelevant")
.map(unused -> {
    System.out.println("callable.call()");
    try {
        return callable.call();
    } catch (Exception ex) {
        throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
    }
})
.subscribe(ts);

System.out.println("subscribe()");
ts.requestMore(1); // or ts.request(1); in Rsc

Now the console will first print subscribe() followed by callable.call().

@akarnokd akarnokd added discussion and removed discussion labels May 27, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment