Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retained messages not recieved upon subscribing #22

Closed
sadilekivan opened this issue Mar 30, 2022 · 27 comments
Closed

Retained messages not recieved upon subscribing #22

sadilekivan opened this issue Mar 30, 2022 · 27 comments

Comments

@sadilekivan
Copy link

As far as I tried I cannot seem to get a message that was published before with a retain bit set.

Running dart client in Linux and Mosquitto broker with Rpi4. I tested my setup with a python paho client and I recieve retained messages fine. Anything I could be missing?

@shamblett
Copy link
Owner

I'm going to need a log if you want me to look at this.

@sadilekivan
Copy link
Author

sadilekivan commented Mar 30, 2022

Sorry for being hastily, should have been my first option. Cant see the problem though, other than MqttMessageType.subscribe Retain = false? Please have a look.

The topic of my interest is tinydb

broker Keep Alive = 0
Response Information = null
broker Reference = null
Authentication Method = null
Properties = Identifier : topicAliasMaximum, value : 10
Identifier : receiveMaximum, value : 20
flutter: 2022-03-30 12:01:11.227969 -- MqttServerConnection::_onData - message available event fired
flutter: 2022-03-30 12:01:11.228030 -- MqttServerConnection::_onData - Message Received Ended <<<
flutter: 2022-03-30 12:01:11.229283 -- MqttConnectionHandlerBase::_connectAckProcessor
flutter: 2022-03-30 12:01:11.229444 -- MqttConnectionHandlerBase::_connectAckProcessor - state = connected
flutter: 2022-03-30 12:01:11.229786 -- MqttConnectionHandlerBase:: cancelling connect timer
flutter: 2022-03-30 12:01:11.230007 -- MqttSynchronousServerConnectionHandler::internalConnect - post sleep, state = Connection status is connected with return code of success and a disconnection origin of none
flutter: 2022-03-30 12:01:11.230057 -- MqttSynchronousServerConnectionHandler::internalConnect exited with state Connection status is connected with return code of success and a disconnection origin of none
flutter: 2022-03-30 12:01:11.233228 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.subscribe
MessageType = MqttMessageType.subscribe Duplicate = false Retain = false Qos = atLeastOnce Size = 0
Message Identifier = 1
Subscription identifier = 0
Properties = No properties set
Topic = tinydb, Option = Maximum Qos = exactlyOnce
No Local = false
Retain As Published = true
Retain Handling = sendRetained
flutter: 2022-03-30 12:01:11.234686 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>
flutter: 2022-03-30 12:01:11.268378 -- MqttServerConnection::_onData - Message Received Started <<<
flutter: 2022-03-30 12:01:11.268437 -- MqttServerConnection::_ondata - adding incoming data, data length is 28, message stream length is 0, message stream position is 0
flutter: 2022-03-30 12:01:11.269693 -- MqttServerConnection::_onData - MESSAGE RECEIVED -> MQTTMessage of type MqttMessageType.subscribeAck
MessageType = MqttMessageType.subscribeAck Duplicate = false Retain = false Qos = atMostOnce Size = 4
Message Identifier = 1
Reason String = null
Properties = No properties set
Reason Code = grantedQos2
flutter: 2022-03-30 12:01:11.270216 -- MqttServerConnection::_onData - message available event fired
flutter: 2022-03-30 12:01:11.270261 -- MqttServerConnection::_onData - Message Received Ended <<<
flutter: 2022-03-30 12:01:11.270573 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.subscribeAck

@sadilekivan
Copy link
Author

I noticed that if I specifically set retainHandling to doNotSendRetained or any other enum value, the log still prints with sendRetained.

void main() async {
  initializeJsonMapper();
  mqtt.client.logging(on: true);
  var option = MqttSubscriptionOption();
  option.retainHandling = MqttRetainHandling.doNotSendRetained;
  var sub = MqttSubscription(MqttSubscriptionTopic("tinydb"), option);
  print(sub.option!.retainHandling);

  await mqtt.client.connect();
  //mqtt.client.subscribe('tinydb', MqttQos.exactlyOnce);
  mqtt.client.subscribeWithSubscription(sub);
  //tinydbTopic.subscribe(MqttQos.exactlyOnce).listen((event) {
  //  print(event);
  //});
  runApp(const MyApp());
}
Restarted application in 165ms.
flutter: MqttRetainHandling.doNotSendRetained
...connecting...
flutter: 2022-03-30 13:02:06.545086 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.subscribe
MessageType = MqttMessageType.subscribe Duplicate = false Retain = false Qos = atLeastOnce Size = 0
Message Identifier = 1
Subscription identifier = 0
Properties = No properties set
Topic = tinydb, Option = Maximum Qos = atMostOnce
No Local = false
Retain As Published = true
Retain Handling = sendRetained

