Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot get retained message from other node in cluster mode #507

Closed
edwinAtWiz opened this issue Sep 14, 2017 · 25 comments
Closed

Cannot get retained message from other node in cluster mode #507

edwinAtWiz opened this issue Sep 14, 2017 · 25 comments

Comments

@edwinAtWiz
Copy link

Environment

  • VerneMQ Version:
  • OS: ubuntu 16.04
  • Erlang/OTP version (if building from source): 20

Expected behavior

All retained message should be stored in all nodes within the cluster

Actual behaviour

I have setup the cluster with 2 nodes in AWS ec2 instance, each node implement the webhook plugin in NodeJS to retain the message and publish to other topic like:

original topic: test/1, test/2 ... test/1000
retained to: save/1, save/2 ... save/1000

On the other hand, I have setup 2 nodes C and D to do publish and subscribe by using MQTT.js, the flow will be like:

  • VerneMQ Cluster with Node A and B
  • Node C and D publish and subscribe to Node A respectively
  • Node C publish 1000 messages and Node D received those properly
  • Node D subscribe again to Node A with save/#, 1000 messages received
  • Node D subscribe to Node B, part of the message lost (not consistent, sometimes > 50%, sometimes <10% )

noted: when i publish the message using mosquitto_pub to Node A with slow insertion rate (~ 2/3 req. per second), the total retained message lost in Node B will be much less (test 2 times, publish 2000 messages, < 10 messages lost, or no lost)

@ioolkos
Copy link
Contributor

ioolkos commented Sep 14, 2017

@edwinAtWiz thanks for reporting!

So the situation is this: We have a 2-node VerneMQ cluster (A, B).
1000 clients send to 1000 topics (save/1 etc). The publishers are always connected to node A.
Now:

  • if the consumers are connected to node A too, they all get their corresponding retain message.
  • if the consumers are connected to node B, they do not all get their corresponding retain message.

Yes, this seems weird. What could possibly interfere here is queue migration happening, ie node A moving 1000 live queues to node B at the same moment.
@dergraf @larshesel opinions on that?

@edwinAtWiz
Copy link
Author

edwinAtWiz commented Sep 14, 2017

@ioolkos Kind of. In Node C, i just write a simple for-loop to publish 1000 messages (test/1, test/2 ... test/1000) to Node A. In Node A, the webhook plugin also added the MQTT.js module to publish the message locally again as retained (save/1, save/2 ... save/1000).

In Node A, all retained messages could be received by subscribing the save/# topic, but i cannot get all messages from Node B.

Also I have tested with directly publish the retained message from Node C to Node A with topic save/1, but the result is same as the test mentioned above.

The message lost situation could be mitigated if i set the request limit. (e.g. lower the req./s)

@ioolkos
Copy link
Contributor

ioolkos commented Sep 14, 2017

Ok, thanks, so this excludes that the re-publishing of the topic in the webhook plugin is involved in this problem.

@ioolkos
Copy link
Contributor

ioolkos commented Sep 15, 2017

I'm going to see if I can find time to reproduce/test this today. Anything of interest in the logs btw?

@edwinAtWiz
Copy link
Author

@ioolkos Thanks for your help!

I confirm that there is no error and crash before / after the data insertion, and nothing is append to the console file after the insertion action, The following file contains all info when the cluster is started and connected.

consoleDump.txt

@ioolkos
Copy link
Contributor

ioolkos commented Sep 15, 2017

One additional questions: what is the client ID structure? are the client IDs always the same or random? (are they the same for the subscribers on node A and node B?)

@dergraf
Copy link
Contributor

dergraf commented Sep 17, 2017

Maybe related... we use a write cache for retained messages.. If you publish a retained message it would take at least 1 second until this retained message is committed to our clustered metadata storage. An unfortunate side effect of this load-prevention mechanism is that within "this" second a new subscriber on a different node will miss the retained message.

@edwinAtWiz
Copy link
Author

@ioolkos The client_id will be changed every time when I restarted the node script in Node C and D. In Node A and B, the client_id set will be the same.

@dergraf Anything i can do to tackle this? Many thanks for the help.

@ioolkos
Copy link
Contributor

ioolkos commented Sep 18, 2017

