From 3e3da598c258b86b9cb600d3a5705555bfbb0fd8 Mon Sep 17 00:00:00 2001 From: Alexey Melnichuk Date: Wed, 23 Jul 2014 12:35:28 +0500 Subject: [PATCH] Fix. Send multipart messages. --- src/lyre/impl/message.lua | 42 ++++++++++++++++++++++----------------- src/lyre/impl/node.lua | 26 +++++++++--------------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/src/lyre/impl/message.lua b/src/lyre/impl/message.lua index 4d30879..e82cc4e 100644 --- a/src/lyre/impl/message.lua +++ b/src/lyre/impl/message.lua @@ -12,11 +12,10 @@ local UUID = require "lyre.impl.uuid" local utils = require "lyre.impl.utils" local ZRE = require "lyre.zre" -local bit = utils.bit -local Iter = utils.Iter -local Buffer = utils.Buffer -local count = utils.count -local unpack = utils.unpack +local bit = utils.bit +local Iter = utils.Iter +local Buffer = utils.Buffer +local count = utils.count local STRUCT, BIG_ENDIAN, BYTES = utils.STRUCT, utils.BIG_ENDIAN, utils.BYTES local UINT8, UINT16, UINT32 = utils.UINT8, utils.UINT16, utils.UINT32 @@ -47,7 +46,10 @@ local LEAVE_HEADER = STRUCT{BIG_ENDIAN, STRING, UINT8} local Message = {} do Message.__index = Message -function Message:new(id, header, content) +function Message:new(id, header, ...) + local content = ... + if select("#", ...) > 1 then content = {...} end + return setmetatable({ _id = id, _header = header, @@ -56,7 +58,7 @@ function Message:new(id, header, content) end function Message:send(peer, s) - local header = Buffer():write(">c0BBI2", + local header = Buffer():write(MESSAGE_HEADER, ZRE.SIGNATURE, self._id, peer:version(), peer:next_sent_sequence() @@ -130,15 +132,15 @@ function MessageEncoder.LEAVE(node, group) return Message:new(ZRE.COMMANDS.LEAVE, buf:data()) end -function MessageEncoder.SHOUT(node, group, content) +function MessageEncoder.SHOUT(node, group, ...) local buf = Buffer() :write_string(group) - return Message:new(ZRE.COMMANDS.SHOUT, buf:data(), content) + return Message:new(ZRE.COMMANDS.SHOUT, buf:data(), ...) end -function MessageEncoder.WHISPER(node, content) - return Message:new(ZRE.COMMANDS.WHISPER, nil, content) +function MessageEncoder.WHISPER(node, ...) + return Message:new(ZRE.COMMANDS.WHISPER, nil, ...) end end @@ -147,10 +149,14 @@ end --------------------------------------------------------------------- local MessageDecoder = {} do -function MessageDecoder.dispatch(node, routing_id, msg, content) +function MessageDecoder.dispatch(node, routing_id, msg, ...) local log = node:logger() - if #routing_id ~= UUID.LEN + 1 then return end + if not routing_id then return end + if not msg then return end + if #routing_id ~= UUID.LEN + 1 then return end + if routing_id:sub(1,1) ~= '\001' then return end + local uuid = routing_id:sub(2) local iter = Iter(msg) @@ -169,7 +175,7 @@ function MessageDecoder.dispatch(node, routing_id, msg, content) log.notice("INBOX : ", UUID.to_string(uuid), name, "#", sequence) local fn = MessageDecoder[name] - if fn then fn(node, version, uuid, sequence, iter, content) end + if fn then fn(node, version, uuid, sequence, iter, ...) end end function MessageDecoder.beacon(node, host, ann) @@ -226,14 +232,14 @@ function MessageDecoder.LEAVE(node, version, uuid, sequence, iter) return node:on_message("LEAVE", version, uuid, sequence, group, status) end -function MessageDecoder.SHOUT(node, version, uuid, sequence, iter, content) +function MessageDecoder.SHOUT(node, version, uuid, sequence, iter, ...) local group = iter:next_string() if not group then return end - return node:on_message("SHOUT", version, uuid, sequence, group, content) + return node:on_message("SHOUT", version, uuid, sequence, group, ...) end -function MessageDecoder.WHISPER(node, version, uuid, sequence, iter, content) - return node:on_message("WHISPER", version, uuid, sequence, content) +function MessageDecoder.WHISPER(node, version, uuid, sequence, iter, ...) + return node:on_message("WHISPER", version, uuid, sequence, ...) end end diff --git a/src/lyre/impl/node.lua b/src/lyre/impl/node.lua index 4811626..d527b80 100644 --- a/src/lyre/impl/node.lua +++ b/src/lyre/impl/node.lua @@ -257,18 +257,18 @@ NODE_MESSAGE[ "LEAVE" ] = function(node, version, uuid, sequence, group, statu end end -NODE_MESSAGE[ "SHOUT" ] = function(node, version, uuid, sequence, group, content) +NODE_MESSAGE[ "SHOUT" ] = function(node, version, uuid, sequence, group, ...) local peer = node:check_peer(version, uuid, sequence) if not peer then return end - node:send("SHOUT", peer:uuid(true), peer:name(), group, unpack(content)) + node:send("SHOUT", peer:uuid(true), peer:name(), group, ...) end -NODE_MESSAGE[ "WHISPER" ] = function(node, version, uuid, sequence, content) +NODE_MESSAGE[ "WHISPER" ] = function(node, version, uuid, sequence, ...) local peer = node:check_peer(version, uuid, sequence) if not peer then return end - node:send("WHISPER", peer:uuid(true), peer:name(), unpack(content)) + node:send("WHISPER", peer:uuid(true), peer:name(), ...) end end @@ -284,16 +284,8 @@ local function Node_on_beacon(self, beacon) return MessageDecoder.beacon(self, host, ann) end -local function wrap_msg(a, b, ...) - return a, b, {...} -end - local function Node_on_inbox(self, inbox) - local routing_id, msg, content = wrap_msg(inbox:recvx()) - if not routing_id then return end - if not msg then return end - - return MessageDecoder.dispatch(self, routing_id, msg, content) + return MessageDecoder.dispatch(self, inbox:recvx()) end local function Node_on_interval(self) @@ -708,19 +700,19 @@ function Node:send(...) return self._private.outbox:sendx(...) end -function Node:shout(name, content) +function Node:shout(name, ...) local p = self._private local group = p.peer_groups[name] if not group then return true end - local msg = MessageEncoder.SHOUT(self, group:name(), content) + local msg = MessageEncoder.SHOUT(self, group:name(), ...) return group:send(msg) end -function Node:whisper(uuid, content) +function Node:whisper(uuid, ...) local p = self._private local peer = p.peers[uuid] if not peer then return true end - local msg = MessageEncoder.WHISPER(self, content) + local msg = MessageEncoder.WHISPER(self, ...) return peer:send(msg) end