Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

JSHint + validate

  • Loading branch information...
commit a32e78a3cb0b4a26ece6e341ebfd3b24f90502ae 1 parent c53edfc
@jfd authored
Showing with 7,960 additions and 902 deletions.
  1. +6 −1 Makefile
  2. +99 −98 lib/channel.js
  3. +3 −1 lib/index.js
  4. +61 −58 lib/memsock.js
  5. +75 −76 lib/message.js
  6. +154 −153 lib/pub.js
  7. +99 −100 lib/req.js
  8. +42 −40 lib/resp.js
  9. +155 −148 lib/socket.js
  10. +80 −80 lib/sub.js
  11. +72 −67 lib/util.js
  12. +83 −80 lib/worker.js
  13. +33 −0 tools/jshint/CHANGELOG
  14. +62 −0 tools/jshint/README.markdown
  15. +49 −0 tools/jshint/env/rhino.js
  16. +5,577 −0 tools/jshint/jshint.js
  17. +109 −0 tools/jshint/tests/core.js
  18. +284 −0 tools/jshint/tests/envs.js
  19. +12 −0 tools/jshint/tests/fixtures/boss.js
  20. +8 −0 tools/jshint/tests/fixtures/curly.js
  21. +11 −0 tools/jshint/tests/fixtures/curly2.js
  22. +10 −0 tools/jshint/tests/fixtures/eqeqeq.js
  23. +14 −0 tools/jshint/tests/fixtures/es5.js
  24. +9 −0 tools/jshint/tests/fixtures/forin.js
  25. +11 −0 tools/jshint/tests/fixtures/immed.js
  26. +11 −0 tools/jshint/tests/fixtures/latedef.js
  27. +19 −0 tools/jshint/tests/fixtures/laxbreak.js
  28. +11 −0 tools/jshint/tests/fixtures/loopfunc.js
  29. +5 −0 tools/jshint/tests/fixtures/newcap.js
  30. +7 −0 tools/jshint/tests/fixtures/noarg.js
  31. +11 −0 tools/jshint/tests/fixtures/onevar.js
  32. +11 −0 tools/jshint/tests/fixtures/redef.js
  33. +30 −0 tools/jshint/tests/fixtures/switchFallThrough.js
  34. +10 −0 tools/jshint/tests/fixtures/undef.js
  35. +9 −0 tools/jshint/tests/fixtures/white.js
  36. +563 −0 tools/jshint/tests/options.js
  37. +155 −0 tools/validate.js
