Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native MQTT #5895

Merged
merged 118 commits into from
Jan 26, 2023
Merged

Native MQTT #5895

merged 118 commits into from
Jan 26, 2023

Conversation

ansd
Copy link
Member

@ansd ansd commented Sep 28, 2022

Concept

Until today MQTT (like STOMP and AMQP 1.0) is proxied via AMQP 0.9.1:
Screenshot 2023-01-17 at 09 21 27

Pros:

  • neat mechanism to easily extend RabbitMQ’s traditional AMQP 0.9.1 core model with support for more protocols

Cons:

  • in case of MQTT: >20 Erlang processes per incoming MQTT connection
  • poor scaling with many MQTT connections

This PR implements Native MQTT. By "native" we mean that MQTT becomes a first class protocol in RabbitMQ that does not get proxied via AMQP 0.9.1:
Screenshot 2023-01-17 at 09 21 39

Pros:

  • Only 1 Erlang process per incoming MQTT connection
  • Less memory usage => better scalability => can handle more MQTT connections

Native MQTT is possible thanks to the queue_type interface introduced by @kjnilsson 2 years ago. The queue_type interface decouples the AMQP 0.9.1 channel from queues (classic queues, quorum queues, or streams) making it possible to have the MQTT connection process publish to (and receive from) queue processes directly (without the need to proxy via the AMQP 0.9.1 channel).

Result

The result of this PR (Native MQTT) is that resource usage drops drastically and that RabbitMQ is able to accept millions of MQTT connections.

Scaling tests can be found in https://github.com/rabbitmq/mqtt-testing (private repo).

Connecting in total 1 million "background" MQTT connections (i.e. neither publishing nor consuming, but only sending PING packets) to a 3 node RabbitMQ cluster with small buffer sizes and disabling management metrics:

mqtt.tcp_listen_options.sndbuf  = 2048
mqtt.tcp_listen_options.recbuf  = 2048
management_agent.disable_metrics_collector = true

requires in RabbitMQ 3.11 97 GB of memory per node and with this PR 7 GB of memory per node. This is a permanent memory saving of (90GB * 3) 270 GB for the cluster and a memory reduction of factor 13. (For only QoS 0 subscribers using the new queue type - more details below - the memory reduction factor will even be much higher.)

93% of the memory required on main (or 3.11) is process memory, meaning memory required for Erlang processes.

Further scaling tests have shown that with this PR,

  • we can connect more than 9.3 million MQTT connections across a 3 node RabbitMQ cluster requiring ~50 GB of memory per node
  • 3 million MQTT connections in total across a 3 node cluster out of which 1.5 million are publishing and 1.5 million are receiving via 1:1 topology (i.e. each publisher sends to exactly one consumer) 1 message of size 64 bytes every 3 minutes with QoS 1 works well. This requires ~49 GB and ~19 CPUs per node. In this scenario there are 1.5 million classic queues v2 created where Reduce memory by multiple GBs with many classic queues #6684 causes multiple GBs of memory savings.

