-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
stream.ts
251 lines (230 loc) Β· 7.36 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
// in this case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;
/*
* Support async iterator syntax for ReadableStreams in all environments.
* Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export class IterableReadableStream<T>
extends ReadableStream<T>
implements IterableReadableStreamInterface<T>
{
public reader: ReadableStreamDefaultReader<T>;
ensureReader() {
if (!this.reader) {
this.reader = this.getReader();
}
}
async next(): Promise<IteratorResult<T>> {
this.ensureReader();
try {
const result = await this.reader.read();
if (result.done) {
this.reader.releaseLock(); // release lock when stream becomes closed
return {
done: true,
value: undefined,
};
} else {
return {
done: false,
value: result.value,
};
}
} catch (e) {
this.reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
}
async return(): Promise<IteratorResult<T>> {
this.ensureReader();
// If wrapped in a Node stream, cancel is already called.
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
return { done: true, value: undefined };
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async throw(e: any): Promise<IteratorResult<T>> {
this.ensureReader();
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
throw e;
}
[Symbol.asyncIterator]() {
return this;
}
static fromReadableStream<T>(stream: ReadableStream<T>) {
// From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream
const reader = stream.getReader();
return new IterableReadableStream<T>({
start(controller) {
return pump();
function pump(): Promise<T | undefined> {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
cancel() {
reader.releaseLock();
},
});
}
static fromAsyncGenerator<T>(generator: AsyncGenerator<T>) {
return new IterableReadableStream<T>({
async pull(controller) {
const { value, done } = await generator.next();
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
}
// Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled
controller.enqueue(value);
},
async cancel(reason) {
await generator.return(reason);
},
});
}
}
export function atee<T>(
iter: AsyncGenerator<T>,
length = 2
): AsyncGenerator<T>[] {
const buffers = Array.from(
{ length },
() => [] as Array<IteratorResult<T> | IteratorReturnResult<T>>
);
return buffers.map(async function* makeIter(buffer) {
while (true) {
if (buffer.length === 0) {
const result = await iter.next();
for (const buffer of buffers) {
buffer.push(result);
}
} else if (buffer[0].done) {
return;
} else {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
yield buffer.shift()!.value;
}
}
});
}
export function concat<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
T extends Array<any> | string | number | Record<string, any> | any
>(first: T, second: T): T {
if (Array.isArray(first) && Array.isArray(second)) {
return first.concat(second) as T;
} else if (typeof first === "string" && typeof second === "string") {
return (first + second) as T;
} else if (typeof first === "number" && typeof second === "number") {
return (first + second) as T;
} else if (
// eslint-disable-next-line @typescript-eslint/no-explicit-any
"concat" in (first as any) &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (first as any).concat === "function"
) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return (first as any).concat(second) as T;
} else if (typeof first === "object" && typeof second === "object") {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chunk = { ...first } as Record<string, any>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for (const [key, value] of Object.entries(second as Record<string, any>)) {
if (key in chunk && !Array.isArray(chunk[key])) {
chunk[key] = concat(chunk[key], value);
} else {
chunk[key] = value;
}
}
return chunk as T;
} else {
throw new Error(`Cannot concat ${typeof first} and ${typeof second}`);
}
}
export class AsyncGeneratorWithSetup<
S = unknown,
T = unknown,
TReturn = unknown,
TNext = unknown
> implements AsyncGenerator<T, TReturn, TNext>
{
private generator: AsyncGenerator<T>;
public setup: Promise<S>;
private firstResult: Promise<IteratorResult<T>>;
private firstResultUsed = false;
constructor(generator: AsyncGenerator<T>, startSetup?: () => Promise<S>) {
this.generator = generator;
// setup is a promise that resolves only after the first iterator value
// is available. this is useful when setup of several piped generators
// needs to happen in logical order, ie. in the order in which input to
// to each generator is available.
this.setup = new Promise((resolve, reject) => {
this.firstResult = generator.next();
if (startSetup) {
this.firstResult.then(startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
}
});
}
async next(...args: [] | [TNext]): Promise<IteratorResult<T>> {
if (!this.firstResultUsed) {
this.firstResultUsed = true;
return this.firstResult;
}
return this.generator.next(...args);
}
async return(
value: TReturn | PromiseLike<TReturn>
): Promise<IteratorResult<T>> {
return this.generator.return(value);
}
async throw(e: Error): Promise<IteratorResult<T>> {
return this.generator.throw(e);
}
[Symbol.asyncIterator]() {
return this;
}
}
export async function pipeGeneratorWithSetup<
S,
A extends unknown[],
T,
TReturn,
TNext,
U,
UReturn,
UNext
>(
to: (
g: AsyncGenerator<T, TReturn, TNext>,
s: S,
...args: A
) => AsyncGenerator<U, UReturn, UNext>,
generator: AsyncGenerator<T, TReturn, TNext>,
startSetup: () => Promise<S>,
...args: A
) {
const gen = new AsyncGeneratorWithSetup(generator, startSetup);
const setup = await gen.setup;
return { output: to(gen, setup, ...args), setup };
}