Skip to content
This repository has been archived by the owner on Feb 17, 2020. It is now read-only.

Commit

Permalink
优化协议转换
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Mar 13, 2019
1 parent c51cf62 commit bfb066e
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 15 deletions.
Expand Up @@ -27,9 +27,4 @@ public DeviceClient unregister(String clientId) {
return repository.remove(clientId);
}

@Override
public DeviceSession getSession(String sessionId) {

return null;
}
}
3 changes: 3 additions & 0 deletions src/main/java/org/jetlinks/gateway/session/DeviceClient.java
@@ -1,6 +1,7 @@
package org.jetlinks.gateway.session;

import org.jetlinks.protocol.message.codec.EncodedMessage;
import org.jetlinks.protocol.message.codec.Transport;

/**
* @author zhouhao
Expand All @@ -17,6 +18,8 @@ public interface DeviceClient {

void send(EncodedMessage encodedMessage);

Transport getTransport();

void close();

void ping();
Expand Down
Expand Up @@ -12,6 +12,4 @@ public interface DeviceSessionManager {

DeviceClient unregister(String clientId);

DeviceSession getSession(String sessionId);

}
Expand Up @@ -7,6 +7,7 @@
import org.jetlinks.gateway.session.DeviceClient;
import org.jetlinks.protocol.message.codec.EncodedMessage;
import org.jetlinks.protocol.message.codec.MqttMessage;
import org.jetlinks.protocol.message.codec.Transport;

import java.nio.charset.StandardCharsets;

Expand Down Expand Up @@ -57,6 +58,11 @@ public void close() {
}
}

@Override
public Transport getTransport() {
return Transport.MQTT;
}

@Override
public void send(EncodedMessage encodedMessage) {
if (encodedMessage instanceof MqttMessage) {
Expand All @@ -68,7 +74,7 @@ public void send(EncodedMessage encodedMessage) {
}
endpoint.publish(message.getTopic(), buffer, MqttQoS.AT_MOST_ONCE, false, false);
} else {
log.error("不支持发送消息{}到mqtt:", encodedMessage);
log.error("不支持发送消息{}到MQTT:", encodedMessage);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/jetlinks/gateway/vertx/mqtt/MqttServer.java
Expand Up @@ -16,6 +16,7 @@
import org.jetlinks.protocol.ProtocolSupports;
import org.jetlinks.protocol.message.codec.EncodedMessage;
import org.jetlinks.protocol.message.DeviceMessage;
import org.jetlinks.protocol.message.codec.Transport;
import org.jetlinks.registry.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -174,8 +175,8 @@ protected void accept(MqttEndpoint endpoint) {
ProtocolSupport protocolSupport = getProtocol(protocol);
//转换消息
DeviceMessage deviceMessage = protocolSupport
.getMessageConverter()
.convert(EncodedMessage.mqtt(clientId, topicName, buffer.getByteBuf()));
.getMessageCodec()
.decode(Transport.MQTT, EncodedMessage.mqtt(clientId, topicName, buffer.getByteBuf()));
if (messageConsumer != null) {
messageConsumer.accept(deviceMessage);
}
Expand Down
Expand Up @@ -9,8 +9,9 @@
import org.jetlinks.protocol.message.DeviceMessage;
import org.jetlinks.protocol.message.codec.DeviceMessageCodec;
import org.jetlinks.protocol.message.codec.EncodedMessage;
import org.jetlinks.protocol.message.codec.Transport;
import org.jetlinks.protocol.message.property.ReadPropertyMessageReply;
import org.jetlinks.protocol.metadata.DeviceMetadataParser;
import org.jetlinks.protocol.metadata.DeviceMetadataCodec;

import java.nio.charset.StandardCharsets;

Expand All @@ -23,17 +24,17 @@ public class MockProtocolSupports implements ProtocolSupports {
public ProtocolSupport getProtocol(String protocol) {
return new ProtocolSupport() {
@Override
public DeviceMessageCodec getMessageConverter() {
public DeviceMessageCodec getMessageCodec() {

return new DeviceMessageCodec() {
@Override
public EncodedMessage convert(DeviceMessage deviceMessage) {
public EncodedMessage encode(Transport transport, DeviceMessage deviceMessage) {
return EncodedMessage.mqtt(deviceMessage.getDeviceId(), "command",
Unpooled.copiedBuffer(deviceMessage.toJson().toJSONString().getBytes()));
}

@Override
public DeviceMessage convert(EncodedMessage message) {
public DeviceMessage decode(Transport transport, EncodedMessage message) {
JSONObject jsonObject = JSON.parseObject(message.getByteBuf().toString(StandardCharsets.UTF_8));
if ("read-property".equals(jsonObject.get("type"))) {
return jsonObject.toJavaObject(ReadPropertyMessageReply.class);
Expand All @@ -44,7 +45,7 @@ public DeviceMessage convert(EncodedMessage message) {
}

@Override
public DeviceMetadataParser getMetadataParser() {
public DeviceMetadataCodec getMetadataCodec() {
throw new UnsupportedOperationException();
}
};
Expand Down

0 comments on commit bfb066e

Please sign in to comment.