Skip to content

Commit

Permalink
Refactor serialiation code to return result objects instead of using …
Browse files Browse the repository at this point in the history
…spurious callbacks for synchronous code. Also, refactor test routines to be more DRY.
  • Loading branch information
wanderview committed Jan 11, 2013
1 parent 62c639d commit d1ba70b
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 264 deletions.
20 changes: 7 additions & 13 deletions lib/message.js
Expand Up @@ -59,20 +59,14 @@ function NetbiosMessage(opts) {
return self;
}

NetbiosMessage.fromBuffer = function(buf, callback) {
var rtn = null;
NetbiosMessage.fromBuffer = function(buf) {

nbunpack(buf, function(error, len, raw) {
if (!error && raw) {
rtn = new NetbiosMessage(raw);
}

if (typeof callback === 'function') {
callback(error, len, rtn);
}
});
var res = nbunpack(buf);
if (res.error) {
return {error: res.error};
}

return rtn;
return {bytesRead: res.bytesRead, message: new NetbiosMessage(res.message)};
}

NetbiosMessage.fromRaw = function(raw) {
Expand Down Expand Up @@ -145,7 +139,7 @@ NetbiosMessage.prototype.toRaw = function() {
};

NetbiosMessage.prototype.pack = function(buf, callback) {
nbpack(buf, this.toRaw(), callback);
return nbpack(buf, this.toRaw());
};

function _recordFromRaw(rr) {
Expand Down
98 changes: 48 additions & 50 deletions lib/pack.js
Expand Up @@ -31,7 +31,7 @@ var ip = require('ip');

// TODO: properly handle truncation and truncate flag due to buffer limits

function pack(buf, message, callback) {
function pack(buf, message) {
var bytes = 0;

// - 16-bit transaction id
Expand All @@ -47,8 +47,9 @@ function pack(buf, message, callback) {
// - 4-bit opcode
var opcodeMask = con.OPCODE_FROM_STRING[message.op];
if (opcodeMask === undefined) {
callback(new Error('Illegal NetBIOS op value of [' + message.op + ']'));
return;
return {
error: new Error('Illegal NetBIOS op value of [' + message.op + ']')
};
}
opcodeMask <<= 11;

Expand Down Expand Up @@ -76,8 +77,9 @@ function pack(buf, message, callback) {
if (message.error !== undefined) {
var rcodeMask = con.RCODE_FROM_STRING[message.error];
if (rcodeMask === undefined) {
callback(new Error('Illegal NetBIOS rcode error [' + message.error + ']'));
return;
return {
error: new Error('Illegal NetBIOS rcode error [' + message.error + ']')
};
}
}

Expand Down Expand Up @@ -122,38 +124,36 @@ function pack(buf, message, callback) {
var arr = toPack[p].arr;
if (arr) {
for (var a = 0, m = arr.length; a < m && !gError; ++a) {
toPack[p].func(buf, bytes, nameMap, arr[a], function(error, rLen) {
if (error) {
gError = error;
return;
}

bytes += rLen;
});
var res = toPack[p].func(buf, bytes, nameMap, arr[a]);
if (res.error) {
return {error: res.error};
}

bytes += res.bytesWritten;
}
}
}

callback(gError, bytes);
return {bytesWritten: bytes};
}

function packQuestion(buf, offset, nameMap, question, callback) {
function packQuestion(buf, offset, nameMap, question) {
var bytes = 0;

var res = question.nbname.write(buf, offset, nameMap);
if (res.error) {
callback(res.error);
return;
return {error: res.error};
}

bytes += res.bytesWritten;

// 16-bit question type
var type = con.QUESTION_TYPE_FROM_STRING[question.type];
if (type === undefined) {
callback(new Error('Illegal question type [' + question.type +
'] for name [' + question.nbname + ']'));
return;
return {
error: new Error('Illegal question type [' + question.type +
'] for name [' + question.nbname + ']')
};
}
buf.writeUInt16BE(type, offset + bytes);
bytes += 2;
Expand All @@ -162,7 +162,7 @@ function packQuestion(buf, offset, nameMap, question, callback) {
buf.writeUInt16BE(con.CLASS_IN, offset + bytes);
bytes += 2;

callback(null, bytes);
return {bytesWritten: bytes};
}

var RR_TYPE_STRING_TO_WRITER = {
Expand All @@ -173,13 +173,12 @@ var RR_TYPE_STRING_TO_WRITER = {
'nbstat': nbstatRDataWriter
};

function packResourceRecord(buf, offset, nameMap, record, callback) {
function packResourceRecord(buf, offset, nameMap, record) {
var bytes = 0;

var res = record.nbname.write(buf, offset, nameMap);
if (res.error) {
callback(res.error);
return;
return {error: res.error};
}

bytes += res.bytesWritten;
Expand All @@ -188,9 +187,10 @@ function packResourceRecord(buf, offset, nameMap, record, callback) {
var type = con.RR_TYPE_FROM_STRING[record.type];
var writer = RR_TYPE_STRING_TO_WRITER[record.type];
if (type === undefined || writer === undefined) {
callback(new Error('Illegal NetBIOS resource record type [' +
record.type + '] for name [' + record.nbname + ']'));
return;
return {
error: new Error('Illegal NetBIOS resource record type [' +
record.type + '] for name [' + record.nbname + ']')
};
}
buf.writeUInt16BE(type, offset + bytes);
bytes += 2;
Expand All @@ -203,29 +203,27 @@ function packResourceRecord(buf, offset, nameMap, record, callback) {
buf.writeUInt32BE(record.ttl, offset + bytes);
bytes += 4;

writer(buf, offset + bytes, nameMap, record, function(error, rLen) {
if (error) {
callback(error);
return;
}
var res = writer(buf, offset + bytes, nameMap, record);
if (res.error) {
return {error: res.error};
}

bytes += rLen;
bytes += res.bytesWritten;

callback(null, bytes);
});
return {bytesWritten: bytes};
}

function nullRDataWriter(buf, offset, nameMap, record, callback) {
function nullRDataWriter(buf, offset, nameMap, record) {
var bytes = 0;

// NULL type always has an rdata length of zero
buf.writeUInt16BE(0, offset + bytes);
bytes += 2;

callback(null, bytes);
return {bytesWritten: bytes};
}

function aRDataWriter(buf, offset, nameMap, record, callback) {
function aRDataWriter(buf, offset, nameMap, record) {
var bytes = 0;

// A type always has an rdata length of 4
Expand All @@ -237,10 +235,10 @@ function aRDataWriter(buf, offset, nameMap, record, callback) {
inet.copy(buf, offset + bytes);
bytes += 4;

callback(null, bytes);
return {bytesWritten: bytes};
}

function nsRDataWriter(buf, offset, nameMap, record, callback) {
function nsRDataWriter(buf, offset, nameMap, record) {
var bytes = 0;

// We don't know the length of the rdata section yet because it contains a
Expand All @@ -249,8 +247,7 @@ function nsRDataWriter(buf, offset, nameMap, record, callback) {
var skip = 2;
var res = record.ns.nbname.write(buf, offset + skip, nameMap);
if (res.error) {
callback(res.error);
return;
return {error: res.error};
}

// Now go back and write the length of the name
Expand All @@ -260,10 +257,10 @@ function nsRDataWriter(buf, offset, nameMap, record, callback) {
// finally, acbytes for the length of the name in our acbytesing
bytes += nLen;

callback(null, bytes);
return {bytesWritten: bytes};
}

function nbRDataWriter(buf, offset, nameMap, record, callback) {
function nbRDataWriter(buf, offset, nameMap, record) {
var bytes = 0;

var length = record.nb.entries.length * 6;
Expand All @@ -286,10 +283,10 @@ function nbRDataWriter(buf, offset, nameMap, record, callback) {
bytes += 4;
});

callback(null, bytes);
return {bytesWritten: bytes};
}

function nbstatRDataWriter(buf, offset, nameMap, record, callback) {
function nbstatRDataWriter(buf, offset, nameMap, record) {
var bytes = 0;

// 16-bit rdata length - skip until we write rest
Expand Down Expand Up @@ -336,9 +333,10 @@ function nbstatRDataWriter(buf, offset, nameMap, record, callback) {
for (var i = 0; i < 5; ++i) {
colonIndex = u.indexOf(':', lastIndex);
if (colonIndex <= lastIndex) {
callback(new Error('Invalid unit ID [' + u +
']; should follow pattern [##:##:##:##:##:##]'));
return;
return {
error: new Error('Invalid unit ID [' + u +
']; should follow pattern [##:##:##:##:##:##]')
};
}

var tmpStr = u.substr(lastIndex, (colonIndex - lastIndex));
Expand Down Expand Up @@ -370,5 +368,5 @@ function nbstatRDataWriter(buf, offset, nameMap, record, callback) {
var rdataLen = bytes - 2;
buf.writeUInt16BE(rdataLen, offset);

callback(null, bytes);
return {bytesWritten: bytes};
}
50 changes: 23 additions & 27 deletions lib/stream.js
Expand Up @@ -70,29 +70,24 @@ NetbiosNameServiceStream.prototype.write = function(netbiosMsg, callback) {
// - 65535 bytes repreenting the maximum allowed message length
var buf = new Buffer(2 + 65535);

var result = false;

// pack the netbios message into the buffer skipping the first two
// bytes to leave room for the length
pack(buf.slice(2), netbiosMsg, function(error, len) {
if (error) {
if (typeof callback === 'function') {
callback(error);
}
self.emit('error', error);
return;
var res = pack(buf.slice(2), netbiosMsg);
if (res.error) {
if (typeof callback === 'function') {
callback(res.error);
}
self.emit('error', res.error);
return;
}

// pack 16-bit length field now that we know the message size
buf.writeUInt16BE(len, 0);

// Return the write result back to communicate back pressure to our
// caller. The socket 'drain' events are already forwarded to our emitted
// 'drain' event in the constructor.
result = self._socket.write(buf.slice(0, 2 + len), null, callback);
});
// pack 16-bit length field now that we know the message size
buf.writeUInt16BE(res.bytesWritten, 0);

return result;
// Return the write result back to communicate back pressure to our
// caller. The socket 'drain' events are already forwarded to our emitted
// 'drain' event in the constructor.
return self._socket.write(buf.slice(0, 2 + res.bytesWritten), null, callback);
};

NetbiosNameServiceStream.prototype._read = function() {
Expand Down Expand Up @@ -122,17 +117,18 @@ NetbiosNameServiceStream.prototype._readMessage = function() {
self._length = null;
self._readFunc = self._readLength.bind(self);

unpack(buf, function(error, len, msg) {
if (error) {
// TODO: consider using a 'warning' event instead; avoid stopping service
// when a remote host sends us a malformed packet.
//self.emit('error', error);
return;
}
var res = unpack(buf);
if (res.error) {
// TODO: consider using a 'warning' event instead; avoid stopping service
// when a remote host sends us a malformed packet.
//self.emit('error', error);
return;
}

self.emit('message', msg);
});
self.emit('message', res.message);

return true;
}

return false;
};

0 comments on commit d1ba70b

Please sign in to comment.