-
Notifications
You must be signed in to change notification settings - Fork 2
/
round-robin.ts
38 lines (36 loc) · 2.21 KB
/
round-robin.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import {Observable} from 'rxjs';
import {filter, map, scan} from 'rxjs/operators';
import {mergeTrim} from './merge-trim';
/* tslint:disable:max-line-length */
export function roundRobin<O1>(o1: Observable<O1>): Observable<O1>;
export function roundRobin<O1, O2>(o1: Observable<O1>, o2: Observable<O2>): Observable<O1 | O2>;
export function roundRobin<O1, O2, O3>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>): Observable<O1 | O2 | O3>;
export function roundRobin<O1, O2, O3, O4>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>, o4: Observable<O4>): Observable<O1 | O2 | O3 | O4>;
export function roundRobin<O1, O2, O3, O4, O5>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>, o4: Observable<O4>, o5: Observable<O5>): Observable<O1 | O2 | O3 | O4 | O5>;
export function roundRobin<O1, O2, O3, O4, O5, O6>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>, o4: Observable<O4>, o5: Observable<O5>, o6: Observable<O6>): Observable<O1 | O2 | O3 | O4 | O5 | O6>;
export function roundRobin<T>(...observables: Observable<T>[]): Observable<T>;
/* tslint:enable:max-line-length */
/**
* Creates an output observable which emits values from each observable in a round robin sequence. Where the first observable must emit
* a value, before the next observable emits a value and starts over after all observables have emitted a value.
*/
export function roundRobin<T>(...observables: Observable<T>[]): Observable<T> {
const SKIP_VALUE: T = Object.freeze({}) as any as T;
const scanFunc = (acc: [number, T], next: [number, T]): [number, T] => {
const [accIndex] = acc;
const [nextIndex, nextValue] = next;
if (accIndex === nextIndex) {
const index = nextIndex === observables.length - 1 ? 0 : nextIndex + 1;
return [index, nextValue];
}
return [accIndex, SKIP_VALUE];
};
const ignoredValues = ([, value]: [number, T]) => value !== SKIP_VALUE;
const unwrapValue = ([, value]: [number, T]) => value;
const indexed$ = observables.map((o, indx) => o.pipe(map<T, [number, T]>(v => [indx, v])));
return mergeTrim(...indexed$).pipe(
scan(scanFunc, [0, SKIP_VALUE]),
filter(ignoredValues),
map(unwrapValue)
);
}