Skip to content

Commit

Permalink
增加ssl功能
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Apr 13, 2019
1 parent 46df5ca commit eee5b6d
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions src/main/java/org/jetlinks/simulator/mqtt/MQTTSimulator.java
Expand Up @@ -14,20 +14,30 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.expands.script.engine.DynamicScriptEngine;
import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
import org.hswebframework.utils.StringUtils;
import org.jetlinks.mqtt.client.*;
import org.slf4j.LoggerFactory;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -99,6 +109,23 @@ public class MQTTSimulator {
private int bindPortStart = 10000;


@Getter
@Setter
private boolean ssl = false;

@Getter
@Setter
private String p12Path;

@Getter
@Setter
private String p12Password;

@Getter
@Setter
private String cerPath;


Map<String, ClientSession> clientMap;

Map<String, MessageHandler> messageHandlerMap = new HashMap<>();
Expand Down Expand Up @@ -196,6 +223,33 @@ public void bindChildHandler(String topic, MessageHandler handler) {
childMessageHandler.put(topic, handler);
}


private SslContext sslContext;

@SneakyThrows
public SslContext getSSLContext() {
if (ssl && sslContext == null) {
Objects.requireNonNull(p12Path, "p12Path不能为空");
Objects.requireNonNull(cerPath, "cerPath不能为空");
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(p12Path), p12Password.toCharArray());
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
CertificateFactory cAf = CertificateFactory.getInstance("X.509");
FileInputStream caIn = new FileInputStream(cerPath);
X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);
KeyStore caKs = KeyStore.getInstance("JKS");
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", ca);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
tmf.init(caKs);
keyManagerFactory.init(keyStore, p12Password.toCharArray());
sslContext = SslContextBuilder.forServer(keyManagerFactory)
.trustManager(tmf)
.build();
}
return sslContext;
}

public void createMqttClient(MQTTAuth auth, InetSocketAddress bind) throws Exception {
MqttClientConfig clientConfig = new MqttClientConfig();
MqttClient mqttClient = MqttClient.create(clientConfig, (topic, payload) -> {
Expand All @@ -216,6 +270,12 @@ public void createMqttClient(MQTTAuth auth, InetSocketAddress bind) throws Excep
}
});

if (ssl) {
//开启双向认证
mqttClient.getClientConfig().setSslEngineConsumer(engine -> {
engine.setUseClientMode(true);
});
}
mqttClient.getClientConfig().setBindAddress(bind);
mqttClient.setEventLoop(eventLoopGroup);
mqttClient.getClientConfig().setChannelClass(channelClass);
Expand All @@ -224,6 +284,7 @@ public void createMqttClient(MQTTAuth auth, InetSocketAddress bind) throws Excep
mqttClient.getClientConfig().setPassword(auth.getPassword());
mqttClient.getClientConfig().setProtocolVersion(MqttVersion.MQTT_3_1_1);
mqttClient.getClientConfig().setReconnect(true);
mqttClient.getClientConfig().setSslContext(getSSLContext());
AtomicLong errorCounter = new AtomicLong();

mqttClient.setCallback(new MqttClientCallback() {
Expand Down

0 comments on commit eee5b6d

Please sign in to comment.