Skip to content

Commit

Permalink
Fix. Send multipart messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
moteus committed Jul 23, 2014
1 parent 63e79a0 commit 3e3da59
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
42 changes: 24 additions & 18 deletions src/lyre/impl/message.lua
Expand Up @@ -12,11 +12,10 @@ local UUID = require "lyre.impl.uuid"
local utils = require "lyre.impl.utils"
local ZRE = require "lyre.zre"

local bit = utils.bit
local Iter = utils.Iter
local Buffer = utils.Buffer
local count = utils.count
local unpack = utils.unpack
local bit = utils.bit
local Iter = utils.Iter
local Buffer = utils.Buffer
local count = utils.count

local STRUCT, BIG_ENDIAN, BYTES = utils.STRUCT, utils.BIG_ENDIAN, utils.BYTES
local UINT8, UINT16, UINT32 = utils.UINT8, utils.UINT16, utils.UINT32
Expand Down Expand Up @@ -47,7 +46,10 @@ local LEAVE_HEADER = STRUCT{BIG_ENDIAN, STRING, UINT8}
local Message = {} do
Message.__index = Message

function Message:new(id, header, content)
function Message:new(id, header, ...)
local content = ...
if select("#", ...) > 1 then content = {...} end

return setmetatable({
_id = id,
_header = header,
Expand All @@ -56,7 +58,7 @@ function Message:new(id, header, content)
end

function Message:send(peer, s)
local header = Buffer():write(">c0BBI2",
local header = Buffer():write(MESSAGE_HEADER,
ZRE.SIGNATURE, self._id,
peer:version(),
peer:next_sent_sequence()
Expand Down Expand Up @@ -130,15 +132,15 @@ function MessageEncoder.LEAVE(node, group)
return Message:new(ZRE.COMMANDS.LEAVE, buf:data())
end

function MessageEncoder.SHOUT(node, group, content)
function MessageEncoder.SHOUT(node, group, ...)
local buf = Buffer()
:write_string(group)

return Message:new(ZRE.COMMANDS.SHOUT, buf:data(), content)
return Message:new(ZRE.COMMANDS.SHOUT, buf:data(), ...)
end

function MessageEncoder.WHISPER(node, content)
return Message:new(ZRE.COMMANDS.WHISPER, nil, content)
function MessageEncoder.WHISPER(node, ...)
return Message:new(ZRE.COMMANDS.WHISPER, nil, ...)
end

end
Expand All @@ -147,10 +149,14 @@ end
---------------------------------------------------------------------
local MessageDecoder = {} do

function MessageDecoder.dispatch(node, routing_id, msg, content)
function MessageDecoder.dispatch(node, routing_id, msg, ...)
local log = node:logger()

if #routing_id ~= UUID.LEN + 1 then return end
if not routing_id then return end
if not msg then return end
if #routing_id ~= UUID.LEN + 1 then return end
if routing_id:sub(1,1) ~= '\001' then return end

local uuid = routing_id:sub(2)

local iter = Iter(msg)
Expand All @@ -169,7 +175,7 @@ function MessageDecoder.dispatch(node, routing_id, msg, content)
log.notice("INBOX : ", UUID.to_string(uuid), name, "#", sequence)

local fn = MessageDecoder[name]
if fn then fn(node, version, uuid, sequence, iter, content) end
if fn then fn(node, version, uuid, sequence, iter, ...) end
end

function MessageDecoder.beacon(node, host, ann)
Expand Down Expand Up @@ -226,14 +232,14 @@ function MessageDecoder.LEAVE(node, version, uuid, sequence, iter)
return node:on_message("LEAVE", version, uuid, sequence, group, status)
end

function MessageDecoder.SHOUT(node, version, uuid, sequence, iter, content)
function MessageDecoder.SHOUT(node, version, uuid, sequence, iter, ...)
local group = iter:next_string() if not group then return end

return node:on_message("SHOUT", version, uuid, sequence, group, content)
return node:on_message("SHOUT", version, uuid, sequence, group, ...)
end

function MessageDecoder.WHISPER(node, version, uuid, sequence, iter, content)
return node:on_message("WHISPER", version, uuid, sequence, content)
function MessageDecoder.WHISPER(node, version, uuid, sequence, iter, ...)
return node:on_message("WHISPER", version, uuid, sequence, ...)
end

end
Expand Down
26 changes: 9 additions & 17 deletions src/lyre/impl/node.lua
Expand Up @@ -257,18 +257,18 @@ NODE_MESSAGE[ "LEAVE" ] = function(node, version, uuid, sequence, group, statu
end
end

NODE_MESSAGE[ "SHOUT" ] = function(node, version, uuid, sequence, group, content)
NODE_MESSAGE[ "SHOUT" ] = function(node, version, uuid, sequence, group, ...)
local peer = node:check_peer(version, uuid, sequence)
if not peer then return end

node:send("SHOUT", peer:uuid(true), peer:name(), group, unpack(content))
node:send("SHOUT", peer:uuid(true), peer:name(), group, ...)
end

NODE_MESSAGE[ "WHISPER" ] = function(node, version, uuid, sequence, content)
NODE_MESSAGE[ "WHISPER" ] = function(node, version, uuid, sequence, ...)
local peer = node:check_peer(version, uuid, sequence)
if not peer then return end

node:send("WHISPER", peer:uuid(true), peer:name(), unpack(content))
node:send("WHISPER", peer:uuid(true), peer:name(), ...)
end

end
Expand All @@ -284,16 +284,8 @@ local function Node_on_beacon(self, beacon)
return MessageDecoder.beacon(self, host, ann)
end

local function wrap_msg(a, b, ...)
return a, b, {...}
end

local function Node_on_inbox(self, inbox)
local routing_id, msg, content = wrap_msg(inbox:recvx())
if not routing_id then return end
if not msg then return end

return MessageDecoder.dispatch(self, routing_id, msg, content)
return MessageDecoder.dispatch(self, inbox:recvx())
end

local function Node_on_interval(self)
Expand Down Expand Up @@ -708,19 +700,19 @@ function Node:send(...)
return self._private.outbox:sendx(...)
end

function Node:shout(name, content)
function Node:shout(name, ...)
local p = self._private
local group = p.peer_groups[name]
if not group then return true end
local msg = MessageEncoder.SHOUT(self, group:name(), content)
local msg = MessageEncoder.SHOUT(self, group:name(), ...)
return group:send(msg)
end

function Node:whisper(uuid, content)
function Node:whisper(uuid, ...)
local p = self._private
local peer = p.peers[uuid]
if not peer then return true end
local msg = MessageEncoder.WHISPER(self, content)
local msg = MessageEncoder.WHISPER(self, ...)
return peer:send(msg)
end

Expand Down

0 comments on commit 3e3da59

Please sign in to comment.