Skip to content

Commit

Permalink
Implement hub with pubsub.
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm committed Apr 5, 2016
1 parent c9ede27 commit 801de1f
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 1 deletion.
7 changes: 6 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var Socket = require('./lib/socket')
var Socket = require('./lib/socket/index')
var SocketHub = require('./lib/socket/hub')

var socketmq = module.exports = function() {
return new Socket()
Expand All @@ -17,3 +18,7 @@ socketmq.connect = function(uri, options, callback) {
smq.connect(uri, options, callback)
return smq
}

socketmq.hub = function() {
return new SocketHub()
}
67 changes: 67 additions & 0 deletions lib/queue/hub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
var util = require('util')
var type = require('../message/type')
var wire = require('../message/wire')
var Queue = require('./index')

var PUB = type.PUB
var REQ = type.REQ
var REP = type.REP


var Hub = module.exports = function QueueHub(socket) {
Queue.call(this, socket)
}

util.inherits(Hub, Queue)

Hub.prototype.pub = function(streams, buf, excludedStream) {
var len = streams.length
if (0 === len || (1 === len && excludedStream === streams[0]))
// No stream or only has one stream which is the excluded one.
return this._pendings.push(['pub', streams, buf, excludedStream])

var send = this.send
while (len-- > 0) {
var stream = streams[len]
if (stream !== excludedStream)
send(stream, buf)
}
}


Hub.prototype.dispatch = function(buf, stream) {
var pack = wire.decode(buf)
var msgId = pack.msgId
var msg = pack.msg

switch (pack.type) {
case PUB:
this.pub(this.socket.streams, buf, stream)
break
case REQ:
// Make reply function
var send = this.send
var encode = this.encode
var reply = function(repMsg) {
var len = arguments.length
if (len > 1) {
len--
var i = 0
repMsg = [repMsg]
while (i++ < len) {
repMsg[i] = arguments[i]
}
} else if ('string' !== typeof repMsg && !Buffer.isBuffer(repMsg)) {
repMsg = [repMsg]
}
var buf = encode(REP, event, repMsg, msgId)
send(stream, buf)
}
msg.push(reply)
this.onReq(event, msg)
break
case REP:
this.onRep(event, msg, msgId)
break
}
}
2 changes: 2 additions & 0 deletions lib/queue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ Queue.prototype._flush = function() {
var pendings = this._pendings
var len = pendings.length
if (0 < len) {
// Reset pending array
this._pendings = []
var i = 0
while (i < len) {
var _p = pendings[i++]
Expand Down
13 changes: 13 additions & 0 deletions lib/socket/hub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
var util = require('util')
var Socket = require('./index')
var QueueHub = require('../queue/hub')


var Hub = module.exports = function SocketHub() {
Socket.call(this)

this.queue = new QueueHub(this)
this.onMessage = this.queue.dispatch.bind(this.queue)
}

util.inherits(Hub, Socket)
61 changes: 61 additions & 0 deletions test/hub.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
var fs = require('fs')
var path = require('path')
var test = require('tape')
var socketmq = require('../')

var certPath = path.join(__dirname, '../benchmark/certs')

module.exports = function() {
var smqHub = socketmq.hub()
var eioClient
var tcpClient
var tlsClient

test('hub: eio + tcp + tls', function(t) {
t.plan(1)

// Endpoints
var eioEndpoint = 'eio://127.0.0.1:8081'
var tcpEndpoint = 'tcp://127.0.0.1:6364'
var tlsEndpoint = 'tls://localhost:46364'

// Servers
smqHub.bind(eioEndpoint)
smqHub.bind(tcpEndpoint, function(stream) {
t.ok(stream)
})
smqHub.bind(tlsEndpoint, {
key: fs.readFileSync(certPath + '/server-key.pem'),
cert: fs.readFileSync(certPath + '/server-cert.pem'),
ca: [fs.readFileSync(certPath + '/client-cert.pem')]
})

// Clients
var tlsClientOptions = {
key: fs.readFileSync(certPath + '/client-key.pem'),
cert: fs.readFileSync(certPath + '/client-cert.pem'),
ca: [fs.readFileSync(certPath + '/server-cert.pem')]
}

eioClient = socketmq.connect(eioEndpoint)
tcpClient = socketmq.connect(tcpEndpoint)
tlsClient = socketmq.connect(tlsEndpoint, tlsClientOptions)
})

test('hub: pub/sub', function(t) {
t.plan(2)
var msg = 'hub pub sub'

eioClient.sub('pub sub', function(arg1) {
t.equal(arg1, msg)
})
tcpClient.sub('pub sub', function(arg1) {
t.equal(arg1, msg)
})
tlsClient.sub('pub sub', function() {
t.notOk(true)
})

tlsClient.pub('pub sub', msg)
})
}
2 changes: 2 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ var test = require('tape')
var testTCP = require('./tcp.test.js')
var testTLS = require('./tls.test.js')
var testEIO = require('./eio.test.js')
var testHub = require('./hub.test.js')

testTCP()
testTLS()
testEIO()
testHub()

test.onFinish(function() {
setImmediate(function() {
Expand Down

0 comments on commit 801de1f

Please sign in to comment.