Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT: replicated retained message store #8096

Open
galah92 opened this issue May 3, 2023 · 20 comments
Open

MQTT: replicated retained message store #8096

galah92 opened this issue May 3, 2023 · 20 comments

Comments

@galah92
Copy link

galah92 commented May 3, 2023

Is your feature request related to a problem? Please describe.

I'm using RabbitMQ as MQTT Broker with the rabbitmq_mqtt plugin. I'm deploying RabbitMQ over Kubernetes with the Kubernetes Operator. According to the docs:

messages retained on one broker node are not replicated to other nodes in the cluster

I'd like to ask to support it over multiple replicas/nodes/pods.

The incentive is that if a client will be connected to a specific pod, disconnect and then reconnect to a different pod, I'd like this client to be able to subscribe to the same MQTT topic and get the same data.

Describe the solution you'd like

I'd like to be able to restore retained messages on MQTT topics from clients connected on multiple nodes/replicas/pods.

Describe alternatives you've considered

It is probably possible to emulate that with a backend that listens to client subscription on "retained" topics and corresponding DB/cache (Redis-like) to store message retention state.
But that doesn't sound right.

Additional context

No response

@michaelklishin
Copy link
Member

michaelklishin commented May 3, 2023

I'm not sure if it was done as part of #5895 #2554 #7263. @ansd @ChunyiLyu would know.

@michaelklishin michaelklishin changed the title Support MQTT message retention over a cluster MQTT: replicated message retention May 3, 2023
@ansd
Copy link
Member

ansd commented May 4, 2023

@michaelklishin No, it was not done. Neither Native MQTT nor new feature requirements for MQTT 5.0 are related to replication of retained messages.

As documented in https://www.rabbitmq.com/mqtt.html#retained support for retained messages has been very limited in RabbitMQ because it works wrongly when the RabbitMQ cluster consists of multiple nodes. More specifically, the current implementation stores retained messages only node local. This means depending on which node a client connects and subscribes, if there is a retained message for a given topic, it may receive no, outdated, or the latest retained message for that topic. Furthermore, wildcards in topic filters are not supported for retained messages.

Given these limitations it makes sense to implement a new rabbit_mqtt_retained_msg_store.

Some of the requirements could be:

  • Support wildcards in topic filters: "When a new subscription is established, the last retained message, if any, on each matching topic name MUST be sent to the subscriber [MQTT-3.3.1-6]." Should we limit the maximum number of retained messages being sent to a subscriber?
  • Ensure that independent on which node a client subscribes, it gets the last retained message for the matching topic.
  • Retained messages are replicated for data safety (on all nodes if the cluster consists of e.g. 7 nodes?)
  • Writing of retained messages must be fast. It should not become a bottleneck if many messages have the Retain flag set.
  • Reading of retained messages is ideally also fast, but is probably allowed to be slower than writing (given that subscriptions are probably created more rarely than publishing a message to the server with the Retain flag set).

Some limitations for the implementation are:

  • Mnesia must not be used because our plan is to replace Mnesia with Khepri.
  • No external data store (e.g. Cassandra) should be used as we want to provide a solution without a dependency do external databases.

Before thinking about an implementation, the very first question is which of the above requirements do we need (all, or just a subset)?

@galah92
Copy link
Author

galah92 commented May 4, 2023

I'll elaborate more regarding my use-case.
I'm trying to replicate the behavior of Azure IoT Hub & AWS IoT Core by having a Device Twin (Azure terminology) or Device Shadow (AWS terminology). In that sense, each device has a config which it can get by subscribing to a concrete topic involving its client_id as part of the topic name.
In that sense, I'm creating a backend service that should be able to mutate device configs by retaining messages on the corresponding topics.
Note that in that case, the topics on which I'm retaining messages (devices/{client_id}/config) are used only to retain message. No other messages are sent these topics.

