-
Notifications
You must be signed in to change notification settings - Fork 17
/
StreamIterator.mjs
100 lines (74 loc) · 2.08 KB
/
StreamIterator.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import nextTick from "./nextTick"
/**
* StreamIterator helps with getting data from a Readable stream using an
* async iterator
*
* @api private
*/
class StreamIterator {
/**
* @param {stream.Readable} stream
*/
constructor(stream) {
this.__stream = stream
this.__states = {
pending: Symbol("pending"),
read: Symbol("read"),
error: Symbol("error")
}
this.__state = this.__states.pending
this.__error = null
this.__stream.on("error", this.__onError)
}
__setState = state => void (this.__state = state)
__isState = state => this.__state === state
__isPendingState = () => this.__isState(this.__states.pending)
__isEndState = () => {
// Temporarily hack while waiting for new public APIs of streams:
// See for more info: https://github.com/nodejs/node/issues/445
// eslint-disable-next-line
const state = this.__stream._readableState
return state.ended && state.endEmitted
}
__isErrorState = () => this.__isState(this.__states.error)
__onError = err => {
this.__error = err
this.__setState(this.__states.error)
}
__ensureRead = () => new Promise(resolve => {
const fulfill = () => resolve(this.__setState(this.__states.read))
this.__stream.once("readable", fulfill)
})
next = async () => {
while (true) {
await nextTick()
if (this.__isErrorState()) {
throw this.__error
}
// Ensure of a Readable ending
if (this.__isEndState()) {
return {
value: undefined,
done: true
}
}
// Set the "readable" event listener (using once method)
// and wait for the event emitting
if (this.__isPendingState()) {
await this.__ensureRead()
continue
}
const value = this.__stream.read()
// Back to the "pending" state if given value is nullish
if (value == null) {
this.__setState(this.__states.pending)
continue
}
return {value, done: false}
}
}
[Symbol.asyncIterator]() {
return this
}
}
export default StreamIterator