Implementation details

  • Message routing and authentication / authorisation checks are now happening in the MQTT connection process instead of the channel process
  • The queue_type interface is used directly by the MQTT connection process
  • There is a single Erlang process per incoming MQTT connection. Most Erlang processes got automatically removed once the proxying via AMQP 0.9.1 was omitted. However, more work was needed to get rid of the rabbit_hearbeat, heartbeat supervisor and rabbit_mqtt_connection_sup procsses. The same is done for Web MQTT: There is now a single Erlang process per incoming Web MQTT connection.
  • Heartbeats / MQTT Keepalives are now implemented in-place in the single MQTT connection process using Erlang timers.
  • The MQTT parser got optimised preventing binaries converted to lists back and forth
  • This PR contains many bug fixes for bugs that still exist on 3.11.x (see commit messages for details)
  • This PR contains MQTT 3.1.1 features that were previously not implemented (see commit messages for details)
  • A new queue type has been introduced. In this PR it's called rabbit_mqtt_qos0_queue. (We can rename it if we think it we can re-use it somewhere else.) The idea is that this queue type is a "pseudo queue" or "virtual queue" as for example experimented in https://github.com/rabbitmq/rabbitmq-server/tree/virtual-queue-type for direct-reply-to. The queue process is the receiving MQTT connection process. The queue is basically the Erlang mailbox of that process. It will be used for MQTT connections that connect with clean session and subscribe with QoS 0. There is no point in forwarding a message to a real queue first. Instead, MQTT publishing connections send a message directly to the new queue type - that is to the receiving MQTT connection process.
  • The new queue type is also interoperable with other protocols, e.g. you can send via AMQP 0.9.1 to the new queue type. (You can even send a message from the Management UI directly to the receiving MQTT connection process.)
  • The new queue type lives in rabbitmq_mqtt. Note that the rabbit app is not aware that this queue type exists.
  • The benefit of the new queue type is (1) 1 message hop less, (2) huge reduction in memory usage because we can omit 2 Erlang processes (1 for the classic queue, 1 for its supervisor), (3) its client does not maintain any state when sending to the destination meaning it simplifies large fanouts. Sending also uses delegate as done in classic queues. The new queue type comes behind the feature flag rabbit_mqtt_qos0_queue.
  • The new queue type does not have any form of flow control because no state can be maintained between sending and receiving process. Therefore it drops messages if it becomes overloaded: specifically if its Erlang mailbox has more than 1k messages configurable via mqtt_mailbox_soft_limit AND the network to the MQTT client is congested meaning the MQTT client app cannot consume messages fast enough.
  • The new queue type can be listed via CLI or Management UI, but client apps are not aware that it exists.
  • The Ra cluster mqtt_node is deleted (hidden behind feature flag delete_ra_cluster_mqt_node). Instead, MQTT client ID to Pid tracking is done via a local-only process group using module pg (thanks to @lhoguin for coming up with this idea). The Ra cluster's procsses grew huge with many MQTT connections and became a bottleneck when mass disconnecting clients. Local-only pg scales better. For the purpose of MQTT client ID tracking (i.e. disconnect the old client if a new client with the same ID connects), we don't need the strong Raft consistency guarantees. This does mean however, that - in the presence of lost messages or network partitions - it can happen that 2 clients connect with the same ID (which shouldn't be huge problem - but does violate the protocol spec temporarily in the presence of such misconfigured clients and network issues within the broker).
  • The code has much higher test coverage compared to v3.11.x. There is a new test suite called shared_SUITE that is shared between MQTT and Web MQTT. This introduces only a test dependency from rabbitmq_mqtt to rabbitmq_web_mqtt.
  • The code is much clearer now. The code on 3.11.x is hard to understand.
  • Hundreds of lines of old copied emqttc library code are deleted from the Web MQTT tests. Instead, the emqtt web socket option is used.
  • Apart from reducing Erlang processes, there are numerous memory optimisations done within the single MQTT connection process itself (see commit messages)
  • (Only) existing global counters are implemented. (Note that we plan to introduce MQTT specific Prometheus metrics when we implement support for MQTT 5.0.)
  • rabbit_mqtt_reader got converted from gen_server2 to gen_server (thanks @lhoguin)
  • Feature flags stream_queue and classic_queue_type_delivery_support (the latter introduced in v3.10.9 and v3.11.1) are required. (That's not a problem because Feature flags: Make feature flags v2 required #6810 present in 3.12 will already require to upgrade to 3.11.x first.)

Limitations

Following limitations are already present today in v3.11, but are especially important to mention now that RabbitMQ can handle many more MQTT connections:

  • Mass client disconnection and node draining still needs to be optimised. For example there are some bottlenecks such as rabbit_alarm and our old custom pg_local (which could be replaced by pg?) Nodes might even crash.
  • Turning on management metrics collection management_agent.disable_metrics_collector = false (the default) will double or triple memory usage. The management plugins is not designed to handle such a huge number of stats emitting objects (connections / queues). Management metric collection should therefore be turned off and Prometheus should be used. We don't plan to improve management agent metrics in the future. Prometheus is the way to go.
  • Huge fanouts are still not optimised and require lots of memory (although the new queue type helps somewhat for QoS 0 subscribers). Large fanouts should be done rarely. If sending to a target classic or quorum queue, they should additionally originate from only few publishers (because every publisher will maintain state to every destination queue).

Newly introduced limitations with this PR:

  • As observed by @mkuratczyk: channel specific fine grained metrics are not emitted anymore for MQTT connections. This means for example users won't see in the Management UI how many messages were sent to the topic exchange. However, Prometheus global counters are supported.

Future Work

(Prioritisation not yet decided.)

  • Support for MQTT 5.0 This PR is a prerequisite.
  • Support huge fanouts: for example by sending to a local stream that asynchronously dispatches to local queues; also routing needs to be optimised, ideally streamed meaning not all million destination queue names being fetched in memory at once and sent to at once.
  • more performant topic routing (prototype already implemented by @dcorbacho and @the-mikedavis for Khepri)
  • handle mass client disconnection and node drain with millions of MQTT connections

Thanks to @ChunyiLyu who equally contributed to this PR!

Copy link
Contributor

@ChunyiLyu ChunyiLyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@michaelklishin michaelklishin merged commit 85e38dc into main Jan 26, 2023
michaelklishin added a commit that referenced this pull request Jan 30, 2023
Previously the helper was not setting the virtual host limit
correctly.
michaelklishin added a commit that referenced this pull request Jan 30, 2023
…set-vhost-limits

Backport a CT broker helper fix from #5895
mergify bot pushed a commit that referenced this pull request Jan 30, 2023
Previously the helper was not setting the virtual host limit
correctly.

(cherry picked from commit ca1c5ac)
michaelklishin added a commit that referenced this pull request Jan 30, 2023
Backport a CT broker helper fix from #5895  (backport #7108)
pjk25 added a commit that referenced this pull request Feb 17, 2023
Leaving MQTT alone, as this branch does not contain
#5895, which fixed a
great many dialyzer warnings.
mergify bot pushed a commit that referenced this pull request Feb 17, 2023
Leaving MQTT alone, as this branch does not contain
#5895, which fixed a
great many dialyzer warnings.

(cherry picked from commit 3f9e6f9)
pjk25 added a commit that referenced this pull request Feb 17, 2023
… (backport #7295) (#7349)

* Fix all dependencies for the dialyzer

This is the latest commit in the series, it fixes (almost) all the
problems with missing and circular dependencies for typing.

The only 2 unsolved problems are:

- `lg` dependency for `rabbit` - the problem is that it's the only
  dependency that contains NIF. And there is no way to make dialyzer
  ignore it - looks like unknown check is not suppressable by dialyzer
  directives. In the future making `lg` a proper dependency can be a
  good thing anyway.

- some missing elixir function in `rabbitmq_cli` (CSV, JSON and
  logging related).

- `eetcd` dependency for `rabbitmq_peer_discovery_etcd` - this one
  uses sub-directories in `src/`, which confuses dialyzer (or our bazel
  machinery is not able to properly handle it). I've tried the latest
  rules_erlang which flattens directory for .beam files, but it wasn't
  enough for dialyzer - it wasn't able to find core erlang files. This
  is a niche plugin and an unusual dependency, so probably not worth
  investigating further.

(cherry picked from commit 949b535)
(cherry picked from commit 3a3ff30)

# Conflicts:
#	deps/rabbit/BUILD.bazel
#	deps/rabbit/src/rabbit_access_control.erl
#	deps/rabbit/src/rabbit_exchange.erl
#	deps/rabbit_common/src/rabbit_misc.erl
#	deps/rabbitmq_consistent_hash_exchange/BUILD.bazel
#	deps/rabbitmq_mqtt/BUILD.bazel
(cherry picked from commit 2ae27f2)

# Conflicts:
#	deps/rabbit_common/src/rabbit_misc.erl

* Resolve conflicts

(cherry picked from commit b205ac9)

# Conflicts:
#	deps/rabbit_common/src/rabbit_misc.erl

* Avoid using a type from rabbit in rabbit_common to avoid a dep cycle

(cherry picked from commit bca40c6)

* Resolved additional errors from merge

Leaving MQTT alone, as this branch does not contain
#5895, which fixed a
great many dialyzer warnings.

(cherry picked from commit 3f9e6f9)

* fixup merge artifacts

* Avoid referencing unexported types from jsx

* Additional dialyzer fixes

---------

Co-authored-by: Alexey Lebedeff <binarin@binarin.ru>
Co-authored-by: Michael Klishin <klishinm@vmware.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
@janiu-001
Copy link

@ChunyiLyu would you help to provide a doc on how to get the mqtt connect/disconnect notification? And when will you push the offical docker image to docker hub?

@ansd
Copy link
Member Author

ansd commented Mar 7, 2023

Hi @janiu-001

would you help to provide a doc on how to get the mqtt connect/disconnect notification?

There is a doc in https://www.rabbitmq.com/event-exchange.html . For each MQTT CONNECT / DISCONNECT, an event connection.created and connection.closed gets emitted.

And when will you push the offical docker image to docker hub?

Yesterday, 3.12.0-beta.1 was released. You can try out Native MQTT by using Docker image rabbitmq:3.12.0-beta.1-management.
pivotalrabbitmq/rabbitmq:v3.12.0-beta.1-otp-max-bazel. The official Docker image rabbitmq/3.12.0-rc.1-management will be available once the 3.12 RC1 is released (happening probably end of March / beginning of April).

We are looking forward to your feedback. Thanks!

@janiu-001
Copy link

janiu-001 commented Mar 7, 2023

@ansd Thanks very much for you response, i tried the image pivotalrabbitmq/rabbitmq:v3.12.0-beta.1-otp-max-bazel. Looks like that the connect/disconnect worked.

Connect:

image

Disconnect

image

But it failed to load the definitions.json which create the queue and bindings! But it could work with rabbitmq:3.11-management( i could see all the queues defined in the definitions.json on dashboard ).

@ansd
Copy link
Member Author

ansd commented Mar 7, 2023

Thanks @janiu-001.

But it failed to load the definitions.json which create the queue and bindings!

What is the error message? Are you trying to export queues created by the MQTT plugin in 3.11 and re-import those in 3.12? If you could provide step by step instructions that reproduce this issue and open a separate GitHub issue if you think something doesn't work as expected, that would be great!

@janiu-001
Copy link

Thanks @janiu-001.

But it failed to load the definitions.json which create the queue and bindings!

What is the error message? Are you trying to export queues created by the MQTT plugin in 3.11 and re-import those in 3.12? If you could provide step by step instructions that reproduce this issue and open a separate GitHub issue if you think something doesn't work as expected, that would be great!

  1. the definition.json

image

  1. load the definition.json file

image

  1. mount the definition.json file

image

  1. check the queue on the dashboard

with the 3.12 image

image

with the rabbitmq:3.11-management image, we could see the queues defined in the definition.json

image

@lukebakken
Copy link
Collaborator

lukebakken commented Mar 7, 2023

@janiu-001 the best and fastest way to assist us is to attach your complete definitions.json file to your next response. Feel free to edit the file to change sensitive data, but keep it as original as possible.

Right now you're asking us to guess how to reproduce it - we know nothing about your queue definitions and other RabbitMQ metadata.

@janiu-001
Copy link

Sorry for the troubles. Since github do not support to upload json file, i change the suffix from json to text

definitions.txt

@michaelklishin
Copy link
Member

This belongs to a discussion.

@lukebakken
Copy link
Collaborator

@michaelklishin that would be ideal but @janiu-001 responded to a closed PR 🤷‍♂️ I'm going to check this out today.

@lukebakken
Copy link
Collaborator

Yep, something is up with definitions import - #7532

@lukebakken
Copy link
Collaborator

@janiu-001 thanks for reporting the issue. In the future, please either start a discussion or file an issue rather than replying to a closed PR. That's not just Team RabbitMQ's preference but is pretty standard practice. Thanks!

@janiu-001
Copy link

@lukebakken yes, understannd. will follow your suggestion

@rabbitmq rabbitmq locked as resolved and limited conversation to collaborators Mar 8, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants