Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stale inflight messages #86

Closed
bkupidura opened this issue Jul 3, 2022 · 13 comments
Closed

Stale inflight messages #86

bkupidura opened this issue Jul 3, 2022 · 13 comments
Labels
bug Something isn't working

Comments

@bkupidura
Copy link
Collaborator

bkupidura commented Jul 3, 2022

I start observing increased number of inflight messages which are almost never going down. In same time i didnt notice any disconnects from clients - so looks like clients just didnt received message and they will never send ACK (network issue on client side?)

I see that there is ResendClientInflight (https://github.com/mochi-co/mqtt/blob/master/server/server.go#L878), but looks like its executed only on new client connection (send messages from persistence store to reconnecting clients?).

Is there any internal mechanism which will try to deliver inflight messages or mochi-co/mqtt user should implement this on his own?

@mochi-co mochi-co added the bug Something isn't working label Jul 3, 2022
@mochi-co
Copy link
Collaborator

mochi-co commented Jul 3, 2022

Are you using persistence? This sounds similar to #76 - typically inflights shouldn't really occur in normal use case. They certainly shouldn't be increasing indefinitely in this way.

Currently inflight messages are only resent when the client reconnects. It would be nice to have a periodic retry, however it needs a bit of thought to avoid creating unnecessary overhead.

@bkupidura
Copy link
Collaborator Author

Im not using any persistence storage. Any my clients wasnt disconnected, so those inflight messages will stay there forever ;)

For now i will implement periodic ResendClientInflight on my end.

@mochi-co
Copy link
Collaborator

mochi-co commented Jul 3, 2022

That's very unusual... Can you provide some more information about your client library, use (qos, etc)? It would be good to determine the issue in case it's something which can be fixed.

@bkupidura
Copy link
Collaborator Author

I didnt do much debuging yet, and not sure which messages are inflight. Will investigate further.

And to be precise, i observe couple (4-10) of inflights per week or two - so im almost sure that this is issue with consumers/networking and not mochi-co/mqtt.

@bkupidura
Copy link
Collaborator Author

bkupidura commented Jul 6, 2022

So i have some funny findings regarding that one. As a PoC i added:

func handleInflight(ctx context.Context, mqttServer *mqtt.Server) {
	for {
		for _, cl := range mqttServer.Clients.GetByListener("t1") {
			log.Printf("inflights messages for %s (%s) %d", cl.ID, string(cl.Username), cl.Inflight.Len())
			for _, tk := range cl.Inflight.GetAll() {
				log.Printf("topic %s (%v)", tk.Packet.TopicName, tk.Packet.Topics)
			}
			mqttServer.ResendClientInflight(cl, false)
		}
		time.Sleep(300 * time.Second)
	}
}

Just to check that running ResendClientInflight every N seconds will solve my problem. And it did.

But today, Prometheus start complaining that i have some in-flight messages.

I build prometheus metric by reading mqttServer.System.Inflight.
I have only one listener called "t1".

And from handleInflight output, looks i dont have any inflights on any client, but mqttServer.System.Inflight is still showing some inflights.

I see that potentialy it can happend somewhere here https://github.com/mochi-co/mqtt/blob/27f3c484ad65bb34cd4c1de12cbb91b8be16dabd/server/server.go#L885 when packet is not Publish packet.

In my case publishDropped is not increased (still showing 0).

Any idea what im missing?

My code: https://github.com/bkupidura/broker-ha/blob/debug-inflight/server/server.go

@bkupidura
Copy link
Collaborator Author

bkupidura commented Aug 7, 2022

@mochi-co possibly i found root cause for those in-flight messages.

When new client subscribes with QoS 2, looks like its remembered by mochi-co/mqtt forever and every message published to subscribed topics will be stored to deliver them in future.
s.Topics.Unsubscribe is called only on unsubscribe packet and from inheritClientSession. Its is not called when client is disconnected.

This will not work, as clients are identified by ClientID which can be random.

Moreover in my case, client can reconnect to different mochi-co/mqtt instance, so first instance will just store messages which will never be delivered.

For debugging i enabled bolt store to check what is there.

mosquitto_sub -h '<broker_ip>' -u <username> -P '<password>' -t 'inflight_test' -i inflight_test -q 2
<close mosquitto_sub process>
mosquitto_pub -h '<broker_ip>' -u <username> -P '<password>' -m '123' -t 'inflight_test' -q 2

Message in store

2022/08/07 19:34:52 server.go:120: inflight from store: {Payload:[49 50 51] FixedHeader:{Remaining:0 Type:3 Qos:2 Dup:false Retain:false} T:ifm ID:if_inflight_test_1 Client: TopicName:inflight_test Sent:1659893659 Resends:0 PacketID:0}

I believe that when client is disconnected, we should have possibility to remove his subscriptions - even if QoS 2 was set, as this client can never reconnect.

@mochi-co
Copy link
Collaborator

@bkupidura This looks very interestin and I suspect you are correct. Apologies I have been very busy lately. I will do my best to look into this as a priority in depth soon 👍🏻

@mochi-co
Copy link
Collaborator

Fixed in v1.3.0 with new TTL/period resend support for inflight messages.

@bkupidura Please give it a test and close the issue if it solves the problem :)

@bkupidura
Copy link
Collaborator Author

@mochi-co thanks! I just deployed new version and looks good. Inflights are indeed cleared.

There is just small thing, after clearExpiredInflights and clearAbandonedInflights we should update s.System.Inflight. Otherwise metric is not synced with real state.

@mochi-co
Copy link
Collaborator

@bkupidura Ahhh yes I knew I'd forgotten something. I'll add it to the list!

@bkupidura
Copy link
Collaborator Author

@mochi-co if you dont mind, i can prepare PR for that.

My idea is that func (i *Inflight) ClearExpired should return number of cleared inflights, and just handle that in mqtt/server.go

@mochi-co
Copy link
Collaborator

@bkupidura That would be wonderful! :) I think that's the most effective approach too.

@bkupidura
Copy link
Collaborator Author

#92

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants