๐ Re-usable operators and utilities for rxjs
npm i @rxtk/utils
yarn add @rxtk/utils
Add a debugger to an RXJS pipeline.
import {from} from 'rxjs';
import {map} from 'rxjs/operators';
import {debug,toConsole} from '@rxtk/utils';
const input$ = from([2, 3]);
const output$ = input$.pipe(
toConsole('input'),
debug(), // inserts breakpoint here
map(n => n * n),
toConsole('squared'),
debug() // inserts a second breakpoint here
);
output$.subscribe();
// 2
// debugger will pause here!
// squared 4
// debugger will pause here!
// 3
// debugger will pause here!
// squared 9
// debugger will pause here!
Delays emissions from a source observable until another observable emits.
import {from,timer} from 'rxjs';
import {map} from 'rxjs/operators';
import {tap} from 'rxjs/operators';
import {delayUntil} from '@rxtk/utils';
// this code delay emitting items from the source observable until 5 seconds
// have passed
const string$ = from(['foo', 'bar']);
const start$ = timer(5000).pipe(tap(console.log));
const output$ = string$.pipe(delayUntil(start$));
output$.subscribe(console.log);
// Output:
// 5000
// foo
// bar
Gets the item from a single-item array and throws if the array is empty or has multiple items.
import {of} from 'rxjs';
import {listToSingleResult} from '@rxtk/utils';
// this code delay emitting items from the source observable until 5 seconds
// have passed
const input$ = of([{'foo': 'bar'}]);
const output$ = string$.pipe(listToSingleResult());
output$.subscribe(console.log);
// Output:
// {foo: bar}
Concatenates items into a string.
import {from,takeLast} from 'rxjs';
import {listToSingleResult} from '@rxtk/utils';
// this code delay emitting items from the source observable until 5 seconds
// have passed
const input$ = from(['always ', 'money ', 'in ', 'the ', 'banana ', 'stand']);
const output$ = string$.pipe(
reduceToString(),
takeLast(1)
);
output$.subscribe(console.log);
// Output:
// "always money in the banana stand"
Map JSON strings to JavaScript objects.
import {from} from 'rxjs';
import {toConsole, parseJSON} from '@rxtk/utils';
const input$ = from(['{"foo": "bar"}']);
const output$ = input$.pipe(
parseJSON(),
toConsole()
);
output$.subscribe();
// {foo: 'bar'}
Add log an RXJS pipeline's data to the console.
import {from} from 'rxjs';
import {toConsole} from '@rxtk/utils';
const input$ = from([1, 2, 3]);
const output$ = input$.pipe(
toConsole(),
map(n => n * n),
toConsole('squared')
);
output$.subscribe();
// 1
// squared 1
// 2
// squared 4
// 3
// squared 4
Map each item to a JSON string.
import {from} from 'rxjs';
import {toConsole, toJSON} from '@rxtk/utils';
const input$ = from([{foo: 'bar'1}]);
const output$ = input$.pipe(
toJSON(),
toConsole('squared')
);
output$.subscribe();
// '{"foo": "bar"}'
Add an index to data in the stream.
import {from} from 'rxjs';
import {withIndex} from '@rxtk/utils';
const input$ = from(['there\'s', 'no', 'place', 'like', 'home']);
const output$ = input$.pipe(
withIndex()
);
output$.subscribe(console.log);
// ["there's", 0]
// ["no", 1]
// ["place", 2]
// ["like", 3]
// ["home", 4]