Skip to content
/ RXJS Public

This repo consists of various RxJS Operators, Observables and Subjects.

Notifications You must be signed in to change notification settings

iswarya15/RXJS

Repository files navigation

RXJS

This project was generated with Angular CLI version 13.3.0.

Rx stands for Reactive Programming which refers to Programming with asynchronous data streams.

What is a data stream?

A stream is a data which arrives over a period of time. The stream of data can be anything like variables, user inputs, cache, data structures, etc

At any point, data stream may emit any of the following three events:

  • Value : The next value in the stream.
  • Error : The stream has ended.
  • Complete : The error has stopped the stream or data stream is over.

Reactive Programming

The Reactive Programming is all about creating streams, emitting values, error or complete notifications, manipulate and transform data.

Rxjs

The Rxjs (Reactive Extensions Library for Javascript) is a Javascript library, that allows us to work with Asynchronous data streams.

Rxjs in Angular

Angular uses RXJS library heavily in its framework to implement Reactive Programming. For example,

  • Reacting to an HTTP Request in Angular, i.e. by subscribing.
  • Value Changes / Status Changes in Reactive forms.
  • Custom events that send Observable output data from child component to a parent component

Observables and Observers

Observables: is a function that emits single or multiple values over time either synchronously or asynchronously.

Observers: Observables on its own is useless unless someone consumes the value emitted by the Observable. We call them Observers or Subscribers.

Observable Creation

  • Simplest way to create Observable is using the Observable constructor. The Observable constructor takes an argument for its callback function (subscriber - argument).This callback function will run when this Observable's subscribe method executes.

  • next() : The observable invokes the next() callback whenever a value arrives in the stream. It passes the value as an argument to this callback.

  • error() : Sends JS Error / Exception as argument. No further value is emitted. Stream stops.

  • complete() : Observable invokes this when stream completes. After emitting the complete() notification, no value is emitted to the subscriber after that.

Syntax : let obs = new Observable(subscriber => { console.log('Start emitting'); subscriber.next('Hi') });

There are easier ways to create Observables using Rxjs Operators.

Subscribing to Observables

The Observers communicate with the Observable using callbacks. While subscribing to the Observable, it passes three optional callbacks. We can pass these callbacks within an Object as an argument for subscribe() method. If we are expecting only the value emitted by the Observable, then it can be passed without the Object syntax.

Check app.component.ts for implementation of subscribing to Observables.

Observable Operators

The Operators are functions that operate on the Observables and return a new Observable. We can manipulate incoming observable, filter it, merge it with another Observable or subscribe to another Observable.

We can also chain each operator one after the other using the pipe. Each operator in the chain gets the Observable from the previous operator. It modifies it and creates new Observable, which becomes the input for next Operator.

The following table lists some of the commonly used Operators.

Operation Operators
Combination combineLatest, concat, merge, startWith , withLatestFrom, zip
Filtering debounceTime, distinctUntilChanged, filter, take, takeUntil, takeWhile, takeLast, first, last, single, skip, skipUntil, skipWhile, skipLast
Transformation bufferTime, concatMap, map, mergeMap, scan, switchMap, ExhaustMap, reduce
Utility tap, delay, delaywhen
Error Handling throwerror, catcherror, retry, retrywhen
Multicasting share

Creation Operators

  • Observable.create() -> Calls the Observable Constructor behind the scenes. Create is a method of the Observable object, hence don't have to import it. This method is deprecated. Use constructor instead.

  • of creates an Observable from the arguments we pass into it. We can pass any number of arguments to the Of. Each argument is emitted one after the other. It sends the complete signal in the end.

  • from operates creates takes only one argument that can be iterated and converted into an Observable. Sends complete signal in the end.

Example Array: from([a,b,c]) => a->b->c->complete Example String: from('Hello') => 'h'->'e'->'l'->'l'->'o' -> complete.

  • Observables from collections : Anything that can be iterated can be converted into an Observable using from operator.

Observables from Event

  • FromEvent method allows us to create an Observable from DOM events directly.

  • Arguments : EventTargetElement: First, EventName: Second

Syntax :

fromEvent(this.button.nativeElement, 'click').subscribe({next: () => {}, complete: () => {}})

How it works?

When we subscribe to an observable, which we created using the fromEvent method, it registers the event handler using the addEventListener in the DOM element

Pipe method

Pipe method of Angular Observable is used to chain multiple operators together. Rxjs Operators are functions that take Observables as Input and transform it into a new Observable and return it.

Each argument of the pipe method must be separated by a comma. The order of operators is important because when a user subscribes to an Observable, the pipe executes in the order in which they are added.

There are 2 ways, we can use the pipe. One as an instance of Observable and the other way is to use it as a standalone method.

Pipe as an instance method

We chain the operators op1, op2 etc that are passed as argument to the pipe method. The output of op1 becomes the Input of op2.

obs.pipe(
  op1(),
  op2(),
  op3(),
)

Note: If we are emitting multiple values through operators in the pipe chain, each observable would go through the entire chain and will be delivered to the subscriber, only then the next one will be streamed.

Refer pipeOperatorsUsingFilterMap() method.

Pipe as standalone method

We can also use pipe as a standalone function and re-use the pipe at other places. We need to import pipe from rxjs. Check reusablePipe method for custom pipe creation.

tap Operator

tap : The tap operator returns a new Observable which is the mirror copy of the source observable. Mostly used for debugging purpose. It does not modify the stream in any way.

Example: Logging the values of Observables. Refer tapObservables() method

map Operator

map : can be used with HTTP Request, with DOM Events, filtering the input data etc..

  • Arguments map(value : emitted by the observable ,index: 0 for the first value emitted and incremented by one for every subsequent value) optional.

Note : keyValue pipe from @angular/common can transform an Object to Array of key-value pairs

const obj = {person1: 'jon',person2: 'hopper',person3: 'mona'}
const transformObj = this.keyValuePipe.transform(obj);

Result : [ { "key": "person1", "value": "jon" }, { "key": "person2", "value": "hopper" }, { "key": "person3", "value": "mona" } ]

  • We can also use multiple maps within same pipe.

Filter operator

-Most widely used operator which can filter items emitted based on a condition.

Transformational Operator

SwitchMap Operator

SwitchMap operator maps each value from the source observable to an inner observable. The source observable subscribes to the inner observable and emits value from it.

SwitchMap function must return an Observable

Map vs SwitchMap

map emits values as Observables, switchMap subscribes to an Inner Observable and emits values from it.

someStream$.pipe(
    switchMap(args => makeApiCall(args)), // must return a stream
    map(response => process(response)) // returns a value of any shape, usually an object or a primitive
).subscribe(doSomethingWithResults);

Example use case: This works perfectly for scenarios like form Input/search Input where you are no longer concerned with the response of the previous request when a new input arrives.

The main difference between switchMap and other flattening operators is the cancelling effect. On each emission the previous inner observable (the result of the function you supplied) is cancelled and the new observable is subscribed. You can remember this by the phrase switch to a new observable.

MergeMap Operator

This operator is best used when you wish to flatten an inner observable but want to manually control the number of inner subscriptions.

In contrast to SwitchMap, mergeMap allows for multiple inner subscriptions to be active at a time. If the order of emission and subscription of inner observables is important, try concatMap. SwitchMap never cancels inner Observable.

Memory Leaks : Using mergeMap operator can often lead to memory leaks since it allows multiple inner subscriptions, so make sure to use Operators like take, takeUntil

Filtering Operators

take Operator

Why use take?

  • When you are interested in only the first emission, you want to use take. Maybe you want to see what the user first clicked on when they entered the page, or you would want to subscribe to the click event and just take the first emission.
  • Another use-case is when you need to take a snapshot of data at a particular point in time but do not require further emissions. For example, a stream of user token updates, or a route guard based on a stream in an Angular application.

    💡 If you want to take a number of values based on some logic, or another observable, you can use takeUntil or takeWhile! 💡 take is the opposite of skip where take will take the first n number of emissions while skip will skip the first n number of emissions.

 obs.pipe(take(2)).subscribe()

TakeUntil Operator

The takeUntil operator returns an Observable that emits value from the source Observable until the notifier Observable emits a value.

TakeUntil(notifier: Observable): Observable

We must pass a notifier observable as the argument to the TakeUntil Operator.

  • TakeUntil emits the values from the Source Observable as long as it does not receive any value from the notifier observable.

  • When the notifier emits a value, the TakeUntil completes the Source observable.

Check sample code in transform.component.ts

TakeWhile vs Filter

TakeWhile operator will keep emitting the value from the source observable until they pass the given condition (predicate). When it receives a value that does not satisfy the condition it completes the observable.

The difference is that takeWhile discards the rest of the stream, when it receives the first value that does not satisfy the condition. The filter operator never stops the observable.

TakeLast

TakeLast operator emits the last n number of values from the source observable.

First / Last

first/last operator emits the first/last matching value if the condition is present.If there is no condition present, it emits th first/last value it receives.

Error notification is sent if no value is emitted from source.

Skip Operators

