-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
132 lines (116 loc) · 3.6 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
module.exports = function(stream) {
var self = this;
self.stream = stream;
self.on = function(cb) {
self.onData = cb;
}
var state = 'readTotal';
var totalBytes = -1;
var pos = 0;
self.newData = function(buffer) {
var extracted = self.extract(buffer);
self.onData(extracted.meta, extracted.buffer);
}
function outstart() {
str = '';
for (var i=0; i<20 && i<fullBuffer.length; i++) {
str += '['+fullBuffer.readUInt8(i).toString()+']';
}
console.log(str);
}
self.read = function(buffer) {
switch (state) {
case 'readTotal':
if (buffer.length - pos >= 4) {
totalBytes = buffer.readUInt32LE(pos);
pos += 4;
bytesLeft = totalBytes-4;
fullBuffer = new Buffer(4);
fullBuffer.writeUInt32LE(totalBytes,0);
state = 'readData';
self.read(buffer);
} else {
totalBuffer = new Buffer(buffer.length-pos);
buffer.copy(totalBuffer, 0, pos, 4);
state = 'continueTotal';
pos = 0;
totalPos = buffer.length-pos;
}
break;
case 'continueTotal':
totalBuffer = Buffer.concat([totalBuffer, buffer.slice(0, totalPos)]);
totalBytes = totalBuffer.readUInt32LE(0);
pos += totalPos;
bytesLeft = totalBytes - 4;
fullBuffer = new Buffer(4);
fullBuffer.writeUInt32LE(totalBytes);
state = 'readData';
self.read(buffer);
break;
case 'readData':
if (bytesLeft + pos <= buffer.length) {
// - the rest of data is fully contained in buffer
try {
var piece = new Buffer(bytesLeft);
buffer.copy(piece, 0, pos, pos+bytesLeft);
fullBuffer = Buffer.concat([fullBuffer, piece]);
pos += bytesLeft;
bytesLeft = 0;
self.newData(fullBuffer);
} catch (ee) {
console.log('newData prob?');
console.log(ee);
}
state = 'readTotal';
if (pos < buffer.length) {
// and another thing starts after
return self.read(buffer);
} else {
pos = 0;
}
} else {
// - the data continues after end of buffer
var piece = new Buffer(buffer.length - pos);
buffer.copy(piece, 0, pos, buffer.length);
fullBuffer = Buffer.concat([fullBuffer, piece]);
pos = 0;
bytesLeft -= piece.length;
}
break;
}
}
self.extract = function(buffer) {
var totalLength = buffer.readUInt32LE(0);
var jsonLength = buffer.readUInt16LE(4);
var json = buffer.toString('utf8', 6, jsonLength+6);
try {
var meta = JSON.parse(json);
} catch (e) {
console.log('Problem parsing meta:', e);
var meta = {};
}
return { meta: meta, buffer: buffer.slice(6+jsonLength) };
}
self.stream.on('data', function(data) {
self.read(data);
});
this.addMeta = function(buffer, meta) {
var json = JSON.stringify(meta);
var jsonBuffer = new Buffer(json);
var totalLength = jsonBuffer.length + 4 + 2 + buffer.length;
var outBuffer = new Buffer(totalLength);
outBuffer.writeUInt32LE(totalLength,0);
outBuffer.writeUInt16LE(jsonBuffer.length, 4);
outBuffer.write(json, 6);
buffer.copy(outBuffer, 4 + 2 + jsonBuffer.length);
return outBuffer;
}
this.write = function(meta, buffer) {
if (buffer === null || buffer === undefined) {
buffer = new Buffer(' ');
}
var newBuffer = self.addMeta(buffer, meta);
self.stream.write(newBuffer);
}
return self;
}