From 6d4b89071db404356d26c3e06be7fd73051bb587 Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Fri, 23 Apr 2021 18:46:43 +0100 Subject: [PATCH 1/5] [mqqt] do not allow unsubscribe unless already subscribed Signed-off-by: Andrew Fiddian-Green --- .../mqtt/discovery/AbstractMQTTDiscovery.java | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java index 5b83320b1de9e..c528ba6b99afe 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java @@ -48,11 +48,34 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp private @Nullable ScheduledFuture scheduledStop; + private boolean isSubscribed; + public AbstractMQTTDiscovery(@Nullable Set supportedThingTypes, int timeout, boolean backgroundDiscoveryEnabledByDefault, String baseTopic) { super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault); this.subscribeTopic = baseTopic; this.timeout = timeout; + isSubscribed = false; + } + + /** + * Only subscribe if we were not already subscribed + */ + private void subscribe() { + if (!isSubscribed) { + getDiscoveryService().subscribe(this, subscribeTopic); + isSubscribed = true; + } + } + + /** + * Only unsubscribe if we were already subscribed + */ + private void unSubscribe() { + if (isSubscribed) { + getDiscoveryService().unsubscribe(this); + isSubscribed = false; + } } /** @@ -94,7 +117,7 @@ protected void startScan() { return; } resetTimeout(); - getDiscoveryService().subscribe(this, subscribeTopic); + subscribe(); } @Override @@ -104,7 +127,7 @@ protected synchronized void stopScan() { return; } stopTimeout(); - getDiscoveryService().unsubscribe(this); + unSubscribe(); super.stopScan(); } @@ -118,11 +141,11 @@ public synchronized void abortScan() { protected void startBackgroundDiscovery() { // Remove results that are restored after a restart removeOlderResults(new Date().getTime()); - getDiscoveryService().subscribe(this, subscribeTopic); + subscribe(); } @Override protected void stopBackgroundDiscovery() { - getDiscoveryService().unsubscribe(this); + unSubscribe(); } } From c0d00c9be0eca1ce989bdf35cc0b539e9786a968 Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Sun, 25 Apr 2021 14:56:34 +0100 Subject: [PATCH 2/5] [mqtt] avoid duplicates in listenerList; only (un)register topicd if listener in listenerList Signed-off-by: Andrew Fiddian-Green --- .../internal/MqttBrokerHandlerFactory.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java index 5a8eec82b3ef1..d983d4b9245b2 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java @@ -12,10 +12,9 @@ */ package org.openhab.binding.mqtt.internal; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; @@ -58,7 +57,7 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements .of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER) .collect(Collectors.toSet()); private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class); - protected final Map> discoveryTopics = new HashMap<>(); + protected final Map> discoveryTopics = new HashMap<>(); protected final Set handlers = Collections .synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); @@ -113,11 +112,12 @@ protected void createdHandler(AbstractBrokerHandler handler) { * a MQTT topic that is registered on all available broker connections. */ @Override + @SuppressWarnings("null") public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { - List listenerList = discoveryTopics.computeIfAbsent(topic, - t -> new ArrayList<>()); - listenerList.add(listener); - handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic)); + Set listenerList = discoveryTopics.computeIfAbsent(topic, t -> new HashSet<>()); + if (listenerList.add(listener)) { + handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic)); + } } /** @@ -127,8 +127,9 @@ public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { @SuppressWarnings("null") public void unsubscribe(MQTTTopicDiscoveryParticipant listener) { discoveryTopics.forEach((topic, listenerList) -> { - listenerList.remove(listener); - handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic)); + if (listenerList.remove(listener)) { + handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic)); + } }); } From 1abcecac7ea211c8d26c4ae23329df01a8516cf8 Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Sun, 25 Apr 2021 16:56:22 +0100 Subject: [PATCH 3/5] [mqtt] rename "listenerList" local variables to "listeners" Signed-off-by: Andrew Fiddian-Green --- .../mqtt/internal/MqttBrokerHandlerFactory.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java index d983d4b9245b2..ffed4ddbaf4c6 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java @@ -78,8 +78,8 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) { */ protected void createdHandler(AbstractBrokerHandler handler) { handlers.add(handler); - discoveryTopics.forEach((topic, listenerList) -> { - listenerList.forEach(listener -> { + discoveryTopics.forEach((topic, listeners) -> { + listeners.forEach(listener -> { handler.registerDiscoveryListener(listener, topic); }); }); @@ -114,8 +114,8 @@ protected void createdHandler(AbstractBrokerHandler handler) { @Override @SuppressWarnings("null") public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { - Set listenerList = discoveryTopics.computeIfAbsent(topic, t -> new HashSet<>()); - if (listenerList.add(listener)) { + Set listeners = discoveryTopics.computeIfAbsent(topic, t -> new HashSet<>()); + if (listeners.add(listener)) { handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic)); } } @@ -126,8 +126,8 @@ public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { @Override @SuppressWarnings("null") public void unsubscribe(MQTTTopicDiscoveryParticipant listener) { - discoveryTopics.forEach((topic, listenerList) -> { - if (listenerList.remove(listener)) { + discoveryTopics.forEach((topic, listeners) -> { + if (listeners.remove(listener)) { handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic)); } }); From 1aaa0bcf02e84d28b7a04e010d42b0728825cdfe Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Wed, 28 Apr 2021 13:09:44 +0100 Subject: [PATCH 4/5] [mqtt] added java docs Signed-off-by: Andrew Fiddian-Green --- .../internal/MqttBrokerHandlerFactory.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java index ffed4ddbaf4c6..d0e71187c834e 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java @@ -53,11 +53,22 @@ @Component(service = { ThingHandlerFactory.class, MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory") public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService { + private static final Set SUPPORTED_THING_TYPES_UIDS = Stream .of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER) .collect(Collectors.toSet()); + private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class); + + /** + * This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value), + * where the Set itself is a list of participants which are subscribed to the respective Topic. + */ protected final Map> discoveryTopics = new HashMap<>(); + + /** + * This Set contains a list of all the Broker handlers that have been created by this factory + */ protected final Set handlers = Collections .synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); @@ -74,7 +85,8 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) { } /** - * Add the given broker connection to all listeners. + * Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective + * list of listeners, and register the respective new listener and topic with the given new broker handler. */ protected void createdHandler(AbstractBrokerHandler handler) { handlers.add(handler); @@ -110,6 +122,9 @@ protected void createdHandler(AbstractBrokerHandler handler) { /** * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to * a MQTT topic that is registered on all available broker connections. + * + * Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that + * list, and registers itself and the respective topic with all the known brokers. */ @Override @SuppressWarnings("null") @@ -121,10 +136,13 @@ public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { } /** - * Unsubscribe a listener from all available broker connections. + * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from + * a MQTT topic that is registered on all available broker connections. + * + * Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and + * unregisters itself and the respective topic from all the known brokers. */ @Override - @SuppressWarnings("null") public void unsubscribe(MQTTTopicDiscoveryParticipant listener) { discoveryTopics.forEach((topic, listeners) -> { if (listeners.remove(listener)) { From d394f2f7bf8d82239edc865f897e875cd4621b6e Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Sun, 2 May 2021 19:56:58 +0100 Subject: [PATCH 5/5] [mqtt] reviewer requested fixes to concurrency issues Signed-off-by: Andrew Fiddian-Green --- .../binding/mqtt/discovery/AbstractMQTTDiscovery.java | 11 +++++------ .../mqtt/internal/MqttBrokerHandlerFactory.java | 8 ++++---- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java index c528ba6b99afe..0732387a221ff 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java @@ -16,6 +16,7 @@ import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -48,23 +49,22 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp private @Nullable ScheduledFuture scheduledStop; - private boolean isSubscribed; + private AtomicBoolean isSubscribed; public AbstractMQTTDiscovery(@Nullable Set supportedThingTypes, int timeout, boolean backgroundDiscoveryEnabledByDefault, String baseTopic) { super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault); this.subscribeTopic = baseTopic; this.timeout = timeout; - isSubscribed = false; + isSubscribed = new AtomicBoolean(false); } /** * Only subscribe if we were not already subscribed */ private void subscribe() { - if (!isSubscribed) { + if (!isSubscribed.getAndSet(true)) { getDiscoveryService().subscribe(this, subscribeTopic); - isSubscribed = true; } } @@ -72,9 +72,8 @@ private void subscribe() { * Only unsubscribe if we were already subscribed */ private void unSubscribe() { - if (isSubscribed) { + if (isSubscribed.getAndSet(false)) { getDiscoveryService().unsubscribe(this); - isSubscribed = false; } } diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java index d0e71187c834e..491f6e1b12ce6 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java @@ -13,11 +13,10 @@ package org.openhab.binding.mqtt.internal; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -64,7 +63,7 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements * This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value), * where the Set itself is a list of participants which are subscribed to the respective Topic. */ - protected final Map> discoveryTopics = new HashMap<>(); + protected final Map> discoveryTopics = new ConcurrentHashMap<>(); /** * This Set contains a list of all the Broker handlers that have been created by this factory @@ -129,7 +128,8 @@ protected void createdHandler(AbstractBrokerHandler handler) { @Override @SuppressWarnings("null") public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { - Set listeners = discoveryTopics.computeIfAbsent(topic, t -> new HashSet<>()); + Set listeners = discoveryTopics.computeIfAbsent(topic, + t -> ConcurrentHashMap.newKeySet()); if (listeners.add(listener)) { handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic)); }