Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible issue with Will retain flag #153

Closed
dgduncan opened this issue Jan 28, 2023 · 14 comments
Closed

Possible issue with Will retain flag #153

dgduncan opened this issue Jan 28, 2023 · 14 comments
Assignees
Labels
bug Something isn't working

Comments

@dgduncan
Copy link
Collaborator

When testing, I discovered that there may be an issue with the retain flag when Will is published. When setting the Will to retain true it doesn't seem to actually set the retain flag to true when it is published. I can see the message when subscribed to the topic when it is published; however, the message will not be there when subscribed again.

@mochi-co mochi-co self-assigned this Jan 28, 2023
@mochi-co mochi-co added bug Something isn't working needs investigation Identify cause / reproduce issue labels Jan 28, 2023
@mochi-co
Copy link
Collaborator

Thanks @dgduncan - I will try to look into this as soon as I can. What MQTT protocol/client are you using?

@dgduncan
Copy link
Collaborator Author

@mochi-co paho.mqtt.golang 3.1.1
I will attempt to do some investigation myself as it was late when I posted this.

@dgduncan
Copy link
Collaborator Author

dgduncan commented Jan 28, 2023

I am beginning to wonder if this isn't an LWT problem but perhaps a retain flag problem. Upon further testing the Will message seems to be publishing normally; however, multiple messages on the topic are being retained in a seemingly random order. I have run a few tests where I publish multiple message from the same client to a topic:

token2 = mqttclient.Publish("LWT/admin", 1, true, "connected")
fmt.Println(token2.WaitTimeout(60 * time.Second))

token2 = mqttclient.Publish("LWT/admin", 1, true, "connected")
fmt.Println(token2.WaitTimeout(60 * time.Second))

And when I subscribe to that topic:

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [pinger]   keepalive starting
[DEBUG] [client]   startCommsWorkers done
[DEBUG] [store]    enter Resume
[DEBUG] [store]    exit resume
[DEBUG] [client]   exit startClient
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [LWT/admin]
[DEBUG] [client]   sending subscribe message, topic: LWT/admin
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 1
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 1
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      logic waiting for msg on ibound
connected
[DEBUG] [net]      putting puback msg on obound
[DEBUG] [store]    memorystore del: message 1 was deleted
[DEBUG] [net]      done putting puback msg on obound
connected
[DEBUG] [net]      putting puback msg on obound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      done putting puback msg on obound
[DEBUG] [net]      obound priority msg to write, type *packets.PubackPacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      obound priority msg to write, type *packets.PubackPacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [145]
[DEBUG] [net]      logic waiting for msg on ibound

I received the message twice. I have also done this with one publish to the LWT topic and another publish using the client will and when resubscribing to the same topic multiple messages will be received in a seemingly random order.

@mochi-co
Copy link
Collaborator

mochi-co commented Feb 1, 2023

Thanks @dgduncan - I will look at this as soon as I can 👍🏻

@mochi-co
Copy link
Collaborator

mochi-co commented Feb 4, 2023

@dgduncan I've had a small look into this issue.

Retained messages seem to be working normally, so that's good news.

However, I noticed you are using Qos 1 for the publish and subscribe. MQTT implements offline message queuing for messages published with Qos 1 and 2, so if you disconnect and reconnect without setting the clean session (v3) or clean start (v5) flag, the client will inherit the queued Qos messages, which will be delivered after a successful connack.

There is a known issue with message ordering which may also be affecting this (#150), and I'm hoping to resolve that in the near future once I decide on the correct solution.

Does this sound like it might explain the behaviour you are experiencing?

@dgduncan
Copy link
Collaborator Author

dgduncan commented Feb 4, 2023

Hey @mochi-co thank you for looking into this. I am currently traveling and have not been able to test this to the best of my abilities in a known environment.

My understanding of MQTT retain is that when publishing QOS 1 or 2 with retain true the broker will store the latest message on that topic and publish that to subscribers of that topic regardless of clean session state. Please correct me if I am wrong in this.

In my testing I found that I would receive two retained messages. In my experience with other brokers I never found that to be the case. I am still relatively new to MQTT and could be misunderstanding interactions between retain and clean session.

@mochi-co
Copy link
Collaborator

mochi-co commented Feb 4, 2023

@dgduncan I think this is very possible and would explain what you're experiencing, it's not something I'd thought of. When a client reconnects, publishing pending inflights and publishing retained messages are two separate calls. Potentially, if a Client A subscribes to Topic and then disconnects, and Client B publishes a retained message Hello with QOS 1/2 to Topic, then when Client A reconnected it would receive first the pending inflight for Hello, and then it would receive the retained message Hello when the subscription is transferred.

I'll have to look into this, but it might be a week or so depending on how busy I get. Probably the simplest solution will be to add some kind of message merge and send both the inflights and retained out at the same time, but I will need to investigate more thoroughly to ensure nothing more broad-reaching is happening.

Let me know if you discover anything else which may confirm or rule out anything. Thank you for raising it, either way! I will try to get it fixed.

@dgduncan
Copy link
Collaborator Author

dgduncan commented Feb 4, 2023

@mochi-co thank you for looking into this. If I have some time I will look into this as well to the best of my ability.

I appreciate you and everyone else working so hard on this project. I have been writing some really cool hooks for this project to integrate some cloud auth and messaging patterns where MQTT isn’t really viable for a personal project of mine and I think this project has a lot of potential :)

@mochi-co
Copy link
Collaborator

mochi-co commented Feb 9, 2023

I've investigated this and it occurs because we resend retained messages when a client inherits an existing session, but per https://groups.google.com/g/mqtt/c/ZUpadJna_tA this is a misreading of the specification. As a result, we end up with the same packet duplicated, because the retained message has a packet id of 0 while the QoS message retains it's original outgoing packet id (e.g. 7) - as such they can't be reconciled into a single message.

As retained messages should only be sent when a client sends a SUBSCRIBE packet, and not on reconnect, the solution is to remove the retained message re-issuance when a session takeover occurs. I will patch this momentarily.

@mochi-co
Copy link
Collaborator

This should now be resolved in https://github.com/mochi-co/mqtt/releases/tag/v2.2.0 - let me know! 👍🏻

@dgduncan
Copy link
Collaborator Author

@mochi-co I will check this out and update with the results. Thank you for looking into this!

@dgduncan
Copy link
Collaborator Author

Just for an update I have not tested the changes yet but will this week and either keep the issue open or close it.

@mochi-co mochi-co added pending ok Released, awaiting confirmation of resolution and removed needs investigation Identify cause / reproduce issue labels Feb 25, 2023
@mochi-co
Copy link
Collaborator

mochi-co commented Mar 3, 2023

@dgduncan I'll go ahead and close this for now, but if you find it is still a problem, comment and I'll reopen :)

@mochi-co mochi-co closed this as completed Mar 3, 2023
@mochi-co mochi-co removed the pending ok Released, awaiting confirmation of resolution label Mar 3, 2023
@dadebue
Copy link
Contributor

dadebue commented May 10, 2023

Hey, unfortunately this problem still exists.

My test case

  • Client A publishes its status on a clientA/status topic
  • When client A is connected it publishes its status online every 10 secs
  • Client A connects with a retained LWT message (changing the clientA/status topic to offline) and a timeout of 30 secs
  • Client B subscribes to the clientA/status topic

Findings

  • When the client A goes offline and its timeout is reached the broker correctly delivers the LWT message to client B
    • Done in func (s *Server) sendLWT(cl *Client)
  • When I disconnect client B and establish a new connection (clean session) with client B it receives the incorrect online status message of client A
  • I found that the s.Topics retained list still only contains the online status message

Potential problem / solution

Could it be that the problem occurs because the s.sendLWT() is missing a call to s.retainMessage() if the LWT message is retained? So something like this:

func (s *Server) sendLWT(cl *Client) {

	...

	if pk.FixedHeader.Retain { // [MQTT-3.3.1-5] ![MQTT-3.3.1-8]
		s.retainMessage(cl, pk)
	}

	s.publishToSubscribers(pk)                      // [MQTT-3.1.2-8]
	atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
	s.hooks.OnWillSent(cl, pk)
}

instead of this:

func (s *Server) sendLWT(cl *Client) {

	...

	s.publishToSubscribers(pk)                      // [MQTT-3.1.2-8]
	atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
	s.hooks.OnWillSent(cl, pk)
}

Thanks and best regards
Marco

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants