Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Puzzle: Sometimes Retained messages seem to be skipped #1633

Open
ioolkos opened this issue Sep 16, 2020 · 55 comments
Open

Puzzle: Sometimes Retained messages seem to be skipped #1633

ioolkos opened this issue Sep 16, 2020 · 55 comments

Comments

@ioolkos
Copy link
Contributor

ioolkos commented Sep 16, 2020

A user has sometimes seen Retained messages "skipped". The cluster will deliver the message on the normal subscription path, but for that topic, the cluster will still show the second-last retained message.

Looking for hypothesis/code path for this "puzzle". As the Retained message is missing on all cluster nodes, this doesn't seem to be related to synchronization (but it's not excluded).

Look into: Retain Cache, Retain SRV, Overload, State Synchronisation (SWC)

@ioolkos
Copy link
Contributor Author

ioolkos commented Sep 23, 2020

We need to check for message_queue length in the mailbox of the Retain SRV process & and if this is in fact a growing mailbox, explore options to introduce concurrency.

@ioolkos
Copy link
Contributor Author

ioolkos commented Sep 25, 2020

Ok, the above (mailbox overload) is confirmed while disk latency seems okay.
Looks like a retain_srv_sup_sup -> retain_svr_sup -> multiple supervised retain_srv processes might give concurrency. (of course needing a way to distribute the incoming messages over those processes).

@ioolkos
Copy link
Contributor Author

ioolkos commented Sep 26, 2020

Suppose we phash2 on the topic name to map this to a certain retain_srv.
The question is what a common overload case is though: a) retains on a lot of different topics, or b) high speed updates on the same topic

@elduds
Copy link
Contributor

elduds commented Sep 29, 2020

I have seen this retain issue on a topic that is rarely updated (2-3 times per day), though the broker cluster is busy processing 10-15kpps. Difficult to detect on topics that are updated frequently so it may be happening on others.

@coder4803
Copy link

coder4803 commented Oct 14, 2020

We have been noticing these "skips" during message bursts. Last example was that there were three messages (qos 1) published by the same client to the same topic (in very short time window, like few milliseconds). Messages did go through in the order they were published but when the topic was subscribed afterwards, we got the second/middle one (not the third/last as expected).

I am guessing that the root cause could be that multiple threads can handle messages published by a single client (to a single topic) and retaining those messages may finish/happen in different (unexpected) order.

We also have a lot of retained topics/messages (like ~100k). Some are changing all the time (many times per second) and others only once per day or so.

@ioolkos
Copy link
Contributor Author

ioolkos commented Oct 14, 2020

@coder4803
thanks, jep. On the cause: I think this is an overload of the Retain server at the incoming side. Basically, filling up the backlog of that process. This is then stored via the metadata system, which also distributes the retained messages to all the other cluster nodes. (This replication is eventually consistent, in any case)

@elduds
Copy link
Contributor

elduds commented Dec 2, 2020

Regardless of the fix, something that would be useful is some additional prometheus metrics around what's happening with the retain server. We are already graphing #msgs and bytes, however this doesn't give any insights into an overload situation.

It would be great to see CRUD metrics as well as the size of the backlog for the retain server to aid troubleshooting/alerting.

cheers

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 3, 2020

@elduds @coder4803 there's metrics (timings) for the metadata layer, in case you use SWC. Which only mirrors the storage level performance of those messages, and it's for all the metadata in general.
So, not really useful here, as the backlog happens in the process mailbox, that is before those messages even hit the disk.

I think the long-term solution to this is integration into OpenTelemetry (already working on it). This will improve the observability of the Retain SRV. It will obviously not resolve the overload issues, still need to find a solution/refactoring for those.

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 7, 2020

I tested the Retain server a little bit over the weekend. I only did this with a couple hundred topics but high update frequency.
I can trigger overloads, starting from 200k retain updates per second or so. Depending on what might be considered sane CPU load, maybe even lower.
But the point is: it seems CPU bound. That's kind of obvious. OTOH, it doesn't seem reported use cases above have seen messages skipped just due to plain CPU max out.
@elduds @coder4803 maybe you can comment on this aspect?

@ico77
Copy link

ico77 commented Dec 7, 2020

Hi everybody,
I also observed skipped retained messages when the update frequency for topics is high. Basically, I was sending messages with the retain flag to multiple topics and then shortly afterwards empty messages to these same topics (these should delete the retained messages). After a while I noticed that not all retained messages were deleted, meaning, some of the zero payload messages were not processed.

I discovered that the reason for this is a race condition in vmq_retain_srv. What basically happens is this:

  1. The function vmq_retain_srv::insert receives the first retained message and stores it in an ets table
  2. The periodic persist job in vmq_retain_srv picks up that message and hands it over to the metadata plugin for further processing.
  3. The second message (the one with no payload) is received by vmq_retain_srv::delete and also stored in the ets table.
  4. Now the callback for the first retained message gets invoked by the metadata plugin, the callback function is handle_info({updated, }, State)). This function again inserts the first retained messages in the ets table and OVERWRITES the second retained message before the periodic persist job has a chance of processing the second message.

Hope this helps.

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 7, 2020

@ico77 thanks for your analysis. I've seen a couple race conditions between memory and stores, so I'm gonna certainly check on your hypothesis. I was under the impression that the 1 second persister process was explicitly done to adress issues like that but we'll see.
And on top of that, this could explain that it happens at higher insert frequencies but not necessarily as a consequence of maxed CPU.
I guess you haven't verified/falsified by touching the code?

EDIT: another question is whether this is a insert/delete race, or also a insert/insert race.

@ico77
Copy link

ico77 commented Dec 7, 2020

No, sadly I haven't proved it by modifying code. I came to the conclusion described above by activating tracing on vmq_retain_srv:insert, vmq_retain_srv:delete and vmq_retain_srv:handle_info function calls. The traces revealed that some retained messages were processed twice and some were never processed.

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 7, 2020

@ico77 okay, well that seems to validate your analysis.

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 7, 2020

@ico77 one additional point: did you see the race condition for single node, or cluster only, or both cases?

@ico77
Copy link

ico77 commented Dec 7, 2020

I was using a single node while tracing, didn't try it with a cluster.

@ico77
Copy link

ico77 commented Dec 7, 2020

I managed to find the file where I saved an excerpt from the trace output. This trace was made during high load (probably around 10K msg/s). For each randomly generated retain message a random topic was generated, and then, an empty payload message was generated 1 second later and sent to the same topic as the original message.

There is a chance that the same topic was generated for multiple retained messages, in which case the trace output can not, with 100% certainty, lead to the conclusion that I described. But the chance of that happening was ~1:60 000 000 during the test, so I guess the trace can be used to point us in the right direction.

I saw a lot of output like this in the trace: 1 insert, 1 delete and then the 2 callbacks. The 2 callbacks are interesting, they show that handle_info(updated) was called twice and in the second call the retained message was updated with the same value which should be impossible with the test data I generated. The second callback should have been a handle_info(deleted).

