Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Integration hits a bug in Paho client with $share/ MQTT subscriptions #8879

Closed
vhbcm opened this issue Feb 1, 2024 · 4 comments
Closed

Comments

@vhbcm
Copy link

vhbcm commented Feb 1, 2024

Spring Integration hits a bug in Paho client with $share/ MQTT subscriptions

In what version(s) of Spring Integration are you seeing this issue?

For example:

6.1.5

Describe the bug

Spring Integration hits a bug (eclipse/paho.mqtt.java/issues/827) in Paho MQTT client with $share/ MQTT subscriptions. This bug can be worked around by using subscription ID by providing it in MqttProperties when calling, e.g., org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho.mqttv5.common.MqttSubscription[], java.lang.Object, org.eclipse.paho.mqttv5.client.MqttActionListener, org.eclipse.paho.mqttv5.client.IMqttMessageListener, org.eclipse.paho.mqttv5.common.packet.MqttProperties). Unfortunately, Spring Integration currently calls only org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho.mqttv5.common.MqttSubscription, org.eclipse.paho.mqttv5.client.IMqttMessageListener) from org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter#addTopic and does not provide subscription ID.

To Reproduce

Subscribe on $share/sharegroup/topic using Mqttv5PahoMessageDrivenChannelAdapter and send message to topic. The message will be received but it will be rejected by Paho itself because it is comparing the filter and the message topic textually without removing the $share/sharegroup/ prefix.

Expected behavior

Mqttv5PahoMessageDrivenChannelAdapter should be able to subscribe to shared MQTT topics.

Sample

N/A

@vhbcm vhbcm added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Feb 1, 2024
@artembilan
Copy link
Member

Are you referring to this property:

	/**
	 * Subscription Identifier. (Subscribe Only) See
	 * {@link MqttProperties#getSubscriptionIdentifier()}
	 * 
	 * @param subscriptionIdentifier
	 *            The Subscription Identifier.
	 */
	public void setSubscriptionIdentifier(Integer subscriptionIdentifier) {

?

I think there might be some other props which could be set for this or that use-case.
So, I guess something like Mqttv5PahoMessageDrivenChannelAdapter.setSubscriptionMqttProperties(MqttProperties) would be sufficient.
Although I see on this sub-set of valid props:

	private static final Byte[] validProperties = { MqttProperties.SUBSCRIPTION_IDENTIFIER,
			MqttProperties.SUBSCRIPTION_IDENTIFIER_SINGLE,
			MqttProperties.USER_DEFINED_PAIR_IDENTIFIER };

So, I'm not sure if we have to expose the whole MqttProperties or just subscriptionIdentifier on the Mqttv5PahoMessageDrivenChannelAdapter.
Although with the former we would nail any future potential requests for those properties.

If that would work as a workaround, then I'm more than happy to accept such a contribution.

Thanks

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter in: mqtt and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Feb 1, 2024
@wilx
Copy link
Contributor

wilx commented Feb 1, 2024

Are you referring to this property:

	/**
	 * Subscription Identifier. (Subscribe Only) See
	 * {@link MqttProperties#getSubscriptionIdentifier()}
	 * 
	 * @param subscriptionIdentifier
	 *            The Subscription Identifier.
	 */
	public void setSubscriptionIdentifier(Integer subscriptionIdentifier) {

?

Yes.

I think there might be some other props which could be set for this or that use-case. So, I guess something like Mqttv5PahoMessageDrivenChannelAdapter.setSubscriptionMqttProperties(MqttProperties) would be sufficient. Although I see on this sub-set of valid props:

	private static final Byte[] validProperties = { MqttProperties.SUBSCRIPTION_IDENTIFIER,
			MqttProperties.SUBSCRIPTION_IDENTIFIER_SINGLE,
			MqttProperties.USER_DEFINED_PAIR_IDENTIFIER };

So, I'm not sure if we have to expose the whole MqttProperties or just subscriptionIdentifier on the Mqttv5PahoMessageDrivenChannelAdapter. Although with the former we would nail any future potential requests for those properties.

I am not sure if it needs to be exposed at all. The subscription identifier can be an internal detail. It is only useful for the client's internal mapping, I think. At least in my use case.

If that would work as a workaround, then I'm more than happy to accept such a contribution.

Thanks

@artembilan
Copy link
Member

Well, that would be easier, but then we need to understand what is that number supposed to be.
Some hash from the getClientId()? Some random value we chose on start?
Well, we still may have it to be set externally if anyway has some use-case where they need to be able to identify those subscription by some other meaning.
And have it defaulted to something random or that hash.

The point is that we might need to back-port this to previous version to provide the support for lost functionality.

Here is some discussion as well: https://stackoverflow.com/questions/75950944/shared-subscription-with-spring-integration-mqttv5-not-receiving-messages

@wilx
Copy link
Contributor

wilx commented Feb 1, 2024

The subscription identifier is an integer:

3.8.2.1.2 Subscription Identifier

11 (0x0B) Byte, Identifier of the Subscription Identifier.

Followed by a Variable Byte Integer representing the identifier of the subscription. The Subscription Identifier can have the value of 1 to 268,435,455. It is a Protocol Error if the Subscription Identifier has a value of 0. It is a Protocol Error to include the Subscription Identifier more than once.

wilx added a commit to wilx/spring-integration that referenced this issue Feb 1, 2024
wilx added a commit to wilx/spring-integration that referenced this issue Feb 1, 2024
@artembilan artembilan added this to the 6.3.0-M1 milestone Feb 2, 2024
artembilan pushed a commit that referenced this issue Feb 2, 2024
Fixes: #8879

To work around the problem with `$share/` subscriptions the `Mqttv5PahoMessageDrivenChannelAdapter` must provide a `subscriptionIdentifier` into `MqttProperties` on `subscribe()`

* Introduce a `Mqttv5PahoMessageDrivenChannelAdapter.subscriptionIdentifierCounter` according to the MQTT specification:
> 3.8.2.1.2 Subscription Identifier: [..]The Subscription Identifier is associated with any subscription created or modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier, it is stored with the subscription.

This one is associated with the MQTT session for the current subscriber and does not interfere into other sessions even if identifier is same from the counter.
It works because the Subscription identifier is per session and because you cannot have multiple connection with the same client ID.

**Cherry-pick to `6.2.x` & `6.1.x`**
# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
artembilan pushed a commit that referenced this issue Feb 2, 2024
Fixes: #8879

To work around the problem with `$share/` subscriptions the `Mqttv5PahoMessageDrivenChannelAdapter` must provide a `subscriptionIdentifier` into `MqttProperties` on `subscribe()`

* Introduce a `Mqttv5PahoMessageDrivenChannelAdapter.subscriptionIdentifierCounter` according to the MQTT specification:
> 3.8.2.1.2 Subscription Identifier: [..]The Subscription Identifier is associated with any subscription created or modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier, it is stored with the subscription.

This one is associated with the MQTT session for the current subscriber and does not interfere into other sessions even if identifier is same from the counter.
It works because the Subscription identifier is per session and because you cannot have multiple connection with the same client ID.

**Cherry-pick to `6.2.x` & `6.1.x`**
# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants