Skip to content

Commit

Permalink
心跳修改
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Jan 5, 2016
1 parent 2167c06 commit 9924ef2
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 73 deletions.
6 changes: 5 additions & 1 deletion mpush-api/src/main/java/com/shinemo/mpush/api/Constants.java
Expand Up @@ -27,5 +27,9 @@ public interface Constants {

int MIN_WORK_POOL_SIZE = 10;
int MAX_WORK_POOL_SIZE = 250;
int HEARTBEAT_TIME = 1000 * 60 * 4;//5min
int HEARTBEAT_TIME = 1000 * 60 * 1;//5min
/**
* 最大心跳超时次数,大于该次数要断开连接
*/
int MAX_HB_TIMEOUT_TIMES = 2;
}
Expand Up @@ -29,13 +29,11 @@ public interface Connection {

String getId();

void close();
ChannelFuture close();

boolean isConnected();

boolean heartbeatTimeout();

void setLastReadTime();


void updateLastReadTime();
}
Expand Up @@ -6,6 +6,7 @@
* Created by ohun on 2015/12/30.
*/
public interface ConnectionManager {

Connection get(Channel channel);

void remove(Channel channel);
Expand Down
Expand Up @@ -67,9 +67,11 @@ public void kickRemote(String userId, RemoteRouter router) {
// TODO: 2016/1/4 receive msg from redis
public void onReceiveKickRemoteMsg(KickRemoteMsg msg) {
String userId = msg.userId;
LocalRouter router = RouterCenter.INSTANCE.getLocalRouterManager().lookup(userId);
LocalRouterManager routerManager = RouterCenter.INSTANCE.getLocalRouterManager();
LocalRouter router = routerManager.lookup(userId);
if (router != null) {
LOGGER.info("receive kick remote msg, msg={}", msg);
routerManager.unRegister(userId);
kickLocal(userId, router);
} else {
LOGGER.warn("no local router find, kick failure, msg={}", msg);
Expand Down
Expand Up @@ -11,7 +11,6 @@
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,27 +34,27 @@ public ServerChannelHandler(ConnectionManager connectionManager, PacketReceiver
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Connection connection = connectionManager.get(ctx.channel());
connection.updateLastReadTime();
receiver.onReceive((Packet) msg, connection);
connection.setLastReadTime();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
connectionManager.remove(ctx.channel());
LOGGER.error(ctx.channel().remoteAddress() + ", exceptionCaught", cause);
LOGGER.error("caught an ex, client={}", ctx.channel(), cause);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.warn(ctx.channel().remoteAddress() + ", channelActive");
LOGGER.warn("a client connect client={}", ctx.channel());
Connection connection = new NettyConnection();
connection.init(ctx.channel(), security);
connectionManager.add(connection);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.warn(ctx.channel().remoteAddress() + ", channelInactive");
LOGGER.warn("a client disconnect client={}", ctx.channel());
connectionManager.remove(ctx.channel());
}
}
Expand Up @@ -2,6 +2,7 @@

import com.shinemo.mpush.api.connection.SessionContext;
import com.shinemo.mpush.tools.crypto.MD5Utils;
import io.netty.util.internal.chmv8.ConcurrentHashMapV8;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -12,7 +13,7 @@
public final class ReusableSessionManager {
public static final ReusableSessionManager INSTANCE = new ReusableSessionManager();
private static final int EXPIRE_TIME = 24 * 60 * 60 * 1000;
private final Map<String, ReusableSession> sessionCache = new ConcurrentHashMap<String, ReusableSession>();
private final Map<String, ReusableSession> sessionCache = new ConcurrentHashMapV8<>();

public boolean cacheSession(ReusableSession session) {
sessionCache.put(session.sessionId, session);
Expand All @@ -23,30 +24,13 @@ public ReusableSession getSession(String sessionId) {
return sessionCache.get(sessionId);
}

public ReusableSession genSession(SessionContext info) {
/**
* 先生成key,需要保证半个周期内同一个设备生成的key是相同的
*/
long partition = System.currentTimeMillis() / (EXPIRE_TIME / 2);//把当前时间按照半个周期划分出一个当前所属于的区域
StringBuilder sb = new StringBuilder();
sb.append(info.deviceId).append(partition);
ReusableSession v = new ReusableSession();
v.sessionContext = info;
v.sessionId = MD5Utils.encrypt(sb.toString());
/**
* 计算失效时间
*/
long nowTime = System.currentTimeMillis();
long willExpire = (nowTime / EXPIRE_TIME + 1) * EXPIRE_TIME;//预计的到下个周期的失效时间

//有可能到绝对周期的时间已经非常短了,如果已经非常短的话,再补充一个周期
int exp;
if (willExpire - nowTime > EXPIRE_TIME / 2) {
exp = (int) (willExpire - nowTime);
} else {
exp = (int) (willExpire - nowTime) + EXPIRE_TIME;
}
v.expireTime = System.currentTimeMillis() + exp;//存储绝对过期时间
return v;
public ReusableSession genSession(SessionContext context) {
long now = System.currentTimeMillis();
ReusableSession session = new ReusableSession();
session.sessionContext = context;
session.sessionId = MD5Utils.encrypt(context.deviceId + now);
session.expireTime = now + EXPIRE_TIME;
return session;
}

}
Expand Up @@ -56,11 +56,15 @@ public ChannelFuture send(Packet packet) {
}

@Override
public ChannelFuture send(Packet packet, ChannelFutureListener listener) {
if (listener != null) {
return channel.writeAndFlush(packet).addListener(listener).addListener(this);
public ChannelFuture send(Packet packet, final ChannelFutureListener listener) {
if (channel.isActive() && channel.isWritable()) {
if (listener != null) {
return channel.writeAndFlush(packet).addListener(listener).addListener(this);
} else {
return channel.writeAndFlush(packet).addListener(this);
}
} else {
return channel.writeAndFlush(packet).addListener(this);
return this.close();
}
}

Expand All @@ -70,9 +74,9 @@ public Channel channel() {
}

@Override
public void close() {
public ChannelFuture close() {
this.status = STATUS_DISCONNECTED;
this.channel.close();
return this.channel.close();
}

@Override
Expand All @@ -86,7 +90,7 @@ public boolean heartbeatTimeout() {
}

@Override
public void setLastReadTime() {
public void updateLastReadTime() {
lastReadTime = System.currentTimeMillis();
}

Expand Down
Expand Up @@ -27,23 +27,19 @@
*/
public final class NettyConnectionManager implements ConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectionManager.class);
private Timer wheelTimer;
//可能会有20w的链接数
private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMapV8<>();

private Timer wheelTimer;

public void init() {
//每秒钟走一步,一个心跳周期内走一圈
long tickDuration = 1000;//1s
int ticksPerWheel = (int) (Constants.HEARTBEAT_TIME / tickDuration);
this.wheelTimer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS, ticksPerWheel);
EventBus.INSTANCE.register(this);
}

//可能会有20w的链接数
private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMapV8<>();

public Connection get(final String channelId) throws ExecutionException {
return connections.get(channelId);
}

public Connection get(final Channel channel) {
return connections.get(channel.id().asLongText());
}
Expand All @@ -52,26 +48,11 @@ public void add(Connection connection) {
connections.putIfAbsent(connection.getId(), connection);
}

public void add(Channel channel) {
Connection connection = new NettyConnection();
connection.init(channel, true);
connections.putIfAbsent(connection.getId(), connection);
}

public void remove(Connection connection) {
connections.remove(connection.getId());
}

public void remove(Channel channel) {
connections.remove(channel.id().asLongText());
}

public List<String> getConnectionIds() {
return new ArrayList<>(connections.keySet());
}

public List<Connection> getConnections() {
return new ArrayList<>(connections.values());
Connection connection = connections.remove(channel.id().asLongText());
if (connection != null) {
connection.close();
}
}

@Subscribe
Expand All @@ -96,13 +77,18 @@ public void startTimeout() {

@Override
public void run(Timeout timeout) throws Exception {
if (!connection.isConnected()) return;
if (connection.heartbeatTimeout()) {
if (++expiredTimes > 5) {
if (++expiredTimes > Constants.MAX_HB_TIMEOUT_TIMES) {
connection.close();
LOGGER.error("connection heartbeat timeout, connection has bean closed");
return;
} else {
LOGGER.error("connection heartbeat timeout, expiredTimes=" + expiredTimes);
}
} else {
expiredTimes = 0;
LOGGER.info("check heartbeat timeout");
}
startTimeout();
}
Expand Down
Expand Up @@ -7,7 +7,7 @@
*/
public interface Constants {
Charset UTF_8 = Charset.forName("UTF-8");
int HEARTBEAT_TIME = 1000 * 60 * 5;//5min
int HEARTBEAT_TIME = 1000 * 60 * 1;//5min
byte[] EMPTY_BYTES = new byte[0];

String JVM_LOG_PATH = "/opt/";
Expand Down

0 comments on commit 9924ef2

Please sign in to comment.