Skip to content

Commit

Permalink
心跳修改
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Jan 4, 2016
1 parent 3274e88 commit 5674bc5
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 83 deletions.
Expand Up @@ -27,5 +27,5 @@ public interface Constants {

int MIN_WORK_POOL_SIZE = 10;
int MAX_WORK_POOL_SIZE = 250;
int HEARTBEAT_TIME = 1000 * 60 * 5;//5min
int HEARTBEAT_TIME = 1000 * 60 * 4;//5min
}
Expand Up @@ -10,6 +10,10 @@
* Created by ohun on 2015/12/22.
*/
public interface Connection {
int STATUS_NEW = 0;
int STATUS_CONNECTED = 1;
int STATUS_DISCONNECTED = 2;
int STATUS_TIMEOUT = 3;

void init(Channel channel, boolean security);

Expand All @@ -19,7 +23,7 @@ public interface Connection {

ChannelFuture send(Packet packet);

void send(Packet packet, ChannelFutureListener listener);
ChannelFuture send(Packet packet, ChannelFutureListener listener);

Channel channel();

Expand All @@ -28,4 +32,10 @@ public interface Connection {
void close();

boolean isConnected();

boolean heartbeatTimeout();

void setLastReadTime();


}
Expand Up @@ -10,6 +10,7 @@ public final class SessionContext {
public String osVersion;
public String clientVersion;
public String deviceId;
public int heartbeat;
public Cipher cipher;

public void changeCipher(Cipher cipher) {
Expand All @@ -36,6 +37,10 @@ public SessionContext setDeviceId(String deviceId) {
return this;
}

public void setHeartbeat(int heartbeat) {
this.heartbeat = heartbeat;
}

public boolean handshakeOk() {
return !Strings.isNullOrEmpty(deviceId);
}
Expand Down
Expand Up @@ -15,6 +15,8 @@ public final class HandshakeMessage extends ByteBufMessage {
public String clientVersion;
public byte[] iv;
public byte[] clientKey;
public int minHeartbeat;
public int maxHeartbeat;
public long timestamp;

public HandshakeMessage(Connection connection) {
Expand All @@ -33,6 +35,8 @@ public void decode(ByteBuf body) {
clientVersion = decodeString(body);
iv = decodeBytes(body);
clientKey = decodeBytes(body);
minHeartbeat = decodeInt(body);
maxHeartbeat = decodeInt(body);
timestamp = decodeLong(body);
}

Expand All @@ -43,6 +47,8 @@ public void encode(ByteBuf body) {
encodeString(body, clientVersion);
encodeBytes(body, iv);
encodeBytes(body, clientKey);
encodeInt(body, minHeartbeat);
encodeInt(body, maxHeartbeat);
encodeLong(body, timestamp);
}
}
Expand Up @@ -59,27 +59,31 @@ public void handle(HandshakeMessage message) {
ReusableSession session = ReusableSessionManager.INSTANCE.genSession(context);
ReusableSessionManager.INSTANCE.cacheSession(session);

//5.响应握手成功消息
//5.计算心跳时间
int heartbeat = MPushUtil.getHeartbeat(message.minHeartbeat, message.maxHeartbeat);

//6.响应握手成功消息
HandshakeOkMessage
.from(message)
.setServerKey(serverKey)
.setServerHost(MPushUtil.getLocalIp())
.setServerTime(System.currentTimeMillis())
.setHeartbeat(Constants.HEARTBEAT_TIME)
.setHeartbeat(heartbeat)
.setSessionId(session.sessionId)
.setExpireTime(session.expireTime)
.send();

//6.更换会话密钥AES(clientKey)=>AES(sessionKey)
//7.更换会话密钥AES(clientKey)=>AES(sessionKey)
context.changeCipher(new AesCipher(sessionKey, iv));

//7.保存client信息
//8.保存client信息
context.setOsName(message.osName)
.setOsVersion(message.osVersion)
.setClientVersion(message.clientVersion)
.setDeviceId(message.deviceId);
.setDeviceId(message.deviceId)
.setHeartbeat(heartbeat);

//8.触发握手成功事件
//9.触发握手成功事件
EventBus.INSTANCE.post(new HandshakeEvent(message.getConnection(), Constants.HEARTBEAT_TIME));
LOGGER.info("会话密钥:{},clientKey={}, serverKey={}", sessionKey, clientKey, serverKey);
}
Expand Down
Expand Up @@ -10,8 +10,6 @@
public final class HeartBeatHandler implements MessageHandler {
@Override
public void handle(Packet packet, Connection connection) {
System.err.println("receive client heartbeat, time="
+ System.currentTimeMillis());

connection.send(packet);
}
}
Expand Up @@ -36,6 +36,7 @@ public ServerChannelHandler(ConnectionManager connectionManager, PacketReceiver
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Connection connection = connectionManager.get(ctx.channel());
receiver.onReceive((Packet) msg, connection);
connection.setLastReadTime();
}

@Override
Expand All @@ -57,31 +58,4 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.warn(ctx.channel().remoteAddress() + ", channelInactive");
connectionManager.remove(ctx.channel());
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent stateEvent = (IdleStateEvent) evt;
switch (stateEvent.state()) {
case READER_IDLE:
connectionManager.remove(ctx.channel());
ctx.close();
LOGGER.warn("heartbeat read timeout, chanel closed!");
break;
case WRITER_IDLE:
ctx.writeAndFlush(Packet.getHBPacket());
LOGGER.warn("heartbeat write timeout, do write an EOL.");
break;
case ALL_IDLE:
}
} else {
LOGGER.warn("One user event Triggered. evt=" + evt);
super.userEventTriggered(ctx, evt);
}
}

public ServerChannelHandler setSecurity(boolean security) {
this.security = security;
return this;
}
}
@@ -1,29 +1,27 @@
package com.shinemo.mpush.core.task;

import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.netty.util.NettySharedHolder;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.netty.util.NettySharedHolder;

/**
* 定时全量扫描connection
*/
public class ScanAllClientConnection implements TimerTask {
public class ConnectionScanner implements TimerTask {

private static final Logger log = LoggerFactory.getLogger(ScanAllClientConnection.class);
private static final Logger log = LoggerFactory.getLogger(ConnectionScanner.class);

private final List<ScanTask> taskList = new ArrayList<ScanTask>();

public ScanAllClientConnection(final ScanTask... scanTasks) {
public ConnectionScanner(final ScanTask... scanTasks) {
if (scanTasks != null) {
for (final ScanTask task : scanTasks) {
this.taskList.add(task);
Expand All @@ -44,7 +42,7 @@ public void run(Timeout timeout) throws Exception {
}
}
} catch (Exception e) {
log.error("", "exception on scan", e);
log.error("exception on scan", e);
} finally {
NettySharedHolder.HASHED_WHEEL_TIMER.newTimeout(this, Constants.TIME_DELAY, TimeUnit.SECONDS);
}
Expand Down
Expand Up @@ -13,13 +13,15 @@
/**
* Created by ohun on 2015/12/22.
*/
public final class NettyConnection implements Connection {
public final class NettyConnection implements Connection, ChannelFutureListener {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnection.class);

private SessionContext context;
private Channel channel;
private boolean security;
private volatile int status = 0;
private volatile int status = STATUS_NEW;
private long lastReadTime;
private long lastWriteTime;

@Override
public void init(Channel channel, boolean security) {
Expand All @@ -29,7 +31,8 @@ public void init(Channel channel, boolean security) {
if (security) {
this.context.changeCipher(CipherBox.INSTANCE.getRsaCipher());
}
this.status = 1;
this.lastReadTime = System.currentTimeMillis();
this.status = STATUS_CONNECTED;
}

@Override
Expand All @@ -48,38 +51,52 @@ public String getId() {
}

@Override
public ChannelFuture send(final Packet packet) {
return channel.writeAndFlush(packet);
public ChannelFuture send(Packet packet) {
return send(packet, null);
}

@Override
public void send(Packet packet, ChannelFutureListener listener) {
if (listener == null) channel.writeAndFlush(packet);
else channel.writeAndFlush(packet).addListener(listener);
public ChannelFuture send(Packet packet, ChannelFutureListener listener) {
if (listener != null) {
return channel.writeAndFlush(packet).addListener(listener).addListener(this);
} else {
return channel.writeAndFlush(packet).addListener(this);
}
}

@Override
public Channel channel() {
return channel;
}

public Channel getChannel() {
return channel;
@Override
public void close() {
this.status = STATUS_DISCONNECTED;
this.channel.close();
}

public void setChannel(Channel channel) {
this.channel = channel;
@Override
public boolean isConnected() {
return status == STATUS_CONNECTED || channel.isActive();
}

@Override
public void close() {
this.status = 0;
this.channel.close();
public boolean heartbeatTimeout() {
return context.heartbeat > 0 && System.currentTimeMillis() - lastReadTime > context.heartbeat;
}

@Override
public boolean isConnected() {
return status == 0 || channel.isActive();
public void setLastReadTime() {
lastReadTime = System.currentTimeMillis();
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
lastWriteTime = System.currentTimeMillis();
} else {
LOGGER.error("send msg error");
}
}

@Override
Expand Down

0 comments on commit 5674bc5

Please sign in to comment.