View
7 Makefile
@@ -1,2 +1,7 @@
+NODE=$(shell which node)
+
test-all:
- node tools/node-test/lib/test.js -r test
+ ${NODE} tools/node-test/lib/test.js -r test
+
+validate:
+ ${NODE} tools/validate.js $(CURDIR)/lib -r
View
197 lib/channel.js
@@ -1,67 +1,67 @@
-//
+/*jshint loopfunc: true, laxbreak: true, expr: true */
+
+//
// Copyright 2010 Johan Dahlberg. All rights reserved.
//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
-// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-const Buffer = require("buffer").Buffer
- , EventEmitter = require("events").EventEmitter
- , inherits = require("util").inherits
- , timers = require('timers').active
- , notEqual = require("assert").notEqual
+var Buffer = require("buffer").Buffer
+ , EventEmitter = require("events").EventEmitter
+ , inherits = require("util").inherits
+ , timers = require('timers').active
+ , notEqual = require("assert").notEqual;
-const recvMsg = process.binding("net").recvMsg
- , socket = process.binding("net").socket
- , bind = process.binding("net").bind
- , connect = process.binding("net").connect
- , listen = process.binding("net").listen
- , accept = process.binding("net").accept
- , close = process.binding("net").close;
+var recvMsg = process.binding("net").recvMsg
+ , socket = process.binding("net").socket
+ , bind = process.binding("net").bind
+ , connect = process.binding("net").connect
+ , listen = process.binding("net").listen
+ , accept = process.binding("net").accept
+ , close = process.binding("net").close;
-const IOWatcher = process.binding('io_watcher').IOWatcher;
+var IOWatcher = process.binding('io_watcher').IOWatcher;
-const createProcSockAlias = require("./util").createProcSockAlias
- , parseUrl = require("./util").parseUrl
+var createProcSockAlias = require("./util").createProcSockAlias
+ , parseUrl = require("./util").parseUrl;
+
+var ENOENT = process.binding('constants').ENOENT
+ , EMFILE = process.binding('constants').EMFILE;
-const ENOENT = process.binding('constants').ENOENT
- , EMFILE = process.binding('constants').EMFILE
-
-//
function Channel() {
var self = this;
- var nextfd;
+ var nextfd;
this._closing = false;
this._closed = false;
this._sockets = [];
-
+
// TODO: Temporary solution in order to disconnect a socket
// that is connected, if this channel is the owner.
this._closingAll = false;
this._watchers = [];
-
}
exports.Channel = Channel;
@@ -102,9 +102,9 @@ Channel.prototype.connect = function(url, callback) {
if (callback) {
// TODO: Add callback handler to Error
}
-
+
this._attachSocket(sock);
-}
+};
Channel.prototype.listen = function(url, callback) {
var self = this;
@@ -113,33 +113,34 @@ Channel.prototype.listen = function(url, callback) {
var type;
var port;
-
if (this.closing || this.closed) {
throw new Error("Channel is closing/closed");
}
-
+
parsedurl = parseUrl(url, true);
-
- if (parsedurl == null) {
+
+ if (!parsedurl) {
throw new Error("Invalid URL `" + url + "`");
}
-
+
switch (parsedurl.protocol) {
-
+
case "proc":
+ type = "unix";
resource = createProcSockAlias(parsedurl.resource);
+ break;
case "sock":
type = "unix";
resource = resource || parsedurl.resource;
break;
-
+
case "tcp":
type = "tcp";
resource = parsedurl.resource;
port = parsedurl.port;
break;
-
+
case "mem":
type = "mem";
resource = parsedurl.resource;
@@ -162,16 +163,16 @@ Channel.prototype._doListen = function(fd, type, host, port, callback) {
var self = this;
if (type === "mem") {
-
+
process.nextTick(function() {
var fd;
-
+
if (self.closing || self.closed) {
return;
- }
+ }
fd = require("./memsock").socket();
-
+
try {
require("./memsock").bind(fd, host);
} catch (err) {
@@ -179,7 +180,7 @@ Channel.prototype._doListen = function(fd, type, host, port, callback) {
callback(err);
return;
}
-
+
self._attachWatcher(fd, "mem", host);
callback(null);
});
@@ -189,11 +190,11 @@ Channel.prototype._doListen = function(fd, type, host, port, callback) {
// Ensure we have a dummy fd for EMFILE conditions.
getDummyFD();
-
+
if (type === "tcp") {
require('dns').lookup(host, function(err, ip, addressType) {
var type;
-
+
if (err) {
callback(err);
} else {
@@ -239,7 +240,7 @@ Channel.prototype._doListen = function(fd, type, host, port, callback) {
// See test/simple/test-net-eaddrinuse.js
return;
- }
+ }
try {
listen(fd, 128);
@@ -249,10 +250,10 @@ Channel.prototype._doListen = function(fd, type, host, port, callback) {
}
self._attachWatcher(fd, type, host);
-
+
callback(null);
});
-}
+};
// close([all=true]) - Close all sockets even if parent dosent match.
@@ -261,31 +262,31 @@ Channel.prototype.close = function(all) {
var sockets = this._sockets;
var sock;
var index;
-
+
if (this._closing || this._closed) {
return;
}
-
+
// TODO: do a more correct solution for this.
this._closeWaitCount = 0;
this._closing = true;
-
+
index = sockets.length;
-
+
while (index--) {
sock = sockets[index];
if (sock._connecting) {
- // We need to wait for the socket to connect before the
- // channel can be closed.
- //
+ // We need to wait for the socket to connect before the
+ // channel can be closed.
+ //
// TODO: Optimization, remove from connect-queue as well.
self._closeWaitCount++;
} else if (sock.channel == this || all === true) {
- // The socket was created by this
- // channel (or `all` was set). Wait for
+ // The socket was created by this
+ // channel (or `all` was set). Wait for
// it to die before raising `close`.
self._closeWaitCount++;
@@ -305,47 +306,47 @@ Channel.prototype.close = function(all) {
}
if (watcher.type === 'unix') {
-
+
self._closeWaitCount++;
-
+
require('fs').unlink(watcher.path, function(err) {
self._closeWaitCount--;
-
- if (self._closeWaitCount == 0) {
+
+ if (self._closeWaitCount === 0) {
self._closing = false;
self._closed = true;
self.emit("close");
}
});
- }
+ }
});
this.onclosing && this.onclosing();
- this.emit("closing");
-
+ this.emit("closing");
+
if (this._closeWaitCount) {
- // We are not waiting for any sockets to die, so we
+ // We are not waiting for any sockets to die, so we
// can emit the `close` event immediately.
-
+
this._closed = true;
this.emit("close");
- }
-}
+ }
+};
// Attach a client or server socket.
Channel.prototype._attachSocket = function(sock) {
var self = this;
var sockets = this._sockets;
var connectemitted = false;
-
+
sockets.push(sock);
-
+
if (socket.channel) {
// TODO: Add support for multiple channels.
throw new Error("Socket is already attached to a Channel.");
}
-
+
sock.channel = this;
if (sock._connecting) {
@@ -354,7 +355,7 @@ Channel.prototype._attachSocket = function(sock) {
sock.once("connect", function() {
var index;
-
+
if (self._closing) {
if (this.channel == self || self._closingAll) {
// This instance is the owner of the socket. Close
@@ -363,36 +364,36 @@ Channel.prototype._attachSocket = function(sock) {
this.end();
} else {
// Just remove it from sockets and raise
- // the event `close`, if this istance was the
+ // the event `close`, if this istance was the
// last socket.
index = sockets.indexOf(this);
notEqual(index, -1);
-
+
sockets.splice(index, 1);
self._closeWaitCount--;
-
- if (self._closeWaitCount == 0) {
+
+ if (self._closeWaitCount === 0) {
self._closing = false;
self._closed = true;
self.emit("close");
}
-
+
}
-
+
} else {
connectemitted = true;
self.emit("connect", this);
}
});
} else {
- // Socket connection is already estabilished,
+ // Socket connection is already estabilished,
// raise `connect` event.
connectemitted = true;
self.emit("connect", sock);
}
-
+
// Remove socket from `_sockets` when socket
// is destroyed. Raise ´close´ if channel is closing,
// and all sockets are disconnected.
@@ -400,19 +401,19 @@ Channel.prototype._attachSocket = function(sock) {
var index = sockets.indexOf(this);
notEqual(index, -1);
sockets.splice(index, 1);
-
+
if (connectemitted) {
- // Do not emit `disconnect` if `connect` wasn't
+ // Do not emit `disconnect` if `connect` wasn't
// emitted
self.emit("disconnect", sock);
}
-
+
if (self._closing) {
-
+
self._closeWaitCount--;
-
- if (self._closeWaitCount == 0) {
+
+ if (self._closeWaitCount === 0) {
self._closing = false;
self._closed = true;
self.emit("close");
@@ -426,7 +427,7 @@ Channel.prototype._attachWatcher = function(fd, type, path) {
var Socket = this.SocketClass;
var doaccept;
var watcher;
-
+
if (type == "mem") {
doaccept = require("./memsock").accept;
watcher = new (require("./memsock")).IOWatcher();
@@ -434,7 +435,7 @@ Channel.prototype._attachWatcher = function(fd, type, path) {
doaccept = accept;
watcher = new IOWatcher();
}
-
+
watcher.host = this;
watcher.type = type;
watcher.fd = fd;
@@ -459,7 +460,7 @@ Channel.prototype._attachWatcher = function(fd, type, path) {
info = doaccept(this.fd);
} catch (e) {
- if (e.errno != EMFILE) {
+ if (e.errno != EMFILE) {
throw e;
}
@@ -488,8 +489,8 @@ Channel.prototype._attachWatcher = function(fd, type, path) {
watcher.set(fd, true, false);
watcher.start();
-
- this._watchers.push(watcher);
+
+ this._watchers.push(watcher);
};
View
4 lib/index.js
@@ -1,3 +1,5 @@
+/*jshint loopfunc: true, laxbreak: true, expr: true */
+
//
// Copyright 2010 Johan Dahlberg. All rights reserved.
//
@@ -24,7 +26,7 @@
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-const slice = Array.prototype.slice;
+var slice = Array.prototype.slice;
// Messaging functions
exports.createChannel = require("./channel").createChannel;
View
119 lib/memsock.js
@@ -1,59 +1,62 @@
-//
+/*jshint loopfunc: true, laxbreak: true, expr: true */
+
+//
// Copyright 2010 Johan Dahlberg. All rights reserved.
//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
-// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-const RANGE_START = 0xFF000000;
-const RANGE_END = 0xFFFFFFFF;
+var RANGE_START = 0xFF000000;
+var RANGE_END = 0xFFFFFFFF;
var proxies = {};
var fds = {};
function IOWatcher() {
- this._notifying = false
-};
+ this._notifying = false;
+}
+
exports.IOWatcher = IOWatcher;
IOWatcher.prototype.start = function() {
var self = this;
-
+
if (this.fd && this.prop) {
fds[this.fd][this.prop] = this;
}
-
+
if (this.incomming && this.incomming.length) {
self.notify();
}
};
IOWatcher.prototype.stop = function() {
-
+
if (fds[this.fd] && fds[this.fd][this.prop] == this) {
fds[this.fd][this.prop] = null;
}
-
+
this.fd = null;
this.prop = null;
};
@@ -71,13 +74,13 @@ IOWatcher.prototype.set = function(fd, readable, writable) {
IOWatcher.prototype.notify = function() {
var self = this;
-
+
if (this._notifying) {
return;
}
-
+
this._notifying = true;
-
+
process.nextTick(function() {
self._notifying = false;
self.callback && self.callback();
@@ -86,17 +89,17 @@ IOWatcher.prototype.notify = function() {
self.notify();
}
});
-
+
};
exports.write = function(fd, buf, off, len) {
- var host
+ var host;
var conn;
-
+
if (!(host = fds[fd])) {
throw new Error("Invalid fd");
}
-
+
if (!(conn = fds[host.remotefd])) {
return 0;
}
@@ -107,14 +110,14 @@ exports.write = function(fd, buf, off, len) {
conn.incomming = [buf.slice(off, len)];
}
- if (conn["readwatcher"]) {
- conn["readwatcher"].notify();
+ if (conn.readwatcher) {
+ conn.readwatcher.notify();
return len;
} else {
conn.rcallback = function() {
this.rcallback = null;
host.notify();
- }
+ };
return 0;
}
@@ -126,31 +129,31 @@ exports.read = function(fd, buf, off, len) {
var data;
var pos;
var end;
-
+
if (!(host = fds[fd])) {
throw new Error("Invalid fd");
}
-
+
if (!host.incomming || !host.incomming.length) {
throw new Error("Invalid callback");
}
-
+
while (len > 0 && (data = host.incomming.shift())) {
end = Math.min(len, data.length);
data.copy(buf, off, 0, end);
-
+
count += end;
len -= end;
off += end;
-
+
if (data.length > end) {
host.incomming.unshift(data.slice(end));
}
-
+
}
-
+
if (host.incomming.length) {
- host["readwatcher"] && host["readwatcher"].notify();
+ host.readwatcher && host.readwatcher.notify();
}
return count;
@@ -158,9 +161,9 @@ exports.read = function(fd, buf, off, len) {
exports.socket = function() {
var keys = Object.keys(fds);
-
+
for (var i = RANGE_START; i < RANGE_END; i++) {
- if (fds[i] == null) {
+ if (!fds[i]) {
fds[i] = { fd: i
, remotefd: null
, readwatcher: null
@@ -170,7 +173,7 @@ exports.socket = function() {
return i;
}
}
-
+
throw new Error("No free Memsock slots");
};
@@ -178,7 +181,7 @@ exports.connect = function(fd, name) {
var conn;
var host;
var proxy;
-
+
if (!(conn = fds[fd])) {
throw new Error("Invalid fd");
}
@@ -186,7 +189,7 @@ exports.connect = function(fd, name) {
if (!(proxy = proxies[name])) {
throw new Error("Proxy not found");
}
-
+
if (!(host = fds[proxy.fd])) {
throw new Error("Proxy host error");
}
@@ -196,9 +199,9 @@ exports.connect = function(fd, name) {
} else {
host.incomming = [conn];
}
-
+
process.nextTick(function() {
- host.readwatcher && host.readwatcher.notify();
+ host.readwatcher && host.readwatcher.notify();
});
};
@@ -207,7 +210,7 @@ exports.close = function(fd) {
var host;
var conn;
var proxy;
-
+
if (!(host = fds[fd])) {
throw new Error("Invalid fd");
}
@@ -215,7 +218,7 @@ exports.close = function(fd) {
if (proxies[host.name].fd == fd) {
proxies[host.name] = null;
}
-
+
if ((conn = fds[host.remotefd])) {
if (conn.incomming) {
conn.incomming.push(new Buffer(0));
@@ -224,7 +227,7 @@ exports.close = function(fd) {
}
conn.readwatcher && conn.readwatcher.notify();
}
-
+
conn.name = null;
conn.remotefd = null;
};
@@ -234,7 +237,7 @@ exports.accept = function(fd) {
var conn;
var proxy;
var newfd;
-
+
if (!(host = fds[fd])) {
throw new Error("Invalid fd");
}
@@ -242,12 +245,12 @@ exports.accept = function(fd) {
if (host.incomming && (conn = host.incomming.pop())) {
newfd = exports.socket();
conn.name = host.name;
- conn.remotefd = newfd
+ conn.remotefd = newfd;
fds[newfd].remotefd = conn.fd;
fds[newfd].name = host.name;
process.nextTick(function() {
- conn["writewatcher"] && conn["writewatcher"].notify();
+ conn.writewatcher && conn.writewatcher.notify();
});
return fds[newfd];
}
@@ -255,20 +258,20 @@ exports.accept = function(fd) {
exports.bind = function(fd, name) {
var host;
-
+
if (!(host = fds[fd])) {
throw new Error("Ivanlid fd");
}
-
+
if (host.name) {
throw new Error("Memsocket already bound");
}
-
+
if (proxies[name]) {
throw new Error("Address already in use");
}
-
+
host.name = name;
-
+
proxies[name] = { fd: fd };
-};
+};
View
151 lib/message.js
@@ -1,69 +1,68 @@
-//
+/*jshint loopfunc: true, laxbreak: true, expr: true */
+
+//
// Copyright 2010 Johan Dahlberg. All rights reserved.
//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
-// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-const Stream = require("stream").Stream
- , inherits = require("util").inherits
+var Stream = require("stream").Stream
+ , inherits = require("util").inherits;
+
+var LENGTH_OFFSET = exports.LENGTH_OFFSET = 0x00
+ , LENGTH_SIZE = exports.LENGTH_SIZE = 0x02
+ , FLAG_OFFSET = exports.FLAG_OFFSET = 0x02
+ , FLAG_SIZE = exports.FLAG_SIZE = 0x01
+ , ACK_OFFSET = exports.ACK_OFFSET = 0x03
+ , ACK_SIZE = exports.ACK_SIZE = 0x01;
+
+var HEADER_SIZE = exports.HEADER_SIZE = LENGTH_SIZE
+ + FLAG_SIZE
+ + ACK_SIZE;
+var PAYLOAD_OFFSET = exports.PAYLOAD_OFFSET = FLAG_OFFSET
+ + FLAG_SIZE
+ + ACK_SIZE;
-// Message Contants
-const LENGTH_OFFSET = exports.LENGTH_OFFSET = 0x00
- , LENGTH_SIZE = exports.LENGTH_SIZE = 0x02
- , FLAG_OFFSET = exports.FLAG_OFFSET = 0x02
- , FLAG_SIZE = exports.FLAG_SIZE = 0x01
- , ACK_OFFSET = exports.ACK_OFFSET = 0x03
- , ACK_SIZE = exports.ACK_SIZE = 0x01;
-
-const HEADER_SIZE = exports.HEADER_SIZE = LENGTH_SIZE
- + FLAG_SIZE
- + ACK_SIZE;
-
-const PAYLOAD_OFFSET = exports.PAYLOAD_OFFSET = FLAG_OFFSET
- + FLAG_SIZE
- + ACK_SIZE;
-
-const PAYLOAD_MAX = exports.PAYLOAD_MAX = 0xFFFF - HEADER_SIZE;
+var PAYLOAD_MAX = exports.PAYLOAD_MAX = 0xFFFF - HEADER_SIZE;
-const OPTIONVAL_OFFSET = exports.OPTIONVAL_OFFSET = PAYLOAD_OFFSET + 1;
+var OPTIONVAL_OFFSET = exports.OPTIONVAL_OFFSET= PAYLOAD_OFFSET + 1;
-// Message Flags
-const OPTION = exports.OPTION = 0x01
- , REJECT = exports.REJECT = 0x02
- , MULTIPART = exports.MULTIPART = 0x04
- , MULTIPART_LAST = exports.MULTIPART_LAST = 0x08
- , COMPLEX = exports.COMPLEX = 0x10
+var OPTION = exports.OPTION = 0x01
+ , REJECT = exports.REJECT = 0x02
+ , MULTIPART = exports.MULTIPART = 0x04
+ , MULTIPART_LAST = exports.MULTIPART_LAST = 0x08
+ , COMPLEX = exports.COMPLEX = 0x10;
-const FD_HASH = {__fdhash__: 22 };
+var FD_HASH = {__fdhash__: 22 };
var createOptionMessage =
exports.createOptionMessage = function(payload, ack) {
return createMessage(payload, OPTION, ack);
-}
-
-var createComplexMessage =
+};
+
+var createComplexMessage =
exports.createComplexMessage = function (arr, flags, ack) {
var payload;
var index;
@@ -72,18 +71,18 @@ exports.createComplexMessage = function (arr, flags, ack) {
var fd;
index = arr.length;
-
- // Need better implmentation of this later on.
+
+ // Need better implmentation of this later on.
while (index--) {
- if ((obj = arr[index]) &&
- obj instanceof Stream &&
+ if ((obj = arr[index]) &&
+ obj instanceof Stream &&
obj.fd &&
obj.pause) {
-
+
if (fd) {
throw new Error("Only one FD per message is allowed");
}
-
+
obj.pause();
fd = obj.fd;
arr[index] = FD_HASH;
@@ -91,15 +90,15 @@ exports.createComplexMessage = function (arr, flags, ack) {
}
payload = new Buffer(JSON.stringify(arr), "utf8");
-
+
msg = createMessage(payload, flags | COMPLEX, ack);
-
+
msg._fd = fd;
return msg;
-};
+};
-var createMessage =
+var createMessage =
exports.createMessage = function (payload, flags, ack) {
var plength = (payload && payload.length) || 0;
var poffset;
@@ -109,8 +108,8 @@ exports.createMessage = function (payload, flags, ack) {
var msg;
var psize;
var pflags;
- var msize
-
+ var msize;
+
if (plength > PAYLOAD_MAX) {
parts = Math.ceil(plength / PAYLOAD_MAX);
length = plength + (HEADER_SIZE * parts);
@@ -125,10 +124,10 @@ exports.createMessage = function (payload, flags, ack) {
msg[offset + LENGTH_OFFSET ] = Math.floor(msize / 256) & 0xff;
msg[offset + LENGTH_OFFSET + 1] = msize % 256;
msg[offset + FLAG_OFFSET ] = pflags;
- msg[offset + ACK_OFFSET ] = ack;
+ msg[offset + ACK_OFFSET ] = ack;
payload.copy(msg, offset + PAYLOAD_OFFSET, poffset, poffset + psize);
- offset += msize;
- poffset += psize;
+ offset += msize;
+ poffset += psize;
}
} else {
length = plength + HEADER_SIZE;
@@ -137,7 +136,7 @@ exports.createMessage = function (payload, flags, ack) {
msg[LENGTH_OFFSET ] = Math.floor(length / 256) & 0xff;
msg[LENGTH_OFFSET + 1] = length % 256;
msg[FLAG_OFFSET ] = flags;
- msg[ACK_OFFSET ] = ack;
+ msg[ACK_OFFSET ] = ack;
payload && payload.copy(msg, PAYLOAD_OFFSET, 0, payload.length);
}
@@ -145,15 +144,15 @@ exports.createMessage = function (payload, flags, ack) {
payload && (msg._fd = payload._fd);
msg._off = 0;
-
- return msg;
+
+ return msg;
};
exports.sendImpl = function(graph, callback) {
var msg;
- if (graph.length == 0) {
+ if (graph.length === 0) {
return;
}
@@ -167,11 +166,11 @@ exports.sendImpl = function(graph, callback) {
msg = createComplexMessage(graph, 0, 0);
}
-
+
msg._callback = callback;
return this._sendmsg(msg, true);
-}
+};
// `instance` is used for multipart parsning.
exports.parseMessage = function(buffer, instance) {
@@ -199,9 +198,9 @@ exports.parseMessage = function(buffer, instance) {
if ((flag & MULTIPART_LAST) == MULTIPART_LAST) {
// The message is last message in a multipart sequence. Construct
- // a new message from all messages in multipart payload cache. Combind
+ // a new message from all messages in multipart payload cache. Combind
// all message parts into one unified message.
-
+
msg = new Message(HEADER_SIZE + cache.size, instance);
mpartpos = PAYLOAD_OFFSET;
first = cache[0];
@@ -215,38 +214,38 @@ exports.parseMessage = function(buffer, instance) {
msg[FLAG_OFFSET] = first[FLAG_OFFSET];
msg[ACK_OFFSET] = first[ACK_OFFSET];
- instance._multipartcache = void(0);
+ instance._multipartcache = null;
} else {
// Wait for more parts
-
+
return;
}
} else {
msg = buffer;
}
-
+
if ((flag & COMPLEX) == COMPLEX) {
graph = JSON.parse(msg.toString("utf8", PAYLOAD_OFFSET));
-
+
index = graph.length;
- // Need better implmentation of this later on.
+ // Need better implmentation of this later on.
while (index--) {
- if ((obj = graph[index]) &&
- obj["__fdhash__"] == 22) {
+ if ((obj = graph[index]) &&
+ obj.__fdhash__ == 22) {
graph[index] = instance._pendingfd;
}
}
-
+
msg.graph = graph;
} else {
msg.graph = msg.slice(PAYLOAD_OFFSET);
}
-
+
return msg;
-}
+};
function Message(size, origin) {
Buffer.call(this, size);
View
307 lib/pub.js
@@ -1,46 +1,48 @@
-//
+/*jshint loopfunc: true, laxbreak: true, expr: true */
+
+//
// Copyright 2010 Johan Dahlberg. All rights reserved.
//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
-// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-const inherits = require("util").inherits
- , equal = require("assert").equal
- , notEqual = require("assert").notEqual
+var inherits = require("util").inherits
+ , equal = require("assert").equal
+ , notEqual = require("assert").notEqual;
+
+var Socket = require("./socket").Socket
+ , Channel = require("./channel").Channel
+ , createMessage = require("./message").createMessage;
-const Socket = require("./socket").Socket
- , Channel = require("./channel").Channel
- , createMessage = require("./message").createMessage;
-
-const PAYLOAD_OFFSET = require("./message").PAYLOAD_OFFSET
-
-const findEventListener = require("./util").findEventListener;
+var PAYLOAD_OFFSET = require("./message").PAYLOAD_OFFSET;
+
+var findEventListener = require("./util").findEventListener;
// Option Types
-const SUBSCRIBE = 0x01
- , UNSUBSCRIBE = 0x02
- , INCLUDE = 0x03
- , EXCLUDE = 0x04;
+var SUBSCRIBE = 0x01
+ , UNSUBSCRIBE = 0x02
+ , INCLUDE = 0x03
+ , EXCLUDE = 0x04;
function sendImpl(buffer) {
@@ -60,7 +62,7 @@ function sendImpl(buffer) {
function PubSocket(options) {
var opts = options || {};
Socket.call(this, opts);
-
+
this._rawsubscriptions = null;
this._rawpatternvariants = null;
this._rawpatternlongest = -1;
@@ -77,45 +79,44 @@ PubSocket.prototype._processMessage = function(msg) {
var exclusions;
var pattern;
var subs;
- var index;
var length;
switch (msg.option) {
-
+
case INCLUDE:
exclusions = this._exclusions;
pattern = msg.graph.toString("ascii");
-
+
if (exclusions && exclusions[pattern]) {
delete exclusions[pattern];
-
- if (Object.keys(exclusions) == 0) {
+
+ if (Object.keys(exclusions) === 0) {
this._exclusions = null;
}
}
-
+
break;
-
+
case EXCLUDE:
exclusions = this._exclusions;
pattern = msg.graph.toString("ascii");
-
+
if (!exclusions) {
exclusions = this._exclusions = {};
exclusions[pattern] = true;
} else {
exclusions[pattern] = true;
}
-
+
break;
-
+
case SUBSCRIBE:
if (msg.complex) {
// Complex patterns is currently disabled
-
+
this.destroy(new Error("Complex subscription's is currently disabled"));
} else {
@@ -127,59 +128,59 @@ PubSocket.prototype._processMessage = function(msg) {
} else if (subs.indexOf(pattern) == -1) {
subs.push(pattern);
} else {
- // Socket is already listen for this pattern,
+ // Socket is already listen for this pattern,
// ignore the request.
-
+
return;
}
-
+
length = pattern.length;
- if (length == 0) {
- // Length zero indicates that socket want to subscribe to
+ if (length === 0) {
+ // Length zero indicates that socket want to subscribe to
// all possible messages.
-
+
this._nullsubscriptions = true;
-
+
if (this._rawpatternlongest !== length) {
this._rawpatternvariants = true;
}
-
+
} else if (length > this._rawpatternlongest) {
// Check if this subscription is the longest one. If
- // so, check if we have variable pattern lenghts. This
+ // so, check if we have variable pattern lenghts. This
// is an optimization that let us skip recalcLongestKey
// when remvoing subscriptions.
if (this._rawpatternvariants === null) {
// Initialize the `rawpatternvariants` variable
-
+
this._rawpatternvariants = false;
- } else if (!this._rawpatternvariants &&
+ } else if (!this._rawpatternvariants &&
length != this._rawpatternlongest){
-
+
this._rawpatternvariants = true;
}
}
-
+
if (length > this._rawpatternlongest) {
- // This pattern is the longest.
-
+ // This pattern is the longest.
+
this._rawpatternlongest = length;
}
-
- events && events["subscribe"] && this.emit("subscribe", pattern);
+
+ events && events.subscribe && this.emit("subscribe", pattern);
}
-
-
+
+
break;
-
+
case UNSUBSCRIBE:
-
+
if (msg.complex) {
// Complex patterns is currently disabled
-
+
this.destroy(new Error("Complex subscription's is currently disabled"));
} else {
@@ -191,18 +192,18 @@ PubSocket.prototype._processMessage = function(msg) {
}
subs.splice(index, 1);
-
+
length = pattern.length;
- if (length == this._rawpatternlongest &&
+ if (length == this._rawpatternlongest &&
this._rawpatternvariants) {
// Need to recalculate the longest pattern again
-
+
index = subs.length;
this._rawpatternvariants = false;
this._rawpatternlongest = -1;
-
+
while (index--) {
length = subs[index].length;
if (length > this._rawpatternlongest) {
@@ -216,24 +217,24 @@ PubSocket.prototype._processMessage = function(msg) {
}
}
- if (length == 0) {
+ if (length === 0) {
this._nullsubscriptions = false;
- }
+ }
+
+ events && events.unsubscribe && this.emit("unsubscribe", pattern);
- events && events["unsubscribe"] && this.emit("unsubscribe", pattern);
-
}
-
-
+
+
break;
-
+
default:
- // Ignore all message's that isn't flag with OPTION.
+ // Ignore all message's that isn't flag with OPTION.
// PubSocket's cannot receive messages.
break;
}
-}
+};
PubSocket.prototype.send = sendImpl;
@@ -260,14 +261,14 @@ PubChannel.prototype._onconnectPubChannel = function(sock) {
var pipedchannels = this._pipedChannels;
var self = this;
var index;
-
+
function onsubscribe(pattern) {
var sockets = rawsubscriptions[pattern];
var index;
if (!sockets) {
sockets = rawsubscriptions[pattern] = [sock];
-
+
// Update subscrptions model for piped channels
if ((index = pipedchannels.length)) {
while (index--) {
@@ -283,26 +284,26 @@ PubChannel.prototype._onconnectPubChannel = function(sock) {
sockets.push(sock);
}
-
+
}
-
+
function onunsubscribe(pattern) {
var sockets;
var index;
-
+
sockets = rawsubscriptions[pattern];
- notEqual(sockets, void(0));
-
+ notEqual(sockets, undefined);
+
index = sockets.indexOf(sock);
notEqual(index, -1);
-
+
sockets.splice(index, 1);
-
- if (sockets.length == 0) {
- // Delete subscriptions for pattern IF this was the
- // last subscription and update subscrptions model
+
+ if (sockets.length === 0) {
+ // Delete subscriptions for pattern IF this was the
+ // last subscription and update subscrptions model
// for piped channels.
-
+
delete rawsubscriptions[pattern];
if ((index = pipedchannels.length)) {
@@ -310,45 +311,45 @@ PubChannel.prototype._onconnectPubChannel = function(sock) {
pipedchannels[index].unsubscribe(pattern);
}
}
-
+
self.emit("unsubscribe", pattern);
}
-
+
}
-
+
onsubscribe.owner = this;
onunsubscribe.owner = this;
-
+
if (sock._subscriptions && sock._rawsubscriptions.length) {
// Process sockets current subscriptions, if exists.
-
+
for (var i = 0, l = sock._rawsubscriptions.length; i < l; i++) {
onsubscribe(sock._rawsubscriptions[i]);
}
-
+
}
sock.on("subscribe", onsubscribe);
sock.on("unsubscribe", onunsubscribe);
-}
+};
PubChannel.prototype._ondisconnectPubChannel = function(sock) {
var onsubscribe;
var onunsubscribe;
var index;
var all;
-
+
// Find our subscribe handler
onsubscribe = findEventListener(sock, "subscribe", this);
- notEqual(onsubscribe, void(0));
+ notEqual(onsubscribe, undefined);
sock.removeListener("subscribe", onsubscribe);
// Find our unsubscribe handler
onunsubscribe = findEventListener(sock, "unsubscribe", this);
- notEqual(onunsubscribe, void(0));
+ notEqual(onunsubscribe, undefined);
sock.removeListener("unsubscribe", onunsubscribe);
-
-}
+
+};
PubChannel.prototype._PublisherImpl_throttleStart = function() {
var channels = this._pipedChannels;
@@ -356,15 +357,15 @@ PubChannel.prototype._PublisherImpl_throttleStart = function() {
while (index--) {
channels[index].pause();
}
-}
+};
PubChannel.prototype._PublisherImpl_throttleStop = function() {
var channels = this._pipedChannels;
var index = channels.length;
while (index--) {
channels[index].resume();
- }
-}
+ }
+};
PubChannel.prototype.send = sendImpl;
@@ -378,13 +379,13 @@ PubChannel.prototype._sendmsg = function(msg) {
queue = this._sendqueue;
queue.push(msg);
- queue.size += msg.length
+ queue.size += msg.length;
// Start send queue processing if not started.
if (!queue.running) {
this._startSendQueueJob();
- }
-}
+ }
+};
PubChannel.prototype._startSendQueueJob = function() {
var self = this;
@@ -393,12 +394,12 @@ PubChannel.prototype._startSendQueueJob = function() {
var filter = this._broadcastEndpointFilter;
var opts = this._broadcastEndpointFilterOptions;
var throttle = false;
-
- // Return if we send queue already is being processed
+
+ // Return if we send queue already is being processed
if (queue.running) {
return;
}
-
+
// Discard all messages if we missing sockets
if (!sockets.length) {
this._sendqueue = [];
@@ -406,37 +407,37 @@ PubChannel.prototype._startSendQueueJob = function() {
this._sendqueue.running = false;
return;
}
-
+
queue.running = true;
-
+
function run() {
var handle;
-
+
if (throttle) {
throttle = false;
self.emit("throttleStop");
}
handle = processSendQueueJob(queue, sockets);
-
+
if (handle) {
self.emit("throttleStart");
handle.ondrain = function() {
throttle = true;
- process.nextTick(run);
- }
-
+ process.nextTick(run);
+ };
+
} else {
queue.running = false;
}
}
-
+
process.nextTick(run);
-}
+};
/**
- * Pipes a subscriber message to this publisher. All messages is forwards
+ * Pipes a subscriber message to this publisher. All messages is forwards
* that is received on this channel.
*
* @param {Channel} channel The subscriber channel instance to receive from.
@@ -450,38 +451,38 @@ PubChannel.prototype.pipe = function(channel) {
if (channel instanceof SubscriberChannel) {
throw new Error("Expected a subscriber channel");
}
-
+
if (channels.indexOf(channel) != -1) {
throw new Error("Channel already in pipe-line");
}
-
- if (channels.length == 0) {
+
+ if (channels.length === 0) {
this.on("throttleStart", this._PublisherImpl_throttleStart);
this.on("throttleStop", this._PublisherImpl_throttleStop);
}
-
+
// Pipe all messages in channel to publishers
// send queue.
channel.on("rawmessage", this._sendMsg.bind(this));
channel.on("close", function( ) {
var index = channels.indexOf(this);
-
+
if (index != -1) {
channels.splice(index, 1);
}
-
- if (channels.length == 0) {
+
+ if (channels.length === 0) {
self.removeListener("throttleStart", this._PublisherImpl_throttleStart);
self.removeListener("throttleStop", this._PublisherImpl_throttleStop);
}
});
-
+
while (index--) {
channel.subscribe(keys[index]);
}
-
+
channels.push(channel);
-}
+};
function processSendQueueJob(queue, sockets) {
var drainWaitHandle;
@@ -498,10 +499,10 @@ function processSendQueueJob(queue, sockets) {
while (index--) {
sock = filtered[index];
- if (sock._writemsg(msg) == false) {
- // The message was not completely written. Create a
+ if (sock._writemsg(msg) === false) {
+ // The message was not completely written. Create a
// drain-wait handle and wait for the socket to flush.
-
+
if (!drainWaitHandle) {
drainWaitHandle = new DrainWaitHandle();
}
@@ -509,15 +510,15 @@ function processSendQueueJob(queue, sockets) {
drainWaitHandle.push(sock);
}
}
-
+
queue.size -= msg.length;
}
-
+
// Return no of sent messages.
return drainWaitHandle;
}
-// Returns a list of sockets that has a
+// Returns a list of sockets that has a
// matching pattern for the specified msg.
function rawPatternFilter(sockets, msg) {
var origin = msg.origin;
@@ -532,55 +533,55 @@ function rawPatternFilter(sockets, msg) {
while (index--) {
sock = sockets[index];
-
+
if (!sock.fd || !sock.writable) {
- // Ignore invalid sockets.
-
+ // Ignore invalid sockets.
+
continue;
}
if (sock._exclusions && origin && sock._exclusions[origin.id]) {
- // Ignore socket if it matches exclusion.
-
- continue;
+ // Ignore socket if it matches exclusion.
+
+ continue;
}
if (sock._nullsubscriptions) {
- // Add socket to result list if he subscribes to
+ // Add socket to result list if he subscribes to
// ALL messages
result.push(sock);
continue;
-
+
} else {
// try to filter against socket subscriptions.
subs = sock._rawsubscriptions;
- if (!subs || subs.length == 0) {
+ if (!subs || subs.length === 0) {
// Ignore socket if no subscriptions.
-
+
continue;
}
if (!pattern || sock._rawpatternlongest > pattern.length) {
- // Slice message for pattern if not defined. Re-slice
+ // Slice message for pattern if not defined. Re-slice
// pattern if socket has a longer pattern in list.
-
- longest = Math.min(sock._rawpatternlongest,
+
+ longest = Math.min(sock._rawpatternlongest,
msg.length - PAYLOAD_OFFSET);
-
- pattern = msg.toString("binary", PAYLOAD_OFFSET,
+
+ pattern = msg.toString("binary", PAYLOAD_OFFSET,
PAYLOAD_OFFSET + longest);
}
-
+
patternindex = subs.length;
while (patternindex--) {
sockpattern = subs[patternindex];
max = sockpattern.length;
- if (sockpattern.length == pattern.length &&
+ if (sockpattern.length == pattern.length &&
sockpattern == pattern) {
result.push(sock);
} else if (sockpattern.length > pattern.length &&
@@ -616,6 +617,6 @@ DrainWaitHandle.prototype.push = function(socket) {
socket.ondrain = free;
socket.on("close", free);
-
+
this._count++;
-}
+};
View
199 lib/req.js
@@ -1,52 +1,54 @@
-//
+/*jshint loopfunc: true, laxbreak: true, expr: true */
+
+//
// Copyright 2010 Johan Dahlberg. All rights reserved.
//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
-// 2. Redistributions in binary form must reproduce the above copyright
-// notice, this list of conditions and the following disclaimer in the
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
-// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-const inherits = require("util").inherits
- , notEqual = require("assert").notEqual;
+var inherits = require("util").inherits
+ , notEqual = require("assert").notEqual;
+
+var slice = Array.prototype.slice;
-const slice = Array.prototype.slice;
+var Socket = require("./socket").Socket
+ , Channel = require("./channel").Channel;
-const Socket = require("./socket").Socket
- , Channel = require("./channel").Channel;
+var createMessage = require("./message").createMessage
+ , createComplexMessage = require("./message").createComplexMessage;
-const createMessage = require("./message").createMessage
- , createComplexMessage = require("./message").createComplexMessage;
-
-const defineDispatcher = require("./util").defineDispatcher;
+var defineDispatcher = require("./util").defineDispatcher;
-const ACK_OFFSET = require("./message").ACK_OFFSET
+var ACK_OFFSET = require("./message").ACK_OFFSET;
-const SQ_FLUSH_INTERVAL = 300;
-const MAX_OUTGOING_REQUESTS = 255;
-const MAX_REJECTS = 6;
+var SQ_FLUSH_INTERVAL = 300;
+var MAX_OUTGOING_REQUESTS = 255;
+var MAX_REJECTS = 6;
function ReqSocket(options) {
var opts = options || {};
Socket.call(this, opts);
-
+
this._outgoingmax = opts.maxOutgoingRequests || MAX_OUTGOING_REQUESTS;
this._outgoingcount = 0;
this._ackwaitpool = {};
@@ -80,7 +82,7 @@ ReqSocket.prototype.send = function() {
}
} else {
graph = msg.graph;
-
+
if (graph instanceof Buffer) {
if (this.__recv__.call(this, "", msg, graph) == -1) {
throw new Error("No dispatch route for `/2`");
@@ -90,7 +92,7 @@ ReqSocket.prototype.send = function() {
graph.unshift(msg);
graph.unshift(name);
if (this.__recv__.apply(this, graph) == -1) {
- throw new Error("No dispatch route for `" +
+ throw new Error("No dispatch route for `" +
name + "/" + (graph.length - 1) + "`");
}
}
@@ -99,9 +101,9 @@ ReqSocket.prototype.send = function() {
self.destroy(dispatchException);
}
};
-
+
this._sendmsg(req, true);
-
+
return req;
};
@@ -121,7 +123,7 @@ function createRequest(graph) {
}
defineDispatcher(req, "recv");
-
+
return req;
}
@@ -130,10 +132,10 @@ ReqSocket.prototype._writemsg = function(msg) {
var bytes;
var ack;
var newmsg;
-
+
ack = msg[ACK_OFFSET];
- if (ack == 0) {
+ if (ack === 0) {
// Find free ack no for this msg
ack = this._outgoingmax + 1;
@@ -143,13 +145,13 @@ ReqSocket.prototype._writemsg = function(msg) {
// Note: This should never happen, because we remove the
// socket from readyRemoteEndpoints.
- if (ack == 0) {
+ if (ack === 0) {
return false;
}
// Set message ack to the one generated.
msg[ACK_OFFSET] = ack;
-
+
} else {
ack = 0;
}
@@ -164,7 +166,7 @@ ReqSocket.prototype._writemsg = function(msg) {
this.destroy(e);
return false;
}
-
+
if (ack) {
// Only add new mesages to waitpool.
@@ -176,16 +178,16 @@ ReqSocket.prototype._writemsg = function(msg) {
if (bytes + msg._off == msg.length) {
return true;
- }
+ }
msg._off += bytes;
- // Partly written messages MUST be
+ // Partly written messages MUST be
// queued again.
this._sendqueue.unshift(msg);
this._writeWatcher.start();
-
+
return false;
};
@@ -198,33 +200,33 @@ ReqSocket.prototype._processMessage = function(msg) {
// Ignore message if we currently doesn't have a
// corresponding wait handle or messages that is flaged as
- // OPTION.
+ // OPTION.
if (!req || msg.option) {
return;
}
-
+
// Free ack handle in socket wait pool
- waitpool[ack] = void(0);
+ waitpool[ack] = null;
ackfree = (this._outgoingcount-- == this._outgoingmax);
-
+
if (ackfree && !this._waitingforflush) {
- this._events && this._events['drain'] && this.emit('drain');
+ this._events && this._events.drain && this.emit("drain");
this.ondrain && this.ondrain();
this.__destroyOnDrain && this.destroy();
}
req.__handler__(null, msg);
-}
+};
// ReqSocket needs it's own implementation of _onWritable
-// because of the fact that the socket shouldn't be `drain`-ed
+// because of the fact that the socket shouldn't be `drain`-ed
// if there is out of acks.
ReqSocket.prototype._onWritable = function() {
this._waitingforflush = false;
if (this.flush()) {
if (this._outgoingcount < this._outgoingmax) {
- this._events && this._events['drain'] && this.emit('drain');
+ this._events && this._events.drain && this.emit("drain");
this.ondrain && this.ondrain();
this.__destroyOnDrain && this.destroy();
}
@@ -238,9 +240,9 @@ ReqSocket.prototype.destroy = function(exception) {
var req;
var keys;
var index;
-
+
Socket.prototype.destroy.call(this, exception);
-
+
if (!this.channel && waitpool) {
keys = Object.keys(waitpool);
index = keys.length;
@@ -262,10 +264,10 @@ function ReqChannel(options) {
this._requestQueue = [];
this._readySockets = [];
this._requestQueueJobRunning = false;
-
+
this._sidequeue = [];
this._sidequeue.running = false;
-
+
this._rejectflushinterval = options.rejectFlushInterval || SQ_FLUSH_INTERVAL;
this._maxrejects = options.maxRejects || MAX_REJECTS;
@@ -300,9 +302,9 @@ ReqChannel.prototype.RequesterImpl_disconnect = function(sock) {
}
index = keys.length;
-
+
while (index--) {
- var key = keys[index];
+ key = keys[index];
if ((msg = waitpool[key])) {
queue.push(msg);
}
@@ -310,7 +312,7 @@ ReqChannel.prototype.RequesterImpl_disconnect = function(sock) {
if (queue.length && !this._requestQueueJobRunning) {
this._startRequestQueueJob();
- }
+ }
};
ReqChannel.prototype.send = function() {
@@ -320,7 +322,7 @@ ReqChannel.prototype.send = function() {
var graph;
var name;
var req;
-
+
req = createRequest(args);