Skip to content

Commit

Permalink
增加拦截器功能
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Jun 4, 2019
1 parent ccb2976 commit 732c1a6
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
@@ -1,9 +1,15 @@
package org.jetlinks.core.message.codec;

import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.interceptor.DecodeDeviceMessageInterceptor;
import org.jetlinks.core.message.interceptor.DeviceMessageInterceptor;
import org.jetlinks.core.message.interceptor.EncodeDeviceMessageInterceptor;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* @author bsetfeng
Expand All @@ -14,17 +20,47 @@ public class DefaultDeviceMessageCodec implements DeviceMessageCodec {

private Map<Transport, TransportDeviceMessageCodec> messageCodec = new HashMap<>();

private List<DecodeDeviceMessageInterceptor> decodeDeviceMessageInterceptors = new CopyOnWriteArrayList<>();

private List<EncodeDeviceMessageInterceptor> encodeDeviceMessageInterceptors = new CopyOnWriteArrayList<>();


public void register(TransportDeviceMessageCodec codec) {
messageCodec.put(codec.getSupportTransport(), codec);
}

public void register(DeviceMessageInterceptor interceptor) {
if (interceptor instanceof DecodeDeviceMessageInterceptor) {
decodeDeviceMessageInterceptors.add(((DecodeDeviceMessageInterceptor) interceptor));
}
if (interceptor instanceof EncodeDeviceMessageInterceptor) {
encodeDeviceMessageInterceptors.add(((EncodeDeviceMessageInterceptor) interceptor));
}
}

@Override
public EncodedMessage encode(Transport transport, MessageEncodeContext context) {
return messageCodec.get(transport).encode(context);
for (EncodeDeviceMessageInterceptor interceptor : encodeDeviceMessageInterceptors) {
interceptor.preEncode(context);
}
EncodedMessage message = Objects.requireNonNull(messageCodec.get(transport), "unsupported transport:" + transport).encode(context);

for (EncodeDeviceMessageInterceptor interceptor : encodeDeviceMessageInterceptors) {
message = interceptor.postEncode(context, message);
}
return message;
}

@Override
public DeviceMessage decode(Transport transport, MessageDecodeContext message) {
return messageCodec.get(transport).decode(message);
public DeviceMessage decode(Transport transport, MessageDecodeContext context) {
for (DecodeDeviceMessageInterceptor interceptor : decodeDeviceMessageInterceptors) {
interceptor.preDecode(context);
}
DeviceMessage message = Objects.requireNonNull(messageCodec.get(transport), "unsupported transport:" + transport).decode(context);

for (DecodeDeviceMessageInterceptor interceptor : decodeDeviceMessageInterceptors) {
message = interceptor.postDecode(context, message);
}
return message;
}
}
@@ -0,0 +1,12 @@
package org.jetlinks.core.message.interceptor;

import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;

public interface DecodeDeviceMessageInterceptor extends DeviceMessageInterceptor {

void preDecode(MessageDecodeContext message);

DeviceMessage postDecode(MessageDecodeContext message, DeviceMessage deviceMessage);

}
@@ -0,0 +1,4 @@
package org.jetlinks.core.message.interceptor;

public interface DeviceMessageInterceptor {
}
@@ -0,0 +1,12 @@
package org.jetlinks.core.message.interceptor;

import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageEncodeContext;

public interface EncodeDeviceMessageInterceptor extends DeviceMessageInterceptor {

void preEncode(MessageEncodeContext context);

EncodedMessage postEncode(MessageEncodeContext context, EncodedMessage message);

}

0 comments on commit 732c1a6

Please sign in to comment.