/
linestream.js
149 lines (130 loc) · 4.59 KB
/
linestream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
var EventEmitter = require('events').EventEmitter;
exports = module.exports = StreamingBuffer;
function noop() {}
require('util').inherits(StreamingBuffer, EventEmitter);
function StreamingBuffer() {
if (!(this instanceof StreamingBuffer)) return new StreamingBuffer();
EventEmitter.call(this);
this.buffers = [];
this.queue = [];
this.process = this.process.bind(this);
}
// Emulate the buffer interface so that we can pipe into a StreamingBuffer.
StreamingBuffer.prototype.readable = true;
StreamingBuffer.prototype.writable = true;
StreamingBuffer.prototype.write = function(buffer) {
this.buffers.push(buffer);
process.nextTick(this.process);
};
StreamingBuffer.prototype.end = function() {
this.readable = false;
this.writable = false;
};
StreamingBuffer.prototype.destroy = function() {
this.buffers = [];
this.queue = [];
this.end();
};
StreamingBuffer.prototype.destroySoon = StreamingBuffer.prototype.destroy;
StreamingBuffer.prototype.pause = noop;
StreamingBuffer.prototype.resume = noop;
StreamingBuffer.prototype.pipe = noop;
StreamingBuffer.prototype.process = function() {
while (this.queue.length) {
var task = this.queue[0];
if (task.type === 'line') {
var line = this._nextLine();
// .process() is invoked again when more data arrives.
if (line === undefined) return;
this.queue.shift().complete(line);
}
else {
// .process() is invoked again when more data arrives.
if (this.buffers.length <= 0) return;
else {
var buffer = this.buffers[0];
if (buffer.length >= task.bytes) {
// Request can be fully satisfied from this buffer.
this.buffers[0] = buffer.slice(task.bytes, buffer.length);
task.chunk(buffer.slice(0, task.bytes));
this.queue.shift().complete();
}
else {
// Request can only be partially satisfied from this buffer.
task.bytes -= buffer.length;
this.buffers.shift();
task.chunk(buffer);
}
}
}
}
};
/**
* Gets the next line up to CRLF.
*
* @return
* The line when there is one, undefined otherwise.
*/
StreamingBuffer.prototype._nextLine = function() {
var bid = 0;
var buffer = this.buffers[bid];
while (buffer) {
var cursor = 0;
while (cursor < buffer.length) {
if (buffer[cursor++] === 13 /* \r */) {
// This potentially is a line ending. Now we only need \n as
// next char. Make sure that we're not at the end of a buffer.
if (cursor >= buffer.length) {
buffer = this.buffers[++bid];
if (!buffer) return;
cursor = 0;
}
if (buffer && buffer[cursor] === 10 /* \n */) {
// We found a line ending.
cursor++;
// Concat buffers if we have multiple.
if (bid) {
var str = '';
for (var i = 0; i < bid; i++) {
// Concat buffers we consumed to the end and remove
// them from the list.
var b = this.buffers.shift();
str += b.toString('ascii');
}
var result = str + buffer.toString('ascii', 0, cursor);
}
else {
// All is in a single buffer.
var result = buffer.toString('ascii', 0, cursor);
}
this.buffers[0] = buffer.slice(cursor, buffer.length);
return result;
}
}
}
// buffer ended, get next buffer and continue with loop.
buffer = this.buffers[++bid];
}
};
StreamingBuffer.prototype.getLine = function(complete) {
this.queue.push({
type: 'line',
complete: complete || noop
});
process.nextTick(this.process);
};
StreamingBuffer.prototype.getBytes = function(bytes, chunk, complete) {
if (!bytes) {
// Filter out requests for 0 bytes.
(complete || noop)();
}
else {
this.queue.push({
type: 'bytes',
bytes: bytes || 0,
chunk: chunk || noop,
complete: complete || noop
});
process.nextTick(this.process);
}
};