/
mult.ts
133 lines (124 loc) · 3.32 KB
/
mult.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import type { Fn, Fn0, Maybe } from "@thi.ng/api";
import { illegalState } from "@thi.ng/errors/illegal-state";
/**
* Creates a new {@link Mult} instance which allows splitting a single `src`
* async iterable into multiple parallel subscribers. Iteration only starts when
* the first subscriber is attached (via {@link Mult.subscribe}) and back
* pressure is handled by waiting for **all** child subscribers to deliver their
* values before the next value from `src` is consumed. `Mult` allows dynamic
* subscriptions and unsubscriptions and will stop consuming from `src` when no
* further subscribers are attached.
*
* @example
* ```ts tangle:../export/mult.ts
* import { map, mult, run, wait } from "@thi.ng/transducers-async";
*
* const root = mult(
* (async function* () {
* yield "hello";
* await wait(1000);
* yield "world";
* await wait(1000);
* yield "good bye";
* })()
* );
*
* // 1st subscriber (vanilla JS)
* (async () => {
* for await (let x of root.subscribe()) console.log("vanilla:", x);
* })();
*
* // 2nd subscriber (transducer), attached with delay
* setTimeout(
* () =>
* run(
* map(async (x) => {
* console.log("tx", x);
* await wait(1500);
* }),
* root.subscribe()
* ),
* 900
* );
*
* // vanilla: hello
* // vanilla: world
* // tx world
* // vanilla: good bye
* // tx good bye
* ```
*
* @param src
*/
export const mult = <T>(src: AsyncIterable<T>) => new Mult<T>(src);
export class Mult<T> {
protected subs: MSub<T>[] = [];
protected isActive = false;
constructor(public src: AsyncIterable<T>) {}
/**
* Creates a new subscription (aka custom `AsyncIterable`) which will
* receive any future values from `src`. The returned subscription can be
* removed again via {@link Mult.unsubscribe}.
*/
subscribe(): AsyncIterable<T> {
const sub = new MSub<T>();
this.subs.push(sub);
if (!this.isActive) {
this.isActive = true;
(async () => {
for await (let val of this.src) {
for (let s of this.subs) s.resolve(val);
if (val === undefined) this.subs.length = 0;
if (!this.subs.length) break;
await Promise.all(this.subs.map((x) => x.notifyP));
}
for (let s of this.subs) s.resolve(undefined);
this.subs.length = 0;
this.isActive = false;
})();
}
return sub;
}
/**
* Attempts to remove given child subscription (presumably created via
* {@link Mult.subscribe}). Returns true if removal was successful.
*
* @param sub
*/
unsubscribe(sub: AsyncIterable<T>) {
const idx = this.subs.findIndex((x) => x === sub);
if (idx >= 0) {
this.subs.splice(idx, 1);
(<MSub<T>>sub).resolve(undefined);
return true;
}
return false;
}
}
/** @internal */
export class MSub<T> {
valueP!: Promise<Maybe<T>>;
notifyP!: Promise<void>;
resolve!: Fn<Maybe<T>, void>;
notify!: Fn0<void>;
active = false;
constructor() {
this.$await();
}
async *[Symbol.asyncIterator]() {
if (this.active) illegalState("multiple consumers unsupported");
this.active = true;
while (true) {
const res = await this.valueP;
if (res === undefined) break;
yield res;
this.notify();
this.$await();
}
this.active = false;
}
protected $await() {
this.notifyP = new Promise((res) => (this.notify = res));
this.valueP = new Promise((res) => (this.resolve = res));
}
}