@shamblett
Copy link
Owner

Yes there seems to be a bug with setting the option in the subscription, I'll look at this, however the default is to sendRetain so it should still work. What are you setting in your connection message?

@sadilekivan
Copy link
Author

Hey, I use the default connection message (just the connect() method with no arguments). Also you're right, I do want the sendRetain option, just thought Id mention what I found while trying to figure it out.

My initial setup just uses the Constructor with an client_id, then connect method and then I subscribe and listen to any messages in updates.

When I subscribe the log prints a Retain = false as seen above, could that be it? Do I need to confirm that my subscription wants retained messages?

Just to be sure I wiped my pub-cache, but to no avail.

@sadilekivan
Copy link
Author

Just had a hunch, I used your package with flutter build for Linux desktop. Tested it under Windows and no problem. Retained message received and everything seems to work as intended. I'm not sure if this could be even a bug with your library then, maybe something with how flutter builds for Linux.

Also just noting that the sendRetain option in the log still seems to not react to changes from the subscription options on Windows.

The project I'm working on is supposed to run on android so I just should get used to using an emulator, or the real device. But in the case you would pursue this issue I'll leave it open.

@sadilekivan sadilekivan changed the title Retained messages not recieved upon subscribing [Linux] Retained messages not recieved upon subscribing Mar 30, 2022
@shamblett
Copy link
Owner

Fixed the subscription options bug, package re published at version 3.2.1

@sadilekivan
Copy link
Author

sadilekivan commented May 20, 2022

Hi, back at it again.

So I was mistaken, it's not a OS based issue, after testing on both linux and windows I have a pretty reproducible bug. I decided to do a better job of showing you with a test file.

Please have a look at my fork and see if you are able to reproduce same results on this test, and if it's making sense to you.

There are two test cases, one does its job without waiting, that works. The other waits a little bit to simulate missing a published message.

@sadilekivan sadilekivan changed the title [Linux] Retained messages not recieved upon subscribing Retained messages not recieved upon subscribing May 20, 2022
@Simonovsky
Copy link

Hi, back at it again.

So I was mistaken, it's not a OS based issue, after testing on both linux and windows I have a pretty reproducible bug. I decided to do a better job of showing you with a test file.

Please have a look at my fork and see if you are able to reproduce same results on this test, and if it's making sense to you.

There are two test cases, one does its job without waiting, that works. The other waits a little bit to simulate missing a published message.

So are you still unable to receive retained messages on subscribtion? Because I'm unable to make it work as well for some reason and I wonder if we have the same problem. I'm testing it on Android tho.

@sadilekivan
Copy link
Author

Hi, back at it again.
So I was mistaken, it's not a OS based issue, after testing on both linux and windows I have a pretty reproducible bug. I decided to do a better job of showing you with a test file.
Please have a look at my fork and see if you are able to reproduce same results on this test, and if it's making sense to you.
There are two test cases, one does its job without waiting, that works. The other waits a little bit to simulate missing a published message.

So are you still unable to receive retained messages on subscribtion? Because I'm unable to make it work as well for some reason and I wonder if we have the same problem. I'm testing it on Android tho.

Yes, I have the same problem on android too. I don't know how to run the dart test straight on android, but the same behavior occurs in my flutter app. Mainly the reason I returned to this issue, because I plan to support the android version.

@Simonovsky
Copy link

Simonovsky commented May 20, 2022

Hi, back at it again.
So I was mistaken, it's not a OS based issue, after testing on both linux and windows I have a pretty reproducible bug. I decided to do a better job of showing you with a test file.
Please have a look at my fork and see if you are able to reproduce same results on this test, and if it's making sense to you.
There are two test cases, one does its job without waiting, that works. The other waits a little bit to simulate missing a published message.

So are you still unable to receive retained messages on subscribtion? Because I'm unable to make it work as well for some reason and I wonder if we have the same problem. I'm testing it on Android tho.

Yes, I have the same problem on android too. I don't know how to run the dart test straight on android, but the same behavior occurs in my flutter app. Mainly the reason I returned to this issue, because I plan to support the android version.

Damn that makes my job a lot harder now 🥲. If I make it work I'll let you know. Thanks for the quick response!

Edit: I've just tested the same setup on 3.1/3.1.1 mqtt_client (https://github.com/shamblett/mqtt_client) lib and it works as expected. So for you, if you don't need MQTT5 specifically I would recommend using the old client.

@sadilekivan
Copy link
Author

Hi, back at it again.
So I was mistaken, it's not a OS based issue, after testing on both linux and windows I have a pretty reproducible bug. I decided to do a better job of showing you with a test file.
Please have a look at my fork and see if you are able to reproduce same results on this test, and if it's making sense to you.
There are two test cases, one does its job without waiting, that works. The other waits a little bit to simulate missing a published message.

So are you still unable to receive retained messages on subscribtion? Because I'm unable to make it work as well for some reason and I wonder if we have the same problem. I'm testing it on Android tho.

Yes, I have the same problem on android too. I don't know how to run the dart test straight on android, but the same behavior occurs in my flutter app. Mainly the reason I returned to this issue, because I plan to support the android version.

Damn that makes my job a lot harder now smiling_face_with_tear. If I make it work I'll let you know. Thanks for the quick response!

Edit: I've just tested the same setup on 3.1/3.1.1 mqtt_client (https://github.com/shamblett/mqtt_client) lib and it works as expected. So for you, if you don't need MQTT5 specifically I would recommend using the old client.

I did that, and remained on v311 till now. But now I'm hoping to use version 5 features such as the response topics in Properties. It's not a big deal to put a response topic into the payload itself, but I'm hoping on this being resolved at some point for the rest of the features.

@shamblett shamblett reopened this May 21, 2022
@Simonovsky
Copy link

I've played around with the Subscription options and found out that if you specify all the options like this:

    MqttSubscriptionOption mqttSubscriptionOption = MqttSubscriptionOption();
    mqttSubscriptionOption.maximumQos = MqttQos.exactlyOnce;
    mqttSubscriptionOption.retainAsPublished = true;
    mqttSubscriptionOption.noLocal = false;
    mqttSubscriptionOption.retainHandling = MqttRetainHandling.sendRetained;

    MqttSubscriptionTopic mqttSubscriptionTopic = MqttSubscriptionTopic(topic);
    MqttSubscription mqttSubscription = MqttSubscription(mqttSubscriptionTopic, mqttSubscriptionOption);
    
    await _client!.subscribeWithSubscription(mqttSubscription);

it will eventually come but it takes like 20 seconds for some reason...

@shamblett
Copy link
Owner

Try your received_retained_message test code against Hives broker at broker.hivemq.com, I'm finding the tests reverse i.e. with waiting is OK now and without waiting fails, although I have changed your test code slightly.

@sadilekivan
Copy link
Author

sadilekivan commented May 23, 2022

Try your received_retained_message test code against Hives broker at broker.hivemq.com, I'm finding the tests reverse i.e. with waiting is OK now and without waiting fails, although I have changed your test code slightly.

I can't get consistent results. It seems that one second is a low timeout for the broker. I set the broker sleep to 5 seconds and can confirm both tests working. I'll rewrite the test for a future so its faster if the broker responds sooner.

Edit: broker.hivemq.com seems to work on both tests now

@Simonovsky
Copy link

Simonovsky commented May 24, 2022

For me, both the tests and my implementation as well are unreliable once it passes /retained message comes, once it doesn't.
Is this an issue of the broker where the response takes too long?
Why it doesn't take so long on other clients (Mqtt box / HiveMq online client)?
Is Mqtt5 production ready or not ? I have truble finding fully implemented clients/brokers working with it...
I'm not trying to be offensive I just need some answers so that I can decide if I can use it in production or use 3.1.1 for now and wait for the Mqtt 5 and then make a switch to it...

@sadilekivan
Copy link
Author

For me, both the tests and my implementation as well are unreliable once it passes /retained message comes, once it doesn't. Is this an issue of the broker where the response takes too long? Why it doesn't take so long on other clients (Mqtt box / HiveMq online client)? Is Mqtt5 production ready or not ? I have truble finding fully implemented clients/brokers working with it... I'm not trying to be offensive I just need some answers so that I can decide if I can use it in production or use 3.1.1 for now and wait for the Mqtt 5 and then make a switch to it...

Same exact feeling. Did you check the new tests I made? There's 4 now and seem to be showing reproducible results now that I added a longer 5 second timeout.