I have not yet found the reason for this, but I guess I'm not testing exactly the same scenario you are.
(I haven't reproduced the malfunction so far, in other words).
I'll look further into this. I think it must have something todo with the save/# subscription, that is the synchronisation of those subscriptions between the 2 nodes, possibly in combination with that @dergraf said.

@edwinAtWiz
Copy link
Author

@ioolkos You mean your testing setup could not reproduce the issue? May I know the testing setup you have for this?

@ioolkos
Copy link
Contributor

ioolkos commented Sep 18, 2017

I tested with singled out subscriptions (to 'save/clientid'), not to 'save/#' as you have. But I'll test that next.

@ioolkos
Copy link
Contributor

ioolkos commented Sep 18, 2017

I did a couple of very basic tests, publishing from 1000 clients, and subscribing from 1 consumer to 'testtopic/#' with mosquitto_sub (and then counting the nr. of lines of it's output). I get the same number of retained messages on both nodes A and B.
This doesn't invalidate your findings, of course. My test is on my laptop with a local cluster, that is without any firewall/network stuff.
@edwinAtWiz Could you look into what vmq-admin metrics show tells you on both nodes? Do the numbers make sense? do you see dropped messages there?

@edwinAtWiz
Copy link
Author

Before data insertion:

Node B:
gauge.queue_processes = 2
gauge.retain_memory = 21184
gauge.retain_messages = 0
gauge.router_memory = 14064
gauge.router_subscriptions = 1
gauge.system_utilization_scheduler_2 = 0
gauge.system_utilization_scheduler_1 = 100
gauge.system_utilization = 50
gauge.vm_memory_ets = 4231552
gauge.vm_memory_code = 11413095
gauge.vm_memory_binary = 3409424
gauge.vm_memory_atom_used = 485945
gauge.vm_memory_atom = 504433
gauge.vm_memory_system = 27420624
gauge.vm_memory_processes_used = 7938232
gauge.vm_memory_processes = 7940400
gauge.vm_memory_total = 35361024
counter.system_wallclock = 143637
counter.system_runtime = 1680
gauge.system_run_queue = 0
counter.system_reductions = 24099100
counter.system_io_out = 959465
counter.system_io_in = 9014841
counter.system_words_reclaimed_by_gc = 33113290
counter.system_gc_count = 5212
counter.system_exact_reductions = 23804772
counter.system_context_switches = 47729
counter.socket_open = 2
counter.socket_close = 0
counter.socket_error = 0
counter.bytes_received = 107
counter.bytes_sent = 13
counter.mqtt_connect_received = 2
counter.mqtt_publish_received = 0
counter.mqtt_puback_received = 0
counter.mqtt_pubrec_received = 0
counter.mqtt_pubrel_received = 0
counter.mqtt_pubcomp_received = 0
counter.mqtt_subscribe_received = 1
counter.mqtt_unsubscribe_received = 0
counter.mqtt_pingreq_received = 0
counter.mqtt_disconnect_received = 0
counter.mqtt_connack_accepted_sent = 2
counter.mqtt_connack_unacceptable_protocol_sent = 0
counter.mqtt_connack_identifier_rejected_sent = 0
counter.mqtt_connack_server_unavailable_sent = 0
counter.mqtt_connack_bad_credentials_sent = 0
counter.mqtt_connack_not_authorized_sent = 0
counter.mqtt_publish_sent = 0
counter.mqtt_puback_sent = 0
counter.mqtt_pubrec_sent = 0
counter.mqtt_pubrel_sent = 0
counter.mqtt_pubcomp_sent = 0
counter.mqtt_suback_sent = 1
counter.mqtt_unsuback_sent = 0
counter.mqtt_pingresp_sent = 0
counter.mqtt_publish_auth_error = 0
counter.mqtt_subscribe_auth_error = 0
counter.mqtt_invalid_msg_size_error = 0
counter.mqtt_puback_invalid_error = 0
counter.mqtt_pubrec_invalid_error = 0
counter.mqtt_pubcomp_invalid_error = 0
counter.mqtt_publish_error = 0
counter.mqtt_subscribe_error = 0
counter.mqtt_unsubscribe_error = 0
counter.queue_setup = 2
counter.queue_teardown = 0
counter.queue_message_drop = 0
counter.queue_message_unhandled = 0
counter.queue_message_in = 0
counter.queue_message_out = 0
counter.client_expired = 0
counter.cluster_bytes_received = 0
counter.cluster_bytes_sent = 0
counter.cluster_bytes_dropped = 0

Node A will be more or less the same but gauge.router_subscriptions is 0

After the insertion:
Node B:
gauge.queue_processes = 2
gauge.retain_memory = 189456
gauge.retain_messages = 604
gauge.router_memory = 14064
gauge.router_subscriptions = 1
gauge.system_utilization_scheduler_2 = 0
gauge.system_utilization_scheduler_1 = 100
gauge.system_utilization = 50
gauge.vm_memory_ets = 4405800
gauge.vm_memory_code = 11413095
gauge.vm_memory_binary = 3605080
gauge.vm_memory_atom_used = 485982
gauge.vm_memory_atom = 504433
gauge.vm_memory_system = 27789232
gauge.vm_memory_processes_used = 8650528
gauge.vm_memory_processes = 8650608
gauge.vm_memory_total = 36439840
counter.system_wallclock = 807813
counter.system_runtime = 3640
gauge.system_run_queue = 0
counter.system_reductions = 28093605
counter.system_io_out = 1433070
counter.system_io_in = 9721934
counter.system_words_reclaimed_by_gc = 42809391
counter.system_gc_count = 20044
counter.system_exact_reductions = 27799277
counter.system_context_switches = 80411
counter.socket_open = 3
counter.socket_close = 1
counter.socket_error = 0
counter.bytes_received = 211
counter.bytes_sent = 383200
counter.mqtt_connect_received = 3
counter.mqtt_publish_received = 0
counter.mqtt_puback_received = 0
counter.mqtt_pubrec_received = 0
counter.mqtt_pubrel_received = 0
counter.mqtt_pubcomp_received = 0
counter.mqtt_subscribe_received = 2
counter.mqtt_unsubscribe_received = 0
counter.mqtt_pingreq_received = 21
counter.mqtt_disconnect_received = 0
counter.mqtt_connack_accepted_sent = 3
counter.mqtt_connack_unacceptable_protocol_sent = 0
counter.mqtt_connack_identifier_rejected_sent = 0
counter.mqtt_connack_server_unavailable_sent = 0
counter.mqtt_connack_bad_credentials_sent = 0
counter.mqtt_connack_not_authorized_sent = 0
counter.mqtt_publish_sent = 1604
counter.mqtt_puback_sent = 0
counter.mqtt_pubrec_sent = 0
counter.mqtt_pubrel_sent = 0
counter.mqtt_pubcomp_sent = 0
counter.mqtt_suback_sent = 2
counter.mqtt_unsuback_sent = 0
counter.mqtt_pingresp_sent = 21
counter.mqtt_publish_auth_error = 0
counter.mqtt_subscribe_auth_error = 0
counter.mqtt_invalid_msg_size_error = 0
counter.mqtt_puback_invalid_error = 0
counter.mqtt_pubrec_invalid_error = 0
counter.mqtt_pubcomp_invalid_error = 0
counter.mqtt_publish_error = 0
counter.mqtt_subscribe_error = 0
counter.mqtt_unsubscribe_error = 0
counter.queue_setup = 3
counter.queue_teardown = 1
counter.queue_message_drop = 0
counter.queue_message_unhandled = 0
counter.queue_message_in = 1604
counter.queue_message_out = 1604
counter.client_expired = 0
counter.cluster_bytes_received = 334932
counter.cluster_bytes_sent = 0
counter.cluster_bytes_dropped = 0

Node A:
gauge.queue_processes = 2
gauge.retain_memory = 338664
gauge.retain_messages = 1000
gauge.router_memory = 14024
gauge.router_subscriptions = 0
gauge.system_utilization_scheduler_2 = 0
gauge.system_utilization_scheduler_1 = 100
gauge.system_utilization = 50
gauge.vm_memory_ets = 4597872
gauge.vm_memory_code = 11916083
gauge.vm_memory_binary = 4123040
gauge.vm_memory_atom_used = 501938
gauge.vm_memory_atom = 512625
gauge.vm_memory_system = 29078480
gauge.vm_memory_processes_used = 8068672
gauge.vm_memory_processes = 8070992
gauge.vm_memory_total = 37149472
counter.system_wallclock = 837882
counter.system_runtime = 4610
gauge.system_run_queue = 0
counter.system_reductions = 44242897
counter.system_io_out = 2849640
counter.system_io_in = 10384940
counter.system_words_reclaimed_by_gc = 64816627
counter.system_gc_count = 33596
counter.system_exact_reductions = 43952193
counter.system_context_switches = 110849
counter.socket_open = 3
counter.socket_close = 1
counter.socket_error = 0
counter.bytes_received = 477954
counter.bytes_sent = 54
counter.mqtt_connect_received = 3
counter.mqtt_publish_received = 2000
counter.mqtt_puback_received = 0
counter.mqtt_pubrec_received = 0
counter.mqtt_pubrel_received = 0
counter.mqtt_pubcomp_received = 0
counter.mqtt_subscribe_received = 0
counter.mqtt_unsubscribe_received = 0
counter.mqtt_pingreq_received = 21
counter.mqtt_disconnect_received = 0
counter.mqtt_connack_accepted_sent = 3
counter.mqtt_connack_unacceptable_protocol_sent = 0
counter.mqtt_connack_identifier_rejected_sent = 0
counter.mqtt_connack_server_unavailable_sent = 0
counter.mqtt_connack_bad_credentials_sent = 0
counter.mqtt_connack_not_authorized_sent = 0
counter.mqtt_publish_sent = 0
counter.mqtt_puback_sent = 0
counter.mqtt_pubrec_sent = 0
counter.mqtt_pubrel_sent = 0
counter.mqtt_pubcomp_sent = 0
counter.mqtt_suback_sent = 0
counter.mqtt_unsuback_sent = 0
counter.mqtt_pingresp_sent = 21
counter.mqtt_publish_auth_error = 0
counter.mqtt_subscribe_auth_error = 0
counter.mqtt_invalid_msg_size_error = 0
counter.mqtt_puback_invalid_error = 0
counter.mqtt_pubrec_invalid_error = 0
counter.mqtt_pubcomp_invalid_error = 0
counter.mqtt_publish_error = 0
counter.mqtt_subscribe_error = 0
counter.mqtt_unsubscribe_error = 0
counter.queue_setup = 3
counter.queue_teardown = 1
counter.queue_message_drop = 0
counter.queue_message_unhandled = 0
counter.queue_message_in = 0
counter.queue_message_out = 0
counter.client_expired = 0
counter.cluster_bytes_received = 0
counter.cluster_bytes_sent = 323499
counter.cluster_bytes_dropped = 0

@edwinAtWiz
Copy link
Author

edwinAtWiz commented Sep 18, 2017

@ioolkos the above trial is pulishing 1000 messages to Node A, only 604 messages received in Node B.
Also I used only 1 publisher to publish messages with for-loop like this:

for(var i = 0; i < 1000; i++) {
var msg = '......';
mqttCli.publish('test/'+i, msg);
}

@ioolkos
Copy link
Contributor

ioolkos commented Sep 18, 2017

Hm, thanks!
I see the gauge.retain_messages = 604 in node B. It's a gauge, not a counter, so it shows the current value. The corresponding gauge.retain_memory = 189456 makes sense proportionally to node A. This seems to indicate that the retain msg are actually not yet fully replicated to node B.

I wonder on the other hand why node B thinks it has published 1604 messages though. counter.mqtt_publish_sent = 1604
is this 1000 + 604...?

@edwinAtWiz
Copy link
Author

@ioolkos I have the same thinking that 1604 = 1000 + 604 .. but I have no idea what those metrics represent for.

I tried it again, after the message published, in Node B the counter.queue_message_in and counter.queue_message_out are 1000, but when once I subscribed to save/#, the value changed from 1000 to 621, and next is 2242 and so on.

@ioolkos
Copy link
Contributor

ioolkos commented Sep 18, 2017

Just to make sure. What happens when you use a completely fresh topic? (instead of 'save/#')

@edwinAtWiz
Copy link
Author

edwinAtWiz commented Sep 19, 2017

@ioolkos what do you mean of the fresh topic? you mean other keywords?

@ioolkos
Copy link
Contributor

ioolkos commented Sep 19, 2017

@edwinAtWiz yes, I meant topics you've not used before, so that they get the retained msg for the first time.

@edwinAtWiz
Copy link
Author

@ioolkos I change the topic from save/1, save/2 ... to msg/stor/1 , msg/stor/2 ... etc. The result is just the same.

@larshesel
Copy link
Contributor

I can reproduce this with two cleanly built and clustered VerneMQ instances. Only thing I did was to allow_anonymous=on and then published to node A:

for f in `seq 1 1000` ; do mosquitto_pub -i "pubber" -p 10053 -t "root/$f" -q 0 -r -m "hello$f"; done

I then checked the retain count on node A:

$ vmq-admin metrics show | grep retain
gauge.retain_memory = 295448
gauge.retain_messages = 1000

And some seconds later on node B:

$ vmq-admin metrics show | grep retain
gauge.retain_memory = 218696
gauge.retain_messages = 973

I'll look into it.

@larshesel
Copy link
Contributor

@edwinAtWiz I've created a fix for this issue, see #509. It would be much appreciated if you could test it out and see if this fixes the issue you're seeing.

@larshesel
Copy link
Contributor

The fix was merged to master. Could you test if it works for you?

@edwinAtWiz
Copy link
Author

@larshesel I am still testing with that, currently it works as expect! Many thanks for your help.

@ioolkos
Copy link
Contributor

ioolkos commented Sep 21, 2017

Good to hear! Thanks for finding this, @edwinAtWiz and @larshesel!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants