-
Notifications
You must be signed in to change notification settings - Fork 1
Clustering MQTT
PadoGrid clusters Mosquitto brokers like any other clustering products. The same cluster lifecycle management commands apply to Mosquitto brokers. For example, the following commands create and run a new default cluster named mymosquitto
consisting of three (3) Mossquitto brokers.
create_cluster -product mosquitto
switch_cluster mymosquitto
start_cluster
PadoGrid includes two (2) MQTT client commands for publishing and subscribing to virtual clusters: vc_publish
and vc_subscribe
. They work similarly as Mosquitto's mosquitto_pub
and mosquitto_sub
except that they can publish and subscribe to one or more brokers via a virtual cluster. They are also fully integrated with PadoGrid's MQTT clusters such that they automatically create a virtual cluster over a physical cluster. This feature is shown in the following examples.
Terminal 1
# Virtual cluster subscription
vc_subscribe -t test/#
# Mosquitto equivalent of vc_subscribe. mosquitto_sub can only subscribed to a single broker.
mosquitto_sub -p 1883 -t test/#
mosquitto_sub -p 1884 -t test/#
mosquitto_sub -p 1885 -t test/#
Terminal 2
# Virtual cluster publication
vc_publish -t test/topic1 -m "hello, world"
# Mosquitto equivalent of vc_publish. mosquitto_pub cannot achieve this except to conditionally
# target each endpoint.
for port in 1883 1884 1885; do
if [ "$(mosquitto_pub -p $port -t test/topic1 -m "hello, world" 2>&1 > /dev/null)" == "" ]; then
break;
fi
done
The above commands publish and subscribe to the mymosquitto
cluster started in the previous section. There are options to both commands for explcitly specifying the cluster and endpoints as shown in the examples below.
# Messaging by cluster name
vc_subscribe -cluster mymosquitto -qos 2 -t test/#
vc_publish -cluster mymosquitto -qos 2 -t test/topic1
# Messaging by endpoints
vc_subscribe -endpoints tcp://localhost:1883-1885 -qos 2 -t test/#
vc_publish -endpoints tcp://localhost:1883-1885 -qos 2 -t test/topic1
# Messaging by configuration file
vc_subscribe -config mqtt5-client.yaml -qos 2 -t test/#
vc_publish -config mqtt5-client.yaml -qos 2 -t test/topic1
PadoGrid also provides the perf_test
app for measuring MQTT broker performance metrics. As with other products, you create and run an instance of the perf_test
app as follows.
Open two terminals and execute the following.
Terminal 1
create_app -product mosquitto -app perf_test -name perf_test_mosquitto
cd_app perf_test_mosquitto/bin_sh
./subscribe_topic test/#
Terminal 2
cd_app perf_test_mosquitto/bin_sh
./test_group -run
By default, the test_group
script runs the etc/group.properties
file, which is configured to concurrently publish data to two topics, test/topic1
and test/topic2
. The first topic publishes 1 KiB payloads, and the second topic publishes 10 KiB payloads. You can view the throughput and latency metrics recorded in the results
directory. You should see the subscriber (Terminal 1) receiving messages from the two topics.
To provide a clustering service for Mosquitto, PadoGrid provides the HaMqttClient
API that can be used to create virtual clusters from the client applications. HaMqttClient
is driven by Paho, allowing a client application to cluster any Paho supported MQTT brokers such as Mosquitto.
Creating a virtual cluster is typically done in a configuration file. For example, the perf_test
app is configured as follows.
persistence:
className: MqttDefaultFilePersistence
properties:
- key: path
value: ${persistence.dir}
clusters:
- name: ${cluster.name}
connections:
- connection:
serverURIs: [tcp://localhost:1883-1885]
See Default HaMqttClient
Configuration File for details.
The persistence
element defines the Paho's persistence settings. HaMqttClient
supports Paho's MemoryPersistence
and MqttDefaultFilePersistence
along with application custom classes. The properties
element defines the arguments to the persistence class. In the example, it sets the path
property to the system property persistence.dir
, which perf_test
supplies with the JVM argument -Dpersistence.dir
.
✏️ You can pass in system properties using the ${}
notation and environment variables using the ${env:}
notation in the configuration file.
The clusters
element defines the virtual cluster name, which must be unique within the configuration file.
The connection
element is equivalent to Paho's MqttConnectionOptions
. You can configure any options supported by MqttConnectionOptions
using this element.
The most important connection
option is serverURIs
. It defines a list of endpoints that form a virtual cluster. By default, HaMqttClient
sets this option to [tcp://localhost:1883-1885]
as shown above. This effectively clusters three (3) brokers with port numbers, 1883, 1884, and 1885 running on localhost
.
The serverURIs
option supports the range wildcard, -
, for listing ports and the last octet of IPv4 addresses.
tcp|ssl|ws|wss://<host-name>[:<port-range>]
tcp|ssl|ws|wss://<IPv3>.<octet-range>[:port-range]
The following shows the use of the range wildcard.
Supported
# defaults to port 1883
tcp://10.1.2.1-10
# ports specified
tcp://10.1.2.1-10:1883
tcp://10.1.2.1-10:1883-1885
tcp://test.mosquitto.org:1883-1884
Not Supported
tcp://10.1.2-5.1
tcp://10.1.2-5.1
tcp://10.1-2.2.1
tcp://10-12.1.2.1
# IPv6 is not supported
HaMqttClient
supports four protocols as Paho: tcp
, ssl
, ws
, and wss
.
serverURIs
can mix different protocols. For example, the following creates a virtual cluster with tcp and websockets.
clusters:
- name: ${cluster.name}
connections:
- connection:
# Free Internet brokers
serverURIs: [tcp://test.mosquitto.org:1883, ws://test.mosquitto.org:8080, tcp://broker.emqx.io:1883, ws://broker.emqx.io:8083, tcp://broker.hivemq.com:1883, ws://broker.hivemq.com:8000]
You can configure any number of virtual clusters limited only by your machine's available system resources. By default, each cluster attempts to make connections to all brokers. If you have too many clusters and brokers, then you could potentially run out of system resources such as file descriptors, CPUs, and memory.
HaMqttClient
is built on top of Paho's MqttClient
, which spawns four (4) threads per endpoint connection. Having too many endpoints in a virtual cluster can lead to a larger number of threads. For a large number of endpoints, it is recommnended that you split them into multiple virtual clusters and aggregate them via bridges (See Bridging Clusters.)
The clusters
element takes an array of clusters. It is important that each cluster has a unique name, otherwise, one will overwrite the other. The following shows an example.
clusters:
- name: mqtt-test
connections:
- connection:
serverURIs:
- tcp://test.mosquitto.org:1883
- ws://test.mosquitto.org:8080
- tcp://broker.emqx.io:1883
- ws://broker.emqx.io:8083
- tcp://broker.hivemq.com:1883
- ws://broker.hivemq.com:8000
- name: mylocal
connections:
- connection:
serverURIs: [tcp://localhost:1883-1885]
- name: enterprise
connections:
- connection:
serverURIs: [tcp://10.1.2.1-5]
The example above has three (3) clusters defined. The mqtt-test
cluster clusters the brokers available on the Internet. These brokers are free to use and maintained by the respective companies. The mylocal
cluster clusters a set of localhost brokers. The enterprise
cluster clusters a range of IP addresses with the default port, 1883.
Using the HaMqttClient
API, you obtain each instance by name as follows.
HaMqttClient mqttTestCluster = HaCluster.getHaMqttClient("mqtt-test");
HaMqttClient mylocalCluster = HaCluster.getHaMqttClient("mylocal");
HaMqttClient enterpriseCluster = HaCluster.getHaMqttClient("enterprise");
Once you have an HaMqttClient
instance, you can use the same Paho's MqttClient
API to access the cluster. There is no difference in API between MattClient
and HaMqttClient
as shown in the example below. The primary benefit of HaMqttClient
is that it provides a single entry point to a cluster consisting of many brokers.
myTestCluster.subscribe(new MqttCallback() {...});
myTestCluster.publish("test/topic1", "hello, world".getBytes(), 2, false);
myTestCluster.disconnect();
myTestCluster.close();
Mosquitto
supports broker bridges for forwarding messages from broker to broker. Bridging is important for those applications that do not have direct access to edge brokers. By bridging a central broker and and edge brokers, applications can send or receive data to or from the edge brokers via the central broker.
HaMqttClient
also supports bridging similar to Mosquitto except that it is done from the client side.
- Bridge clusters as opposed to brokers
- Filter topics to control both incoming and outgoing messages
- Override QoS per topic filter
The subsequent sections show how applications can access edge clusters via an enterprise (central) cluster.
This example shows how to configure the application to receive filtered messages from the edge cluster and forward them to the enterprise cluster. It does not override the edge cluster's QoS. The default QoS value of -1 passes on the subscriber's QoS to the publisher that forwards the messages.
clusters:
- name: edge
connections:
- connection:
serverURIs: [tcp://localhost:31001-31020]
bridges:
# 'in' forwards incoming messages to the target cluster, 'enterprise'
in:
- cluster: cluster-enterprise
topicFilters: [test/#]
qos: -1
- name: enterprise
connections:
- connection:
serverURIs: [tcp://localhost:32001-32010]
A common use case for bridging incoming messages is to place the bridge app in DMZ between firewalls to provide access to the edge devices to enterprise applications as shown in the diagram below. However, this way of bridging does not provide the HA service since the bridge app itself cannot be clustered. Data loss may occur if the bridge app fails.
This example shows how to configure the application to send the messages to the edge cluster which in turn sends filtered messages directly to the enterprise cluster. It overrides the edge cluster's QoS with 2.
clusters:
- name: edge
connections:
- connection:
serverURIs: [tcp://localhost:31001-31020]
bridges:
# 'out' sends outgoing messages to the target cluster, 'enterprise'
out:
- cluster: enterprise
topicFilters: [test/#]
qos: 2
- name: cluster-enterprise
connections:
- connection:
serverURIs: [tcp://localhost:32001-32010]
With outgoing messages, edge devices themselves directly bridge the enterprise cluster as shown in the diagram below. This means, unlike incoming messages, outgoing messages fully utilize HA. Data loss will not occur until edge devices themselves fail.
Because the virtual clusters are created and maintained by each application, the HA support is tightly coupled with applications.
For incoming bridges, if the application goes down, then the enterprise application connected to the enterprise cluster stops receiving data. The edge applications connected to the edge cluster may continue to send data but they never reach the enterprise application since the bridge that the application hosts has been taken out. This leads to data loss. This also means incoming bridges do not support HA.
For outgoing bridges, since the application itself is publishing data, if the application goes down, no additional data will be sent. This means there is no data loss since data is no longer published. Hence, outgoing bridges fully support HA.
In addition to bridged clusters, we can also leverage bridged brokers to provide HA. This is achieved by creating sticky clusters. But first, before we dive into sticky clusters, we need to define FoS levels.
FoS provides a failover service over MQTT brokers. FoS has four (4) levels of service: 0, 1, 2, and 3. FoS is primarily used for creating sticky clusters to provide HA services over bridged brokers. Bridged brokers forward messages from broker to broker whereas bridged clusters forward messages from cluster to cluster.
❗ If FoS is not set or set to an invalid value, then it defaults to 0. With FoS 0, the virtual cluster must not contain bridged brokers. Otherwise, the application will receive duplicate messages since it subscribes to all the brokers in the cluster.
FoS 0 is the default level. It makes subscriptions and connections to all endpoints. Unless you want to create a sticky cluster, this level should be used.
This level opens connections to two endpoints, but sticks topic subscriptions only to one endpoint. No subscriptions are made to the other endpoint. Only when the sticky endpoint fails, subscriptions are made to the other endpoint at that time. Failover is quick but there is a subscription delay which could lead to a brief period of data loss.
Like FoS 1, this level opens two (2) connections but increases the number of endpoints that make subscriptions to two (2). That means, with FoS 2, HaMqttClient
receives duplicate messages, one from each endpoint. HaMqttClient
, however, sticks to one of the endpoints and delivers data only from that endpoint to the application. It discards the data from the other endpoint. The other endpoint is a standby and replaces the sticky endpoint upon failure. Failover is immediate with no data loss.
FoS 3 allows configuration of both subscriberCount
and liveEndpointCount
. By default, FoS 3 makes subscriptions to all of the endpoints. This is rather expensive and wasteful if there are many endpoints. The higher the counts the higher the degree of HA but at the expense of increased duplicate data and network traffic. Hence, in this level, it is important that you adjust subscriberCount
and liveEndpointCount
to minimally impact the application performance.
FoS simplifies the subscription configuration by overriding the following parameters.
Parameter | Default Value | Description |
---|---|---|
fos |
0 |
Failover Service level. Valid values are 0 , 1 , 2 , 3 . |
subscriberCount |
-1 |
Maximum number of clients allowed to make suscriptions. -1 for all clients. |
liveEndpointCount |
-1 |
Maximum number of client connections allowed. -1 for all clients. |
FoS overrides the default values of the above parameters as shown in the table below. For example, fos: 1
sets subscriberCount: 1
and liveEndpointCount: 2
. These parameters are hard wired and not configurable for FoS 0, 1, and 2. To configure them, select FoS 3. If FoS is not set or invalid, then HaMqttClient
defaults to FoS 0.
fos |
subscriberCount |
liveEndpointCount |
Parameters Configurable? |
---|---|---|---|
0 |
-1 |
-1 |
No |
1 |
1 |
2 |
No |
2 |
2 |
2 |
No |
3 |
-1 | -1 |
Yes |
The above table shows the fixed values that cannot be configured in bold font.
A sticky cluster is a special cluster that sticks to a single broker for all subscriptions. By sticking to one broker, it blocks the delivery of data from other brokers in the cluster. This pattern is particularly useful for bridging brokers with HA intact.
To create a sticky cluster, we also need help from the MQTT brokers. The brokers must be capable of providing the following bridging services.
- Ability to bridge multiple brokers.
- Ability to prevent cyclic (looping) messages. The bridged brokers must not send messages back to the originating broker.
Mosquitto supports both services but it can only bridge two (2) brokers at a time. Unfortunately, this limits the maximum cluster size to two (2) endpoints.
With bridged brokers and FoS set to 1, 2, or 3, we can create a virtual cluster with HA fully intact. These FoS levels stick to a single broker guaranteeing the delivery of non-duplicate data to the application.
A sticky cluster is created by configuring fos
and serverURIs
. For example, the following creates a sticky cluster named enterprise
with FoS 2.
clusters:
- name: enterprise
fos: 2
connections:
- connection:
serverURIs: [tcp://localhost:32001-32002]
To complete the sticky cluster, the brokers themselves must be bridged. The following Mosquitto example configures the tcp://localhost:32001
broker to bridge tcp://localhost:32002
.
The first broker's mosquitto.conf
file:
connection bridged-01
address localhost:32002
remote_clientid bridged-01
cleansession false
notifications false
start_type automatic
topic # both 0
bridge_protocol_version mqttv50
try_private true
❗ The second broker must not be configured with a bridge; otherwise, cyclic (looping) messages will occur.
It is important that you select the right FoS level for your application based on the following guidelines.
-
For non-bridged brokers, always select FoS 0 (default).
-
For bridged brokers, select FoS 2 to avoid data loss during failover.
-
For bridged brokers, select FoS 3 and apply Archetype 7 to attain a higher level of availability and scalability than FoS 2.
-
For bridged brokers, select FoS 1 if your application has resource constraints and can tolerate a brief period of data loss during failover.
PadoGrid Manual
Overview
- Home
- PadoGrid in 5 Minutes
- Quick Start
- Introduction
- Bundle Catalogs
- Building PadoGrid
- Supported Data Grid Products and Downloads
- PadoGrid Components
- Installing PadoGrid
- Root Workspaces Environments (RWEs)
- Initializing PadoGrid
- Bash Auto-Completion
- Viewing PadoGrid Summaries
- Updating Products
- Upgrading PadoGrid
- Migrating Workspaces
- PadoGrid Pods
- Kubernetes
- Docker
- Apps
- Software List
Operations
- Workspace Lifecycle Management
- Creating RWE
- Creating Workspace and Starting Cluster
- Managing Workspaces
- Understanding Workspaces
- Understanding Clusters
- Running Clusters
- Default Port Numbers
- Running Clusters Independent of PadoGrid
- Running Apps
- Understanding Groups
- Running Groups
- Understanding Bundles
- User Bundle Repos
- Using Bundle Templates
- Bundle Repo Guidelines
- User Bundle Catalogs
- Private Bundle Repos
- Gitea Repos
- Running Bundles in Container
- PadoGrid Addon Jars
- Understanding PadoGrid Pods
- Tested Vagrant Boxes
- VM-Enabled Pods
- Multitenancy
- Multitenancy Best Practices
- PadoGrid Configuration Files
Tools
Platforms
Clouds
Pado
Geode/GemFire
- Geode CLASSPATH
- Geode Kubernetes
- Geode Minikube
- Geode Minikube on WSL
- Geode Docker Compose
- Geode Grafana App
- Geode
perf_test
App - Geode WAN Example
- Geode Workspaces on VMs
- Geode on AWS EC2
- Reactivating Geode Workspaces on AWS EC2
Hazelcast/Jet
- Hazelcast CLASSPATH
- Creating Jet Workspace
- Configuring Hazelcast Addon
- HQL Query
- Hazelcast Kubernetes
- Hazelcast GKE
- Hazelcast Minikube
- Hazelcast Minikube on WSL
- Hazelcast Minishift/CDK
- Hazelcast OpenShift
- Hazelcast Docker Compose
- Hazelcast Desktop App
- Hazelcast Grafana App
- Hazelcast
jet_demo
App - Hazelcast
perf_test
App - Hazelcast WAN Example
- Hazelcast Workspaces on VMs
- Hazelcast on AWS EC2
- Reactivating Hazelcast Workspaces on AWS EC2
ComputeDB/SnappyData
Coherence
Hadoop
Kafka/Confluent
Mosquitto
- Mosquitto CLASSPATH
- Mosquitto Overview
- Installing/Building Mosquitto
- Clustering MQTT
- Cluster Archetypes
- Enabling Mosquitto SSL/TLS
- Mosquitto Docker Compose
- MQTT perf_test App
Redis
Spark