@shamblett
Copy link
Owner

In theory shouldn't you wait until you have at least received the subscribe ack before you expect the retained message? Until you have received that you don't know if the subscription has been successful, from the Hive MQ docs -

A retained message is a normal MQTT message with the retained flag set to true. The broker stores the last retained message and the corresponding QoS for that topic. Each client that subscribes to a topic pattern that matches the topic of the retained message receives the retained message immediately after they subscribe. The broker stores only one retained message per topic.

So you should be able to connect a client, publish a retained message to a topic, disconnect the client, connect again and subscribe to the topic, when the subscribe ack is received you should then get the retained message. Bit more from the spec -

Retained messages help newly-subscribed clients get a status update immediately after they subscribe to a topic. The retained message eliminates the wait for the publishing clients to send the next update.

In other words, a retained message on a topic is the last known good value. The retained message doesn’t have to be the last value, but it must be the last message with the retained flag set to true.

Are we sure the tests are actually doing this?

@Simonovsky
Copy link

For me, both the tests and my implementation as well are unreliable once it passes /retained message comes, once it doesn't. Is this an issue of the broker where the response takes too long? Why it doesn't take so long on other clients (Mqtt box / HiveMq online client)? Is Mqtt5 production ready or not ? I have truble finding fully implemented clients/brokers working with it... I'm not trying to be offensive I just need some answers so that I can decide if I can use it in production or use 3.1.1 for now and wait for the Mqtt 5 and then make a switch to it...

Same exact feeling. Did you check the new tests I made? There's 4 now and seem to be showing reproducible results now that I added a longer 5 second timeout.

Yes I did check the test and played with timeouts and delay but I wasn't able to get it to consistently pass all tests.

@sadilekivan
Copy link
Author

In theory shouldn't you wait until you have at least received the subscribe ack before you expect the retained message? Until you have received that you don't know if the subscription has been successful, from the Hive MQ docs -

A retained message is a normal MQTT message with the retained flag set to true. The broker stores the last retained message and the corresponding QoS for that topic. Each client that subscribes to a topic pattern that matches the topic of the retained message receives the retained message immediately after they subscribe. The broker stores only one retained message per topic.

So you should be able to connect a client, publish a retained message to a topic, disconnect the client, connect again and subscribe to the topic, when the subscribe ack is received you should then get the retained message. Bit more from the spec -

Retained messages help newly-subscribed clients get a status update immediately after they subscribe to a topic. The retained message eliminates the wait for the publishing clients to send the next update.

In other words, a retained message on a topic is the last known good value. The retained message doesn’t have to be the last value, but it must be the last message with the retained flag set to true.

Are we sure the tests are actually doing this?

I assumed the client handles ack on subscribe, I did wonder how it could do so synchronously. If not then I agree. This is surely something I've got to cover in the test.

@Simonovsky
Copy link

Hi, thank you for responding and trying to resolve this with us :)

In theory shouldn't you wait until you have at least received the subscribe ack before you expect the retained message? Until you have received that you don't know if the subscription has been successful, from the Hive MQ docs -

Due to my use-case, I'm just listening for all messages coming to the topic at all times. But yes it I get subscribe ack message but the retained message is not coming on hivemq broker.
On empx broker the retained message will come but it will come 30 seconds after subscribe ack which is too much for me and I don't really understand the delay.
On my local broker which I just tested it works just fine and returns the retained message in a few milliseconds. I wonder if it may be a broker setup problem (?) .

So you should be able to connect a client, publish a retained message to a topic, disconnect the client, connect again and subscribe to the topic, when the subscribe ack is received you should then get the retained message.

Yes I think I should be able to get the retained message after suback but on this client it simply wont come on hivemq broker or comes 30 sec after suback on empx broker. I expect it to behave more or less the same as on 3.1.1 is there something incorrect in this expectation? I've tried the setup on 3.1.1 by just switching the client to 3.1.1 and it worked flawlessly so I think something fishy is going on 😄

@sadilekivan
Copy link
Author

sadilekivan commented May 24, 2022

On my local broker which I just tested it works just fine and returns the retained message in a few milliseconds. I wonder if it may be a broker setup problem (?) .

What broker are you using? I'm pretty much interested in only hosting my own on a raspberry pi, but using mosquitto has not worked at all for me (on version 5).

@Simonovsky
Copy link

Simonovsky commented May 24, 2022

On my local broker which I just tested it works just fine and returns the retained message in a few milliseconds. I wonder if it may be a broker setup problem (?) .

