Skip to content

Commit

Permalink
fix(tests) fix more tests, ioloop related
Browse files Browse the repository at this point in the history
  • Loading branch information
Tieske committed Nov 4, 2021
1 parent 2723220 commit 7bd0a86
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 20 deletions.
17 changes: 8 additions & 9 deletions mqtt/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ function client_mt:subscribe(opts)
local packet_id = pargs.packet_id
local subscribe = self._make_packet(pargs)

log:info("subscribing client '%s' to topic '%s' (packet: %d)", self.opts.id, opts.topic, packet_id or -1)
log:info("subscribing client '%s' to topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.")

-- send SUBSCRIBE packet
local ok, err = self:_send_packet(subscribe)
Expand Down Expand Up @@ -409,7 +409,7 @@ function client_mt:unsubscribe(opts)
local packet_id = pargs.packet_id
local unsubscribe = self._make_packet(pargs)

log:info("unsubscribing client '%s' from topic '%s' (packet: %d)", self.opts.id, opts.topic, packet_id or -1)
log:info("unsubscribing client '%s' from topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.")

-- send UNSUBSCRIBE packet
local ok, err = self:_send_packet(unsubscribe)
Expand Down Expand Up @@ -475,8 +475,7 @@ function client_mt:publish(opts)
local packet_id = opts.packet_id
local publish = self._make_packet(opts)

log:debug("client '%s' publishing to topic '%s' (packet: %d)", self.opts.id, opts.topic, packet_id or -1)
--log:debug("client '%s' publishing to topic '%s' (value '%d')", self.opts.id, opts.topic, opts.payload)
log:debug("client '%s' publishing to topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.")

-- send PUBLISH packet
local ok, err = self:_send_packet(publish)
Expand Down Expand Up @@ -537,7 +536,7 @@ function client_mt:acknowledge(msg, rc, properties, user_properties)
return true
end

log:debug("client '%s' acknowledging packet %d", self.opts.id, packet_id or -1)
log:debug("client '%s' acknowledging packet %s", self.opts.id, packet_id or "n.a.")

if msg.qos == 1 then
-- PUBACK should be sent
Expand Down Expand Up @@ -886,7 +885,7 @@ function client_mt:acknowledge_pubrel(packet_id)
-- create PUBREL packet
local pubrel = self._make_packet{type=packet_type.PUBREL, packet_id=packet_id, rc=0}

log:debug("client '%s' sending PUBREL (packet: %d)", self.opts.id, packet_id or -1)
log:debug("client '%s' sending PUBREL (packet: %s)", self.opts.id, packet_id or "n.a.")

-- send PUBREL packet
local ok, err = self:_send_packet(pubrel)
Expand All @@ -912,7 +911,7 @@ function client_mt:acknowledge_pubcomp(packet_id)
-- create PUBCOMP packet
local pubcomp = self._make_packet{type=packet_type.PUBCOMP, packet_id=packet_id, rc=0}

log:debug("client '%s' sending PUBCOMP (packet: %d)", self.opts.id, packet_id or -1)
log:debug("client '%s' sending PUBCOMP (packet: %s)", self.opts.id, packet_id or "n.a.")

-- send PUBCOMP packet
local ok, err = self:_send_packet(pubcomp)
Expand Down Expand Up @@ -966,7 +965,7 @@ function client_mt:handle_received_packet(packet)
local conn = self.connection
local err

log:debug("client '%s' received '%s' (packet: %s)", self.opts.id, packet.type, tostring(packet.packet_id or "n.a."))
log:debug("client '%s' received '%s' (packet: %s)", self.opts.id, packet_type[packet.type], packet.packet_id or "n.a.")

if not conn.connack then
-- expecting only CONNACK packet here
Expand All @@ -993,7 +992,7 @@ function client_mt:handle_received_packet(packet)
return false, err
end

log:info("client '%s' connected successfully to '%s'", self.opts.id, conn.uri)
log:info("client '%s' connected successfully to '%s:%s'", self.opts.id, conn.host, conn.port)

-- fire connect event
self:handle("connect", packet, self)
Expand Down
63 changes: 52 additions & 11 deletions tests/spec/06-mqtt-client_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-- DOC v5.0: http://docs.oasis-open.org/mqtt/mqtt/v5.0/cos01/mqtt-v5.0-cos01.html

local log = require("logging").defaultLogger()
local socket = require("socket")

describe("MQTT lua library", function()
-- load MQTT lua library
Expand Down Expand Up @@ -314,12 +315,8 @@ describe("MQTT client", function()
end,
}

-- and wait for connection to broker is closed
if case.sync then
mqtt.run_sync(client)
else
mqtt.run_ioloop(client)
end
-- and wait for connection to broker to be closed
mqtt.run_ioloop(client)

assert.are.same({}, errors)
assert.is_true(acknowledge)
Expand Down Expand Up @@ -352,7 +349,7 @@ describe("last will message", function()
username = "stPwSVV73Eqw5LSv0iMXbc4EguS7JyuZR9lxU5uLxI5tiNM8ToTVqNpu85pFtJv9",
}

local client1_ready, client2_ready
local client1_ready, client2_ready, clients_done

local function send_self_destroy()
if not client1_ready or not client2_ready then
Expand Down Expand Up @@ -381,6 +378,7 @@ describe("last will message", function()
-- break connection with broker on any message
log:warn("client1 received a message and is now closing its connection")
client1:close_connection("self-destructed")
clients_done = (clients_done or 0)+1
end,
}

Expand All @@ -401,17 +399,28 @@ describe("last will message", function()
will_received = msg.topic == will_topic
log:warn("client2 received a message, topic is: '%s', client 2 is now closing its connection",tostring(msg.topic))
client2:disconnect()
clients_done = (clients_done or 0)+1
end,
}

mqtt.run_ioloop(client1, client2)
local timer do
local timeout = socket.gettime() + 30
function timer()
if clients_done == 2 then
require("mqtt.ioloop").get():remove(timer)
end
assert(socket.gettime() < timeout, "test failed due to timeout")
end
end

mqtt.run_ioloop(client1, client2, timer)

assert.is_true(will_received)
end)
end)


describe("no_local flag for subscription: ", function()
describe("no_local flag for subscription:", function()
local mqtt = require("mqtt")
local prefix = "luamqtt/" .. tostring(math.floor(math.random()*1e13))
local no_local_topic = prefix .. "/no_local_test"
Expand All @@ -426,7 +435,7 @@ describe("no_local flag for subscription: ", function()
version = mqtt.v50
}

it("msg should not be received", function()
it("msg should not be received #only", function()
local c1 = mqtt.client(conn_args)
local c2 = mqtt.client(conn_args)

Expand All @@ -449,68 +458,90 @@ describe("no_local flag for subscription: ", function()

local function send()
if not s1.subscribed or not s2.subscribed then
log:warn("not sending because clients are not both subscribed")
return
end
log:warn("both clients are subscribed, now sending...")
socket.sleep(0.2) -- shouldn't be necessary, but test is flaky otherwise
log:warn("client1: publishing 'message' to topic '.../no_local_test'")
assert(c1:publish{
topic = no_local_topic,
payload = "message",
callback = function()
s1.published = s1.published + 1
log:warn("client1: publishing to topic '.../no_local_test' confirmed, count: %d", s1.published)
end
})
end

c1:on{
connect = function()
log:warn("client1: is now connected")
s1.connected = true
send()
log:warn("client1: subscribing to topic '.../#', with 'no_local'")
assert(c1:subscribe{
topic = prefix .. "/#",
no_local = true,
callback = function()
s1.subscribed = true
log:warn("client1: subscription to topic '.../#' with 'no_local' confirmed")
send()
end
})
end,
message = function(msg)
s1.messages[#s1.messages + 1] = msg.payload
if msg.payload == "stop" then
log:warn("client1: received message, with payload 'stop'. Will now disconnect.")
assert(c1:disconnect())
else
log:warn("client1: received message, with payload '%s' (but waiting for 'stop')", msg.payload)
end
end,
error = function(err)
s1.errors[#s1.errors + 1] = err
log:warn("client1: received error: '%s'", tostring(err))
end,
close = function(conn)
log:warn("client1: closed, reason: %s", conn.close_reason)
s1.close_reason = conn.close_reason
end
}

c2:on{
connect = function()
s2.connected = true
log:warn("client2: is now connected")
log:warn("client2: subscribing to topic '.../#', without 'no_local'")
assert(c2:subscribe{
topic = no_local_topic,
no_local = false,
callback = function()
s2.subscribed = true
log:warn("client2: subscription to topic '.../#' without 'no_local' confirmed")
send()
end
})
end,
message = function(msg)
s2.messages[#s2.messages + 1] = msg.payload
if msg.payload == "message" then
log:warn("client2: received message, with payload 'message'")
log:warn("client2: publishing to topic '.../no_local_test'', with payload 'stop'")
assert(c2:publish{
topic = no_local_topic,
payload = "stop",
callback = function()
s2.published = s2.published + 1
log:warn("client2: publishing to topic '.../no_local_test' confirmed, count: %d", s2.published)
end
})
elseif msg.payload == "stop" then
log:warn("client2: received message, with payload 'stop'. Will now disconnect.")
assert(c2:disconnect())
else
log:warn("client2: received message, with payload '%s' (but waiting for 'stop' or 'message')", msg.payload)
end
end,
error = function(err)
Expand All @@ -521,7 +552,17 @@ describe("no_local flag for subscription: ", function()
end
}

mqtt.run_ioloop(c1, c2)
local timer do
local timeout = socket.gettime() + 30
function timer()
if s1.close_reason and s2.close_reason then
require("mqtt.ioloop").get():remove(timer)
end
assert(socket.gettime() < timeout, "test failed due to timeout")
end
end

mqtt.run_ioloop(c1, c2, timer)

assert.is_true(s1.connected, "client 1 is not connected")
assert.is_true(s2.connected, "client 2 is not connected")
Expand Down

0 comments on commit 7bd0a86

Please sign in to comment.