diff --git a/mqtt/connector/base/non_buffered_base.lua b/mqtt/connector/base/non_buffered_base.lua index eb9e0f8..617813c 100644 --- a/mqtt/connector/base/non_buffered_base.lua +++ b/mqtt/connector/base/non_buffered_base.lua @@ -26,7 +26,7 @@ non_buffered.signal_closed = {} -- remote closed the connection --- Validate connection options. function non_buffered:shutdown() -- luacheck: ignore - error("method 'validate' on connector wasn't implemented") + error("method 'validate' on connector wasn't implemented") --TODO: comments and text doesn't match name end --- Clears consumed bytes. diff --git a/mqtt/connector/copas.lua b/mqtt/connector/copas.lua index 2c787d8..e7114a0 100644 --- a/mqtt/connector/copas.lua +++ b/mqtt/connector/copas.lua @@ -48,6 +48,8 @@ end function connector:connect() self:validate() local sock = copas.wrap(socket.tcp(), self.secure_params) + copas.setsocketname("mqtt@"..self.host..":"..self.port, sock) + sock:settimeouts(self.timeout, self.timeout, -1) -- no timout on reading local ok, err = sock:connect(self.host, self.port) diff --git a/mqtt/loop/copas.lua b/mqtt/loop/copas.lua index d42216d..32fc84e 100644 --- a/mqtt/loop/copas.lua +++ b/mqtt/loop/copas.lua @@ -25,23 +25,24 @@ function _M.add(cl) do -- make mqtt device async for incoming packets local handle_received_packet = cl.handle_received_packet - + local count = 0 -- replace packet handler; create a new thread for each packet received cl.handle_received_packet = function(mqttdevice, packet) - copas.addthread(handle_received_packet, mqttdevice, packet) + count = count + 1 + copas.addnamedthread(handle_received_packet, cl.opts.id..":receive_"..count, mqttdevice, packet) return true end end -- add keep-alive timer - local timer = copas.addthread(function() + local timer = copas.addnamedthread(function() while client_registry[cl] do copas.sleep(cl:check_keep_alive()) end - end) + end, cl.opts.id .. ":keep_alive") -- add client to connect and listen - copas.addthread(function() + copas.addnamedthread(function() while client_registry[cl] do local timeout = cl:step() if not timeout then @@ -54,7 +55,7 @@ function _M.add(cl) end end end - end) + end, cl.opts.id .. ":listener") return true end