Skip to content

Commit

Permalink
代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Feb 25, 2016
1 parent aed42d7 commit 878cfe9
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 58 deletions.
3 changes: 0 additions & 3 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Constants.java
Expand Up @@ -7,10 +7,7 @@
*/
public interface Constants {
Charset UTF_8 = Charset.forName("UTF-8");
int HEADER_LEN = 13;

byte CRYPTO_FLAG = 0x01;
byte COMPRESS_FLAG = 0x02;
String HTTP_HEAD_READ_TIMEOUT = "readTimeout";

}
57 changes: 44 additions & 13 deletions mpush-api/src/main/java/com/shinemo/mpush/api/protocol/Packet.java
Expand Up @@ -4,22 +4,24 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.Serializable;

import java.util.Arrays;

/**
* Created by ohun on 2015/12/19.
* length(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n)
*/
public final class Packet implements Serializable {
public static final byte HB_PACKET = '\n';
private static final long serialVersionUID = -2725825199998223372L;
public final class Packet {
public static final int HEADER_LEN = 13;
public static final byte FLAG_CRYPTO = 0x01;
public static final byte FLAG_COMPRESS = 0x02;

public static final byte HB_PACKET_BYTE = '\n';
public static final byte[] HB_PACKET_BYTES = new byte[]{HB_PACKET_BYTE};
public static final Packet HB_PACKET = new Packet(Command.HEARTBEAT);

public byte cmd; //命令
public short cc; //校验码 暂时没有用到
public byte flags; //特性,如是否加密,是否压缩等
public int sessionId; // 会话id。客户端生成。
public byte lrc; // 校验,纵向冗余校验。只校验body
public byte lrc; // 校验,纵向冗余校验。只校验head
public byte[] body;

public Packet(byte cmd) {
Expand All @@ -44,10 +46,6 @@ public int getBodyLength() {
return body == null ? 0 : body.length;
}

public String getStringBody() {
return body == null ? "" : new String(body, Constants.UTF_8);
}

public void setFlag(byte flag) {
this.flags |= flag;
}
Expand All @@ -56,6 +54,39 @@ public boolean hasFlag(byte flag) {
return (flags & flag) != 0;
}

public short calcCheckCode() {
short checkCode = 0;
if (body != null) {
for (int i = 0; i < body.length; i++) {
checkCode += (body[i] & 0x0ff);
}
}
return checkCode;
}

public byte calcLrc() {
byte[] data = Unpooled.buffer(HEADER_LEN - 1)
.writeInt(getBodyLength())
.writeByte(cmd)
.writeShort(cc)
.writeByte(flags)
.writeInt(sessionId)
.array();
byte lrc = 0;
for (int i = 0; i < data.length; i++) {
lrc ^= data[i];
}
return lrc;
}

public boolean vaildCheckCode() {
return calcCheckCode() == cc;
}

public boolean validLrc() {
return (lrc ^ calcLrc()) == 0;
}

@Override
public String toString() {
return "Packet{" +
Expand All @@ -69,6 +100,6 @@ public String toString() {
}

public static ByteBuf getHBPacket() {
return Unpooled.buffer(1).writeByte(HB_PACKET);
return Unpooled.wrappedBuffer(HB_PACKET_BYTES);
}
}
@@ -1,6 +1,5 @@
package com.shinemo.mpush.common.message;

import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.Message;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.connection.SessionContext;
Expand Down Expand Up @@ -30,15 +29,15 @@ protected void decodeBody() {
if (packet.body != null && packet.body.length > 0) {
//1.解密
byte[] tmp = packet.body;
if (packet.hasFlag(Constants.CRYPTO_FLAG)) {
if (packet.hasFlag(Packet.FLAG_CRYPTO)) {
SessionContext info = connection.getSessionContext();
if (info.cipher != null) {
tmp = info.cipher.decrypt(tmp);
}
}

//2.解压
if (packet.hasFlag(Constants.COMPRESS_FLAG)) {
if (packet.hasFlag(Packet.FLAG_COMPRESS)) {
byte[] result = IOUtils.uncompress(tmp);
if (result.length > 0) {
tmp = result;
Expand All @@ -60,7 +59,7 @@ protected void encodeBody() {
byte[] result = IOUtils.compress(tmp);
if (result.length > 0) {
tmp = result;
packet.setFlag(Constants.COMPRESS_FLAG);
packet.setFlag(Packet.FLAG_COMPRESS);
}
}

Expand All @@ -70,7 +69,7 @@ protected void encodeBody() {
byte[] result = context.cipher.encrypt(tmp);
if (result.length > 0) {
tmp = result;
packet.setFlag(Constants.CRYPTO_FLAG);
packet.setFlag(Packet.FLAG_CRYPTO);
}
}
packet.body = tmp;
Expand Down
Expand Up @@ -12,8 +12,9 @@
public final class HeartBeatHandler implements MessageHandler {
@Override
public void handle(Packet packet, Connection connection) {
connection.send(packet);//ping -> pong
LoggerManage.log(LogType.HEARTBEAT, "response client heartbeat:{},{}", connection.getChannel(),connection.getSessionContext().deviceId);
connection.send(packet);//ping -> pong
LoggerManage.log(LogType.HEARTBEAT, "response client heartbeat:{}, {}",
connection.getChannel(), connection.getSessionContext().deviceId);
}

}
@@ -1,9 +1,8 @@
package com.shinemo.mpush.core.server;

import com.shinemo.mpush.api.Server;
import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.common.MessageDispatcher;
import com.shinemo.mpush.core.handler.*;
import com.shinemo.mpush.core.handler.GatewayPushHandler;
import com.shinemo.mpush.netty.connection.NettyConnectionManager;
import com.shinemo.mpush.netty.server.NettyServer;
import io.netty.channel.ChannelHandler;
Expand Down
@@ -1,11 +1,9 @@
package com.shinemo.mpush.netty.codec;

import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.exception.DecodeException;
import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.tools.config.ConfigCenter;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
Expand All @@ -26,7 +24,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

private void decodeHeartbeat(ByteBuf in, List<Object> out) {
while (in.isReadable()) {
if (in.readByte() == Packet.HB_PACKET) {
if (in.readByte() == Packet.HB_PACKET_BYTE) {
out.add(new Packet(Command.HEARTBEAT));
} else {
in.readerIndex(in.readerIndex() - 1);
Expand All @@ -37,7 +35,7 @@ private void decodeHeartbeat(ByteBuf in, List<Object> out) {

private void decodeFrames(ByteBuf in, List<Object> out) throws Exception {
try {
while (in.readableBytes() >= Constants.HEADER_LEN) {
while (in.readableBytes() >= Packet.HEADER_LEN) {
//1.记录当前读取位置位置.如果读取到非完整的frame,要恢复到该位置,便于下次读取
in.markReaderIndex();
out.add(decodeFrame(in));
Expand All @@ -51,7 +49,7 @@ private void decodeFrames(ByteBuf in, List<Object> out) throws Exception {
private Packet decodeFrame(ByteBuf in) throws Exception {
int bufferSize = in.readableBytes();
int bodyLength = in.readInt();
if (bufferSize < (bodyLength + Constants.HEADER_LEN)) {
if (bufferSize < (bodyLength + Packet.HEADER_LEN)) {
throw new DecodeException("invalid frame");
}
return readPacket(in, bodyLength);
Expand Down
Expand Up @@ -18,7 +18,7 @@ public final class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) throws Exception {
if (packet.cmd == Command.HEARTBEAT.cmd) {
out.writeByte(Packet.HB_PACKET);
out.writeByte(Packet.HB_PACKET_BYTE);
} else {
out.writeInt(packet.getBodyLength());
out.writeByte(packet.cmd);
Expand Down
Expand Up @@ -11,14 +11,14 @@
import com.shinemo.mpush.log.LogType;
import com.shinemo.mpush.log.LoggerManage;
import com.shinemo.mpush.tools.config.ConfigCenter;

import io.netty.channel.Channel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.internal.chmv8.ConcurrentHashMapV8;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -76,52 +76,47 @@ public List<Connection> getConnections() {

@Subscribe
void onHandshakeOk(HandshakeEvent event) {
HeartbeatCheckTask task = new HeartbeatCheckTask(event.heartbeat, event.connection);
HeartbeatCheckTask task = new HeartbeatCheckTask(event.connection);
task.startTimeout();
}

private class HeartbeatCheckTask implements TimerTask {

private int expiredTimes = 0;
private final int heartbeat;
private final Connection connection;

public HeartbeatCheckTask(int heartbeat, Connection connection) {
this.heartbeat = heartbeat;
public HeartbeatCheckTask(Connection connection) {
this.connection = connection;
}

public void startTimeout() {
timer.newTimeout(this, heartbeat, TimeUnit.MILLISECONDS);
int timeout = connection.getSessionContext().heartbeat;
timer.newTimeout(this, timeout > 0 ? timeout : ConfigCenter.holder.minHeartbeat(), TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
try {
if (!connection.isConnected()) {
LoggerManage.info(LogType.HEARTBEAT, "connection is not connected:{},{}", expiredTimes, connection.getChannel(), connection.getSessionContext().deviceId);
if (!connection.isConnected()) {
LoggerManage.info(LogType.HEARTBEAT, "connection is not connected:{},{}", expiredTimes, connection.getChannel(), connection.getSessionContext().deviceId);
return;
}
if (connection.heartbeatTimeout()) {
if (++expiredTimes > ConfigCenter.holder.maxHBTimeoutTimes()) {

EventBus.INSTANCE.post(new UserOfflineEvent(connection));

connection.close();
LoggerManage.info(LogType.HEARTBEAT, "connection heartbeat timeout, connection has bean closed:{},{}", connection.getChannel(), connection.getSessionContext().deviceId);
return;
}
if (connection.heartbeatTimeout()) {
if (++expiredTimes > ConfigCenter.holder.maxHBTimeoutTimes()) {

EventBus.INSTANCE.post(new UserOfflineEvent(connection));

connection.close();
LoggerManage.info(LogType.HEARTBEAT, "connection heartbeat timeout, connection has bean closed:{},{}", connection.getChannel(), connection.getSessionContext().deviceId);
return;
} else {
LoggerManage.info(LogType.HEARTBEAT, "connection heartbeat timeout, expiredTimes:{},{},{}", expiredTimes, connection.getChannel(), connection.getSessionContext().deviceId);
}
} else {
expiredTimes = 0;
LoggerManage.info(LogType.HEARTBEAT, "connection heartbeat reset, expiredTimes:{},{},{}", expiredTimes, connection.getChannel(), connection.getSessionContext().deviceId);
LoggerManage.info(LogType.HEARTBEAT, "connection heartbeat timeout, expiredTimes:{},{},{}", expiredTimes, connection.getChannel(), connection.getSessionContext().deviceId);
}
} catch (Throwable e) {
LoggerManage.execption(LogType.DEFAULT, e, "HeartbeatCheckTask error");
} else {
expiredTimes = 0;
LoggerManage.info(LogType.HEARTBEAT, "connection heartbeat reset, expiredTimes:{},{},{}", expiredTimes, connection.getChannel(), connection.getSessionContext().deviceId);
}

startTimeout();
}
}

}

0 comments on commit 878cfe9

Please sign in to comment.