The skip operators skips the values from the source observable based on a condition. The Skip, SkipUntil, SkipWhile skips the values from the start of the source. The SkipLast Operator skips elements from the end of the source.

Filter vs SkipWhile

  • Filter emits the value if the predicate(condn) is true
  • SkipWhile skips the value if the predicate(condn) is true

Subjects

Subjects are special Observable which acts as both Observer and Observable. They allow us to emit new values to the Observable stream using the next method.

  • All the subscribers, who subscribe to the subject will receive the same instance of the subject & hence the same values.
  • A Subject is a special type of Observable which allows values to be multi-casted to many observers.

How does Subjects work?

Subject implements both subscribe method and next, error and complete.

Creating Subject

subject$ = new Subject();

Subscribing & Emitting: Subject

ngOnInit() {
  this.subject$.subscribe(val => console.log(value))

  this.subject$.next(1);
  this.subject$.next(2);
  this.subject$.complete();
}

Subject - Hot Observable

Observables are classified into two groups.

  • Cold Observable
  • Hot Observable

Cold Observable

The cold observable does not activate the producer until there is a subscriber. The producer emits the value only when a subscriber subscribes to it.

Hot Observable

The Hot observable does not wait for a subscriber to emit the data. It can start emitting the values right away.

subject$ = new Subject();

ngOnInit() {
  subject$.next(1);
  subject$.next(2);
  subject$.complete();
}

In the above example, since there were no subscribers, no one receives the data but that did not stop the subject from emitting data.

Now consider the following example. Here the subjects that emits the values 1 & 2 are lost because subscription happens after they emit values.

ngOnInit() {
  subject$.next(1);
  subject$.next(2);

  subject$.subscribe(val => console.log(val));

  subject$.next(3);
  subject$.next(4);
  subject$.complete();
}

Every Subject is an Observer

Observer needs to implement next, error, complete callback (all optional) to become an Observer.

let obs$ = new Observable(observer => {
  observer.next(1);
  observer.error('error');
})

this.subject$.subscribe(val => {
      console.log(val);
});

obs$.subscribe(subject$);

Since the subject$ implements next method, it receives the value from observable and emits them to subscribers. So we can subscribe to observable and use subject$ as observer.

Subjects are MultiCast

Another important distinction between observable and subject is that Subjects are multi cast.

  • More than one subscriber can subscribe to a Subject. They will share the same instance of the observable. All subscribers will receive the same event when the Subject emits it.
  • Multiple observers of an observable will receive a separate instance of the observable.

MultiCast vs UniCast

Check uniCastVsMultiCast method in subject.component.ts.

Subject maintains a list of Subscribers

Whenever subscriber subscribes to a Subject, it will add it to an array of Subscribers. This way Subject keeps track of all the subscribers and emits the event to all of them.

Types of Subject

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

BehaviorSubject

BehaviorSubject requires an initial value and stores the current value and emits it to the new subscribers.

subject$ = new BehaviorSubject(0);

subject$.subscribe(val => console.log(val)); //0

subject$.next(1);

BehaviorSubject will always remembers the last emitted value ans shares it with new subscribers.

ReplaySubject

ReplaySubject replays old values to new Subscribers when they first subscribe.

  • The ReplaySubject will store every value it emits in a buffer. We can configure the buffer arguments using the bufferSize and windowTime.

bufferSize : No. of items that ReplaySubject will keep in its buffer. It defaults to infinity.

windowTime : The amount of time to keep the value in the buffer.

Even when subscription happens after the values are emitted, ReplaySubject stores the values in a buffer.

AsyncSubject

AsyncSubject only emits the latest value when it completes. If it errors out, then it will emit an error, but will not emit anymore values.

Check asyncSubjectDemo method in subject.component.ts

Scan & Reduce

The Scan & Reduce Operators in Angular applies an accumulator function on the values of the source observable. The Scan Operator returns all intermediate results of the accumulation, while Reduce only emits the last result. Both also use an optional seed value as the initial value.

DebounceTime & Debounce

Both emit values from the source observable, only after a certain amount of time has elapsed since the last value. Both emit only the latest value and discard any intermediate values.

UseCase of DebounceTime & Debounce

The typeahead/autocomplete fields are one of the most common use cases for Debounce Operators.

  • As the user types in the typeahead field, we need to listen to it and send an HTTP request to the back end to get a list of possible values. If we send HTTP requests for every keystroke, we end up making numerous calls to the server.

  • By using the Debounce Operators, we wait until the user pauses typing before sending an HTTP Request. This will eliminates unnecessary HTTP requests.