Skip to content

Commit

Permalink
代码,整理
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Mar 9, 2016
1 parent 3e58ff1 commit addf519
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 117 deletions.
Expand Up @@ -10,40 +10,41 @@
import com.shinemo.mpush.tools.redis.manage.RedisManage; import com.shinemo.mpush.tools.redis.manage.RedisManage;


//查询使用 //查询使用
public class UserManager { public final class UserManager {

public static final UserManager INSTANCE = new UserManager();
private static final String EXTRANET_ADDRESS = MPushUtil.getExtranetAddress(); private final String ONLINE_KEY = RedisKey.getUserOnlineKey(MPushUtil.getExtranetAddress());


private static final String ONLINE_KEY = RedisKey.getUserOnlineKey(EXTRANET_ADDRESS); private static final Logger log = LoggerFactory.getLogger(UserManager.class);


private static final Logger log = LoggerFactory.getLogger(UserManager.class); public UserManager() {

//重置在线数
public void init(){ // RedisManage.set(RedisKey.getConnNum(MPushUtil.getExtranetAddress()), 0);
RedisManage.del(ONLINE_KEY); //删除已经存在的数据
log.info("init redis key:{}"+ONLINE_KEY); RedisManage.del(ONLINE_KEY);
} log.info("init redis key:{}" + ONLINE_KEY);

}
public void userOnline(String userId) {
RedisManage.zAdd(ONLINE_KEY, userId); public void userOnline(String userId) {
log.info("user online {}",userId); RedisManage.zAdd(ONLINE_KEY, userId);
log.info("user online {}", userId);
} }


public void userOffline(String userId) { public void userOffline(String userId) {
RedisManage.zRem(ONLINE_KEY, userId); RedisManage.zRem(ONLINE_KEY, userId);
log.info("user offline {}",userId); log.info("user offline {}", userId);
} }

//在线用户 //在线用户
public long onlineUserNum(){ public long onlineUserNum() {
return RedisManage.zCard(ONLINE_KEY); return RedisManage.zCard(ONLINE_KEY);
} }

//在线用户列表 //在线用户列表
public List<String> onlineUserList(int start,int size){ public List<String> onlineUserList(int start, int size) {
if(size<10){ if (size < 10) {
size = 10; size = 10;
} }
return RedisManage.zrange(ONLINE_KEY, start, size-1, String.class); return RedisManage.zrange(ONLINE_KEY, start, size - 1, String.class);
} }

} }
Expand Up @@ -22,8 +22,8 @@ public final class RouterCenter {
private final LocalRouterManager localRouterManager = new LocalRouterManager(); private final LocalRouterManager localRouterManager = new LocalRouterManager();
private final RemoteRouterManager remoteRouterManager = new RemoteRouterManager(); private final RemoteRouterManager remoteRouterManager = new RemoteRouterManager();
private final RouterChangeListener routerChangeListener = new RouterChangeListener(); private final RouterChangeListener routerChangeListener = new RouterChangeListener();
private final UserManager userManager = new UserManager(); private final UserOnlineOfflineListener userOnlineOfflineListener = new UserOnlineOfflineListener();



/** /**
* 注册用户和链接 * 注册用户和链接
Expand All @@ -45,7 +45,7 @@ public boolean register(String userId, Connection connection) {
try { try {
oldLocalRouter = localRouterManager.register(userId, localRouter); oldLocalRouter = localRouterManager.register(userId, localRouter);
oldRemoteRouter = remoteRouterManager.register(userId, remoteRouter); oldRemoteRouter = remoteRouterManager.register(userId, remoteRouter);

} catch (Exception e) { } catch (Exception e) {
LOGGER.error("register router ex, userId={}, connection={}", userId, connection, e); LOGGER.error("register router ex, userId={}, connection={}", userId, connection, e);
} }
Expand Down Expand Up @@ -83,12 +83,4 @@ public LocalRouterManager getLocalRouterManager() {
public RemoteRouterManager getRemoteRouterManager() { public RemoteRouterManager getRemoteRouterManager() {
return remoteRouterManager; return remoteRouterManager;
} }

public RouterChangeListener getRouterChangeListener() {
return routerChangeListener;
}

public UserManager getUserManager(){
return userManager;
}
} }
Expand Up @@ -29,28 +29,26 @@ public final class RouterChangeListener extends AbstractEventContainer implement
private final String kick_channel = KICK_CHANNEL_ + MPushUtil.getLocalIp(); private final String kick_channel = KICK_CHANNEL_ + MPushUtil.getLocalIp();


public RouterChangeListener() { public RouterChangeListener() {
ListenerDispatcher.INSTANCE.subscribe(getKickChannel(), this); ListenerDispatcher.INSTANCE.subscribe(getKickChannel(), this);
} }


public String getKickChannel() { public String getKickChannel() {
return kick_channel; return kick_channel;
} }

public String getKickChannel(String remoteIp){ public String getKickChannel(String remoteIp) {
return KICK_CHANNEL_ + remoteIp; return KICK_CHANNEL_ + remoteIp;
} }


@Subscribe @Subscribe
void onRouteChangeEvent(RouterChangeEvent event) { void onRouteChange(RouterChangeEvent event) {
String userId = event.userId; String userId = event.userId;
Router<?> r = event.router; Router<?> r = event.router;
if (r.getRouteType().equals(Router.RouterType.LOCAL)) { if (r.getRouteType().equals(Router.RouterType.LOCAL)) {
kickLocal(userId, (LocalRouter) r); kickLocal(userId, (LocalRouter) r);
} else { } else {
kickRemote(userId, (RemoteRouter) r); kickRemote(userId, (RemoteRouter) r);
} }

// TODO: 2016/1/10 publish remoter change event to redis
} }


/** /**
Expand All @@ -68,11 +66,10 @@ public void kickLocal(final String userId, final LocalRouter router) {
message.send(new ChannelFutureListener() { message.send(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
//future.channel().close();
if (future.isSuccess()) { if (future.isSuccess()) {
LoggerManage.info(LogType.CONNECTION, "kick local connection success, userId={}, router={}", userId, router); LoggerManage.info(LogType.CONNECTION, "kick local connection success, userId={}, router={}", userId, router);
} else { } else {
LoggerManage.info(LogType.CONNECTION, "kick local connection failure, userId={}, router={}", userId, router); LoggerManage.info(LogType.CONNECTION, "kick local connection failure, userId={}, router={}", userId, router);
} }
} }
}); });
Expand All @@ -91,7 +88,7 @@ public void kickRemote(String userId, RemoteRouter router) {
ClientLocation location = router.getRouteValue(); ClientLocation location = router.getRouteValue();
//1.如果目标机器是当前机器,就不要再发送广播了,直接忽略 //1.如果目标机器是当前机器,就不要再发送广播了,直接忽略
if (location.getHost().equals(MPushUtil.getLocalIp())) { if (location.getHost().equals(MPushUtil.getLocalIp())) {
LoggerManage.info(LogType.CONNECTION, "kick remote user but router in local, userId={}", userId); LoggerManage.info(LogType.CONNECTION, "kick remote user but router in local, userId={}", userId);
return; return;
} }


Expand All @@ -115,7 +112,7 @@ public void kickRemote(String userId, RemoteRouter router) {
public void onReceiveKickRemoteMsg(KickRemoteMsg msg) { public void onReceiveKickRemoteMsg(KickRemoteMsg msg) {
//1.如果当前机器不是目标机器,直接忽略 //1.如果当前机器不是目标机器,直接忽略
if (!msg.targetServer.equals(MPushUtil.getLocalIp())) { if (!msg.targetServer.equals(MPushUtil.getLocalIp())) {
LoggerManage.info(LogType.CONNECTION, "receive kick remote msg, target server error, localIp={}, msg={}", MPushUtil.getLocalIp(), msg); LoggerManage.info(LogType.CONNECTION, "receive kick remote msg, target server error, localIp={}, msg={}", MPushUtil.getLocalIp(), msg);
return; return;
} }


Expand All @@ -124,14 +121,14 @@ public void onReceiveKickRemoteMsg(KickRemoteMsg msg) {
LocalRouterManager routerManager = RouterCenter.INSTANCE.getLocalRouterManager(); LocalRouterManager routerManager = RouterCenter.INSTANCE.getLocalRouterManager();
LocalRouter router = routerManager.lookup(userId); LocalRouter router = routerManager.lookup(userId);
if (router != null) { if (router != null) {
LoggerManage.info(LogType.CONNECTION, "receive kick remote msg, msg={}", msg); LoggerManage.info(LogType.CONNECTION, "receive kick remote msg, msg={}", msg);
//2.1删除本地路由信息 //2.1删除本地路由信息
routerManager.unRegister(userId); routerManager.unRegister(userId);
//2.2发送踢人消息到客户端 //2.2发送踢人消息到客户端
kickLocal(userId, router); kickLocal(userId, router);
remStatUser(userId); remStatUser(userId);
} else { } else {
LoggerManage.info(LogType.CONNECTION, "no local router find, kick failure, msg={}", msg); LoggerManage.info(LogType.CONNECTION, "no local router find, kick failure, msg={}", msg);
} }
} }


Expand All @@ -142,14 +139,14 @@ public void onMessage(String channel, String message) {
if (msg != null) { if (msg != null) {
onReceiveKickRemoteMsg(msg); onReceiveKickRemoteMsg(msg);
} else { } else {
LoggerManage.info(LogType.CONNECTION, "receive an error kick message={}", message); LoggerManage.info(LogType.CONNECTION, "receive an error kick message={}", message);
} }
} else { } else {
LoggerManage.info(LogType.CONNECTION, "receive an error redis channel={}",channel); LoggerManage.info(LogType.CONNECTION, "receive an error redis channel={}", channel);
} }
} }

private void remStatUser(String userId){ private void remStatUser(String userId) {
RedisManage.zRem(RedisKey.getUserOnlineKey(MPushUtil.getExtranetIp()), userId); RedisManage.zRem(RedisKey.getUserOnlineKey(MPushUtil.getExtranetIp()), userId);
} }
} }

This file was deleted.

@@ -0,0 +1,37 @@
package com.shinemo.mpush.core.router;

import com.google.common.eventbus.Subscribe;
import com.shinemo.mpush.api.event.UserOfflineEvent;
import com.shinemo.mpush.api.event.UserOnlineEvent;
import com.shinemo.mpush.common.EventBus;
import com.shinemo.mpush.common.manage.user.UserManager;
import com.shinemo.mpush.tools.redis.manage.RedisManage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by ohun on 2015/12/23.
*/
public final class UserOnlineOfflineListener {
public static final Logger LOGGER = LoggerFactory.getLogger(UserOnlineOfflineListener.class);

public static final String ONLINE_CHANNEL = "/mpush/online/";

public static final String OFFLINE_CHANNEL = "/mpush/offline/";

public UserOnlineOfflineListener() {
EventBus.INSTANCE.register(this);
}

@Subscribe
void onUserOnline(UserOnlineEvent event) {
UserManager.INSTANCE.userOnline(event.getUserId());
RedisManage.publish(ONLINE_CHANNEL, event.getUserId());
}

@Subscribe
void onUserOffline(UserOfflineEvent event) {
UserManager.INSTANCE.userOffline(event.getUserId());
RedisManage.publish(OFFLINE_CHANNEL, event.getUserId());
}
}
@@ -1,7 +1,6 @@
package com.shinemo.mpush.core.server; package com.shinemo.mpush.core.server;




import com.shinemo.mpush.api.RedisKey;
import com.shinemo.mpush.api.connection.ConnectionManager; import com.shinemo.mpush.api.connection.ConnectionManager;
import com.shinemo.mpush.api.protocol.Command; import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.common.MessageDispatcher; import com.shinemo.mpush.common.MessageDispatcher;
Expand All @@ -10,9 +9,6 @@
import com.shinemo.mpush.netty.client.NettyHttpClient; import com.shinemo.mpush.netty.client.NettyHttpClient;
import com.shinemo.mpush.netty.connection.NettyConnectionManager; import com.shinemo.mpush.netty.connection.NettyConnectionManager;
import com.shinemo.mpush.netty.server.NettyServer; import com.shinemo.mpush.netty.server.NettyServer;
import com.shinemo.mpush.tools.MPushUtil;
import com.shinemo.mpush.tools.redis.manage.RedisManage;

import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
Expand All @@ -34,11 +30,6 @@ public ConnectionServer(int port) {
public void init() { public void init() {
super.init(); super.init();
connectionManager.init(); connectionManager.init();
//重置在线数
// RedisManage.set(RedisKey.getConnNum(MPushUtil.getExtranetAddress()), 0);
//删除已经存在的数据
RedisManage.del(RedisKey.getUserOnlineKey(MPushUtil.getExtranetAddress()));

MessageDispatcher receiver = new MessageDispatcher(); MessageDispatcher receiver = new MessageDispatcher();
receiver.register(Command.HEARTBEAT, new HeartBeatHandler()); receiver.register(Command.HEARTBEAT, new HeartBeatHandler());
receiver.register(Command.HANDSHAKE, new HandshakeHandler()); receiver.register(Command.HANDSHAKE, new HandshakeHandler());
Expand Down

0 comments on commit addf519

Please sign in to comment.