Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quorum queues support for MQTT #4254

Merged
merged 2 commits into from
Mar 31, 2022
Merged

Add quorum queues support for MQTT #4254

merged 2 commits into from
Mar 31, 2022

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Mar 10, 2022

Enable the quorum queue for MQTT only if CleanSession is False.
QQs don't support auto-delete flag so in case Clean session is True
the queue will be a classic queue.

Add another group test non_parallel_tests_quorum.
For Mixed test the quorum_queue feature flag must be enabled.

How to test:

make run-broker RABBITMQ_CONFIG_FILE=myfile.conf PLUGINS='rabbitmq_mqtt rabbitmq_management'

where myfile.conf is:

mqtt.queue_type = quorum

python client recv:

import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe("paho/temperature", qos=1)


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


def on_disconnect(client, userdata, rc):
    print("Disconnected: " + str(rc))


client = mqtt.Client(client_id="myclient", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

client.connect("localhost", 1883, 60)
client.loop_forever()

send:

import datetime
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

client = mqtt.Client()
client.on_connect = on_connect

client.connect("localhost", 1883, 60)
for i in range(110):
    client.publish("paho/temperature", "temperature {}".format(datetime.datetime.now()), qos=1)
    time.sleep(1)

client.loop_forever()

Important:
Read this PR #4185 for all the story.
I had to create another because of a wrong rebase.

@pjk25 I created different tests.

@Gsantomaggio
Copy link
Member Author

@binarin @michaelklishin WDYT?

@lukebakken
Copy link
Collaborator

@Gsantomaggio @kjnilsson -

QQs don't support auto-delete flag

Just curious, is it possible to purge a quorum queue?

@michaelklishin
Copy link
Member

It is possible to purge a quorum queue, yes.

@lukebakken
Copy link
Collaborator

I'm planning to test and review this week!

Copy link
Collaborator

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested out combinations of quorum and classic queues for MQTT with CleanSession True / False

I just have the one comment, otherwise 👍

@pjk25
Copy link
Contributor

pjk25 commented Mar 30, 2022

We want to backport this into 3.10.0, correct?

@pjk25 pjk25 removed the request for review from binarin March 30, 2022 06:27
@michaelklishin
Copy link
Member

We do but I haven't tested it yet.

@pjk25
Copy link
Contributor

pjk25 commented Mar 30, 2022

We do but I haven't tested it yet.

I have applied the label. Do we want to merge this into master and let the backport stay open until we are done testing?

@michaelklishin
Copy link
Member

I should get to it within the next 24 hours.

Enable the quorum queue for MQTT only if CleanSession is False.
QQs don't support auto-delete flag so in case Clean session is True
the queue will be a classic queue.

Add another group test non_parallel_tests_quorum.
For Mixed test the quorum_queue feature flag must be enabled.

Add log message
@michaelklishin
Copy link
Member

If a client uses a durable topic without mqtt.queue_type = quorum and then switches to use it, it will run into an exception because queue type won't match. This is not a novel problem with this PR but it doesn't do anything to work around it.
It's not clear whether it realistically can use passive declares in every single case.

So the only upgrade strategy is then to drain all topics with durable data and delete all durable queues that begin with ^mqtt. This should be clearly documented.

Alternatively we can change the naming strategy used by this plugin. Then existing data will be "missing" for such clients.
Not a significant improvement if you ask me.

@lukebakken
Copy link
Collaborator

Alternatively we can change the naming strategy used by this plugin

Seems reasonable to me. The user could then shovel messages to the new queue.

@michaelklishin
Copy link
Member

It can be quite a pain with thousands of queues and diverse client IDs (used in queue names). So I think this is a feature that only new installations realistically can adopt.

@michaelklishin
Copy link
Member

Since this feature is opt-in, we see the above limitation as acceptable.

@michaelklishin
Copy link
Member

I will rename the rabbitmq.conf setting as this only changes the type of durable queues (those used by QoS 1 clients).

@michaelklishin michaelklishin merged commit d01d02c into master Mar 31, 2022
@michaelklishin michaelklishin deleted the mqtt_quorum branch March 31, 2022 16:18
michaelklishin added a commit that referenced this pull request Mar 31, 2022
Add quorum queues support for MQTT

(cherry picked from commit d01d02c)
michaelklishin added a commit that referenced this pull request Mar 31, 2022
Add quorum queues support for MQTT (backport #4254)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants