Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added mvn exec to launch an improved EmbeddedLauncher #746

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions embedding_moquette/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
<artifactId>moquette-broker</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<!-- This is used to use log4j12 implementation -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>

<build>
Expand All @@ -30,6 +37,15 @@
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<mainClass>io.moquette.testembedded.EmbeddedLauncher</mainClass>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.moquette.testembedded;

import io.moquette.broker.ClientDescriptor;
import io.moquette.broker.Server;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
Expand All @@ -29,6 +30,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

Expand All @@ -39,7 +41,7 @@
* */
public final class EmbeddedLauncher {

static class PublisherListener extends AbstractInterceptHandler {
private class PublisherListener extends AbstractInterceptHandler {

@Override
public String getID() {
Expand All @@ -50,6 +52,17 @@ public String getID() {
public void onPublish(InterceptPublishMessage msg) {
final String decodedPayload = msg.getPayload().toString(UTF_8);
System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
if ("/command".equals(msg.getTopicName())) {
switch (decodedPayload) {
case "exit":
System.out.println("EXITING broker by /command exit");
shutdown();
return;
case "list_clients":
listClients();
return;
}
}
}

@Override
Expand All @@ -59,36 +72,63 @@ public void onSessionLoopError(Throwable error) {
}

public static void main(String[] args) throws InterruptedException, IOException {
final EmbeddedLauncher launcher = new EmbeddedLauncher();
launcher.start();
}

private Server mqttBroker;

private EmbeddedLauncher() {
}

private void start() throws IOException, InterruptedException {
IResourceLoader classpathLoader = new ClasspathResourceLoader();
final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);

final Server mqttBroker = new Server();
mqttBroker = new Server();
List<? extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener());
mqttBroker.startServer(classPathConfig, userHandlers);

System.out.println("Broker started press [CTRL+C] to stop");
//Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Stopping broker");
mqttBroker.stopServer();
System.out.println("Broker stopped");
}));

//Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));

Thread.sleep(20000);
internalPublish("Hello World!!");
}

private void internalPublish(String messageText) {
System.out.println("Before self publish");
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName("/exit")
.retained(true)
// qos(MqttQoS.AT_MOST_ONCE);
// qQos(MqttQoS.AT_LEAST_ONCE);
.qos(MqttQoS.EXACTLY_ONCE)
.payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
.payload(Unpooled.copiedBuffer(messageText.getBytes(UTF_8)))
.build();

mqttBroker.internalPublish(message, "INTRLPUB");
System.out.println("After self publish");
}

private EmbeddedLauncher() {
private void shutdown() {
listClients();

System.out.println("Stopping broker");
mqttBroker.stopServer();
System.out.println("Broker stopped");
}

private void listClients() {
final Collection<ClientDescriptor> connectedClients = mqttBroker.listConnectedClients();
if (connectedClients.isEmpty()) {
System.out.println("No connected clients");
}
for (ClientDescriptor client : connectedClients) {
System.out.println(client);
}
}

}
34 changes: 34 additions & 0 deletions embedding_moquette/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
log4j.rootLogger=ERROR, stdout, file, messagelog

log4j.logger.io.moquette=WARN

# stdout appender is set to be a ConsoleAppender.
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=TRACE
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c{1} %x - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%X{channel}%X{client.id}%X{msg.type} %-5p %c{1} %M %L %x - %m%n

#file appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.Threshold=INFO
log4j.appender.file.File=moquette.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%-4r [%t] %-5p %c{1} %x - %m%n
log4j.appender.file.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%X{channel}%X{client.id}%X{msg.type} %-5p %c{1} %M %L %x - %m%n


####################################
# Message Logger Configuration #
#####################################
log4j.appender.messagelog=org.apache.log4j.RollingFileAppender
log4j.appender.messagelog.Threshold=DEBUG
log4j.appender.messagelog.File=moquette_messages.log
log4j.appender.messagelog.MaxFileSize=100MB
log4j.appender.messagelog.MaxBackupIndex=1
log4j.appender.messagelog.layout=org.apache.log4j.PatternLayout
log4j.appender.messagelog.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t] %-5p %c{1} %L %x - %m%n

log4j.category.io.moquette.broker.metrics.MQTTMessageLogger=DEBUG, messagelog