I terms of requirements:

  • Support wildcards in topic filters: I don't think it's required, at least not for my specific case, since devices are subscribed to concrete topics (devices/{client_id}/config).
  • Ensure that independent on which node a client subscribes, it gets the last retained message for the matching topic: that's the most important requirement.
  • Retained messages are replicated for data safety (on all nodes if the cluster consists of e.g. 7 nodes?): sounds like implementation details.
  • Writing of retained messages must be fast. It should not become a bottleneck if many messages have the Retain flag set: not a requirement for my use case. Device configuration is rare compared to other data coming in/out from the devices.
  • Reading of retained messages is ideally also fast, but is probably allowed to be slower than writing: makes total sense.

Also, this is for MQTT 3.1.1 as I am not using MQTT 5.

@ansd
Copy link
Member

ansd commented May 4, 2023

Thank you for explaining your use case @galah92.

The solution will be much simpler if wildcards are not supported.
In that case a distributed key value store could be used.
If that key value store is built on top of https://github.com/rabbitmq/ra we need to think about snapshotting including the message payload.
Another key value store solution could use consistent hashing.

We are a small team and while we invest into MQTT, our current focus has been improving scalability in #5895 and adding support for MQTT 5.0 in #7263 .
Although important in the long term, implementing a replicated retained messages store (such as a distributed key value store) is a non-trivial task and not a priority for us right now and will therefore not be implemented in the near future.
@galah92 if this is an important feature for you, feel free to provide a contribution or sponsor the development.

@galah92
Copy link
Author

galah92 commented May 4, 2023

Thank you for the explanation.

As learning the RabbitMQ internals & Erlang sounds out-of-my-scope for now, I would have to go with a different solution, i.e. an external key-value store (Redis) and backend service to watch for device subscriptions to queues and publishing messages accordingly.

For that I would need a way to store the currently subscribed topics. As these MQTT topics correspond to AMQP queues, I think this can be done by listening for queue.created & queue.deleted events.
I would also need to disable to current retention mechanism so that it won't conflict with my external one. I think this can be done with rabbit_mqtt_retained_msg_store_noop - can you confirm?

In general, does that solution make sense?

@michaelklishin
Copy link
Member

Yes, rabbit_mqtt_retained_msg_store_noop is the implementation you need. It won't retain anything (by design). I kind of forgot that we had it :)

@ansd
Copy link
Member

ansd commented May 4, 2023

Dumping here one more good idea that came up by @kjnilsson.
A relatively simple solution that could work just well enough is:

  • Store retained messages in a RabbitMQ Stream
  • The stream is replicated across all nodes
  • Each node consumes locally from the stream and writes the retained message into a ETS (or local Mnesia) table.
  • Optionally, wildcards could be supported by translating them into Match Specifications to be queried within the database.
  • If a retained message is to be deleted, such a deletion marker is written to the stream. Each local stream consumer then deletes the message from the local ETS store.
  • The stream would set x-max-length-bytes (instead of x-max-age). Before deleting a segment, any existing messages get rewritten to the stream.
  • If a node gets upgraded, it recovers by re-consuming from the beginning of the stream.

This should provide very fast writes and reads as well as message replication while working with the current primitives available in RabbitMQ.