vmq0_1  | (<0.1009.0>) call vmq_retain_srv:insert([],[<<"0">>,<<"1">>,<<"1">>,<<"1">>,<<"3">>,<<"3">>,<<"1">>,<<"0">>,<<"3">>,<<"2">>,<<"1">>,<<"3">>,<<"3">>,
vmq0_1  |  <<"testMqttProducerId-c26996a5-ae7d-4af5-b28b-92e03ccf0a25">>],{retain_msg,<<1,1,0,0>>,#{},undefined})

vmq0_1  | (<0.1009.0>) call vmq_retain_srv:delete([],[<<"0">>,<<"1">>,<<"1">>,<<"1">>,<<"3">>,<<"3">>,<<"1">>,<<"0">>,<<"3">>,<<"2">>,<<"1">>,<<"3">>,<<"3">>,
vmq0_1  |  <<"testMqttProducerId-c26996a5-ae7d-4af5-b28b-92e03ccf0a25">>])

vmq0_1  | (<0.392.0>) call vmq_retain_srv:handle_info({updated,{vmq,retain},
vmq0_1  |          {[],
vmq0_1  |           [<<"0">>,<<"1">>,<<"1">>,<<"1">>,<<"3">>,<<"3">>,<<"1">>,<<"0">>,<<"3">>,<<"2">>,<<"1">>,<<"3">>,<<"3">>,
vmq0_1  |            <<"testMqttProducerId-c26996a5-ae7d-4af5-b28b-92e03ccf0a25">>]},
vmq0_1  |          undefined,
vmq0_1  |          {retain_msg,<<1,1,0,0>>,#{},undefined}},{state})

vmq0_1  | (<0.392.0>) call vmq_retain_srv:handle_info({updated,{vmq,retain},
vmq0_1  |          {[],
vmq0_1  |           [<<"0">>,<<"1">>,<<"1">>,<<"1">>,<<"3">>,<<"3">>,<<"1">>,<<"0">>,<<"3">>,<<"2">>,<<"1">>,<<"3">>,<<"3">>,
vmq0_1  |            <<"testMqttProducerId-c26996a5-ae7d-4af5-b28b-92e03ccf0a25">>]},
vmq0_1  |          {retain_msg,<<1,1,0,0>>,#{},undefined},
vmq0_1  |          {retain_msg,<<1,1,0,0>>,#{},undefined}},{state})```

@elduds
Copy link
Contributor

elduds commented Dec 8, 2020

The traces revealed that some retained messages were processed twice and some were never processed.

This would explain exactly the behaviour I have seen. It is not as if there is some giant backlog where retained messages eventually update to a correct value from some time in the past albeit minutes or hours late, I am seeing that some topics are never updated with the most recent value.

This is especially noticeable on one topic whose publish pattern is:

  1. Updates payload to { v: 1 }
  2. Wait 2 seconds
  3. Updates payload to { v: 0 }
  4. Wait 8 hours
  5. Repeat

Very often, the retained message on that topic has a payload of v:1 where that should only ever be the case for 2 seconds every 8 hours or so, however until the next cycle it never updates, and it's a crapshoot as to whether the next cycle will reset it back to 0 or not.

Would it be useful for me to get some trace output similar to @ico77 ? How might I do this?

thanks

@ico77
Copy link

ico77 commented Dec 8, 2020

@elduds you can enable tracing like this:

  1. Attach to the vernemq console: sudo vernemq attach
  2. Then in the shell, call the following functions:
dbg:tracer().
dbg:tp(vmq_retain_srv, insert, '_', []).
dbg:tp(vmq_retain_srv, delete, '_', []).
dbg:tp(vmq_retain_srv, handle_info, '_', []).
dbg:p(all, c).

The trace will be visible in the erlang.log files.

When you are finished, type:
dbg:stop_clear().
To exit the shell, press Ctrl + g, then q

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 8, 2020

@ico77 @elduds
Note that Verne has this built in: https://ferd.github.io/recon/recon_trace.html
I'd be cautious to start unlimited tracing with the dbg on a production system with high load... although well, here it's only 3 functions and seems to have worked for @ico77 but probably not under high load?

Something like recon_trace:calls({vmq_retain_srv, '_', fun(_) -> return_trace() end}, 50). might be a start for experimentation. It'll trace all the functions in vmq_retain_srv.
This would trip after 50 calls, so limit the tracer impact.

I doubt you'll directly see the issue happening. You'd have to trace at the point where the second retain gets sets.

@elduds
Copy link
Contributor

elduds commented Dec 8, 2020

OK, these brokers are doing c. 7-10kpps, so it is unlikely I will catch this. May do some experimentation in development if it can be triggered without high load

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 8, 2020

When this was originally implemented, we seemed to have been aware of racing, and accepted double message inserts (of same message) as a tradeoff: fe493dd

The thing is, we need to be subscribed to events from the RETAIN_DB, as those events could come from other nodes (and then they need to be added to the cache too).
Let's see if a quick fix is possible, or whether we have to re-architect this beast entirely.

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 8, 2020

@elduds do you ever do explicit retain 'deletes', that is sending a retain message with payload <<>> (empty payload)?
My guess is no, so this is racing insert/insert too, not only insert/delete.

@elduds
Copy link
Contributor

elduds commented Dec 8, 2020

@elduds do you ever do explicit retain 'deletes', that is sending a retain message with payload <<>> (empty payload)?

No we are never deleting retained messages in this way

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 8, 2020

@elduds thanks for confirmation. Reviewing better approaches currently. And maybe we don't need to cache for local reads at all.

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 10, 2020

This fold is suspicious:

ets:foldl(fun persist/2, ignore, ?RETAIN_UPDATE),

Because even if ets:update_counter is atomic, the foldl iterates through global state, which might interleave.

In other words: this explanation would be wrong then:

%% If a concurrent insert happened during the fold then the

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 11, 2020

The RETAIN_CACHE component is a full RAM based copy of the data. It is not a partial cache, and it is interesting to ask how a cache would work for something like retained messages (where the recently set topics are not necessarily the most accessed).

Ideally, I'd like to have this configurable (number of retained messages to keep indexed in RAM).

Within the retain server, we still need to subscribe to the on_disk store (RETAIN_DB) because that's how events from other cluster nodes come in. We can only change this if we make reads hit the disk store too (currently they don't, as we have a full copy in RAM).
Interesting stuff :)

@ioolkos
Copy link
Contributor Author

ioolkos commented Dec 15, 2020

Currently looking into a couple of options and also different Erlang cache implementations. The tricky thing is to come up with an alternative that is not replacing the existing approach with some other evil :) Most likely anythnig with 'read+update' operations on the cache path is not concurrency safe with ETS.

@coder4803
Copy link

coder4803 commented Jun 10, 2021

Have you made any progress? There is a fixes -tag added in January but the ticket is still open. We are soon running into a situation where we can not tolerate those retained message skips and we have to find some solution to this.

@ioolkos
Copy link
Contributor Author

ioolkos commented Jun 10, 2021

@coder4803 are you with the same use case as @elduds and/or @ico77 ?

@coder4803
Copy link

coder4803 commented Jun 21, 2021

Hello @ioolkos ,

I think my first message describes our problem well (and yes, it is quite similar to the cases described by @elduds and @ico77 although we basically never delete a retained message).

@coder4803 wrote:
"We have been noticing these "skips" during message bursts. Last example was that there were three messages (qos 1) published by the same client to the same topic (in very short time window, like few milliseconds). Messages did go through in the order they were published but when the topic was subscribed afterwards, we got the second/middle one (not the third/last as expected)."

So if the subscription is done before publish, all messages go through as expected. But if the topic is subscribed afterwards, we may get an older message (not the latest). This seems to happen at least with message bursts to the same topic.

@MarcVanOevelen MarcVanOevelen mentioned this issue Aug 11, 2021
13 tasks
@nikolchina
Copy link

We faced a similar issue and can reproduce it with use of a simple setup. Details are as follows.

Environment

  • Erlang/OTP: 24.2
  • VerneMQ: 1.12.4
  • Docker: 20.10.17_ce
  • Standalone and cluster
  • Go based clients use Go 1.19 and paho.mqtt.golang 1.4.1

Test

There are two clients: client A publishes retained messages, client B subscribes on the target topic and receives appropriate messages.

Client A: Publish message [2,2,2,2,2,2]
1:
Client B: Connect to the broker and subscribe
Client B: Get the retained message and verify if it is [2,2,2,2,2,2], stop execution on an error.
Client A: Publish message [1,1,1,1,1,1]
Client B: Receive message [1,1,1,1,1,1]
Client A: Publish message [2,2,2,2,2,2]
Client B: Receive message [2,2,2,2,2,2]
Client B: Disconnect
Goto 1

Actual behavior

The test program fails after some time. A number of iterations before the failure depends on following points.

  • VerneMQ configuration (a cluster or a single node.) In case of the cluster, when clients are connected to different nodes, it will fail faster.
  • the delay between "Client B: Receive message" and "Client B: Disconnect" events. Smaller delays lead to faster failures.

Expected behavior

The program does not fail after several days of execution even if the delay between "Client B: Receive message" and "Client B: Disconnect" events is 100ms or less.

Notes

Sometimes client B cannot receive the first message, so it can be required to restart the program.

The test program is attached.
vmqtest.go.txt

@ioolkos
Copy link
Contributor Author

ioolkos commented Sep 21, 2022

@nikolchina Thanks, what do you mean by "fails after some time". How exactly? (the original issue was fixed so this must be different)


👉 Thank you for supporting VerneMQ: https://github.com/sponsors/vernemq
👉 Using the binary VerneMQ packages commercially (.deb/.rpm/Docker) requires a paid subscription.

@gaxx
Copy link

gaxx commented Sep 21, 2022

I am afraid that the fix did not fully solve the issue. We have been using 1.12.5 and it still seems to occur from time to time.

If multiple messages (usually two in our case) are published into a same topic in a short time window, the latest message is not always retained on the broker (but one of the earlier messages is). Messages are otherwise delivered correctly (online clients will receive them as expected).

I did not test @nikolchina's test application but the described sequence is exactly what should trigger the problem eventually. If you keep the test application running for a while, problem should occur. If not, I believe adding some background load for the broker would help (like 10k/sec retained publishes to different topics).

@mkostyuk
Copy link

I was involved into this investigation a bit, so I can comment.

what do you mean by "fails after some time". How exactly?

We actually see that an unexpected retained message is read by a client on a reconnect. This happens after several or multiple iterations of the test. The exact test is described in @nikolchina comment.

The number of iterations before the failure depends on the broker configuration and the test program.

If there is a cluster with two nodes and involved clients are connected to different nodes, the failure can be observed in 20 seconds or so. If a single node is in use instead, it will be required to wait minutes or hours before the new occurrence.

The timing is important in this case, so it is better to start VerneMQ and the test program on the same machine.

@mths1
Copy link
Contributor

mths1 commented May 27, 2023

I had a quick look into this, as it might hit us as well:

I was not able to reproduce the "local" case, not even with some load. I had the example code running on a VM for over two weeks without any issues.

The distributed case is easily explained. The "publish" subsystem and the "retain" subsystem are independent and currently do not "share" any information. The retain system is eventually consistent. Timing issues are possible. So what happens is this: In a two node scenario with messages A and B the following happens:

Client publishes to Node 1
Subscriber listens to Node 2

publish retained message "A" to Node 1
Message "A" is forwared to Node 2 and marked in the retain subsystem (Node 1!) for distribution
Subscriber Receives message A. Node 2 does not alter the retain subsystem (responsibility of Node 1)
publish retained Message "B" to Node 1
(Timing Issue here) Information about retained message "A" arrives in the retain subsystem of Node 2
Message "B" is forwared to node 2
Message "B" is received by subscriber (note: This information is not used be the retain subsystem, it still has message "A" as retained)
Subscriber unsubscribes and subscribes before the retain subsystem delivers the information that Message "B" is to be retained
Subscriber receives message A
a new subscriber that subscribes just a moment later will get "B", because the retain subsystem lacks a little bit behind.

This can be only observerd in scenarios where a retained message is published at some point in time, and the subscriber subscribes just at the moment right after. This is a general problem of lock-free distributed systems. In a n-node scenario one could create even crazier scenarios. What makes it look strange is that the publish system is faster then the retain system.

So what could be done in this particular scenario:

  • Delay subscribers only another round of the retain server. This would ensure it gets the latest information.
  • The retain server could be tighter coupled to the publish subsystem. The publish could locally set the retain information. For this we would need to have some "ordering" (as in the the retain server would need to know if it can overwrite the retained message or not).

I tried to play with option 2, it seems possible but would have some drawbacks:

  • More information needs to be synced between nodes
  • The msg_ref (or something similar) needs to have some ordering information.
  • Even then, scenarios that will look "strange" can be created if the retain subsystem does not have enough time to distribute the information

@ioolkos
Copy link
Contributor Author

ioolkos commented May 27, 2023

@mths1 as you correctly state, Retained messages are part of the metadata layer in VerneMQ which is eventually consistent. We deemed this acceptable for the reason that in high frequency publishs, the "value" of a Retained message for a new subscriber will be very short: it will immediately receive normal messages, that is "updates" to the original Retained message.

We could, of course, explore fully consistent alternatives. Ra and Khepri (https://github.com/rabbitmq/khepri) comes to mind.

@revikk
Copy link

revikk commented Jun 30, 2023

Hello @ioolkos,

Do I correctly understand that 'eventually consistent' is a feature of Retained messages layer by design? Existing plugins vmq_plumtree and vmq_swc do not change this behavior. They just make a work more stable and bugless. Can the used backend DB solve or minimize this issue?
Your suggestion regarding alternatives with 'full consistency' support, is there any plans to add them to Vernemq?
Also, you mentioned high frequent publishes, but in our case, publishing is very low frequent.
We have 2 brokers, producer publish a value to Broker 1, consumer connect to the Broker 2 and subscribe at a time when value from Broker 1 has not been synced with Retain storage on Broker 2 yet, so it receives 'outdated' value. Is there an easy way to resent 'updated' value from broker to consumer? Do you see any drawback on such solution?

@ioolkos
Copy link
Contributor Author

ioolkos commented Jul 6, 2023

@revikk yes, all the synchronisation of metadata state (including Retain messages) are eventually consistent. This does not mean "slow" synchronisation. But it means it can be beaten by observing wallclock, that is consider all your components under linear time.

Consider the following observation fallacy, where a publisher is on node A, and a subscriber on node B:

  • case 1: Subscriber subscribes 1ms before the Publisher sends the new value -> you happily accept the old Retain value.
  • case 2: Publisher sends the message 1ms before the Subscriber subscribes -> you are unhappy about the Subscriber receiving the old Retain value

Considerations about the decoupled nature of Pub/Sub led us to reason that eventual consistency is acceptable for retained messages.

A consistent implementation would be possible with the usual protocols (Raft, etc.)


👉 Thank you for supporting VerneMQ: https://github.com/sponsors/vernemq
👉 Using the binary VerneMQ packages commercially (.deb/.rpm/Docker) requires a paid subscription.

@aoiremax
Copy link

@ioolkos I disagree about your definition of acceptable: does it mean that VerneMQ is an acceptable MQTT broker implementation just in case of high frequency publishes? And therefore it is not suitable for not-so-frequent publishes use cases, such as @revikk's one and many others?

BTW, in case of high frequency publishes, you could avoid to use retained messages at all: just wait for a while, and subscriber will receive live messages. So probably it could be acceptable not to implement support for retained messages at all, isn't it?

The decoupled nature of Pub/Sub has nothing to do with the fact that a subscriber turns out to be not aligned with the status of a topic which holds a retained message. In the case 2, there is no reason to be unhappy about subscriber receiving old Retain value, as long as it will then receive also the message (as live one) published 1 ms before its subscription. But if it receives just the old Retain value while any another subscriber subscribing a few seconds later receive the latest (of course), this is not good.

Just to summarize, (if I correctly understood) your conclusion is: a subscriber shall not rely on a message received as retained, as it may not be the latest published on the subscribed topic (that it will never receive).

Another point not clear to me is: does this issue affect only multi-node scenarios, or also single-node?

@ioolkos
Copy link
Contributor Author

ioolkos commented Jul 20, 2023

@aoiremax Thanks, disagreeing is perfectly fine, the question is a little bit what to do with it. Should you be working on concrete experiments/implementation, let me know.


👉 Thank you for supporting VerneMQ: https://github.com/sponsors/vernemq
👉 Using the binary VerneMQ packages commercially (.deb/.rpm/Docker) requires a paid subscription.

@mths1
Copy link
Contributor

mths1 commented Jul 28, 2023

@aoiremax It affects only multi node scenarios. I was unable to reproduce this in a single node environment (and a code review didn't show anything different).

I have played with some mitigation ideas and the one that worked best without huge changes in the code base is what I would call "delayed registration". It waits n times the cycle time for new retained messages and only then continues with adding the subscriber. Messages that arrive in-between are still part of the online queue and will be delivered (as the delay happens after the queue has been already initialized). I did not fully test it and there are some downsides in the current implementation (like that the retained messages might not be the first message the subscriber gets) but if it is something people are interested in I could continue preparing a pull request.

@mths1
Copy link
Contributor

mths1 commented Jul 28, 2023

Here is the branch in case someone wants to to give it a try https://github.com/mths1/vernemq/tree/feature/delayed_retained (poorly tested). It will "ensure" (based on time) that the latest retain message is shown and no message loss occurs. You might receive the retained message twice (as retained and the actual one that arrived before the retained one did).

@ioolkos
Copy link
Contributor Author

ioolkos commented Jul 29, 2023

@mths1 thanks, will take a look at it!


👉 Thank you for supporting VerneMQ: https://github.com/sponsors/vernemq
👉 Using the binary VerneMQ packages commercially (.deb/.rpm/Docker) requires a paid subscription.

@rafaelnobrekz
Copy link

I think we are having issues with this as well. A case where this is specifically harming us is when we subscribe very closely to the very first retain message sent to a topic. In some cases we simply never get anything at all and have to work around it by resubscribing after some set amount of time without getting a message.

@nikolchina
Copy link

As I understand, vernemq retain system works with some delay. The message itself can be delivered instantly to subscriber but it will appear in retain system not right away. Thus, there is a situation where, after reconnection, the subscriber will read the message stored in the persistence system, but it will not necessarily be the latest message that came to the broker. To illustrate this case I'm attaching a sequence diagram.
vernemq-retained

@ioolkos
Copy link
Contributor Author

ioolkos commented Apr 2, 2024

@nikolchina
Thanks! Yes, message_1 and message_2 will be immediately delivered to online clients, for this delivery their retain flag will be set to 0 (from the original incoming retain flag 1).
Since both messages are messaged to be retained, the cluster will add this message to the distributed retain store, in an eventually consistent fashion. Which is the cause of the effect seen in your diagram.

If a new subscriber connects to a specific cluster node and issues a subscription to the topic, the retain store will do a local read and deliver the retain message with the retain flag set to 1.


👉 Thank you for supporting VerneMQ: https://github.com/sponsors/vernemq
👉 Using the binary VerneMQ packages commercially (.deb/.rpm/Docker) requires a paid subscription.

@mths1
Copy link
Contributor

mths1 commented Apr 2, 2024

@nikolchina :
Hello! One or two small addition(s), your picture is correct if publisher and subscriber are on different nodes, as what you call "delay" is the synchronization of the distributed retain store which happens periodically, as in the remote node will learn (aas is in wall clock) to late that there is a new retained messages. My patch (above) tries to mitigate this by waiting for the synchronisation to complete, and only then forwards the actual retrained message.

@nikolchina
Copy link

Hi @mths1 ,

  1. You're right, the issue can only be reproduced when publisher and subscriber are connected to different nodes (tested on version 1.13.0 and the branch "feature/delayed_retained" from your repo https://github.com/mths1/vernemq.git). Though in version 1.12.4 even when publisher and subscriber have connection to the same node, the issue could be reproduced.
  2. I've built the docker image from your branch (git clone --depth 1 https://github.com/mths1/vernemq.git -b "feature/delayed_retained" vernemq) and run my test against it, and this test does not show any improvements in comparison with 1.13.0. I'm attaching yaml file for docker compose and test program on Go.
    vernemq-compose.yaml.txt
    vernemq-test.go.txt

@mths1
Copy link
Contributor

mths1 commented Apr 10, 2024

Hi @nikolchina ,
I rebased my branch to master, just to be sure. Please retest with the latest version. Would you please also verify that the you have set

subscriber_retain_mode syncwait
within the vernemq config of the node that you use as subscriber? Thanks! Your demo code seems to work (for me :-) ) with such a setup.

@nikolchina
Copy link

Hi @mths1 ,
I've created docker image, but it fails to start

-------------------------------------------------------------------------------
Starting VerneMQ...
-------------------------------------------------------------------------------

MASTER = ''
vmq_cluster_node_sup
vernemq failed to start within 60 seconds,
see the output of 'vernemq console' for more information.
If you want to wait longer, set the environment variable
WAIT_FOR_ERLANG to the number of seconds to wait.

@mths1
Copy link
Contributor

mths1 commented Apr 13, 2024

@nikolchina : This typically happens when the vernemq config is not correct. Running 'vernemq console' should give more information. Maybe you can also share your config file.

@nikolchina
Copy link

@mths1, thanks, there were some parameters in old config which are out of date for now.
I managed to start and run some tests. With this build I observe a delay (about 2 seconds) when subscriber is connected and receive the retained message, the messages are not lost. And as you mentioned, this retained message can be received twice.

If publisher sends some messages during subscriber connecting, one of messages will be delivered as retained and all of them will be delivered consequently after retained message as it is represented on a diagram.
multiple_messages_during_connecting

@mths1
Copy link
Contributor

mths1 commented Apr 18, 2024

@nikolchina : Great, that it works for you! Btw, of course your code is somewhat artificial, as your publisher publishes very frequently and thus you see m4 (retrained), m3 (because m4 came within the sync period as a new retrained message), m4 (again from the publisher) and then m5.

The typical use case I aimed for is that you have a sensor that sends data very infrequently, like once a day (and you do not want or cannot have persistent sessions). In mid- to high frequency use cases, you typically won't use retrained messages (or having a very slim change of getting an old retrained messages does not matter) and if every messag counts than persistant sessions are typically better suited. So, typically you would just get m4 or in some special situation m4, m4 (the retrain server currently does not have a message id for deduplication, so we cannot tell if the second m4 is actually the same message or another message with the same content).

@nikolchina
Copy link

@mths1 ,
You've mentioned that the parameter subscriber_retain_mode should be set to syncwait for the subscriber node. I wonder whether this is a strong requirement to use it only on subscriber node and if there are any drawbacks if all nodes have this parameter set to syncwait.
Thank you.

@mths1
Copy link
Contributor

mths1 commented May 19, 2024

Hello @nikolchina : You can set the parameter on all nodes to syncwait. There is no strong requirement to do this only on subscriber nodes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: High priority
Development

No branches or pull requests