Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mqtt] Discovery services shall not unsubscribe unless they have already subscribed #10566

Merged
merged 5 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand Down Expand Up @@ -48,11 +49,32 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp

private @Nullable ScheduledFuture<?> scheduledStop;

private AtomicBoolean isSubscribed;

public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
this.subscribeTopic = baseTopic;
this.timeout = timeout;
isSubscribed = new AtomicBoolean(false);
}

/**
* Only subscribe if we were not already subscribed
*/
private void subscribe() {
if (!isSubscribed.getAndSet(true)) {
getDiscoveryService().subscribe(this, subscribeTopic);
}
}

/**
* Only unsubscribe if we were already subscribed
*/
private void unSubscribe() {
if (isSubscribed.getAndSet(false)) {
getDiscoveryService().unsubscribe(this);
}
}

/**
Expand Down Expand Up @@ -94,7 +116,7 @@ protected void startScan() {
return;
}
resetTimeout();
getDiscoveryService().subscribe(this, subscribeTopic);
subscribe();
}

@Override
Expand All @@ -104,7 +126,7 @@ protected synchronized void stopScan() {
return;
}
stopTimeout();
getDiscoveryService().unsubscribe(this);
unSubscribe();
super.stopScan();
}

Expand All @@ -118,11 +140,11 @@ public synchronized void abortScan() {
protected void startBackgroundDiscovery() {
// Remove results that are restored after a restart
removeOlderResults(new Date().getTime());
getDiscoveryService().subscribe(this, subscribeTopic);
subscribe();
}

@Override
protected void stopBackgroundDiscovery() {
getDiscoveryService().unsubscribe(this);
unSubscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
*/
package org.openhab.binding.mqtt.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -54,11 +52,22 @@
@Component(service = { ThingHandlerFactory.class,
MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {

private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
.collect(Collectors.toSet());

private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();

/**
* This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value),
* where the Set itself is a list of participants which are subscribed to the respective Topic.
*/
protected final Map<String, Set<MQTTTopicDiscoveryParticipant>> discoveryTopics = new ConcurrentHashMap<>();

/**
* This Set contains a list of all the Broker handlers that have been created by this factory
*/
protected final Set<AbstractBrokerHandler> handlers = Collections
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));

Expand All @@ -75,12 +84,13 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) {
}

/**
* Add the given broker connection to all listeners.
* Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective
* list of listeners, and register the respective new listener and topic with the given new broker handler.
*/
protected void createdHandler(AbstractBrokerHandler handler) {
handlers.add(handler);
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.forEach(listener -> {
discoveryTopics.forEach((topic, listeners) -> {
listeners.forEach(listener -> {
handler.registerDiscoveryListener(listener, topic);
});
});
Expand Down Expand Up @@ -111,24 +121,33 @@ protected void createdHandler(AbstractBrokerHandler handler) {
/**
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
* a MQTT topic that is registered on all available broker connections.
*
* Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that
* list, and registers itself and the respective topic with all the known brokers.
*/
@Override
@SuppressWarnings("null")
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
t -> new ArrayList<>());
listenerList.add(listener);
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
Set<MQTTTopicDiscoveryParticipant> listeners = discoveryTopics.computeIfAbsent(topic,
t -> ConcurrentHashMap.newKeySet());
if (listeners.add(listener)) {
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
}
}

/**
* Unsubscribe a listener from all available broker connections.
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from
* a MQTT topic that is registered on all available broker connections.
*
* Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and
* unregisters itself and the respective topic from all the known brokers.
*/
@Override
@SuppressWarnings("null")
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.remove(listener);
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
discoveryTopics.forEach((topic, listeners) -> {
if (listeners.remove(listener)) {
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
}
});
}

Expand Down