Skip to content

Commit

Permalink
Allow sending req messages with only callback or with undefined value.
Browse files Browse the repository at this point in the history
Tests updated.  Normalize arguments and packs  based on length of the arguments instead of value.
  • Loading branch information
lsm committed Feb 1, 2017
1 parent fc8f21e commit 215f61f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 46 deletions.
4 changes: 3 additions & 1 deletion lib/message/index.js
Expand Up @@ -25,7 +25,9 @@ exports.encode = function(pack, encodeMsg) {
pack.msg = Buffer(msg)
} else if (isBuffer(msg)) {
pack.formatId = type.BUFFER
} else if (msg) {
} else if (pack.hasOwnProperty('msg')) {
if (undefined === msg)
msg = null
pack.msg = encodeMsg(msg)
}

Expand Down
19 changes: 9 additions & 10 deletions lib/message/wire.js
Expand Up @@ -28,20 +28,19 @@ exports.decode = function(buf) {
// message type
type: args[0].toString(),
// event name
event: args[1].toString(),
// message format id
formatId: args[2],
// message data
msg: args[3]
event: args[1].toString()
}
// message meta data
var metaIdx = 4

// message format id in string
if (pack.msg) {
var metaIdx
if (args.length > 3) {
metaIdx = 4
// message format id
pack.formatId = args[2]
pack.fid = pack.formatId.toString()
// message data
pack.msg = args[3]
pack.hasMsg = true
} else {
pack.formatId = undefined
metaIdx = 2
}

Expand Down
3 changes: 3 additions & 0 deletions lib/queue/channel.js
Expand Up @@ -105,6 +105,9 @@ QueueChannel.prototype.beforeDispatch = function(pack, stream, dispatch) {
pack.msg.unshift(chn)
else
pack.msg = [chn]

// Set hasMsg to true so the dispatch function will use `msg` as arguments for callbacks.
pack.hasMsg = true
}

var self = this
Expand Down
6 changes: 5 additions & 1 deletion lib/queue/index.js
Expand Up @@ -148,7 +148,11 @@ Queue.prototype.dispatch = function(data, stream) {

var queue = this
this.beforeDispatch(pack, stream, function(pack, stream) {
var msg = pack.msg
var msg
if (true === pack.hasMsg)
msg = pack.msg || [pack.msg]
else
msg = []
var meta = pack.meta
var event = pack.event

Expand Down
32 changes: 19 additions & 13 deletions lib/socket/channel.js
Expand Up @@ -29,28 +29,34 @@ var SocketChannel = module.exports = function SocketChannel(socket, ns, chn) {
inherits(SocketChannel, Socket)

SocketChannel.prototype.reqChn = function(chn, event, msg, callback) {
var args = que.getMsgAndCallback(arguments, 2)
this.queue.req(this.streams, {
var pack = {
chn: chn,
event: event,
msg: args.msg
}, args.callback)
event: event
}
var args = que.getMsgAndCallback(arguments, 2)
if (args.hasMsg)
pack.msg = args.msg
this.queue.req(this.streams, pack, args.callback)
}

SocketChannel.prototype.pubChn = function(chn, event, msg) {
this.queue.pub(this.streams, {
var pack = {
chn: chn,
event: event,
msg: que.getMsg(arguments, 2)
})
event: event
}
if (arguments.length > 2)
pack.msg = que.getMsg(arguments, 2)
this.queue.pub(this.streams, pack)
}

SocketChannel.prototype.pubSid = function(sid, event, msg) {
this.queue.pub(this.streams, {
var pack = {
sid: sid,
event: event,
msg: que.getMsg(arguments, 2)
})
event: event
}
if (arguments.length > 2)
pack.msg = que.getMsg(arguments, 2)
this.queue.pub(this.streams, pack)
}

SocketChannel.prototype.allow = function(allowFn) {
Expand Down
53 changes: 33 additions & 20 deletions lib/socket/que.js
Expand Up @@ -8,20 +8,23 @@ var isBuffer = Buffer.isBuffer
// pub/sub pair

exports.pub = function(event, msg) {
this.queue.pub(this.streams, {
event: event,
msg: getMsg(arguments, 1)
})
var pack = {
event: event
}
if (arguments.length > 1)
pack.msg = getMsg(arguments, 1)
this.queue.pub(this.streams, pack)
}

exports.pubTag = function(tag, event, msg) {
msg = getMsg(arguments, 2)
var pack = {
event: event
}
if (arguments.length > 2)
pack.msg = getMsg(arguments, 2)
var streams = this.getStreamsByTag(tag)
if (streams.length > 0) {
this.queue.pub(streams, {
event: event,
msg: msg
})
this.queue.pub(streams, pack)
} else {
this.emit('error', {
tag: tag,
Expand All @@ -40,20 +43,24 @@ exports.sub = function(event, callback) {

exports.req = function(event, msg, callback) {
var args = getMsgAndCallback(arguments, 1)
this.queue.req(this.streams, {
event: event,
msg: args.msg
}, args.callback)
var pack = {
event: event
}
if (args.hasMsg)
pack.msg = args.msg
this.queue.req(this.streams, pack, args.callback)
}

exports.reqTag = function(tag, event, msg, callback) {
var pack = {
event: event
}
var args = getMsgAndCallback(arguments, 2)
if (args.hasMsg)
pack.msg = args.msg
var streams = this.getStreamsByTag(tag)
if (streams.length > 0) {
this.queue.req(streams, {
event: event,
msg: args.msg
}, args.callback)
this.queue.req(streams, pack, args.callback)
} else {
this.emit('error', {
tag: tag,
Expand All @@ -78,10 +85,16 @@ var getMsgAndCallback = exports.getMsgAndCallback = function(args, start) {
else
callback = undefined

return {
msg: getMsg(args, start, len),
var result = {
callback: callback
}

if (len > start) {
result.msg = getMsg(args, start, len)
result.hasMsg = true
}

return result
}

var getMsg = exports.getMsg = function(args, start, end) {
Expand All @@ -98,7 +111,7 @@ var getMsg = exports.getMsg = function(args, start, end) {
i++
}
} else {
// only one or no rgument is msg
// only one or no argument is msg
msg = args[start]
if (!msg)
return msg
Expand Down
23 changes: 22 additions & 1 deletion test/default.js
Expand Up @@ -119,7 +119,7 @@ module.exports = function(name, T, smqServer, smqClient1, smqClient2, endpoint,
})

test(name + ': req/rep', function(t) {
t.plan(15)
t.plan(20)

// Single argument with callback
smqServer.rep('test reply object', function(arg1, reply) {
Expand Down Expand Up @@ -164,6 +164,27 @@ module.exports = function(name, T, smqServer, smqClient1, smqClient2, endpoint,
t.equal(arg3, undefined, 'no arguments arg3')
})
smqClient2.req('without arguments')

// With only callback
var onlyCallbackMsg = 'onlyCallbackMsg'
smqServer.rep('with only callback', function(reply) {
t.equal(typeof reply, 'function', 'got reply function')
reply(onlyCallbackMsg)
})
smqClient2.req('with only callback', function(msg) {
t.equal(msg, onlyCallbackMsg, 'replied message match')
})

// With undefined message and callback
var undefinedCallbackMsg = 'undefinedCallbackMsg'
smqServer.rep('with undefined & callback', function(msg, reply) {
t.equal(msg, null, 'got msg value undefined')
t.equal(typeof reply, 'function', 'got reply function')
reply(undefinedCallbackMsg)
})
smqClient2.req('with undefined & callback', undefined, function(msg) {
t.equal(msg, undefinedCallbackMsg, 'replied message match')
})
})

test(name + ': tag clients', function(t) {
Expand Down

0 comments on commit 215f61f

Please sign in to comment.