-
Notifications
You must be signed in to change notification settings - Fork 10
/
buffered-stream.js
209 lines (176 loc) · 5 KB
/
buffered-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
var util = require('util');
var Stream = require('stream');
module.exports = BufferedStream;
/**
* A readable/writable Stream subclass that buffers data until next tick. The
* maxSize determines the number of bytes the buffer can hold before it is
* considered "full". This argument may be omitted to indicate this stream has
* no maximum size.
*
* The source and sourceEncoding arguments may be used to easily wrap this
* stream around another, or a simple string. If the source is another stream,
* it is piped to this stream. If it's a string, it is used as the entire
* contents of this stream and passed to end.
*
* NOTE: The maxSize is a soft limit that is only used to determine when calls
* to write will return false, indicating to streams that are writing to this
* stream that they should pause. In any case, calls to write will still append
* to the buffer so that no data is lost.
*/
function BufferedStream(maxSize, source, sourceEncoding) {
if (!(this instanceof BufferedStream)) {
return new BufferedStream(maxSize, source, sourceEncoding);
}
Stream.call(this);
if (typeof maxSize !== 'number') {
sourceEncoding = source;
source = maxSize;
maxSize = Infinity;
}
// Public interface.
this.maxSize = maxSize;
this.size = 0;
this.encoding = null;
this.paused = false;
this.ended = false;
this.readable = true;
this.writable = true;
this._buffer = [];
this._flushing = false;
this._wasFull = false;
if (typeof source !== 'undefined') {
if (source instanceof Stream) {
source.pipe(this);
} else {
this.end(source, sourceEncoding);
}
}
}
util.inherits(BufferedStream, Stream);
/**
* A read-only property that returns true if this stream has no data to emit.
*/
BufferedStream.prototype.__defineGetter__('empty', function () {
return this._buffer == null || this._buffer.length === 0;
});
/**
* A read-only property that returns true if this stream's buffer is full.
*/
BufferedStream.prototype.__defineGetter__('full', function () {
return this.maxSize < this.size;
});
/**
* Sets this stream's encoding. If an encoding is set, this stream will emit
* strings using that encoding. Otherwise, it emits Buffer objects.
*/
BufferedStream.prototype.setEncoding = function (encoding) {
this.encoding = encoding;
};
/**
* Prevents this stream from emitting data events until resume is called.
* This does not prevent writes to this stream.
*/
BufferedStream.prototype.pause = function () {
if (!this.paused) {
this.paused = true;
this.emit('pause');
}
};
/**
* Resumes emitting data events.
*/
BufferedStream.prototype.resume = function () {
if (this.paused) {
this.paused = false;
this.emit('resume');
if (!this.empty) {
flushOnNextTick(this);
}
}
};
/**
* Writes the given chunk of data to this stream. Returns false if this
* stream is full and should not be written to further until drained, true
* otherwise.
*/
BufferedStream.prototype.write = function (chunk, encoding) {
if (!this.writable) {
throw new Error('Stream is not writable');
}
if (typeof chunk === 'string') {
chunk = new Buffer(chunk, encoding);
}
this._buffer.push(chunk);
this.size += chunk.length;
flushOnNextTick(this);
if (this.full) {
this._wasFull = true;
return false;
}
return true;
};
/**
* Writes the given chunk to this stream and queues the end event to be
* called as soon as soon as possible. If the stream is not currently
* scheduled to be flushed, the end event will fire immediately. Otherwise, it
* will fire after the next flush.
*/
BufferedStream.prototype.end = function (chunk, encoding) {
if (this.ended) {
throw new Error('Stream is already ended');
}
if (arguments.length > 0) {
this.write(chunk, encoding);
}
this.ended = true;
// If the stream isn't already scheduled to flush on the next tick we can
// safely end it now. Otherwise it will end after the next flush.
if (!this._flushing) {
end(this);
}
};
function flushOnNextTick(stream) {
if (!stream._flushing) {
process.nextTick(function tick() {
if (stream.paused) {
stream._flushing = false;
return;
}
flush(stream);
if (stream.empty) {
stream._flushing = false;
} else {
process.nextTick(tick);
}
});
stream._flushing = true;
}
}
function flush(stream) {
var chunk;
while (stream._buffer.length) {
chunk = stream._buffer.shift();
stream.size -= chunk.length;
if (stream.encoding) {
stream.emit('data', chunk.toString(stream.encoding));
} else {
stream.emit('data', chunk);
}
// If the stream was full at one point but isn't now, emit "drain".
if (stream._wasFull && !stream.full) {
stream._wasFull = false;
stream.emit('drain');
}
// If the stream was paused in some data event handler, break.
if (stream.paused) {
break;
}
}
if (stream.ended) {
end(stream);
}
}
function end(stream) {
stream.emit('end');
stream._buffer = null;
}