/
execute-latest-on-idle.ts
executable file
·145 lines (124 loc) · 4.77 KB
/
execute-latest-on-idle.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import { merge, Observable, Operator, Subject, Subscriber, TeardownLogic, BehaviorSubject } from 'rxjs';
import { tap } from 'rxjs/operators';
export type ExecuteLatestOnIdleEvent<T, R = void> = StartedEvent<T> | SkippedEvent<T> | FinishedEvent<T, R> | ErrorEvent<T>;
export class StartedEvent<T> { constructor(public readonly input: T) {} }
export class SkippedEvent<T> { constructor(public readonly input: T) {} }
export class FinishedEvent<T, R> { constructor(public readonly input: T, public readonly output: R) {} }
export class ErrorEvent<T> { constructor(public readonly input: T, public readonly error: any) {} }
export type OperateFunction<T, R> = {(obs: Observable<T>): Observable<ExecuteLatestOnIdleEvent<T, R>>, readonly idle: Observable<boolean> };
/**
* Calls the given execute function with the latest value from the source observable as soon as a running call of executeFun finished.
*
* Example:
*
* - source emitted 1 -> emit StartedEvent 1 to subscriber, call executeFun with 1
*
* - executeFun finished 1 -> emit FinishedEvent 1 to subscriber
*
* - source emitted 2 -> emit StartedEvent 2 to subscriber, call executeFun with 2
*
* - source emitted 3 -> cache 3 because executeFun didn't finished for 2 yet
*
* - executeFun finished 2 -> emit FinishedEvent 2 to subscriber, emit StartedEvent 3 to subscriber, call executeFun with 3
*
* - source emitted 4 -> cache 4 because executeFun didn't finished for 3 yet
*
* - source emitted 5 -> emit SkippedEvent for 4 and cache 5 because executeFun didn't finished for 3 yet
*
* - executeFun finished 3 -> emit FinishedEvent 3 to subscriber, emit StartedEvent 5 to subscriber, call executeFun with 5
*
* - source completed -> executeFun didn't finished for 5 yet, delay completion
*
* - executeFun finished 5 -> emit FinishedEvent 6 to subscriber, complete subscriber
*
* Error Handling:
*
* - For errors caused by the execute function, an ErrorEvent will be emitted to the subscriber.
*
* - Errors of the source observable will be passed through.
*
* @param executeFun Function to execute with latest value from source observable.
* Can return something synchronously or asynchronously. Returned value will be included in FinishedEvent.
*/
export function executeLatestOnIdle<T, R = void>(executeFun: (t: T) => Promise<R> | R): OperateFunction<T, R> {
const operator = new ExecuteLatestOnIdleOperator(executeFun);
function operateFunction(obs: Observable<T>): Observable<ExecuteLatestOnIdleEvent<T, R>> {
return obs.lift(operator);
}
Object.defineProperty(operateFunction, 'idle', {
get(): Observable<boolean> {
return operator.idle;
}
});
return operateFunction as OperateFunction<T, R>;
}
class ExecuteLatestOnIdleOperator<T, R> implements Operator<T, ExecuteLatestOnIdleEvent<T, R>> {
public idle = new BehaviorSubject<boolean>(true);
constructor(private fun: (t: T) => Promise<R> | R) {}
call(subscriber: Subscriber<ExecuteLatestOnIdleEvent<T, R>>, source: Observable<T>): TeardownLogic {
const delayed$ = new Subject<T>();
let hasLatestEvent = false;
let latestEvent: T;
let shouldComplete = false;
const onMergedNext = async (val: T) => {
if (this.idle.value) {
this.idle.next(false);
subscriber.next(new StartedEvent(val));
try {
const result = await this.fun(val);
subscriber.next(new FinishedEvent(val, result));
} catch (e) {
subscriber.next(new ErrorEvent(val, e));
} finally {
this.idle.next(true);
}
if (hasLatestEvent) {
hasLatestEvent = false;
delayed$.next(latestEvent);
}
else if (shouldComplete) {
this.idle.complete();
delayed$.complete();
}
}
else {
if (hasLatestEvent) {
subscriber.next(new SkippedEvent(latestEvent));
}
latestEvent = val;
hasLatestEvent = true;
}
};
const onMergedError = (e: any) => {
subscriber.error(e);
this.idle.complete();
delayed$.complete();
};
const onMergedComplete = () => {
subscriber.complete();
this.idle.complete();
};
// eslint-disable-next-line @typescript-eslint/no-empty-function
const noop = () => {};
const onSourceCompleted = () => {
if (this.idle.value) {
// complete synchronous
delayed$.complete();
this.idle.complete();
}
else {
shouldComplete = true;
}
};
const internalSubscription = merge(
source.pipe(tap(noop, noop, onSourceCompleted)),
delayed$
).subscribe(onMergedNext, onMergedError, onMergedComplete);
const tearDown = () => {
internalSubscription.unsubscribe();
delayed$.complete();
this.idle.complete();
};
return tearDown;
}
}