-
Notifications
You must be signed in to change notification settings - Fork 26k
/
readable-stream.ts
81 lines (73 loc) · 1.93 KB
/
readable-stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ReadableStream<T> {
constructor(opts: UnderlyingSource = {}) {
let closed = false
let pullPromise: any
let transformController: TransformStreamDefaultController
const { readable, writable } = new TransformStream(
{
start: (controller: TransformStreamDefaultController) => {
transformController = controller
},
},
undefined,
{
highWaterMark: 1,
}
)
const writer = writable.getWriter()
const encoder = new TextEncoder()
const controller: ReadableStreamController<T> = {
get desiredSize() {
return transformController.desiredSize
},
close: () => {
if (!closed) {
closed = true
writer.close()
}
},
enqueue: (chunk: T) => {
writer.write(typeof chunk === 'string' ? encoder.encode(chunk) : chunk)
pull()
},
error: (reason: any) => {
transformController.error(reason)
},
}
const pull = () => {
if (opts.pull) {
const shouldPull =
controller.desiredSize !== null && controller.desiredSize > 0
if (!pullPromise && shouldPull) {
pullPromise = Promise.resolve().then(() => {
pullPromise = 0
opts.pull!(controller)
})
return pullPromise
}
}
return Promise.resolve()
}
if (opts.cancel) {
readable.cancel = (reason: any) => {
opts.cancel!(reason)
return readable.cancel(reason)
}
}
function registerPull() {
const getReader = readable.getReader.bind(readable)
readable.getReader = () => {
pull()
return getReader()
}
}
const started = opts.start && opts.start(controller)
if (started && typeof started.then === 'function') {
started.then(() => registerPull())
} else {
registerPull()
}
return readable
}
}
export { ReadableStream }