Skip to content

Commit

Permalink
perf(mqtt): reduce memory usage
Browse files Browse the repository at this point in the history
Signed-off-by: Jianhui Zhao <zhaojh329@gmail.com>
  • Loading branch information
zhaojh329 committed Oct 8, 2023
1 parent 55c3b35 commit 4378fbb
Showing 1 changed file with 43 additions and 56 deletions.
99 changes: 43 additions & 56 deletions mqtt.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ local dns = require 'eco.dns'

local M = {}

local function mqtt_io_loop(mt)
local done = mt.done
local con = mt.con
local w = mt.iow
local function mqtt_io_loop(con)
local done = con.done
local __con = con.con
local w = con.iow

while not done.v do
local ev = eco.READ

if con:want_write() then
if __con:want_write() then
ev = ev | eco.WRITE
end

Expand All @@ -32,13 +32,13 @@ local function mqtt_io_loop(mt)
end

if ev & eco.READ > 0 then
if not con:loop_read(1) then
if not __con:loop_read(1) then
break
end
end

if ev & eco.WRITE > 0 then
if not con:loop_write(1) then
if not __con:loop_write(1) then
break
end
end
Expand All @@ -47,12 +47,12 @@ local function mqtt_io_loop(mt)
done.v = true
end

local function check_keepalive_loop(mt)
local done = mt.done
local con = mt.con
local function check_keepalive_loop(con)
local done = con.done
local __con = con.con

while not done.v do
if not con:loop_misc() then
if not __con:loop_misc() then
break
end
time.sleep(1)
Expand All @@ -64,95 +64,79 @@ end
local methods = {}

function methods:destroy()
local mt = getmetatable(self)
mt.done.v = true
self.done.v = true

if mt.ior then
mt.ior:cancel()
if self.ior then
self.ior:cancel()
end

if mt.iow then
mt.iow:cancel()
if self.iow then
self.iow:cancel()
end

return mt.con:destroy()
return self.con:destroy()
end

function methods:disconnect()
local mt = getmetatable(self)
return mt.con:disconnect()
return self.con:disconnect()
end

function methods:reinitialise(id, clean_session)
local mt = getmetatable(self)
return mt.con:reinitialise(id, clean_session)
return self.con:reinitialise(id, clean_session)
end

function methods:will_set(topic, payload, qos, retain)
local mt = getmetatable(self)
return mt.con:will_set(topic, payload, qos, retain)
return self.con:will_set(topic, payload, qos, retain)
end

function methods:will_clear()
local mt = getmetatable(self)
return mt.con:will_clear()
return self.con:will_clear()
end

function methods:login_set(username, password)
local mt = getmetatable(self)
return mt.con:login_set(username, password)
return self.con:login_set(username, password)
end

function methods:tls_insecure_set(insecure)
local mt = getmetatable(self)
return mt.con:tls_insecure_set(insecure)
return self.con:tls_insecure_set(insecure)
end

function methods:tls_set(cafile, capath, certfile, keyfile)
local mt = getmetatable(self)
return mt.con:tls_set(cafile, capath, certfile, keyfile)
return self.con:tls_set(cafile, capath, certfile, keyfile)
end

function methods:tls_psk_set(psk, identity, ciphers)
local mt = getmetatable(self)
return mt.con:tls_psk_set(psk, identity, ciphers)
return self.con:tls_psk_set(psk, identity, ciphers)
end

function methods:tls_opts_set(cert_required, tls_version, ciphers)
local mt = getmetatable(self)
return mt.con:tls_psk_set(cert_required, tls_version, ciphers)
return self.con:tls_psk_set(cert_required, tls_version, ciphers)
end

function methods:option(option, value)
local mt = getmetatable(self)
return mt.con:option(option, value)
return self.con:option(option, value)
end

function methods:publish(topic, payload, qos, retain)
local mt = getmetatable(self)
return mt.con:publish(topic, payload, qos, retain)
return self.con:publish(topic, payload, qos, retain)
end

function methods:subscribe(topic, qos)
local mt = getmetatable(self)
return mt.con:subscribe(topic, qos)
return self.con:subscribe(topic, qos)
end

function methods:unsubscribe(topic)
local mt = getmetatable(self)
return mt.con:unsubscribe(topic)
return self.con:unsubscribe(topic)
end

function methods:set_callback(typ, func)
local mt = getmetatable(self)
return mt.con:callback_set(typ, function(...)
return self.con:callback_set(typ, function(...)
eco.run(func, ...)
end)
end

local function try_connect(con, address, port, keepalive)
local mt = getmetatable(con)
local __con = mt.con
local __con = con.con

local s, err = socket.connect_tcp(address, port)
if not s then
Expand All @@ -167,11 +151,11 @@ local function try_connect(con, address, port, keepalive)
return false, err
end

mt.iow = eco.watcher(eco.IO, __con:socket(), eco.WRITE)
mt.done.v = false
con.iow = eco.watcher(eco.IO, __con:socket(), eco.WRITE)
con.done.v = false

eco.run(mqtt_io_loop, mt)
eco.run(check_keepalive_loop, mt)
eco.run(mqtt_io_loop, con)
eco.run(check_keepalive_loop, con)

return true
end
Expand All @@ -196,15 +180,18 @@ function methods:connect(host, port, keepalive)
return false, err
end

local metatable = {
__index = methods,
__gc = methods.destroy
}

function M.new(id, clean_session)
local con = mqtt.new(eco.context(), id, clean_session)

return setmetatable({}, {
__index = methods,
__gc = methods.destroy,
return setmetatable({
con = con,
done = { v = true }
})
}, metatable)
end

return setmetatable(M, { __index = mqtt })

0 comments on commit 4378fbb

Please sign in to comment.