# RxJava
## Intro - 
- RxJava is a specific implementation of reactive programming for Java and Android that is influenced by functional programming.
- Favors functional composition, avoidance of global state and side-effects and using streams to program asynchronously and event based
- **Reactive programming** is related to reacting to changes and can be done imperatively or functionally like callbacks
- RxJava is not functional reactive programming
- All programs ultimately end up being reactive when it hits harware
- Recommended books [Java Concurrency in Practice](https://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601), [Concurrent Programming](https://www.amazon.com/Concurrent-Programming-Java%C2%99-Principles-Pattern/dp/0201310090) and **Mechanical sympathy
- Reactive programming is needed when - 
 - Responding to user events
 - Responding to IO events
 - Handling events or data coming from a producer one does not have control over
 
## How it works - 
### Push vs Pull - 
- Being reactive means supporting push so that `Observable` and `Observer` support the events being pushed at it
- The `Observable` and `Observer` are connected via subscription
- The `Observable` represents a stream of data and can be subscribed to by the `Observer`    
- Once `Observer` subscribes, three types of events can be pushed to it
 - Data via `onNext()` function
 - Error view `onError`
 - Stream completion via `onComplete()`    


`interface Observer<T> { 
    void onNext(T t);
    void onError(Throwable t); 
    void onComplete();
}`

- The `onNext()` method can be called as many time as required
- `onError()` and `onComplete()` are called on termination
- The `Observable` stream is finished with a terminal call and no further events can be sent over it

- To permit interactive pull there is another signature  

`interface Producer {
    void request(long n);
}`

- This is used by a more advanced `Observer` called `Subscribe`  

`interface Subscriber<T> implements Observer<T>, Subscription { 
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
    void unsubscribe();
    void setProducer(Producer p);
}`

- `unsubscribe()` is to allow a subscriber to unsubscribe when required
- `setProducer` forms a bidirectional communication channel between producer and the consumer
### Async vs Sync - 
- RxJava never adds concurrency unless asked to
- A synchronous `Observable` would be subscribed to, emit data on the subscriber thread and completes

In [1]:
// Adding the jars required to obtain the rxJava dependencies
%classpath add jar ./rxjava-2.2.0-SNAPSHOT.jar
%classpath add jar ./reactive-streams-1.0.2.jar

In [15]:
import io.reactivex.*;

Observable.create(s -> {
    s.onNext("Hello world!");
    s.onComplete();
}).subscribe(hello -> System.out.println(hello));

Hello world!


null

- This is a completely synchronous example of `Observable.create()`. The I/O is blocking the asynchronicity of this `Observable`. The I/O operations should be done in a separate thread.
- Here the asynchronous scheduling is actually making the whole 'Hello World' program slower.
- The RxJava `Observable` is async vs sync agnostic so as to let it be incumbent upon the impementation to determine the source of concurrency and can be decided whether it will be useful.
- There are also some applications where synchronous behaviour is better than asynchronous behaviour. For example - 
 - In-memory data access is very fast and should not be bogged down by adding the scheduling cost to it. The data fetch from in memory can be done synchronously. If disk access then that can be done concurrently.
 - Another case is stream composition and transformation via operators like `map()`, `filter()` etc

In [16]:
import io.reactivex.Observable;

// If map was asynchronous, there would be separate threads for string concatenation and that would be wasteful and the behaviour would become very non-deterministic
Observable<Integer> o = Observable.create(s -> {
    s.onNext(1);
    s.onNext(2);
    s.onNext(3);
});
o.map(i -> "Number " + i)
    .subscribe(s -> System.out.println(s));

Number 1
Number 2
Number 3


null

### Concurrency and Parellelism - 
- Parallelism is simultaneous execution of tasks on differenct CPUs or machines.
- Concurrency is composition of interleaving of multiple tasks. Multiple thread execution on a single CPU is excuting concurrently but not parallely
- Parallel execution is always concurrent. Parallelism is a specific form of concurrency. 
- The RxJava streams like `onNext()`, `onComplete()` etc are never concurrent but are always serialized and thread safe
- The emission of events can happen on different threads but the emission must happen serially.

`// DO NOT DO THIS
Observable.create(s -> {
// Thread A
new Thread(() -> {
s.onNext("one");
s.onNext("two");
}).start();
// Thread B
new Thread(() -> {
s.onNext("three");
s.onNext("four");
}).start();
// ignoring need to emit s.onCompleted() due to race of threads
});
// DO NOT DO THIS`

- `onNext()` is not allowed to be invoked concurrently because - 
 - `onNext()` is supposed to be used by the programmer. If it is invoked concurrently then every invocation needs to handle the concurrent case
 - Some operations like `reduce()` and `scan()` are not possible concurrently
 - Performance overhead because all observers and operators have to be thread safe
 - Fine grained parallelism is undesirable and slower. It is much faster to execute serially on a single CPU to gain advantage of the CPU optimizations already in place.

### Lazy vs Eager - 
- The `Observer` type is lazy and does nothing until it is subscribed to as opposed to `Future` which is eager and starts executing as soon as it is created.
- This also means that the observable instances can be reused

## Duality - 
- `Observable` is the dual of `Iterable`. That means all iterable properties are there using observable just with reverse flow of data. It is push instead of pull.
- So the data instead of being pulled out using `next()` is pushed by the producer using `onNext()`
- This just means that the same programming model can be applied to both

## Cardinality - 
- `Observable` supports asynchronously pushing multiple values.

## Reactive Extensions - 
### rx.Observable - 
- Represents a flowing sequence of values
- It is a stream of events because the events appear over a wide time range
- `Observable` is inherently push based and decides when it is going to push values to it's subscriber by itself
- This is similar to the publish-subscribe pattern
- `Observable<T>` can produce three types of events - 
 - Values of type T
 - Completion event
 - Error event
- The Subscriber structure is expressed as - `onNext * (onComplete | onError)?`

### Subscribing to notifications from observables - 
- The `Observable` does not emit anything until it is subscribed to
- Use `subscribe()` family of methods to subscribe to this observable
- Events can be emitted from multiple threads but the callback will always be invoked on the same thread
- The second argument is the callback invoked when an error is thrown and it is guaranteed that no further events will be produced
- The third argument is the callback invoked when the stream is finally completed

`tweets.subscribe( 
    (Tweet tweet) -> { System.out.println(tweet); }, 
    (Throwable t) -> { t.printStackTrace(); }, 
    () -> {this.noMore();} 
);`

### Observer - 
- This is a container of all the three arguments of subscribe
- This is the core abstraction of RxJava. Subscriber can be used for even greater control

### Subscription and Subscriber - 
- `Subscription subscription = tweets.subscribe(System.out::println); subscription.unsubscribe();`
- `Subscription` is the handle that lets the client control the subscription actions
- The unsubscription can also be performed conditionally by putting a condition with the onNext() statement

In [4]:
import io.reactivex.*;

// If map was asynchronous, there would be separate threads for string concatenation and that would be wasteful and the behaviour would become very non-deterministic
Observable<Integer> o = Observable.create(s -> {
    s.onNext(1);
    s.onNext(2);
    s.onNext(3);
});
Subscription subscription = o.subscribe(s -> System.out.println(s));

cannot find symbol: cannot find symbol

In [8]:
import io.reactivex.Observable;

// If map was asynchronous, there would be separate threads for string concatenation and that would be wasteful and the behaviour would become very non-deterministic
Observable<Integer> o = Observable.create(s -> {
    s.onNext(1);
    s.onNext(2);
    s.onNext(3);
});
o.map(i -> "Number " + i)
    .subscribe(s -> System.out.println(s));

Number 1
Number 2
Number 3


null

### Observable creation - 
- `Observable.just()` - Emits the same value for every subscription. Can take upto nine values for emmission
- `Observable.from(values)` - Accepts iterators as well and can directly take a number of values to be emmitted
- `Observable.range(from, n)` - Produces `n` integers from `from`
- There are others as well but better to directly look up documentation for that.
- `Observable.create()` - The most versatile and the underlying one beneath all the other factory methods
- All this creation and execution is done on the main thread always. RxJava does not choose another thread for the process
- The subscription handler inside `create()` is invoked separately for each of the `subscribe()` invocations so all the different subscribers see different invocations being made
- If multiple subscribers need to share the same invocation then they must use `cache()`
- `cache()` is an operator. An operator wraps existing observables enhancing them
- cache basically stands at the subscription. When a subscriber comes in it hands over the subscription to the underlying observable but keeps a cache of the values being pushed. On any subsequent subscription the values are pushed directly from the cache

## Infinite streams - 
- Infinite streams are enabled by the ability to produce and consume events on the fly

In [17]:
import io.reactivex.Observable;
import java.util.*;
import java.math.BigInteger;
import static java.math.BigInteger.*;

Observable<BigInteger> naturalNumbers = Observable.create(
    subscriber -> {
        Runnable r = () -> {
        BigInteger i = BigInteger.ZERO;
        while (!subscriber.isDisposed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

null

- Infinite streams are complicated and full of pitfalls. Be careful
- Explicit threads should not be used inside `create()` as it breaks the contract of serial execution of onNext()
- To propogate errors down to all the subscribers wrap the code inside a `try-catch` and call `subscriber.onError(e)` explicitly
### `timer` and `interval` - 
- timer basically emits for a given amount of time and then exits. It can act as an asynchronous sleep

In [3]:
import io.reactivex.Observable;
import java.util.*;

Observable
    .timer(1, TimeUnit.SECONDS)
    .subscribe((Long zero) -> log(zero));

cannot find symbol: cannot find symbol

- `interval()` is to generate a sequence of long numbers starting with zero. The generation is delayed by a fixed value
### Hot and Cold observable - 
- Cold observable is completely lazy and is only activated only when someone subscribes to it. It's just a static data structure and there is no caching involved
- Hot observable is when the observable keeps on emitting events even if noone is listening to it. It's when we don't have any control over the source of data.
- Cold observables always ensure that the subscriber is receiving the complete set of events while the hot observable has to depend on operators like `cache()` to achieve the same.
- In order to implement the scenario of multiplexing a stream to multiple subscribers, we can either handle the subscription and unsubscription of observables manually by maintaining a list or through `Subject` or `ConnectableObservable`

### Subject - 
- It extends `Observer` and implements `Observable`.
- Basically it acts as a sink to the observables where they can publish their events to and a source for the observers that fetch their events by subscribing to a particular subject


In [None]:
class TwitterSubject {
    private final PublishSubject<Status> subject = PublishSubject.create();
    public TwitterSubject() {
        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                subject.onNext(status);
            }
            @Override
            public void onException(Exception ex) {
                subject.onError(ex);
            }
            //other callbacks
        });
        twitterStream.sample();
    }
    public Observable<Status> observe() {
        return subject;
    }
}

- `PublishSubject` eagerly starts listening for events and pushes them to all the subscribers 
- `ConnectableObservable`is a way of coordinating multiple subscribers and using the same underlying subscription
- It ensures the existance of at least one subscriber at all times
- It shields the Observable from the subscribers. No matter how many subscribers subscribe there will only be a single underlying subscription
### Single subscription with `publish.refCount()` - 
- It maintains a count of the subscribers available. 
- It only subscribes once when the count reaches one and then reuses the subscription after that.