What broker are you using? I'm pretty much interested in only hosting my own on a raspberry pi, but using mosquitto has not worked at all for me (on version 5).

I use mosquitto integration in my homeassistant instance (running on intel NUC) mainly for zigbee2mqtt but it proven to be very usefull as a test tool for my job as well. In the past I ran mosquitto on RPI and it worked just fine although you know, I wouldn't recommend opening it to the outside of the local network.

@Simonovsky
Copy link

Simonovsky commented May 24, 2022

@shamblett I've just noticed another weird thing. The time to receive any message seems to be inconsistent as well. It takes from less than a second to around 10 seconds even on my local broker. It wasn't like that in Mqtt 3.1.1 . Just wanted to share this as it might help you debug...

@Simonovsky
Copy link

Simonovsky commented May 24, 2022

Sorry for the spam but I think I've found a usable solution for now.

This seems to work flawlessly as well for some reason it all works over WS. I still don't understand why this could happen ...

This is also why it worked fine on my locally hosted broker when I tested it I've used WebSockets when I've realized this i tried using mqtt/tcp and it doesn't reliably work on my local broker .

@sadilekivan I've edited your tests like this to support WebSockets could you try it if it works for you as well?

import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';
import 'package:typed_data/typed_buffers.dart';
import 'package:test/test.dart';

Future<int> main() async {
  recieveRetainedMessageTest(String broker_url, int port, bool useWs, int retainDelay) async {
    final client = MqttServerClient(broker_url, "");
    client.useWebSocket = useWs;
    client.port = port;

    String topic = "iWantToBeRetained";
    var sendData = Uint8Buffer();
    sendData.add(255); //Super unique data
    Uint8Buffer? recvData;

    await client.connect();
    print("Publishing Message: $sendData at $topic");
    client.publishMessage(topic, MqttQos.exactlyOnce, sendData, retain: true);

    print("Waiting for $retainDelay seconds before subscribing");
    await MqttUtilities.asyncSleep(retainDelay);

    client.subscribe(topic, MqttQos.exactlyOnce);

    Future recieveRetained() {
      var completer = Completer();
      client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
        final data = (c[0].payload as MqttPublishMessage).payload.message!;
        print("Recieved Message: ${data} at ${c[0].topic}");
        if (c[0].topic == topic) {
          completer.complete(data);
        }
      });
      return completer.future;
    }

    recvData = await recieveRetained().timeout(Duration(seconds: 5), onTimeout: () => print("Broker timeout, nothing was recieved!"));

    expect(recvData, sendData);

    client.unsubscribeStringTopic(topic);
    client.disconnect();
  }
 
  //WS
  test("[broker.emqx.io] websocket right after its published", () => recieveRetainedMessageTest("ws://broker.emqx.io/mqtt", 8083, true, 0));

  test("[broker.emqx.io] websocket after a delay", () => recieveRetainedMessageTest("ws://broker.emqx.io/mqtt", 8083, true, 5));

  test("[broker.hivemq.com] websocket right after its published", () => recieveRetainedMessageTest("ws://broker.hivemq.com/mqtt", 8000, true, 0));

  test("[broker.hivemq.com] websocket after a delay", () => recieveRetainedMessageTest("ws://broker.hivemq.com/mqtt", 8000, true, 5));

  //TCP

  test("[broker.emqx.io] tcp right after its published", () => recieveRetainedMessageTest("broker.emqx.io", 1883, false, 0));

  test("[broker.emqx.io] tcp after a delay", () => recieveRetainedMessageTest("broker.emqx.io", 1883, false, 5));

  test("[broker.hivemq.com] tcp right after its published", () => recieveRetainedMessageTest("broker.hivemq.com", 1883, false, 0));

  test("[broker.hivemq.com] tcp after a delay", () => recieveRetainedMessageTest("broker.hivemq.com", 1883, false, 5));


  return 0;
}

@sadilekivan
Copy link
Author

sadilekivan commented May 24, 2022

@Simonovsky
Screenshot from 2022-05-24 16-43-07
Not sure what to make of the different broker, different result.

Edit: Almost already forgot, nothing yet until the test confirms the sub ack.

@Simonovsky
Copy link

@Simonovsky Screenshot from 2022-05-24 16-43-07 Not sure what to make of the different broker, different result.

Thank you! Yes this means that all WebSockets worked. For me, it behaves differently each time I run it tcp connection seems to be super inconsitent.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants