Skip to content

Commit

Permalink
添加注释
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Jan 9, 2016
1 parent 0be5a70 commit fc8b6d2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 20 deletions.
Expand Up @@ -8,11 +8,18 @@
import com.shinemo.mpush.netty.connection.NettyConnection;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.shinemo.mpush.common.ErrorCode.OFFLINE;
import static com.shinemo.mpush.common.ErrorCode.PUSH_CLIENT_FAILURE;
import static com.shinemo.mpush.common.ErrorCode.ROUTER_CHANGE;

/**
* Created by ohun on 2015/12/30.
*/
public class PushClientChannelHandler extends ChannelHandlerAdapter {
public class GatewayClientChannelHandler extends ChannelHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(GatewayClientChannelHandler.class);
private final Connection connection = new NettyConnection();

@Override
Expand All @@ -31,23 +38,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
Packet packet = ((Packet) msg);
PushRequest request = PushRequestBus.INSTANCE.remove(packet.sessionId);
if (request == null) {
LOGGER.warn("receive a gateway response, but request timeout. packet={}", packet);
return;
}

if (packet.cmd == Command.OK.cmd) {
request.success();
} else if (packet.cmd == Command.ERROR.cmd) {
} else {
ErrorMessage message = new ErrorMessage(packet, connection);
byte errorCode = message.code;
if (errorCode == ErrorCode.OFFLINE.errorCode) {
if (message.code == OFFLINE.errorCode) {
request.offline();
} else if (errorCode == ErrorCode.PUSH_CLIENT_FAILURE.errorCode) {
} else if (message.code == PUSH_CLIENT_FAILURE.errorCode) {
request.failure();
} else if (errorCode == ErrorCode.ROUTER_CHANGE.errorCode) {
} else if (message.code == ROUTER_CHANGE.errorCode) {
request.redirect();
}
} else {

LOGGER.warn("receive an error gateway response, message={}", message);
}
}
}
Expand Down
Expand Up @@ -5,7 +5,6 @@
import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.netty.client.NettyClient;
import com.shinemo.mpush.netty.client.NettyClientFactory;
import com.shinemo.mpush.tools.ConfigCenter;
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
Expand All @@ -26,7 +25,6 @@
public class PushClient implements PushSender {

private int defaultTimeout = 3000;
private int port = 4000;
private final Map<String, Client> clientMap = new ConcurrentHashMap<>();
private final Map<String, ServerApp> servers = new ConcurrentHashMap<>();

Expand All @@ -44,7 +42,7 @@ public void init() throws Exception {
private void createClient(final String ip, int port) {
Client client = clientMap.get(ip);
if (client == null) {
final Client cli = new NettyClient(ip, port, new PushClientChannelHandler());
final Client cli = new NettyClient(ip, port, new GatewayClientChannelHandler());
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
Expand All @@ -59,7 +57,7 @@ public void run() {
public Connection getConnection(String ip) {
Client client = clientMap.get(ip);
if (client == null) return null;
return ((PushClientChannelHandler) client.getHandler()).getConnection();
return ((GatewayClientChannelHandler) client.getHandler()).getConnection();
}

@Override
Expand Down
Expand Up @@ -6,7 +6,8 @@
public enum ErrorCode {
OFFLINE(1, "user offline"),
PUSH_CLIENT_FAILURE(2, "push to client failure"),
ROUTER_CHANGE(3, "router change");
ROUTER_CHANGE(3, "router change"),
UNKNOWN(-1, "unknown");

ErrorCode(int code, String errorMsg) {
this.errorMsg = errorMsg;
Expand All @@ -15,4 +16,13 @@ public enum ErrorCode {

public final byte errorCode;
public final String errorMsg;

public static ErrorCode toEnum(int code) {
for (ErrorCode errorCode : values()) {
if (errorCode.errorCode == code) {
return errorCode;
}
}
return UNKNOWN;
}
}
Expand Up @@ -6,7 +6,6 @@
import com.shinemo.mpush.common.message.ErrorMessage;
import com.shinemo.mpush.common.message.FastConnectMessage;
import com.shinemo.mpush.common.message.FastConnectOkMessage;
import com.shinemo.mpush.core.router.RouterCenter;
import com.shinemo.mpush.core.session.ReusableSession;
import com.shinemo.mpush.core.session.ReusableSessionManager;
import com.shinemo.mpush.tools.MPushUtil;
Expand All @@ -26,22 +25,27 @@ public FastConnectMessage decode(Packet packet, Connection connection) {

@Override
public void handle(FastConnectMessage message) {
ReusableSession session = ReusableSessionManager.INSTANCE.getSession(message.sessionId);
//从缓存中心查询session
ReusableSession session = ReusableSessionManager.INSTANCE.querySession(message.sessionId);

if (session == null) {

//1.没查到说明session已经失效了
ErrorMessage.from(message).setReason("session expired").send();

LOGGER.warn("fast connect failure, session is expired, sessionId={}, deviceId={}", message.sessionId, message.deviceId);

} else if (!session.context.deviceId.equals(message.deviceId)) {

//2.非法的设备, 当前设备不是上次生成session时的设备
ErrorMessage.from(message).setReason("invalid device").send();

LOGGER.warn("fast connect failure, not the same device, deviceId={}, session={}", message.deviceId, session.context);

} else {

//3.校验成功,重新计算心跳,完成快速重连

int heartbeat = MPushUtil.getHeartbeat(message.minHeartbeat, message.maxHeartbeat);

session.context.setHeartbeat(heartbeat);
Expand All @@ -54,7 +58,7 @@ public void handle(FastConnectMessage message) {
.setHeartbeat(heartbeat)
.send();

LOGGER.warn("fast connect success, session={}", message.deviceId, session.context);
LOGGER.warn("fast connect success, session={}", session.context);
}
}
}
Expand Up @@ -23,7 +23,7 @@
*/
public final class RouterChangeListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RouterChangeListener.class);
public static final String KICK_CHANNEL = "__kick__";
public static final String KICK_CHANNEL = "/mpush/kick";

public RouterChangeListener() {
EventBus.INSTANCE.register(this);
Expand All @@ -41,6 +41,12 @@ void onRouteChangeEvent(RouterChangeEvent event) {
}
}

/**
* 发送踢人消息到客户端
*
* @param userId
* @param router
*/
public void kickLocal(final String userId, final LocalRouter router) {
Connection connection = router.getRouteValue();
SessionContext context = connection.getSessionContext();
Expand All @@ -59,32 +65,55 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

/**
* 广播踢人消息到消息中心(redis).
* <p>
* 有可能目标机器是当前机器,所以要做一次过滤
* 如果client连续2次链接到同一台机器上就有会出现这中情况
*
* @param userId
* @param router
*/
public void kickRemote(String userId, RemoteRouter router) {
ClientLocation location = router.getRouteValue();
//如果是本机直接忽略
//1.如果目标机器是当前机器,就不要再发送广播了,直接忽略
if (location.getHost().equals(MPushUtil.getLocalIp())) {
LOGGER.error("kick remote user but router in local, userId={}", userId);
return;
}

//2.发送广播
KickRemoteMsg msg = new KickRemoteMsg();
msg.deviceId = location.getDeviceId();
msg.targetServer = location.getHost();
msg.userId = userId;
RedisManage.publish(KICK_CHANNEL, msg);
}

/**
* 处理远程机器发送的踢人广播.
* <p>
* 一台机器发送广播所有的机器都能收到,
* 包括发送广播的机器,所有要做一次过滤
*
* @param msg
*/
public void onReceiveKickRemoteMsg(KickRemoteMsg msg) {
//如果目标不是本机,直接忽略
//1.如果当前机器不是目标机器,直接忽略
if (!msg.targetServer.equals(MPushUtil.getLocalIp())) {
LOGGER.error("receive kick remote msg, target server error, localIp={}, msg={}", MPushUtil.getLocalIp(), msg);
return;
}

//2.查询本地路由,找到要被踢下线的链接,并删除该本地路由
String userId = msg.userId;
LocalRouterManager routerManager = RouterCenter.INSTANCE.getLocalRouterManager();
LocalRouter router = routerManager.lookup(userId);
if (router != null) {
LOGGER.info("receive kick remote msg, msg={}", msg);
//2.1删除本地路由信息
routerManager.unRegister(userId);
//2.2发送踢人消息到客户端
kickLocal(userId, router);
} else {
LOGGER.warn("no local router find, kick failure, msg={}", msg);
Expand Down
Expand Up @@ -18,7 +18,7 @@ public boolean cacheSession(ReusableSession session) {
return true;
}

public ReusableSession getSession(String sessionId) {
public ReusableSession querySession(String sessionId) {
String value = RedisManage.get(sessionId, String.class);
if (Strings.isBlank(value)) return null;
return ReusableSession.decode(value);
Expand Down

0 comments on commit fc8b6d2

Please sign in to comment.