-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.mjs
78 lines (78 loc) · 1.61 KB
/
index.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
export default function makeDefer() {
let resolve = undefined;
let reject = undefined;
const promise = new Promise((rs, rj) => {
resolve = rs;
reject = rj;
});
return {
resolve,
reject,
promise
};
}
export function makeBroadcastStream() {
const listeners = [];
let done = false;
let defer = makeDefer();
return {
[Symbol.asyncIterator]() {
return {
async next() {
const value = await defer.promise;
defer = makeDefer();
return value;
},
async return(value) {
return { value: undefined, done: true };
},
async throw(e) {
return { value: undefined, done: true };
}
};
},
listen(onNext, { onError, onDone } = {}) {
if (done)
throw new Error('Cannot listen after done');
const listener = { onNext, onError, onDone };
listeners.push(listener);
return function removeListener() {
const idx = listeners.lastIndexOf(listener);
if (idx >= 0)
listeners.splice(idx, 1);
};
},
next(value) {
if (done)
throw new Error('Cannot next after done');
defer.resolve({ value, done: false });
for (const { onNext } of listeners)
try {
onNext(value);
}
catch { }
},
throw(error) {
if (done)
throw new Error('Cannot throw after done');
done = true;
defer.reject(error);
for (const { onError } of listeners)
try {
onError?.(error);
}
catch { }
},
done() {
if (done)
throw new Error('Cannot done after done');
done = true;
defer.resolve({ value: undefined, done: true });
for (const { onDone } of listeners)
try {
onDone?.();
}
catch { }
},
};
}