Skip to content

Commit

Permalink
Merge pull request #8 from Red1L/plugin_update
Browse files Browse the repository at this point in the history
Mqtt client information available through a plugin Facet
  • Loading branch information
adrienlauer committed Nov 21, 2016
2 parents 4be98ad + 3d4c302 commit 1e0b164
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Version 1.2.1
* [new] Mqtt client information accessible through a plugin Facet

# Version 1.2.0 (2016-09-30)
* [new] Support of policies for handling rejected tasks (received messages): multiple behaviors can be specified, CALLER_RUN(default), ABORT, DISCARD and DISCARD_OLDEST
* [chg] The MqttClient reconnection feature is also supported at kernel startup
Expand Down
16 changes: 16 additions & 0 deletions src/it/java/org/seedstack/mqtt/MqttIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.junit.runner.RunWith;
import org.seedstack.mqtt.fixtures.BrokerFixture;
import org.seedstack.mqtt.fixtures.TestMqttListener;
import org.seedstack.mqtt.spi.MqttClientInfo;
import org.seedstack.mqtt.spi.MqttInfo;
import org.seedstack.seed.it.AfterKernel;
import org.seedstack.seed.it.BeforeKernel;
import org.seedstack.seed.it.SeedITRunner;
Expand All @@ -43,6 +45,8 @@ public class MqttIT {
@Named("org.seedstack.mqtt.fixtures.TestMqttListener")
MqttCallback mqttCallback;

@Inject
MqttInfo mqttInfo;

@BeforeKernel
public static void startBroker() throws Exception {
Expand Down Expand Up @@ -78,4 +82,16 @@ public String call() throws Exception {
String result = task.get(2, TimeUnit.SECONDS);
Assertions.assertThat(result).isEqualTo(expected);
}

@Test
public void mqttInfoFacetTest() {
Assertions.assertThat(mqttInfo).isNotNull();
Assertions.assertThat(mqttInfo.getClientNames()).hasSize(5);
for (String clientId : mqttInfo.getClientNames()) {
MqttClientInfo mqttClientInfo = mqttInfo.getClientInfo(clientId);
Assertions.assertThat(mqttClientInfo).isNotNull();
Assertions.assertThat(mqttClientInfo.getClientId()).isNotNull();
Assertions.assertThat(mqttInfo.getClientStats(clientId)).isNull();
}
}
}
49 changes: 49 additions & 0 deletions src/it/java/org/seedstack/mqtt/fixtures/DummyPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright (c) 2013-2016, The SeedStack authors <http://seedstack.org>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.seedstack.mqtt.fixtures;

import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
import io.nuun.kernel.api.plugin.InitState;
import io.nuun.kernel.api.plugin.context.InitContext;
import io.nuun.kernel.core.AbstractPlugin;
import org.seedstack.mqtt.internal.MqttPlugin;
import org.seedstack.mqtt.spi.MqttInfo;

import java.util.Collection;

public class DummyPlugin extends AbstractPlugin {

private MqttInfo mqttInfo;

@Override
public String name() {
return "DummyMqttPlugin";
}

@Override
public InitState init(InitContext initContext) {
mqttInfo = initContext.dependency(MqttInfo.class);
return InitState.INITIALIZED;
}

@Override
public Collection<Class<?>> requiredPlugins() {
return Lists.<Class<?>>newArrayList(MqttInfo.class);
}

@Override
public Object nativeUnitModule() {
return new AbstractModule() {
@Override
protected void configure() {
bind(MqttInfo.class).toInstance(mqttInfo);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.seedstack.mqtt.fixtures.DummyPlugin
68 changes: 46 additions & 22 deletions src/main/java/org/seedstack/mqtt/internal/MqttPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,39 @@
*/
package org.seedstack.mqtt.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.Lists;
import io.nuun.kernel.api.plugin.InitState;
import io.nuun.kernel.api.plugin.context.Context;
import io.nuun.kernel.api.plugin.context.InitContext;
import io.nuun.kernel.api.plugin.request.ClasspathScanRequest;
import io.nuun.kernel.core.AbstractPlugin;
import org.apache.commons.configuration.Configuration;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.*;
import org.kametic.specifications.Specification;
import org.seedstack.mqtt.MqttListener;
import org.seedstack.mqtt.MqttPublishHandler;
import org.seedstack.mqtt.MqttRejectHandler;
import org.seedstack.mqtt.MqttRejectedExecutionHandler;
import org.seedstack.mqtt.spi.MqttClientInfo;
import org.seedstack.mqtt.spi.MqttClientStats;
import org.seedstack.mqtt.spi.MqttInfo;
import org.seedstack.seed.Application;
import org.seedstack.seed.SeedException;
import org.seedstack.seed.core.internal.application.ApplicationPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

import io.nuun.kernel.api.plugin.InitState;
import io.nuun.kernel.api.plugin.context.Context;
import io.nuun.kernel.api.plugin.context.InitContext;
import io.nuun.kernel.api.plugin.request.ClasspathScanRequest;
import io.nuun.kernel.core.AbstractPlugin;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

/**
* This plugin provides MQTT support through a plain configuration. It uses Paho
* library.
*
* @author thierry.bouvet@mpsa.com
*/
public class MqttPlugin extends AbstractPlugin {
public class MqttPlugin extends AbstractPlugin implements MqttInfo {

private static final String CONNECTION_CLIENT = "client";
private static final String CONNECTION_CLIENTS = "clients";
Expand Down Expand Up @@ -81,7 +74,6 @@ public class MqttPlugin extends AbstractPlugin {
private ConcurrentHashMap<String, MqttCallbackAdapter> mqttCallbackAdapters = new ConcurrentHashMap<String, MqttCallbackAdapter>();



@Override
public String name() {
return "mqtt";
Expand Down Expand Up @@ -320,4 +312,36 @@ public void start(Context context) {
entry.getValue().start();
}
}


@Override
public Set<String> getClientNames() {
return Collections.unmodifiableSet(new HashSet<String>(Collections.list(mqttClientDefinitions.keys())));
}

@Override
public MqttClientInfo getClientInfo(String id) {
MqttClientDefinition mqttClientDefinition = mqttClientDefinitions.get(id);
MqttConnectOptions mqttConnectOptions = mqttClientDefinition.getConnectOptionsDefinition().getMqttConnectOptions();
MqttClientInfo mqttClientInfo = new MqttClientInfo();
mqttClientInfo.setClientId(mqttClientDefinition.getClientId());
mqttClientInfo.setMqttPoolConfiguration(mqttClientDefinition.getPoolDefinition().getMqttPoolConfiguration());
mqttClientInfo.setUri(mqttClientDefinition.getUri());
mqttClientInfo.setReconnectionInterval(mqttClientDefinition.getReconnectionInterval());
if (mqttClientDefinition.getListenerDefinition() != null) {
mqttClientInfo.setTopicFilters(mqttClientDefinition.getListenerDefinition().getTopicFilter());
}
if (mqttConnectOptions != null) {
mqttClientInfo.setKeepAliveInterval(mqttConnectOptions.getKeepAliveInterval());
mqttClientInfo.setCleanSession(mqttConnectOptions.isCleanSession());
mqttClientInfo.setConnectionTimeout(mqttConnectOptions.getConnectionTimeout());
mqttClientInfo.setMqttVersion(mqttConnectOptions.getMqttVersion());
}
return mqttClientInfo;
}

@Override
public MqttClientStats getClientStats(String id) {
return null;
}
}
18 changes: 12 additions & 6 deletions src/main/java/org/seedstack/mqtt/internal/MqttPoolDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.commons.configuration.Configuration;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.seedstack.mqtt.MqttRejectedExecutionHandler;
import org.seedstack.mqtt.spi.MqttPoolConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,19 +46,21 @@ class MqttPoolDefinition {
private Class<? extends MqttRejectedExecutionHandler> rejectHandlerClass;
private Boolean available = Boolean.TRUE;
private ThreadPoolExecutor threadPoolExecutor;
private MqttPoolConfiguration mqttPoolConfiguration;
private static final Logger LOGGER = LoggerFactory.getLogger(MqttPoolDefinition.class);


public MqttPoolDefinition(Configuration configuration) {
this.available = configuration.getBoolean(POOL_ENABLED, Boolean.FALSE);
if (this.available) {
int coreSize = configuration.getInt(POOL_CORE_SIZE, DEFAULT_CORE_SIZE);
int maxSize = configuration.getInt(POOL_MAX_SIZE, DEFAULT_MAX_SIZE);
int queueSize = configuration.getInt(POOL_QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
int keepAlive = configuration.getInt(POOL_KEEP_ALIVE, DEFAULT_KEEP_ALIVE);
mqttPoolConfiguration = new MqttPoolConfiguration();
mqttPoolConfiguration.setCoreSize(configuration.getInt(POOL_CORE_SIZE, DEFAULT_CORE_SIZE));
mqttPoolConfiguration.setMaxSize(configuration.getInt(POOL_MAX_SIZE, DEFAULT_MAX_SIZE));
mqttPoolConfiguration.setQueueSize(configuration.getInt(POOL_QUEUE_SIZE, DEFAULT_QUEUE_SIZE));
mqttPoolConfiguration.setKeepAlive(configuration.getInt(POOL_KEEP_ALIVE, DEFAULT_KEEP_ALIVE));
rejectedExecutionPolicy = getRejectedExecutionPolicy(configuration);
this.threadPoolExecutor = new ThreadPoolExecutor(coreSize, maxSize, keepAlive, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
this.threadPoolExecutor = new ThreadPoolExecutor(mqttPoolConfiguration.getCoreSize(), mqttPoolConfiguration.getMaxSize(), mqttPoolConfiguration.getKeepAlive(), TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(mqttPoolConfiguration.getQueueSize()));
threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler());
}
}
Expand Down Expand Up @@ -110,4 +113,7 @@ public ThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}

public MqttPoolConfiguration getMqttPoolConfiguration() {
return mqttPoolConfiguration;
}
}
114 changes: 114 additions & 0 deletions src/main/java/org/seedstack/mqtt/spi/MqttClientInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright (c) 2013-2016, The SeedStack authors <http://seedstack.org>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.seedstack.mqtt.spi;

/**
* Information related to an Mqtt client. Can be used for monitoring purpose.
*/
public class MqttClientInfo {

private String clientId;

private String[] topicFilters;

private String mqttReconnectionMode;

private MqttPoolConfiguration mqttPoolConfiguration;

private int reconnectionInterval;

public String uri;

private int keepAliveInterval;

private boolean cleanSession;

private int mqttVersion;

private int connectionTimeout;

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String[] getTopicFilters() {
return topicFilters;
}

public void setTopicFilters(String[] topicFilters) {
this.topicFilters = topicFilters;
}

public String getMqttReconnectionMode() {
return mqttReconnectionMode;
}

public void setMqttReconnectionMode(String mqttReconnectionMode) {
this.mqttReconnectionMode = mqttReconnectionMode;
}

public MqttPoolConfiguration getMqttPoolConfiguration() {
return mqttPoolConfiguration;
}

public void setMqttPoolConfiguration(MqttPoolConfiguration mqttPoolConfiguration) {
this.mqttPoolConfiguration = mqttPoolConfiguration;
}

public int getReconnectionInterval() {
return reconnectionInterval;
}

public void setReconnectionInterval(int reconnectionInterval) {
this.reconnectionInterval = reconnectionInterval;
}

public void setUri(String uri) {
this.uri = uri;
}

public String getUri() {
return uri;
}

public int getKeepAliveInterval() {
return keepAliveInterval;
}

public void setKeepAliveInterval(int keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}

public boolean isCleanSession() {
return cleanSession;
}

public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}

public int getMqttVersion() {
return mqttVersion;
}

public void setMqttVersion(int mqttVersion) {
this.mqttVersion = mqttVersion;
}

public int getConnectionTimeout() {
return connectionTimeout;
}

public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
}
11 changes: 11 additions & 0 deletions src/main/java/org/seedstack/mqtt/spi/MqttClientStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Copyright (c) 2013-2016, The SeedStack authors <http://seedstack.org>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.seedstack.mqtt.spi;

public class MqttClientStats {
}

0 comments on commit 1e0b164

Please sign in to comment.