Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Some bugfixes #3

Merged
merged 1 commit into from

1 participant

@moteus
Owner

Fix. Set correct status to peer on HELLO.
Fix. Check status on LEAVE/JOIN.
Fix. Proceed content on WHISPER.
Fix. Check error creating outbox socket.
Fix. Stop interval event on node when stop.
Fix. Send LEAVE event.

@moteus moteus Fix. Check error creating peer in beacon.
Fix. Set correct status to peer on HELLO.
Fix. Check status on LEAVE/JOIN.
Fix. Proceed content on WHISPER.
Fix. Check error creating outbox socket.
Fix. Stop interval event on node when stop.
Fix. Send LEAVE event.
ad5d7d5
@moteus moteus merged commit 7f294f0 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 22, 2014
  1. @moteus

    Fix. Check error creating peer in beacon.

    moteus authored
    Fix. Set correct status to peer on HELLO.
    Fix. Check status on LEAVE/JOIN.
    Fix. Proceed content on WHISPER.
    Fix. Check error creating outbox socket.
    Fix. Stop interval event on node when stop.
    Fix. Send LEAVE event.
This page is out of date. Refresh to see the latest.
Showing with 37 additions and 15 deletions.
  1. +37 −15 src/lyre/impl/node.lua
View
52 src/lyre/impl/node.lua
@@ -164,10 +164,15 @@ local NODE_MESSAGE = {} do
NODE_MESSAGE[ "beacon" ] = function(node, version, uuid, host, port)
local log = node:logger()
+
+ log.trace("BEACON: ", UUID.to_string(uuid), " tcp://", host, ":", port)
+
if port > 0 then
- local endpoint = "tcp://" .. host .. ":" .. port
- local peer = node:require_peer(uuid, endpoint)
- log.trace("BEACON: ", peer:uuid(true), peer:endpoint())
+ local endpoint = "tcp://" .. host .. ":" .. port
+ local peer, err = node:require_peer(uuid, endpoint)
+ if not peer then
+ log.alert("Can not create peer:", UUID.to_string(uuid), " ", endpoint)
+ end
return
end
@@ -199,10 +204,6 @@ NODE_MESSAGE[ "HELLO" ] = function(node, version, uuid, sequence, endpoint, gr
return
end
- peer
- :set_status(status)
- :set_name(name)
-
for key, val in pairs(headers) do
peer:set_header(key, val)
end
@@ -219,7 +220,10 @@ NODE_MESSAGE[ "HELLO" ] = function(node, version, uuid, sequence, endpoint, gr
log.info("New peer ready ", peer:uuid(true), " ", name, " ", endpoint)
- peer:set_ready(true)
+ peer
+ :set_status(status)
+ :set_name(name)
+ :set_ready(true)
end
NODE_MESSAGE[ "PING" ] = function(node, version, uuid, sequence)
@@ -239,6 +243,11 @@ NODE_MESSAGE[ "JOIN" ] = function(node, version, uuid, sequence, group, statu
if not peer then return end
node:join_peer_group(peer, group)
+
+ if peer:status() ~= status then
+ log.alert("Get JOIN with status: ",status, " from ", peer:uuid(true), " but expected ", peer:status())
+ node:remove_peer(peer):disconnect()
+ end
end
NODE_MESSAGE[ "LEAVE" ] = function(node, version, uuid, sequence, group, status)
@@ -246,6 +255,11 @@ NODE_MESSAGE[ "LEAVE" ] = function(node, version, uuid, sequence, group, statu
if not peer then return end
node:leave_peer_group(peer, group)
+
+ if peer:status() ~= status then
+ log.alert("Get LEAVE with status: ",status, " from ", peer:uuid(true), " but expected ", peer:status())
+ node:remove_peer(peer):disconnect()
+ end
end
NODE_MESSAGE[ "SHOUT" ] = function(node, version, uuid, sequence, group, content)
@@ -255,7 +269,7 @@ NODE_MESSAGE[ "SHOUT" ] = function(node, version, uuid, sequence, group, conte
node:send("SHOUT", peer:uuid(true), peer:name(), group, unpack(content))
end
-NODE_MESSAGE[ "WHISPER" ] = function(node, version, uuid, sequence, group, content)
+NODE_MESSAGE[ "WHISPER" ] = function(node, version, uuid, sequence, content)
local peer = node:check_peer(version, uuid, sequence)
if not peer then return end
@@ -314,7 +328,11 @@ end
function Node:new(pipe, outbox)
local ctx = zmq.assert(zthreads.context())
- outbox = zmq.assert(ctx:socket{zmq.PAIR, connect = outbox})
+ local err
+ outbox, err = ctx:socket{zmq.PAIR, connect = outbox}
+ if not outbox then
+ return nil, err
+ end
local uuid = UUID.new()
@@ -343,7 +361,7 @@ function Node:new(pipe, outbox)
port = ZRE.DISCOVERY_PORT; -- beacon port
logger = LogLib.new('none',
function(...) return o._private.log_writer(...) end,
- o:_formatter(require "log.formatter.concat".new())
+ o:_formatter(require "log.formatter.concat".new(''))
);
log_writer = require "log.writer.stdout".new();
}
@@ -428,7 +446,7 @@ function Node:start()
p.loop:add_socket(p.inbox, function(s) Node_on_inbox(self, s) end)
- p.loop:add_interval(REAP_INTERVAL, function() Node_on_interval(self) end)
+ p.time_event = p.loop:add_interval(REAP_INTERVAL, function() Node_on_interval(self) end)
return self
end
@@ -443,6 +461,10 @@ function Node:stop()
if p.loop then
if p.inbox then p.loop:remove_socket(p.inbox) end
if p.beacon then p.loop:remove_socket(p.beacon) end
+ if p.time_event then
+ p.time_event:reset()
+ p.time_event = nil
+ end
end
for _, peer in pairs(self._private.peers) do
@@ -553,7 +575,7 @@ function Node:leave_peer_group(peer, name)
if not group then return true end
group:leave(peer)
- self:send("JOIN", peer:uuid(true), peer:name(), name)
+ self:send("LEAVE", peer:uuid(true), peer:name(), name)
return true
end
@@ -564,7 +586,7 @@ function Node:require_peer(uuid, endpoint)
local peer = p.peers[uuid]
if not peer then
- log.info("New peer detected: ", UUID.to_string(uuid), endpoint)
+ log.info("New peer detected: ", UUID.to_string(uuid), " ", endpoint)
for u, pp in pairs(p.peers) do
if pp:endpoint() == endpoint then
log.warning('Found peer with same endpoint:', pp:uuid(true), ". Remove it")
@@ -576,7 +598,7 @@ function Node:require_peer(uuid, endpoint)
peer, err = Peer.new(self, uuid)
if not peer then
- print("Error:", err)
+ -- @todo log alert here
return nil, err
end
Something went wrong with that request. Please try again.