Skip to content

Commit

Permalink
feat(ioloop) add incremental back-off for idle clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Tieske committed Nov 1, 2021
1 parent 1c23598 commit 3ddc6ca
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 26 deletions.
55 changes: 32 additions & 23 deletions mqtt/ioloop.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,21 @@ ioloop_mt.__index = ioloop_mt

--- Initialize ioloop instance
-- @tparam table opts ioloop creation options table
-- @tparam[opt=0.005] number opts.timeout network operations timeout in seconds
-- @tparam[opt=0] number opts.sleep sleep interval after each iteration
-- @tparam[opt=0] number opts.sleep_min min sleep interval after each iteration
-- @tparam[opt=0.002] number opts.sleep_step increase in sleep after every idle iteration
-- @tparam[opt=0.030] number opts.sleep_max max sleep interval after each iteration
-- @tparam[opt] function opts.sleep_function custom sleep function to call after each iteration
-- @treturn ioloop_mt ioloop instance
function ioloop_mt:__init(opts)
log:debug("initializing ioloop instance '%s'", tostring(self))
opts = opts or {}
opts.timeout = opts.timeout or 0.005
opts.sleep = opts.sleep or 0
opts.sleep_min = opts.sleep_min or 0
opts.sleep_step = opts.sleep_step or 0.002
opts.sleep_max = opts.sleep_max or 0.030
opts.sleep_function = opts.sleep_function or require("socket").sleep
self.opts = opts
self.clients = {}
self.timeouts = setmetatable({}, { __mode = "v" })
self.running = false --ioloop running flag, used by MQTT clients which are adding after this ioloop started to run
end

Expand All @@ -81,6 +84,7 @@ function ioloop_mt:add(client)
end
clients[#clients + 1] = client
clients[client] = true
self.timeouts[client] = self.opts.sleep_min

if type(client) == "table" then
log:info("adding client '%s' to ioloop '%s'", client.opts.id, tostring(self))
Expand Down Expand Up @@ -136,36 +140,41 @@ function ioloop_mt:remove(client)
end

--- Perform one ioloop iteration.
-- TODO: make this smarter do not wake-up functions or clients returned a longer
-- sleep delay. Currently it's pretty much a busy loop.
-- TODO: make this smarter do not wake-up functions or clients returning a longer
-- sleep delay. Currently they will be tried earlier if another returns a smaller delay.
function ioloop_mt:iteration()
local opts = self.opts
local sleep = opts.sleep
local sleep = opts.sleep_max

for _, client in ipairs(self.clients) do
local t, err
-- read data and handle events
if type(client) ~= "function" then
t, err = client:step()
if t == -1 then
-- no data read, client is idle
t = nil
elseif not t then
if not client.opts.reconnect then
-- error and not reconnecting, remove the client
log:error("client '%s' failed with '%s', will not re-connect", client.opts.id, err)
self:remove(client)
t = nil
else
-- error, but will reconnect
log:error("client '%s' failed with '%s', will try re-connecting", client.opts.id, err)
t = 0 -- try immediately
end
else
t = client() or opts.sleep_max
end
if t == -1 then
-- no data read, client is idle, step up timeout
t = math_min(self.timeouts[client] + opts.sleep_step, opts.sleep_max)
self.timeouts[client] = t
elseif not t then
-- an error from a client was returned
if not client.opts.reconnect then
-- error and not reconnecting, remove the client
log:fatal("client '%s' failed with '%s', will not re-connect", client.opts.id, err)
self:remove(client)
t = opts.sleep_max
else
-- error, but will reconnect
log:error("client '%s' failed with '%s', will try re-connecting", client.opts.id, err)
t = opts.sleep_min -- try asap
end
else
t = client()
-- a number of seconds was returned
t = math_min(t, opts.sleep_max)
self.timeouts[client] = opts.sleep_min
end
t = t or opts.sleep
sleep = math_min(sleep, t)
end
-- sleep a bit
Expand Down
4 changes: 2 additions & 2 deletions mqtt/luasocket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ end
function luasocket:plain_receive(size)
local sock = self.sock

sock:settimeout(0.010) -- TODO: setting to 0 fails??? it shouldn't
local data, err = sock:receive(size)
sock:settimeout(0)

local data, err = sock:receive(size)
if data then
return data
end
Expand Down
1 change: 0 additions & 1 deletion tests/spec/01-module-basics_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ describe("MQTT lua library component test:", function()
assert.are.equal(2097152, protocol.parse_var_length_nonzero(make_read_func("80808001")))
assert.are.equal(268435455, protocol.parse_var_length_nonzero(make_read_func("FFFFFF7F")))
assert.is_false(protocol.parse_var_length_nonzero(make_read_func("FFFFFFFF")))

end)

it("protocol.next_packet_id", function()
Expand Down

0 comments on commit 3ddc6ca

Please sign in to comment.