Skip to content

Commit

Permalink
Push notifications over socket using a queue
Browse files Browse the repository at this point in the history
Notifications are retrieved from alarms state and converted to text
messages.  Text messages are enqueued into each peer's notification
queue.  Later the peers queues is iterated and messages are transmitted
over each peer's socket.  Each peer keeps track of the latest position of
the last message transmitted.
  • Loading branch information
dpino committed May 4, 2018
1 parent f4e9a7e commit 65df2ca
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 32 deletions.
76 changes: 50 additions & 26 deletions src/lib/ptree/ptree.lua
Expand Up @@ -497,27 +497,17 @@ function Manager:rpc_attach_listener (args)
if success then return response else return {status=1, error=response} end
end

function Manager:add_notification_peer (peer)
function Manager:rpc_attach_notification_listener ()
local i, peers = 1, self.peers
while i <= #peers do
if peers[i] == peer then break end
if peers[i] == self.rpc_peer then break end
i = i + 1
end
if i <= #peers then
table.insert(self.notification_peers, peer)
table.insert(self.notification_peers, self.rpc_peer)
table.remove(self.peers, i)
end
end

function Manager:rpc_attach_notification_listener (args)
local function attacher()
if self.listen_peer ~= nil then error('Listener already attached') end
self.listen_peer = self.rpc_peer
self:add_notification_peer(self.listen_peer)
return {}
end
local success, response = pcall(attacher)
if success then return response else return {status=1, error=response} end
return {}
end

function Manager:rpc_get_state (args)
Expand Down Expand Up @@ -558,24 +548,58 @@ end

local dummy_unix_sockaddr = S.t.sockaddr_un()

local function send_message(socket, msg_str)
socket:write(tostring(#msg_str)..'\n'..msg_str)
end

function Manager:push_notifications_to_peers()
local notifications = alarms.notifications()
if #notifications > 0 then
-- Build notifications message.
if #notifications == 0 then return end
local function head (queue)
local msg = assert(queue[1])
local len = #msg
return ffi.cast('uint8_t*', msg), len
end
local function tojson (output, str)
json_lib.write_json_object(output, str)
local msg = output:flush()
return tostring(#msg)..'\n'..msg
end
-- Enqueue notifications into each peer queue.
local peers = self.notification_peers
for _,peer in ipairs(peers) do
local output = json_lib.buffered_output()
peer.queue = peer.queue or {}
for _,each in ipairs(notifications) do
json_lib.write_json_object(output, each)
table.insert(peer.queue, tojson(output, each))
end
local msg = output:flush()
-- Broadcast to notification peers.
for _,peer in ipairs(self.notification_peers) do
send_message(peer.fd, msg)
end
-- Iterate peers and send enqueued messages.
for i,peer in ipairs(peers) do
local queue = peer.queue
while #queue > 0 do
local buf, len = head(peer.queue)
peer.pos = peer.pos or 0
local count, err = peer.fd:write(buf + peer.pos,
len - peer.pos)
if not count then
if err.AGAIN then break end
peer.state = 'error'
peer.msg = tostring(err)
elseif count == 0 then
peer.state = 'error'
peer.msg = 'short write'
else
peer.pos = peer.pos + count
assert(peer.pos <= len)
if peer.pos == len then
peer.pos = 0
table.remove(peer.queue, 1)
end
end

if peer.state == 'error' then
if peer.state == 'error' then self:warn('%s', peer.msg) end
peer.fd:close()
table.remove(peers, i)
end
end
alarms.clear_notifications()
end
end

Expand Down
13 changes: 7 additions & 6 deletions src/lib/yang/alarms.lua
Expand Up @@ -33,6 +33,12 @@ local state = {
}
}

local function clear_notifications ()
state.notifications.alarm = {}
state.notifications.alarm_inventory_changed = {}
state.notifications.operator_action = {}
end

function notifications ()
local ret = {}
local notifications = state.notifications
Expand All @@ -45,15 +51,10 @@ function notifications ()
for k,v in pairs(notifications.operator_action) do
table.insert(ret, v)
end
clear_notifications()
return ret
end

function clear_notifications ()
state.notifications.alarm = {}
state.notifications.alarm_inventory_changed = {}
state.notifications.operator_action = {}
end

local function table_size (t)
local size = 0
for _ in pairs(t) do size = size + 1 end
Expand Down

0 comments on commit 65df2ca

Please sign in to comment.