/
async-prioritized-queue.ts
executable file
·139 lines (113 loc) · 4.25 KB
/
async-prioritized-queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import { Observable, ReplaySubject } from "rxjs";
export type Worker<T> = () => Promise<T>;
/**
* An asynchronous, prioritized, observable and removable queue of worker functions.
*
* Async - Workers are async functions. Next worker will be started when promise returned by previous worker resolves or rejects.
*
* Prioritized - Each worker has a priority. 0 is highest, higher numbers means lower priority.
* Worker with higher priority will be executed first.
*
* Observable - Calling the add method returns an observable. The worker will be queued on subscribing this observable.
* Multiple subscriptions lead to multiple executions of the worker.
* This observable will emit the result of the finished worker and then will complete immediately.
* It will send an error if the worker throws an error or rejects the returned promise.
*
* Removable - The worker will be removed from the queue if the subscriber unsubscribes early, before the worker is started.
* If the subscriber unsubscribes late (the worker was already called but hasn't finished yet), it will not be cancelled.
* The caller will just miss the result.
*
* Unhandled Errors - An error thrown by a worker will be passed through to the observable subscriber.
* If the subscriber unsubscribes during the execution of the worker, and the worker throws an error the error will be passed the the
* unhandledErrors observable (together with the worker function as a hint who throws that error).
*
*/
export class AsyncPrioritizedQueue {
private readonly priorities = new Set<number>();
private readonly workers = new Map<number, Worker<any>[]>();
private isRunning: boolean = false;
private readonly unhandledErrors$ = new ReplaySubject<{ error: any, worker: Worker<any> }>(1);
get unhandledErrors(): Observable<{ error: any, worker: Worker<any> }> {
return this.unhandledErrors$;
}
/**
* Creates an Observable for queuing the given worker with the given priority.
* Subscribe to add the worker to the queue. Multiple subscriptions will add the worker multiple times.
* Unsubscribe before completion to remove the worker from the queue.
*
* Observable emits the result of the worker and then completes immediately.
* Observable mirrors any error thrown by the worker.
*
* @param worker
* @param priority
*/
add<T>(worker: Worker<T>, priority: number): Observable<T> {
if (priority < 0) {
throw new Error("priority should not be smaller than 0");
}
return new Observable<T>(subscriber => {
this.priorities.add(priority);
if (!this.workers.has(priority)) {
this.workers.set(priority, []);
}
const workers = this.workers.get(priority);
if (workers === undefined) {
throw new Error("bug in implementation of this method, workers should not be undefined");
}
let unsubscribed = false;
const workerWrapper = async () => {
try {
if (!unsubscribed) {
const result = await worker();
subscriber.next(result);
}
subscriber.complete();
} catch (e) {
if (unsubscribed) {
this.unhandledErrors$.next({
error: e,
worker: worker
});
} else {
subscriber.error(e);
}
}
this.start();
};
workers.push(workerWrapper);
this.start();
return () => {
unsubscribed = true;
};
});
}
private async start(): Promise<void> {
if (!this.isRunning) {
const worker = this.getNextWorker();
if (worker === undefined) {
return;
}
this.isRunning = true;
await worker();
this.isRunning = false;
this.start();
}
}
private getNextWorker(): Worker<any> | undefined {
const priorities = Array.from(this.priorities.values()).sort();
if (priorities.length === 0) {
return undefined;
}
const nextPriority = priorities[0];
const workers = this.workers.get(nextPriority);
if (workers === undefined || workers.length === 0) {
return undefined;
}
const worker = workers.shift();
if (workers.length === 0) {
this.priorities.delete(nextPriority);
this.workers.delete(nextPriority);
}
return worker;
}
}