Permalink
Browse files

Initial commit

  • Loading branch information...
0 parents commit 5c2217fdbeeadb70c5f4895175faec0f01a32abb @tj committed Jul 12, 2012
2 .gitignore
@@ -0,0 +1,2 @@
+node_modules
+testing
7 .npmignore
@@ -0,0 +1,7 @@
+examples
+<<<<<<< HEAD
+=======
+Readme.md
+test
+testing
+>>>>>>> 9288590b9bb16130c35d64447747328678b0da2b
7 Makefile
@@ -0,0 +1,7 @@
+
+TESTS = $(wildcard test/test.*.js)
+
+test:
+ @./test/run $(TESTS)
+
+.PHONY: test
176 Readme.md
@@ -0,0 +1,176 @@
+
+# Super Sockets
+
+ Super Sockets is a message-oriented socket library for node.js heavily inspired by zeromq.
+
+## Features
+
+ - message oriented
+ - automated reconnection
+ - light-weight wire protocol
+ - light-weight implementation (~300 SLOC)
+ - supports arbitrary binary message (msgpack, json, BLOBS, etc)
+ - supports JSON messages
+ - push / pull pattern
+ - pub / sub pattern
+ - event emitter pattern
+
+## Push / Pull
+
+`PushSocket`s distribute messages round-robin.
+
+```js
+var ss = require('super-sockets');
+var sock = ss.socket('push');
+
+sock.bind(3000);
+console.log('push server started');
+
+setInterval(function(){
+ process.stdout.write('.');
+ sock.send('hello');
+}, 150);
+```
+
+Receiver of `PushSocket` messages.
+
+```js
+var ss = require('../../')
+ , sock = ss.socket('push');
+
+sock.bind(3000);
+console.log('push server started');
+
+setInterval(function(){
+ process.stdout.write('.');
+ sock.send('hello');
+}, 150);
+```
+
+## Pub / Sub
+
+`PubSocket`s send messages to all subscribers without queueing.
+
+_pub.js_:
+
+```js
+var ss = require('super-sockets');
+var sock = ss.socket('pub');
+
+sock.bind(3000);
+console.log('pub server started');
+
+setInterval(function(){
+ console.log('sending');
+ sock.send('hello');
+}, 500);
+```
+
+`SubSocket` provides selective reception of messages from a `PubSocket`.
+
+_sub.js_:
+
+```js
+var ss = require('super-sockets');
+var sock = ss.socket('sub');
+
+sock.connect(3000);
+
+sock.on('message', function(msg){
+ console.log(msg.toString());
+});
+```
+
+## EmitterSocket
+
+`EmitterSocket`'s send and receive messages behaving like regular node `EventEmitter`s.
+This is achieved by using pub / sub sockets behind the scenes. Currently we simply choose
+define the `EmitterSocket` as a `PubSocket` if you `.bind()`, and `SubSocket` if you `.connect()`,
+providing the natural API you're used to:
+
+server.js:
+
+```js
+var ss = require('super-sockets');
+var sock = ss.socket('emitter');
+
+sock.bind(3000);
+console.log('pub server started');
+
+setInterval(function(){
+ sock.emit('login', { name: 'tobi' });
+}, 500);
+```
+
+client.js:
+
+```js
+var ss = require('super-sockets');
+var sock = ss.socket('emitter');
+
+sock.connect(3000);
+console.log('sub client connected');
+
+sock.on('login', function(user){
+ console.log('%s signed in', user.name);
+});
+```
+
+## Protocol
+
+ The wire protocol is simple and very much zeromq-like, where `<length>` is
+ a 24 bit unsigned integer, representing a maximum length of 2^24, roughly ~16mb,
+ so you should be good :p
+
+```
+ octet: 0 1 2 3 <length>
+ +------+------+------+------+------------------...
+ | meta | <length> | data ...
+ +------+------+------+------+------------------...
+```
+
+## Running tests
+
+```
+$ npm install
+$ make test
+```
+
+## Todo
+
+ - more tests
+ - code cov
+ - acks
+ - emitter style on top of pubsub with multipart
+ - pluggable codecs
+ - weighted fair queuing
+ - use mocha for tests
+ - multipart
+ - subscriptions
+ - ...
+
+## License
+
+(The MIT License)
+
+Copyright (c) 2012 TJ Holowaychuk &lt;tj@vision-media.ca&gt;
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+'Software'), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
10 benchmark/pub.js
@@ -0,0 +1,10 @@
+
+var zmq = require('..')
+ , pub = zmq.socket('pub');
+
+var n = 1000;
+
+pub.bind(3000, function(){
+ console.log('bound to :3000');
+ while (n--) pub.send('foo');
+});
26 benchmark/sub.js
@@ -0,0 +1,26 @@
+
+var zmq = require('..')
+ , sub = zmq.socket('sub');
+
+var start = new Date
+ , n = 1000
+ , ops = n;
+
+sub.on('message', function(msg){
+ console.error(msg);
+ --n || (function(){
+ var duration = new Date - start;
+ pub.close();
+ sub.close();
+ console.log();
+ console.log(' pub/sub:');
+ console.log(' \033[36m%d\033[m msgs', ops);
+ console.log(' \033[36m%d\033[m ops/s', ops / (duration / 1000) | 0);
+ console.log(' \033[36m%d\033[m ms', duration);
+ console.log();
+ })();
+});
+
+sub.connect(3000, function(){
+ console.log('connected to :3000');
+});
14 examples/emitter/client.js
@@ -0,0 +1,14 @@
+
+var ss = require('../..')
+ , sock = ss.socket('emitter');
+
+sock.connect(3000);
+console.log('emitter client connected');
+
+sock.on('login', function(user){
+ console.log('%s signed in', user.name);
+});
+
+sock.on('logout', function(user){
+ console.log('%s signed out', user.name);
+});
14 examples/emitter/server.js
@@ -0,0 +1,14 @@
+
+var ss = require('../..')
+ , sock = ss.socket('emitter');
+
+sock.bind(3000);
+console.log('emitter server started');
+
+setInterval(function(){
+ sock.emit('login', { name: 'tobi' });
+}, 300);
+
+setInterval(function(){
+ sock.emit('logout', { name: 'tobi' });
+}, 1500);
11 examples/pubsub/pub.js
@@ -0,0 +1,11 @@
+
+var ss = require('../..')
+ , sock = ss.socket('pub');
+
+sock.bind(3000);
+console.log('pub server started');
+
+setInterval(function(){
+ console.log('sending');
+ sock.send('hello');
+}, 500);
9 examples/pubsub/sub.js
@@ -0,0 +1,9 @@
+
+var ss = require('../..')
+ , sock = ss.socket('sub');
+
+sock.connect(3000);
+
+sock.on('message', function(msg){
+ console.log(msg.toString());
+});
9 examples/pushpull/pull.js
@@ -0,0 +1,9 @@
+
+var ss = require('../..')
+ , sock = ss.socket('pull');
+
+sock.connect(3000);
+
+sock.on('message', function(msg){
+ console.log(msg.toString());
+});
11 examples/pushpull/push.js
@@ -0,0 +1,11 @@
+
+var ss = require('../..')
+ , sock = ss.socket('push');
+
+sock.bind(3000);
+console.log('push server started');
+
+setInterval(function(){
+ process.stdout.write('.');
+ sock.send('hello');
+}, 150);
2 index.js
@@ -0,0 +1,2 @@
+
+module.exports = require('./lib');
18 lib/codecs.js
@@ -0,0 +1,18 @@
+
+/**
+ * Binary.
+ */
+
+exports.none = {
+ encode: function(msg){ return msg },
+ decode: function(msg){ return msg }
+};
+
+/**
+ * JSON.
+ */
+
+exports.json = {
+ encode: JSON.stringify,
+ decode: JSON.parse
+};
78 lib/emitter.js
@@ -0,0 +1,78 @@
+
+/**
+ * Module dependencies.
+ */
+
+var PubSocket = require('./pub')
+ , SubSocket = require('./sub')
+ , Emitter = require('events').EventEmitter;
+
+/**
+ * Expose `EmitterSocket`.
+ */
+
+module.exports = EmitterSocket;
+
+/**
+ * Initialzie a new `EmitterSocket`.
+ *
+ * @api private
+ */
+
+function EmitterSocket(){}
+
+/**
+ * Inherits from `Emitter.prototype`.
+ */
+
+EmitterSocket.prototype.__proto__ = Emitter.prototype;
+
+/**
+ * Bind as a `PubSocket`.
+ *
+ * @api public
+ */
+
+EmitterSocket.prototype.bind = function(){
+ this.pub = new PubSocket;
+ this.pub.format('json');
+ this.emit = emit;
+ return this.pub.bind.apply(this.pub, arguments);
+};
+
+/**
+ * Connect as a `SubSocket`.
+ *
+ * @api public
+ */
+
+EmitterSocket.prototype.connect = function(){
+ var self = this;
+ this.sub = new SubSocket;
+ this.sub.on('message', function(args){
+ self.emit.apply(self, args);
+ });
+ return this.sub.connect.apply(this.sub, arguments);
+};
+
+/**
+ * Close the pub or sub socket.
+ *
+ * @api public
+ */
+
+EmitterSocket.prototype.close = function(){
+ return (this.sub || this.pub).close();
+};
+
+/**
+ * Emit `event` and the given args to all established peers.
+ *
+ * @param {String} event
+ * @api public
+ */
+
+function emit(event){
+ var args = [].slice.apply(arguments);
+ this.pub.send(args);
+};
53 lib/index.js
@@ -0,0 +1,53 @@
+
+/**
+ * Library version.
+ */
+
+exports.version = '0.0.1';
+
+/**
+ * Constructors.
+ */
+
+exports.Socket = require('./sock');
+exports.Queue = require('./queue');
+exports.PubSocket = require('./pub');
+exports.SubSocket = require('./sub');
+exports.PushSocket = require('./push');
+exports.PullSocket = require('./pull');
+exports.EmitterSocket = require('./emitter');
+
+/**
+ * Socket types.
+ */
+
+exports.types = {
+ stream: exports.Socket,
+ queue: exports.Queue,
+ pub: exports.PubSocket,
+ sub: exports.SubSocket,
+ push: exports.PushSocket,
+ pull: exports.PullSocket,
+ emitter: exports.EmitterSocket
+};
+
+/**
+ * Codecs.
+ */
+
+exports.codecs = require('./codecs');
+
+/**
+ * Return a new socket of the given `type`.
+ *
+ * @param {String} type
+ * @param {Object} options
+ * @return {Socket}
+ * @api public
+ */
+
+exports.socket = function(type, options){
+ var fn = exports.types[type];
+ if (!fn) throw new Error('invalid socket type "' + type + '"');
+ return new fn(options);
+};
49 lib/pub.js
@@ -0,0 +1,49 @@
+
+/**
+ * Module dependencies.
+ */
+
+var Queue = require('./queue');
+
+/**
+ * Expose `PubSocket`.
+ */
+
+module.exports = PubSocket;
+
+/**
+ * Initialzie a new `PubSocket`.
+ *
+ * @api private
+ */
+
+function PubSocket() {
+ Queue.call(this);
+ this.filters = [];
+}
+
+/**
+ * Inherits from `Queue.prototype`.
+ */
+
+PubSocket.prototype.__proto__ = Queue.prototype;
+
+/**
+ * Send `msg` to all established peers.
+ *
+ * @param {Mixed} msg
+ * @api public
+ */
+
+PubSocket.prototype.send = function(msg){
+ var socks = this.socks
+ , fmt = this._format
+ , msg = this.pack(this.encode(msg, fmt), fmt)
+ , len = socks.length
+ , sock;
+
+ for (var i = 0; i < len; ++i) {
+ sock = socks[i];
+ sock.write(msg);
+ }
+};
29 lib/pull.js
@@ -0,0 +1,29 @@
+
+/**
+ * Module dependencies.
+ */
+
+var Queue = require('./queue');
+
+/**
+ * Expose `PullSocket`.
+ */
+
+module.exports = PullSocket;
+
+/**
+ * Initialize a new `PullSocket`.
+ *
+ * @api private
+ */
+
+function PullSocket() {
+ Queue.call(this);
+ // TODO: selective reception
+}
+
+/**
+ * Inherits from `Queue.prototype`.
+ */
+
+PullSocket.prototype.__proto__ = Queue.prototype;
50 lib/push.js
@@ -0,0 +1,50 @@
+
+/**
+ * Module dependencies.
+ */
+
+var Queue = require('./queue');
+
+/**
+ * Expose `PushSocket`.
+ */
+
+module.exports = PushSocket;
+
+/**
+ * Initialzie a new `PushSocket`.
+ *
+ * @api private
+ */
+
+function PushSocket() {
+ Queue.call(this);
+ this.n = 0;
+ this.on('connect', this.flush.bind(this));
+}
+
+/**
+ * Inherits from `Queue.prototype`.
+ */
+
+PushSocket.prototype.__proto__ = Queue.prototype;
+
+/**
+ * Send `msg` round-robin to established peers.
+ *
+ * @param {Mixed} msg
+ * @api public
+ */
+
+PushSocket.prototype.send = function(msg){
+ var socks = this.socks
+ , len = socks.length
+ , sock = socks[this.n++ % len]
+ , fmt = this._format;
+
+ if (sock) {
+ sock.write(this.pack(this.encode(msg, fmt), fmt));
+ } else {
+ this.buf.push(msg);
+ }
+};
271 lib/queue.js
@@ -0,0 +1,271 @@
+
+/**
+ * Module dependencies.
+ */
+
+var Socket = require('./sock')
+ , codecs = require('./codecs')
+ , net = require('net');
+
+/**
+ * Expose `Queue`.
+ */
+
+exports = module.exports = Queue;
+
+/**
+ * Format map.
+ */
+
+var format = { ids: {}, names: {} };
+
+/**
+ * Build the map.
+ */
+
+Object.keys(codecs).forEach(function(name, i){
+ format.ids[name] = i;
+ format.names[i] = name;
+});
+
+/**
+ * Initialize a new `Queue`.
+ *
+ * The "Queue" encapsulates message packing & framing,
+ * and applying of codecs for each message received.
+ *
+ * @api private
+ */
+
+function Queue() {
+ Socket.call(this);
+ var self = this;
+ var sock = this.sock;
+ this.socks = [];
+ this.buf = [];
+ this.state = 'meta';
+ this.format('none');
+ sock.setNoDelay();
+ sock.on('data', this.frame.bind(this));
+ sock.on('connect', this.flush.bind(this));
+}
+
+/**
+ * Inherit from `Socket.prototype`.
+ */
+
+Queue.prototype.__proto__ = Socket.prototype;
+
+/**
+ * Set format to `type`.
+ *
+ * @param {String} type
+ * @return {Queue}
+ * @api public
+ */
+
+Queue.prototype.format = function(type){
+ var id = format.ids[type];
+ if (null == id) throw new Error('unknown format "' + type + '"');
+ this._format = id;
+ return this;
+};
+
+/**
+ * Frame the given `chunk`.
+ *
+ * @param {Buffer} chunk
+ * @api private
+ */
+
+Queue.prototype.frame = function(chunk){
+ var i = 0
+ , len = chunk.length;
+
+ while (i < len) {
+ switch (this.state) {
+ case 'meta':
+ this.meta = this.unpack(chunk, i);
+ this.msg = new Buffer(this.meta.length);
+ this.state = 'message';
+ this.offset = 0;
+ i += 4;
+ break;
+ case 'message':
+ var needed = this.meta.length
+ , left = len - i
+ , n = needed > left
+ ? left
+ : needed;
+
+ chunk.copy(this.msg, this.offset, i, i + n);
+ this.offset += n;
+ i += n;
+ if (this.offset == needed) {
+ this.onmessage(this.msg, this.meta);
+ this.state = 'meta';
+ }
+ break;
+ }
+ }
+};
+
+/**
+ * Decode `msg` as `fmt`.
+ *
+ * @param {Buffer} msg
+ * @param {String} fmt
+ * @return {Mixed} decoded message
+ * @api private
+ */
+
+Queue.prototype.decode = function(msg, fmt){
+ var decode = codecs[format.names[fmt]].decode;
+ return decode(msg);
+};
+
+/**
+ * Encode `msg` as `fmt`.
+ *
+ * @param {Buffer} msg
+ * @param {String} fmt
+ * @return {Mixed} encoded message
+ * @api private
+ */
+
+Queue.prototype.encode = function(msg, fmt){
+ var encode = codecs[format.names[fmt]].encode;
+ return encode(msg);
+};
+
+/**
+ * Handle message decoding and emit "message".
+ *
+ * @param {Buffer} msg
+ * @param {Object} meta
+ * @api public
+ */
+
+Queue.prototype.onmessage = function(msg, meta){
+ this.emit('message', this.decode(msg, meta.format));
+};
+
+/**
+ * Pack `msg` as `format`.
+ *
+ * @param {String|Buffer} msg
+ * @param {String} format
+ * @return {Buffer}
+ * @api private
+ */
+
+Queue.prototype.pack = function(msg, format){
+ // TODO: zero-copy
+ if ('string' == typeof msg) msg = new Buffer(msg);
+ var len = msg.length
+ , buf = new Buffer(len + 4);
+
+ // length
+ buf.writeUInt32BE(len, 0);
+
+ // format
+ buf[0] = format;
+
+ // data
+ msg.copy(buf, 4);
+ return buf;
+};
+
+/**
+ * Unpack `msg` at `offset`.
+ *
+ * @param {String|Buffer} msg
+ * @param {Number} offset
+ * @return {Object}
+ * @api private
+ */
+
+Queue.prototype.unpack = function(buf, offset){
+ // format
+ var format = buf[offset];
+
+ // length
+ buf[offset] = 0;
+ var len = buf.readUInt32BE(offset);
+
+ return {
+ length: len,
+ format: format
+ };
+};
+
+/**
+ * Flush queued messages.
+ *
+ * @api private
+ */
+
+Queue.prototype.flush = function(){
+ var buf = this.buf
+ , len = buf.length;
+ this.buf = [];
+ for (var i = 0; i < len; ++i) {
+ this.send(buf[i]);
+ }
+};
+
+// TODO: refactor this stuff...
+
+Queue.prototype.close = function(){
+ this.server && this.server.close();
+ return Socket.prototype.close.call(this);
+};
+
+/**
+ * Bind to `port` and invoke `fn()`.
+ *
+ * Emits:
+ *
+ * - `connect` when a connection is accepted
+ * - `bind` when bound and listening
+ *
+ * TODO: host
+ *
+ * @param {Number} port
+ * @param {Function} fn
+ * @api public
+ */
+
+Queue.prototype.bind = function(port, fn){
+ var self = this;
+
+ this.server = net.createServer(function(sock){
+ self.socks.push(sock);
+
+ self.emit('connect', sock);
+
+ sock.on('close', function(){
+ self.socks.forEach(function(s, i){
+ if (s == sock) self.socks.splice(i, 1);
+ })
+ });
+
+ sock.on('data', function(chunk){
+ for (var i = o = 0, len = chunk.length; i < len; ++i) {
+ if (0 == chunk[i]) {
+ // TODO: this could be half a message...
+ // TODO: dont append nul... use lengths
+ // TODO: add Buffer support
+ self.onmessage(chunk.slice(o, i));
+ o = i + 1;
+ }
+ }
+ });
+ });
+
+ this.server.on('listening', function(){
+ self.emit('bind');
+ });
+
+ this.server.listen(port, fn);
+};
95 lib/sock.js
@@ -0,0 +1,95 @@
+
+/**
+ * Module dependencies.
+ */
+
+var net = require('net')
+ , Emitter = require('events').EventEmitter;
+
+/**
+ * Expose `Socket`.
+ */
+
+module.exports = Socket;
+
+/**
+ * Initialize a new `Socket`.
+ *
+ * A super socket "Socket" encapsulates the
+ * reconnection logic with exponential backoff,
+ * serving as a base for the `Queue`.
+ *
+ * @api private
+ */
+
+function Socket() {
+ var self = this;
+ var sock = this.sock = new net.Socket;
+
+ this.retryTimeout = this.retry = 100;
+ this.retryMaxTimeout = 2000;
+
+ sock.on('error', function(err){
+ if ('ECONNREFUSED' != err.code) {
+ self.emit('error', err);
+ }
+ });
+
+ sock.on('data', function(chunk){
+ self.emit('data', chunk);
+ });
+
+ sock.on('close', function(){
+ self.connected = false;
+ if (self.closing) return self.emit('close');
+ setTimeout(function(){
+ self.emit('reconnect attempt');
+ sock.destroy();
+ self.connect(self.port);
+ self.retry = Math.min(self.retryMaxTimeout, self.retry * 1.5);
+ }, self.retry);
+ });
+
+ sock.on('connect', function(){
+ self.connected = true;
+ self.retry = self.retryTimeout;
+ self.emit('connect'); // TODO: dont emit each time... will invoke callback too many times
+ self.callback && self.callback();
+ });
+}
+
+/**
+ * Inherit from `Emitter.prototype`.
+ */
+
+Socket.prototype.__proto__ = Emitter.prototype;
+
+/**
+ * Connect to `port` and invoke `fn()`.
+ *
+ * TODO: host
+ *
+ * @param {Number} port
+ * @param {Function} fn
+ * @api public
+ */
+
+Socket.prototype.connect = function(port, fn){
+ this.port = port;
+ this.sock.connect(port, '127.0.0.1');
+ this.callback = fn;
+ return this;
+};
+
+/**
+ * Close the socket.
+ *
+ * @api public
+ */
+
+Socket.prototype.close = function(){
+ this.closing = true;
+ // TODO: end()?
+ this.sock.destroy();
+ return this;
+};
29 lib/sub.js
@@ -0,0 +1,29 @@
+
+/**
+ * Module dependencies.
+ */
+
+var Queue = require('./queue');
+
+/**
+ * Expose `SubSocket`.
+ */
+
+module.exports = SubSocket;
+
+/**
+ * Initialize a new `SubSocket`.
+ *
+ * @api private
+ */
+
+function SubSocket() {
+ Queue.call(this);
+ this.subscriptions = [];
+}
+
+/**
+ * Inherits from `Queue.prototype`.
+ */
+
+SubSocket.prototype.__proto__ = Queue.prototype;
11 package.json
@@ -0,0 +1,11 @@
+{
+ "name": "super-sockets",
+ "description": "High-level messaging & socket patterns implemented in pure js",
+ "version": "0.0.1",
+ "author": "TJ Holowaychuk <tj@vision-media.ca>",
+ "devDependencies": {
+ "should": "*",
+ "mocha": "*"
+ },
+ "keywords": ["zmq", "zeromq", "pubsub", "socket"]
+}
9 test/run
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+echo
+for file in $@; do
+ printf "\033[90m ${file#test/}\033[0m "
+ node $file && printf "\033[36m✓\033[0m\n"
+ test $? -eq 0 || exit $?
+done
+echo
34 test/test.emitter.js
@@ -0,0 +1,34 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var pub = ss.socket('emitter')
+ , sub = ss.socket('emitter');
+
+var msgs = [];
+
+// test basic 1-1 pub/sub emitter style
+
+pub.bind(3000, function(){
+ sub.connect(3000, function(){
+ sub.on('foo', function(){
+ msgs.push(['foo']);
+ });
+
+ sub.on('bar', function(a, b, c){
+ msgs.push(['bar', a, b, c]);
+ });
+
+ sub.on('baz', function(a){
+ msgs.push(['baz', a]);
+ pub.close();
+ sub.close();
+ });
+
+ setTimeout(function(){
+ pub.emit('foo');
+ pub.emit('bar', 1, 2, 3);
+ pub.emit('baz', { name: 'tobi' });
+ }, 20);
+ });
+});
45 test/test.pubsub.js
@@ -0,0 +1,45 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var pub = ss.socket('pub')
+ , sub = ss.socket('sub');
+
+var n = 0
+ , closed;
+
+// test basic 1-1 pub/sub
+
+pub.bind(3000, function(){
+ sub.connect(3000, function(){
+ sub.on('message', function(msg){
+ msg.should.be.an.instanceof(Buffer);
+ msg.should.have.length(3);
+ msg = msg.toString();
+ switch (n++) {
+ case 0:
+ msg.should.equal('foo');
+ break;
+ case 1:
+ msg.should.equal('bar');
+ break;
+ case 2:
+ msg.should.equal('baz');
+ pub.close();
+ sub.close();
+ closed = true;
+ break;
+ }
+ });
+
+ setTimeout(function(){
+ pub.send('foo');
+ pub.send('bar');
+ pub.send('baz');
+ }, 20);
+ });
+});
+
+process.on('exit', function(){
+ should.equal(true, closed);
+});
38 test/test.pubsub.missed-messages.js
@@ -0,0 +1,38 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var pub = ss.socket('pub')
+ , sub = ss.socket('sub');
+
+var n = 0;
+
+// test basic 1-1 pub/sub with missed messages
+
+pub.bind(3000, function(){
+ pub.send('foo');
+ pub.send('bar');
+ sub.connect(3000, function(){
+ sub.on('message', function(msg){
+ msg.should.be.an.instanceof(Buffer);
+ msg.should.have.length(3);
+ msg = msg.toString();
+ switch (n++) {
+ case 0:
+ msg.should.equal('baz');
+ break;
+ case 1:
+ msg.should.equal('raz');
+ pub.close();
+ sub.close();
+ break;
+ }
+ });
+
+ setTimeout(function(){
+ pub.send('baz');
+ pub.send('raz');
+ }, 20);
+ });
+});
+
57 test/test.pubsub.multiple-subscribers.js
@@ -0,0 +1,57 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var pub = ss.socket('pub')
+ , a = ss.socket('sub')
+ , b = ss.socket('sub')
+ , c = ss.socket('sub');
+
+var n = 9;
+
+var messages = {
+ a: []
+ , b: []
+ , c: []
+};
+
+// test basic 1-M pub/sub
+
+pub.bind(3000, function(){
+ a.connect(3000, function(){
+ b.connect(3000, function(){
+ c.connect(3000, function(){
+ setTimeout(function(){
+ pub.send('foo');
+ pub.send('bar');
+ pub.send('baz');
+ }, 20);
+ });
+ });
+ });
+});
+
+a.on('message', function(msg){
+ messages.a.push(msg.toString());
+ --n || done();
+});
+
+b.on('message', function(msg){
+ messages.b.push(msg.toString());
+ --n || done();
+});
+
+c.on('message', function(msg){
+ messages.c.push(msg.toString());
+ --n || done();
+});
+
+function done() {
+ messages.a.should.eql(['foo', 'bar', 'baz']);
+ messages.b.should.eql(['foo', 'bar', 'baz']);
+ messages.c.should.eql(['foo', 'bar', 'baz']);
+ pub.close();
+ a.close();
+ b.close();
+ c.close();
+}
44 test/test.pushpull.js
@@ -0,0 +1,44 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var push = ss.socket('push')
+ , pull = ss.socket('pull');
+
+// basic 1-1 push/pull
+
+var n = 0
+ , closed;
+
+push.bind(3000);
+push.send('foo');
+push.send('bar');
+
+pull.connect(3000);
+pull.on('message', function(msg){
+ msg.should.be.an.instanceof(Buffer);
+ msg.should.have.length(3);
+ msg = msg.toString();
+ switch (n++) {
+ case 0:
+ msg.should.equal('foo');
+ break;
+ case 1:
+ msg.should.equal('bar');
+ break;
+ case 2:
+ msg.should.equal('baz');
+ pull.close();
+ push.close();
+ closed = true;
+ break;
+ }
+});
+
+pull.on('connect', function(){
+ push.send('baz');
+});
+
+process.on('exit', function(){
+ should.equal(true, closed);
+});
28 test/test.pushpull.json.js
@@ -0,0 +1,28 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var push = ss.socket('push')
+ , pull = ss.socket('pull');
+
+// basic 1-1 push/pull
+
+var n = 0;
+
+push.bind(3000);
+
+push.format('json');
+push.send({ path: '/tmp/foo.png' });
+push.send({ path: '/tmp/bar.png' });
+push.send({ path: '/tmp/baz.png' });
+
+var strs = ['foo', 'bar', 'baz'];
+
+pull.connect(3000);
+pull.on('message', function(msg){
+ msg.should.have.property('path', '/tmp/' + strs[n++] + '.png');
+ if (n == 3) {
+ push.close();
+ pull.close();
+ }
+});
20 test/test.socket.js
@@ -0,0 +1,20 @@
+
+/**
+ * Module dependencies.
+ */
+
+var ss = require('../')
+ , should = require('should');
+
+// version
+
+ss.version.should.match(/^\d+\.\d+\.\d+$/)
+
+// socket types
+
+ss.socket('stream').should.be.an.instanceof(ss.Socket);
+ss.socket('pub').should.be.an.instanceof(ss.PubSocket);
+ss.socket('sub').should.be.an.instanceof(ss.SubSocket);
+ss.socket('push').should.be.an.instanceof(ss.PushSocket);
+ss.socket('pull').should.be.an.instanceof(ss.PullSocket);
+
40 test/test.socket.reconnection.js
@@ -0,0 +1,40 @@
+
+var ss = require('../')
+ , should = require('should');
+
+var push = ss.socket('push')
+ , pull = ss.socket('pull');
+
+// basic 1-1 push/pull
+
+var msgs = []
+ , n = 0;
+
+push.bind(3000);
+pull.connect(3000);
+
+var id = setInterval(function(){
+ push.send(String(n++));
+}, 1);
+
+pull.on('message', function(msg){
+ msgs.push(msg.toString());
+
+ switch (msgs.length) {
+ case 10:
+ pull.close();
+ pull.on('close', function(){
+ if (100 == msgs.length) return;
+ pull.connect(3000);
+ });
+ break;
+ case 100:
+ for (var i = 0; i < 99; ++i) {
+ msgs[i].should.equal(i.toString());
+ }
+ clearInterval(id);
+ push.close();
+ pull.close();
+ break;
+ }
+});

0 comments on commit 5c2217f

Please sign in to comment.