Skip to content

Commit

Permalink
Dispose of all previously created subscriptions (#3).
Browse files Browse the repository at this point in the history
  • Loading branch information
Heiko Bornholdt committed Aug 26, 2020
1 parent a393b5b commit 6ff9eac
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 10 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<maven.compiler.release>11</maven.compiler.release>
<akka.version>2.6.8</akka.version>
<netty.version>4.1.51.Final</netty.version>
<guava.version>29.0-jre</guava.version>
</properties>

<build>
Expand Down Expand Up @@ -239,6 +240,13 @@
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 5 additions & 0 deletions wot-servient-binding-mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,10 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import city.sane.wot.thing.form.Form;
import city.sane.wot.thing.form.Operation;
import city.sane.wot.thing.property.ExposedThingProperty;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.typesafe.config.Config;
import io.reactivex.rxjava3.disposables.Disposable;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,18 +40,25 @@ public class MqttProtocolServer implements ProtocolServer {
private static final Logger log = LoggerFactory.getLogger(MqttProtocolServer.class);
private final Map<String, ExposedThing> things = new HashMap<>();
private final RefCountResource<Pair<MqttProtocolSettings, MqttClient>> mqttClientProvider;
private final Multimap<String, Disposable> subscriptions;
private Pair<MqttProtocolSettings, MqttClient> settingsClientPair;

public MqttProtocolServer(Config config) {
mqttClientProvider = SharedMqttClientProvider.singleton(config);
}

MqttProtocolServer(RefCountResource<Pair<MqttProtocolSettings, MqttClient>> mqttClientProvider,
Multimap<String, Disposable> subscriptions,
Pair<MqttProtocolSettings, MqttClient> settingsClientPair) {
this.mqttClientProvider = mqttClientProvider;
this.subscriptions = subscriptions;
this.settingsClientPair = settingsClientPair;
}

public MqttProtocolServer(Config config) {
this(
SharedMqttClientProvider.singleton(config),
HashMultimap.create(),
null
);
}

@Override
public CompletableFuture<Void> start(Servient servient) {
log.info("Start MqttServer");
Expand Down Expand Up @@ -109,6 +120,13 @@ public CompletableFuture<Void> expose(ExposedThing thing) {
@Override
public CompletableFuture<Void> destroy(ExposedThing thing) {
log.info("MqttServer stop exposing '{}' as unique '/{}/*'", thing.getId(), thing.getId());

// dispose all created subscriptions
Collection<Disposable> thingSubscriptions = subscriptions.removeAll(thing.getId());
for (Disposable subscription: thingSubscriptions) {
subscription.dispose();
}

things.remove(thing.getId());

return completedFuture(null);
Expand All @@ -127,7 +145,7 @@ private void exposeProperties(ExposedThing thing, String baseUrl) {
properties.forEach((name, property) -> {
String topic = thing.getId() + "/properties/" + name;

property.observer()
Disposable subscription = property.observer()
.map(optional -> ContentManager.valueToContent(optional.orElse(null)))
.map(content -> new MqttMessage(content.getBody()))
.subscribe(
Expand All @@ -136,6 +154,7 @@ private void exposeProperties(ExposedThing thing, String baseUrl) {
() -> {
}
);
subscriptions.put(thing.getId(), subscription);

String href = baseUrl + topic;
Form form = new Form.Builder()
Expand Down Expand Up @@ -178,7 +197,7 @@ private void exposeEvents(ExposedThing thing, String baseUrl) {
events.forEach((name, event) -> {
String topic = thing.getId() + "/events/" + name;

event.observer()
Disposable subscription = event.observer()
.map(optional -> ContentManager.valueToContent(optional.orElse(null)))
.map(content -> new MqttMessage(content.getBody()))
.subscribe(
Expand All @@ -187,6 +206,7 @@ private void exposeEvents(ExposedThing thing, String baseUrl) {
() -> {
}
);
subscriptions.put(thing.getId(), subscription);

String href = baseUrl + topic;
Form form = new Form.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import city.sane.wot.thing.action.ExposedThingAction;
import city.sane.wot.thing.event.ExposedThingEvent;
import city.sane.wot.thing.property.ExposedThingProperty;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
Expand All @@ -22,6 +25,7 @@ public class MqttProtocolServerTest {
private MqttProtocolSettings settings;
private MqttClient mqttClient;
private Pair<MqttProtocolSettings, MqttClient> settingsClientPair;
private Multimap<String, Disposable> subcriptions;
private ExposedThing thing;
private ExposedThingProperty<Object> property;
private ExposedThingAction action;
Expand All @@ -33,6 +37,7 @@ public void setUp() {
mqttClient = mock(MqttClient.class);
refCountResource = mock(RefCountResource.class);
settingsClientPair = mock(Pair.class);
subcriptions = HashMultimap.create();
thing = mock(ExposedThing.class);
property = mock(ExposedThingProperty.class);
action = mock(ExposedThingAction.class);
Expand All @@ -45,7 +50,7 @@ public void exposeShouldPublishThingDescription() throws MqttException {
when(settingsClientPair.first()).thenReturn(settings);
when(settingsClientPair.second()).thenReturn(mqttClient);
when(settings.getBroker()).thenReturn("tcp://dummy-broker");
server = new MqttProtocolServer(refCountResource, settingsClientPair);
server = new MqttProtocolServer(refCountResource, subcriptions, settingsClientPair);
server.expose(thing);

verify(mqttClient).publish(eq("counter"), any());
Expand All @@ -60,7 +65,7 @@ public void exposeShouldExposeProperties() {
when(settingsClientPair.second()).thenReturn(mqttClient);
when(settings.getBroker()).thenReturn("tcp://dummy-broker");

server = new MqttProtocolServer(refCountResource, settingsClientPair);
server = new MqttProtocolServer(refCountResource, subcriptions, settingsClientPair);
server.expose(thing);

verify(property).addForm(any());
Expand All @@ -74,7 +79,7 @@ public void exposeShouldExposeActions() {
when(settingsClientPair.second()).thenReturn(mqttClient);
when(settings.getBroker()).thenReturn("tcp://dummy-broker");

server = new MqttProtocolServer(refCountResource, settingsClientPair);
server = new MqttProtocolServer(refCountResource, subcriptions, settingsClientPair);
server.expose(thing);

verify(action).addForm(any());
Expand All @@ -89,7 +94,7 @@ public void exposeShouldExposeEvents() {
when(settingsClientPair.second()).thenReturn(mqttClient);
when(settings.getBroker()).thenReturn("tcp://dummy-broker");

server = new MqttProtocolServer(refCountResource, settingsClientPair);
server = new MqttProtocolServer(refCountResource, subcriptions, settingsClientPair);
server.expose(thing);

verify(event).addForm(any());
Expand Down

0 comments on commit 6ff9eac

Please sign in to comment.