@galah92 Yes, currently I see 2 possible workarounds for you:

  1. The one you mentioned. But don't only listen on queue_created and queue_deleted, but more importantly also on binding_created and binding_deleted. Each binding from an MQTT queue to the topic exchange corresponds to one MQTT subscription. The binding key corresponds to the MQTT subscription topic filter (although some characters are converted, e.g. / in an MQTT topic filter gets converted to . in the AMQP 0.9.1 binding, same applies for + to *, see
    %% amqp mqtt descr
    %% * + match one topic level
    %% # # match multiple topic levels
    %% . / topic level separator
    )
  2. You implement the rabbit_mqtt_retained_msg_store behaviour. This does require a bit of Erlang coding, but you just need an Erlang Redis client that inserts, lookups, deletes into / from Redis (Shouldn't require more than a few lines of Erlang code.)

@kjnilsson
Copy link
Contributor

kjnilsson commented May 4, 2023 via email

@galah92
Copy link
Author

galah92 commented May 8, 2023

I was able to create POC based on Redis and using rabbit_mqtt_retained_msg_store_noop. So far looks good. Adding Redis as dependency to the system is worse then having a distributed key-value store in RabbitMQ, but at least it's a valid solution that works.
Thanks everyone.

@michaelklishin
Copy link
Member

michaelklishin commented May 9, 2023

The stream-based option seems better to me that a new embedded distributed key-value store. It will not be perfectly consistent when compared to what streams themselves offer due to the step that copied inbound data concurrently to an ETS table. Otherwise it can be perfectly sufficient.

@ansd ansd mentioned this issue Jun 13, 2023
23 tasks
@galah92
Copy link
Author

galah92 commented Jun 27, 2023

Hi, just saw 3.13.0-beta.1 was released. The release notes didn't mention anything related to message retention, but was is implemented by any chance? For MQTT 3 & 5.

@lukebakken
Copy link
Collaborator

Hi, just saw 3.13.0-beta.1 was released. The release notes didn't mention anything related to message retention, but was is implemented by any chance? For MQTT 3 & 5

This issue is still open, so no, it has not been implemented.

@gery0815
Copy link

gery0815 commented Aug 2, 2023

Hi all, we created a plugin to store the retained messges on a redis cluster. We are hosting our 3 node cluster on kubernetes. Maybe there is some interest on our implementation. Would be a pleasure to share it.

@michaelklishin
Copy link
Member

@gery0815 you are welcome to leave a link to the plugin here but for any further discussions of it, use GitHub Discussions.

@jdsdc
Copy link

jdsdc commented Sep 14, 2023

Hi all, we created a plugin to store the retained messges on a redis cluster. We are hosting our 3 node cluster on kubernetes. Maybe there is some interest on our implementation. Would be a pleasure to share it.

@gery0815, would it be possible for you to share more information about the plugin you wrote for redis?

@gery0815
Copy link

@jdsdc @michaelklishin I started a discussion. There I also posted the link to my repo. Feel free to use !

#9431
https://github.com/gery0815/rabbitmq-mqtt-retained-msg-redis

@rabbitmq rabbitmq deleted a comment from amills-vibeirl Jan 21, 2024
@ansd ansd changed the title MQTT: replicated message retention MQTT: replicated retained message store Feb 28, 2024
@robertsLando
Copy link

robertsLando commented Mar 5, 2024

Seems the redis plugin is not working with latest RabbitMQ version. Could I ask rabbitMQ maintainers if there is a WIP or an ETA to support cross nodes/replicas/pods retained messages store? I'm wondering if I'm the only one that consider this a big limitation, I use retained messages a lot to keep the state in sync but if I have multiple RabbitMQ nodes right now I cannot be sure my clients will receive all retained messages when they do a subscription as they are fethed just from the node they are connecting too. I think also this is not what users would expect it to work

@getlarge
Copy link

getlarge commented Mar 5, 2024

@ansd @kjnilsson I am very interested to know more about the solution based on the RabbitMQ streams that you presented here.
Since I have no experience with streams yet, my questions might appear naïve, but how would you implement such a solution?

  • By modifying the rabbit_mqtt plugin?
  • As a separate rabbit_mqtt_retained_msg_store plugin?
  • Using a custom policy?
  • In a separate application?
  • Something totally different?

@michaelklishin
Copy link
Member

The solution based on streams, or rather certain internal stream API elements, is still very much a hypothesis to prove at this point. So are other options.

There are no plans to introduce more plugins.

@robertsLando
Copy link

We created how own implementation using Redis as a fork of @gery0815 one: https://github.com/innovation-system/rabbitmq-mqtt-retained-msg-redis

This is something we really miss from RabbitMQ, hope one day there will be some work for supporting this 🙏🏼

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants