Skip to content

Commit

Permalink
添加注释
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Jan 8, 2016
1 parent b125ab1 commit fdbeaa9
Show file tree
Hide file tree
Showing 28 changed files with 229 additions and 197 deletions.
Expand Up @@ -21,28 +21,32 @@ public ClientLocation setDeviceId(String deviceId) {
return this; return this;
} }



public String getHost() {
return host;
}

public ClientLocation setHost(String host) {
this.host = host;
return this;
}

public String getOsName() { public String getOsName() {
return osName; return osName;
} }


public void setOsName(String osName) { public ClientLocation setOsName(String osName) {
this.osName = osName; this.osName = osName;
return this;
} }


public String getClientVersion() { public String getClientVersion() {
return clientVersion; return clientVersion;
} }


public void setClientVersion(String clientVersion) { public ClientLocation setClientVersion(String clientVersion) {
this.clientVersion = clientVersion; this.clientVersion = clientVersion;
} return this;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
} }


public static ClientLocation from(SessionContext context) { public static ClientLocation from(SessionContext context) {
Expand Down
Expand Up @@ -14,7 +14,7 @@
/** /**
* Created by ohun on 2015/12/22. * Created by ohun on 2015/12/22.
*/ */
public class MessageDispatcher implements PacketReceiver { public final class MessageDispatcher implements PacketReceiver {
public static final Logger LOGGER = LoggerFactory.getLogger(MessageDispatcher.class); public static final Logger LOGGER = LoggerFactory.getLogger(MessageDispatcher.class);
private final Map<Byte, MessageHandler> handlers = new HashMap<>(); private final Map<Byte, MessageHandler> handlers = new HashMap<>();


Expand Down
Expand Up @@ -8,7 +8,7 @@
/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
*/ */
public class PushMessage extends BaseMessage { public final class PushMessage extends BaseMessage {


public String content; public String content;


Expand Down
Expand Up @@ -8,7 +8,7 @@
/** /**
* Created by ohun on 2016/1/4. * Created by ohun on 2016/1/4.
*/ */
public class ConnectionRouterManager extends RemoteRouterManager { public final class ConnectionRouterManager extends RemoteRouterManager {
public static final ConnectionRouterManager INSTANCE = new ConnectionRouterManager(); public static final ConnectionRouterManager INSTANCE = new ConnectionRouterManager();
// TODO: 2015/12/30 可以增加一层本地缓存,防止疯狂查询redis, 但是要注意失效问题及数据不一致问题 // TODO: 2015/12/30 可以增加一层本地缓存,防止疯狂查询redis, 但是要注意失效问题及数据不一致问题
private final Cache<String, RemoteRouter> cache = CacheBuilder private final Cache<String, RemoteRouter> cache = CacheBuilder
Expand Down
Expand Up @@ -6,7 +6,7 @@
/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class RemoteRouter implements Router<ClientLocation> { public final class RemoteRouter implements Router<ClientLocation> {
private final ClientLocation clientLocation; private final ClientLocation clientLocation;


public RemoteRouter(ClientLocation clientLocation) { public RemoteRouter(ClientLocation clientLocation) {
Expand Down
4 changes: 2 additions & 2 deletions mpush-core/src/main/java/com/shinemo/mpush/core/App.java
Expand Up @@ -5,7 +5,7 @@
import com.shinemo.mpush.core.server.ConnectionServer; import com.shinemo.mpush.core.server.ConnectionServer;
import com.shinemo.mpush.core.server.GatewayServer; import com.shinemo.mpush.core.server.GatewayServer;
import com.shinemo.mpush.tools.ConfigCenter; import com.shinemo.mpush.tools.ConfigCenter;
import com.shinemo.mpush.tools.InetAddressUtil; import com.shinemo.mpush.tools.MPushUtil;
import com.shinemo.mpush.tools.Jsons; import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.redis.RedisGroup; import com.shinemo.mpush.tools.redis.RedisGroup;
import com.shinemo.mpush.tools.redis.RedisNode; import com.shinemo.mpush.tools.redis.RedisNode;
Expand Down Expand Up @@ -105,7 +105,7 @@ public void onFailure(String message) {
} }


private void registerServerToZK(int port, ZKPath path) { private void registerServerToZK(int port, ZKPath path) {
ServerApp app = new ServerApp(InetAddressUtil.getInetAddress(), port); ServerApp app = new ServerApp(MPushUtil.getLocalIp(), port);
ZkUtil.instance.registerEphemeralSequential(path.getWatchPath(), Jsons.toJson(app)); ZkUtil.instance.registerEphemeralSequential(path.getWatchPath(), Jsons.toJson(app));
LOGGER.error("mpush app register server:{} to zk success", port); LOGGER.error("mpush app register server:{} to zk success", port);
} }
Expand Down
Expand Up @@ -30,15 +30,18 @@ public void handle(BindUserMessage message) {
LOGGER.error("bind user failure invalid param, session={}", message.getConnection().getSessionContext()); LOGGER.error("bind user failure invalid param, session={}", message.getConnection().getSessionContext());
return; return;
} }
//1.绑定用户时先看下是否握手成功
SessionContext context = message.getConnection().getSessionContext(); SessionContext context = message.getConnection().getSessionContext();
if (context.handshakeOk()) { if (context.handshakeOk()) {
//2.如果握手成功,就把用户链接信息注册到路由中心,本地和远程各一份
boolean success = RouterCenter.INSTANCE.register(message.userId, message.getConnection()); boolean success = RouterCenter.INSTANCE.register(message.userId, message.getConnection());
if (success) { if (success) {
OkMessage.from(message).setData("bind success").send(); OkMessage.from(message).setData("bind success").send();
LOGGER.warn("bind user success, userId={}, session={}", message.userId, context); LOGGER.warn("bind user success, userId={}, session={}", message.userId, context);
} else { } else {
ErrorMessage.from(message).setReason("bind failed").close(); //3.注册失败再处理下,防止本地注册成功,远程注册失败的情况,只有都成功了才叫成功
RouterCenter.INSTANCE.unRegister(message.userId); RouterCenter.INSTANCE.unRegister(message.userId);
ErrorMessage.from(message).setReason("bind failed").close();
LOGGER.error("bind user failure, register router failure, userId={}, session={}", message.userId, context); LOGGER.error("bind user failure, register router failure, userId={}, session={}", message.userId, context);
} }
} else { } else {
Expand Down
Expand Up @@ -2,21 +2,23 @@


import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.protocol.Packet; import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.api.router.ClientLocation;
import com.shinemo.mpush.api.router.Router; import com.shinemo.mpush.api.router.Router;
import com.shinemo.mpush.common.ErrorCode;
import com.shinemo.mpush.common.handler.BaseMessageHandler; import com.shinemo.mpush.common.handler.BaseMessageHandler;
import com.shinemo.mpush.common.message.ErrorMessage; import com.shinemo.mpush.common.message.ErrorMessage;
import com.shinemo.mpush.common.message.OkMessage; import com.shinemo.mpush.common.message.OkMessage;
import com.shinemo.mpush.common.message.PushMessage; import com.shinemo.mpush.common.message.PushMessage;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage; import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.common.router.RemoteRouter;
import com.shinemo.mpush.core.router.RouterCenter; import com.shinemo.mpush.core.router.RouterCenter;
import com.shinemo.mpush.tools.MPushUtil; import com.shinemo.mpush.tools.MPushUtil;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import static com.shinemo.mpush.api.router.Router.RouterType.LOCAL;
import static com.shinemo.mpush.common.ErrorCode.*;

/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
*/ */
Expand All @@ -30,59 +32,72 @@ public GatewayPushMessage decode(Packet packet, Connection connection) {


@Override @Override
public void handle(final GatewayPushMessage message) { public void handle(final GatewayPushMessage message) {
//查询路由信息,先查本地,本地不存在,查远程,有可能远程也是本机(在本地被删除的情况下)
Router<?> router = RouterCenter.INSTANCE.lookup(message.userId); Router<?> router = RouterCenter.INSTANCE.lookup(message.userId);

if (router == null) { if (router == null) {

//1.路由信息不存在说明用户此时不在线 //1.路由信息不存在说明用户此时不在线
ErrorMessage ErrorMessage.from(message).setErrorCode(OFFLINE).send();
.from(message)
.setErrorCode(ErrorCode.OFFLINE)
.send();
LOGGER.warn("gateway push, router not exists user offline userId={}, content={}", message.userId, message.content); LOGGER.warn("gateway push, router not exists user offline userId={}, content={}", message.userId, message.content);
} else if (router.getRouteType() == Router.RouterType.LOCAL) { } else if (router.getRouteType() == LOCAL) {
//2.如果是本地路由信息,说明用户链接在当前机器,直接把消息下发到客户端
//2.如果是本地路由信息,说明用户链接在当前机器,如果链接可用,直接把消息下发到客户端
Connection connection = (Connection) router.getRouteValue(); Connection connection = (Connection) router.getRouteValue();

if (!connection.isConnected()) { if (!connection.isConnected()) {

//2.1如果链接失效,先删除本地失效的路由,再查下远程路由,看用户是否登陆到其他机器
LOGGER.info("gateway push, router in local but disconnect, userId={}, connection={}", message.userId, connection); LOGGER.info("gateway push, router in local but disconnect, userId={}, connection={}", message.userId, connection);

//2.2删除已经失效的本地路由,防止递归死循环
RouterCenter.INSTANCE.getLocalRouterManager().unRegister(message.userId); RouterCenter.INSTANCE.getLocalRouterManager().unRegister(message.userId);
handle(message);//递归在试一次,看用户是否登陆在远程
//2.3递归再试一次,看用户是否登陆在远程其他机器
this.handle(message);

return; return;
} }

//2.4下发消息到手机客户端
PushMessage pushMessage = new PushMessage(message.content, connection); PushMessage pushMessage = new PushMessage(message.content, connection);

pushMessage.send(new ChannelFutureListener() { pushMessage.send(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {//推送成功 if (future.isSuccess()) {
OkMessage //推送成功
.from(message) OkMessage.from(message).setData(message.userId).send();
.setData(message.userId)
.send();
LOGGER.info("gateway push message to client success userId={}, content={}", message.userId, message.content); LOGGER.info("gateway push message to client success userId={}, content={}", message.userId, message.content);
} else {//推送失败 } else {
ErrorMessage //推送失败
.from(message) ErrorMessage.from(message).setErrorCode(PUSH_CLIENT_FAILURE).send();
.setErrorCode(ErrorCode.PUSH_CLIENT_FAILURE)
.send();
LOGGER.error("gateway push message to client failure userId={}, content={}", message.userId, message.content); LOGGER.error("gateway push message to client failure userId={}, content={}", message.userId, message.content);
} }
} }
}); });

LOGGER.info("gateway push, router in local userId={}, connection={}", message.userId, connection); LOGGER.info("gateway push, router in local userId={}, connection={}", message.userId, connection);
} else { } else {
RemoteRouter r = (RemoteRouter) router;
if (r.getRouteValue().getHost().equals(MPushUtil.getLocalIp())) { //3.如果是远程路由,说明此时用户已经跑到另一台机器上了
ErrorMessage ClientLocation location = (ClientLocation) router.getRouteValue();
.from(message)
.setErrorCode(ErrorCode.OFFLINE) if (MPushUtil.getLocalIp().equals(location.getHost())) {
.send(); //3.1如果查出的远程机器是当前机器,说明本机路由已经失效,此时说明用户已经不在线
ErrorMessage.from(message).setErrorCode(OFFLINE).send();

LOGGER.error("gateway push error remote is local, userId={}, router={}", message.userId, router); LOGGER.error("gateway push error remote is local, userId={}, router={}", message.userId, router);

return; return;
} }
//3.如果是远程路由,说明此时用户已经跑到另一台机器上了
// 需要通过GatewayClient或ZK把消息推送到另外一台机器上 //3.2返回给推送服务,路由信息发生更改
ErrorMessage ErrorMessage.from(message).setErrorCode(ROUTER_CHANGE).send();
.from(message)
.setErrorCode(ErrorCode.ROUTER_CHANGE)
.send();
LOGGER.info("gateway push, router in remote userId={}, router={}", message.userId, router); LOGGER.info("gateway push, router in remote userId={}, router={}", message.userId, router);
} }
} }
Expand Down
Expand Up @@ -74,12 +74,14 @@ public void handle(HandshakeMessage message) {
//7.更换会话密钥AES(clientKey)=>AES(sessionKey) //7.更换会话密钥AES(clientKey)=>AES(sessionKey)
context.changeCipher(new AesCipher(sessionKey, iv)); context.changeCipher(new AesCipher(sessionKey, iv));


//8.保存client信息 //8.保存client信息到当前连接
context.setOsName(message.osName) context.setOsName(message.osName)
.setOsVersion(message.osVersion) .setOsVersion(message.osVersion)
.setClientVersion(message.clientVersion) .setClientVersion(message.clientVersion)
.setDeviceId(message.deviceId) .setDeviceId(message.deviceId)
.setHeartbeat(heartbeat); .setHeartbeat(heartbeat);

//9.保存可复用session到Redis, 用于快速重连
ReusableSessionManager.INSTANCE.cacheSession(session); ReusableSessionManager.INSTANCE.cacheSession(session);


//9.触发握手成功事件 //9.触发握手成功事件
Expand Down
Expand Up @@ -3,7 +3,7 @@
/** /**
* Created by ohun on 2016/1/4. * Created by ohun on 2016/1/4.
*/ */
public class KickRemoteMsg { public final class KickRemoteMsg {
public String userId; public String userId;
public String deviceId; public String deviceId;
public String targetServer; public String targetServer;
Expand Down
Expand Up @@ -6,7 +6,7 @@
/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class LocalRouter implements Router<Connection> { public final class LocalRouter implements Router<Connection> {
private final Connection connection; private final Connection connection;


public LocalRouter(Connection connection) { public LocalRouter(Connection connection) {
Expand Down
Expand Up @@ -10,7 +10,7 @@
/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class LocalRouterManager implements RouterManager<LocalRouter> { public final class LocalRouterManager implements RouterManager<LocalRouter> {
public static final Logger LOGGER = LoggerFactory.getLogger(LocalRouterManager.class); public static final Logger LOGGER = LoggerFactory.getLogger(LocalRouterManager.class);
private final Map<String, LocalRouter> routerMap = new ConcurrentHashMap<>(); private final Map<String, LocalRouter> routerMap = new ConcurrentHashMap<>();


Expand Down
Expand Up @@ -14,7 +14,7 @@
/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class RouterCenter { public final class RouterCenter {
public static final Logger LOGGER = LoggerFactory.getLogger(RouterCenter.class); public static final Logger LOGGER = LoggerFactory.getLogger(RouterCenter.class);
public static final RouterCenter INSTANCE = new RouterCenter(); public static final RouterCenter INSTANCE = new RouterCenter();


Expand All @@ -30,10 +30,13 @@ public class RouterCenter {
* @return * @return
*/ */
public boolean register(String userId, Connection connection) { public boolean register(String userId, Connection connection) {
ClientLocation connConfig = ClientLocation.from(connection.getSessionContext()); ClientLocation location = ClientLocation
connConfig.setHost(MPushUtil.getLocalIp()); .from(connection.getSessionContext())
.setHost(MPushUtil.getLocalIp());

LocalRouter localRouter = new LocalRouter(connection); LocalRouter localRouter = new LocalRouter(connection);
RemoteRouter remoteRouter = new RemoteRouter(connConfig); RemoteRouter remoteRouter = new RemoteRouter(location);

LocalRouter oldLocalRouter = null; LocalRouter oldLocalRouter = null;
RemoteRouter oldRemoteRouter = null; RemoteRouter oldRemoteRouter = null;
try { try {
Expand Down
Expand Up @@ -21,7 +21,7 @@
/** /**
* Created by ohun on 2016/1/4. * Created by ohun on 2016/1/4.
*/ */
public class RouterChangeListener implements MessageListener { public final class RouterChangeListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RouterChangeListener.class); private static final Logger LOGGER = LoggerFactory.getLogger(RouterChangeListener.class);
public static final String KICK_CHANNEL = "__kick__"; public static final String KICK_CHANNEL = "__kick__";


Expand Down
@@ -0,0 +1,24 @@
package com.shinemo.mpush.core.server;

import com.shinemo.mpush.api.Server;

/**
* Created by ohun on 2016/1/8.
*/
public final class AdminServer implements Server {

@Override
public void start(Listener listener) {

}

@Override
public void stop(Listener listener) {

}

@Override
public boolean isRunning() {
return false;
}
}
Expand Up @@ -18,7 +18,7 @@
* Created by ohun on 2015/12/19. * Created by ohun on 2015/12/19.
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class ServerChannelHandler extends ChannelHandlerAdapter { public final class ServerChannelHandler extends ChannelHandlerAdapter {


private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannelHandler.class); private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannelHandler.class);


Expand Down
@@ -1,22 +1,20 @@
package com.shinemo.mpush.core.session; package com.shinemo.mpush.core.session;


import com.shinemo.mpush.api.connection.SessionContext; import com.shinemo.mpush.api.connection.SessionContext;
import com.shinemo.mpush.tools.ConfigCenter;
import com.shinemo.mpush.tools.Strings; import com.shinemo.mpush.tools.Strings;
import com.shinemo.mpush.tools.crypto.MD5Utils; import com.shinemo.mpush.tools.crypto.MD5Utils;
import com.shinemo.mpush.tools.redis.manage.RedisManage; import com.shinemo.mpush.tools.redis.manage.RedisManage;
import io.netty.util.internal.chmv8.ConcurrentHashMapV8;

import java.util.Map;


/** /**
* Created by ohun on 2015/12/25. * Created by ohun on 2015/12/25.
*/ */
public final class ReusableSessionManager { public final class ReusableSessionManager {
public static final ReusableSessionManager INSTANCE = new ReusableSessionManager(); public static final ReusableSessionManager INSTANCE = new ReusableSessionManager();
private static final int EXPIRE_TIME = 86400; private int expiredTime = ConfigCenter.INSTANCE.getSessionExpiredTime();


public boolean cacheSession(ReusableSession session) { public boolean cacheSession(ReusableSession session) {
RedisManage.set(session.sessionId, session.encode(), EXPIRE_TIME); RedisManage.set(session.sessionId, session.encode(), expiredTime);
return true; return true;
} }


Expand All @@ -37,8 +35,7 @@ public ReusableSession genSession(SessionContext context) {
ReusableSession session = new ReusableSession(); ReusableSession session = new ReusableSession();
session.context = context; session.context = context;
session.sessionId = MD5Utils.encrypt(context.deviceId + now); session.sessionId = MD5Utils.encrypt(context.deviceId + now);
session.expireTime = now + EXPIRE_TIME * 1000; session.expireTime = now + expiredTime * 1000;
return session; return session;
} }

} }

0 comments on commit fdbeaa9

Please sign in to comment.