Skip to content

Commit

Permalink
add Admin command
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed May 24, 2016
1 parent 84a5181 commit 717dba1
Show file tree
Hide file tree
Showing 24 changed files with 201 additions and 382 deletions.
1 change: 1 addition & 0 deletions conf/conf-dev.properties
@@ -0,0 +1 @@
log.level=debug
1 change: 1 addition & 0 deletions conf/conf-pub.properties
@@ -0,0 +1 @@
log.level=warn
2 changes: 1 addition & 1 deletion mpush-boot/assembly.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<assembly> <assembly>
<id>release-${mpush.version}</id> <id>release-${mpush.version}</id>
<baseDirectory>mpush</baseDirectory> <baseDirectory>mpush-${mpush.version}</baseDirectory>
<includeBaseDirectory>true</includeBaseDirectory> <includeBaseDirectory>true</includeBaseDirectory>
<formats> <formats>
<format>tar.gz</format> <format>tar.gz</format>
Expand Down
8 changes: 4 additions & 4 deletions mpush-boot/src/main/java/com/mpush/bootstrap/Main.java
Expand Up @@ -25,17 +25,17 @@ public class Main {


public static void main(String[] args) { public static void main(String[] args) {
Logs.init(); Logs.init();
Logs.Console.info("launch app..."); Logs.Console.error("launch app...");
ServerLauncher launcher = new ServerLauncher(); ServerLauncher launcher = new ServerLauncher();
launcher.start(); launcher.start();
addHook(launcher); addHook(launcher);
} }


private static void addHook(final ServerLauncher serverBoot) { private static void addHook(final ServerLauncher launcher) {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() { public void run() {
serverBoot.stop(); launcher.stop();
Logs.Console.info("jvm exit all server stopped..."); Logs.Console.error("jvm exit, all server stopped...");
} }
}); });
} }
Expand Down
Expand Up @@ -38,11 +38,11 @@ public class ServerLauncher {


private final ZKServerNode gsNode = ZKServerNode.gsNode(); private final ZKServerNode gsNode = ZKServerNode.gsNode();


private final Server connectServer = new ConnectionServer(csNode.getPort()); private final ConnectionServer connectServer = new ConnectionServer(csNode.getPort());


private final Server gatewayServer = new GatewayServer(gsNode.getPort()); private final GatewayServer gatewayServer = new GatewayServer(gsNode.getPort());


private final Server adminServer = new AdminServer(CC.mp.net.admin_server_port); private final AdminServer adminServer = new AdminServer(CC.mp.net.admin_server_port, connectServer, gatewayServer);




public void start() { public void start() {
Expand All @@ -60,7 +60,7 @@ public void start() {
} }


public void stop() { public void stop() {
stopServer(gatewayServer); stopServer(connectServer);
stopServer(gatewayServer); stopServer(gatewayServer);
stopServer(adminServer); stopServer(adminServer);
} }
Expand Down
Expand Up @@ -41,7 +41,7 @@ private BootJob first() {
return new BootJob() { return new BootJob() {
@Override @Override
public void run() { public void run() {
Logs.Console.info("begin run bootstrap chain..."); Logs.Console.error("begin run bootstrap chain...");
next(); next();
} }
}; };
Expand Down
Expand Up @@ -33,7 +33,7 @@ public abstract class BootJob {


public void next() { public void next() {
if (next != null) { if (next != null) {
Logs.Console.info("run next bootstrap job [" + next.getClass().getSimpleName() + "]"); Logs.Console.error("run next bootstrap job [" + next.getClass().getSimpleName() + "]");
next.run(); next.run();
} }
} }
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class LastBoot extends BootJob {
@Override @Override
public void run() { public void run() {
UserManager.INSTANCE.clearUserOnlineData(); UserManager.INSTANCE.clearUserOnlineData();
Logs.Console.info("end run bootstrap chain..."); Logs.Console.error("end run bootstrap chain...");
Logs.Console.info("app start success..."); Logs.Console.error("app start success...");
} }
} }
Expand Up @@ -50,7 +50,7 @@ public void run() {
server.start(new Server.Listener() { server.start(new Server.Listener() {
@Override @Override
public void onSuccess(Object... args) { public void onSuccess(Object... args) {
Logs.Console.info("start " + serverName + " success listen:" + args[0]); Logs.Console.error("start " + serverName + " success listen:" + args[0]);
if (node != null) { if (node != null) {
registerServerToZk(node.getZkPath(), Jsons.toJson(node)); registerServerToZk(node.getZkPath(), Jsons.toJson(node));
} }
Expand All @@ -70,6 +70,6 @@ public void onFailure(Throwable cause) {
//step7 注册应用到zk //step7 注册应用到zk
public void registerServerToZk(String path, String value) { public void registerServerToZk(String path, String value) {
ZKClient.I.registerEphemeralSequential(path, value); ZKClient.I.registerEphemeralSequential(path, value);
Logs.Console.info("register server node=" + value + " to zk name=" + path); Logs.Console.error("register server node=" + value + " to zk name=" + path);
} }
} }
6 changes: 4 additions & 2 deletions mpush-boot/src/main/resources/mpush.conf
@@ -1,2 +1,4 @@
mp.log.level=debug mp.log.level=${log.level}
mp.zk.namespace=mpush mp.security.private-key="MIIBNgIBADANBgkqhkiG9w0BAQEFAASCASAwggEcAgEAAoGBAKCE8JYKhsbydMPbiO7BJVq1pbuJWJHFxOR7L8Hv3ZVkSG4eNC8DdwAmDHYu/wadfw0ihKFm2gKDcLHp5yz5UQ8PZ8FyDYvgkrvGV0ak4nc40QDJWws621dm01e/INlGKOIStAAsxOityCLv0zm5Vf3+My/YaBvZcB5mGUsPbx8fAgEAAoGAAy0+WanRqwRHXUzt89OsupPXuNNqBlCEqgTqGAt4Nimq6Ur9u2R1KXKXUotxjp71Ubw6JbuUWvJg+5Rmd9RjT0HOUEQF3rvzEepKtaraPhV5ejEIrB+nJWNfGye4yzLdfEXJBGUQzrG+wNe13izfRNXI4dN/6Q5npzqaqv0E1CkCAQACAQACAQACAQACAQA="
mp.security.public-key="MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB"
mp.zk.namespace=mpush
Expand Up @@ -50,7 +50,7 @@ private ZKRedisClusterManager() {
*/ */
@Override @Override
public void init() { public void init() {
Logs.Console.info("begin init redis cluster"); Logs.Console.error("begin init redis cluster");
List<com.mpush.tools.config.data.RedisGroup> groupList = CC.mp.redis.cluster_group; List<com.mpush.tools.config.data.RedisGroup> groupList = CC.mp.redis.cluster_group;
if (groupList.size() > 0) { if (groupList.size() > 0) {
if (CC.mp.redis.write_to_zk) { if (CC.mp.redis.write_to_zk) {
Expand All @@ -73,7 +73,7 @@ public void init() {
groups.add(RedisGroup.from(node)); groups.add(RedisGroup.from(node));
} }
if (groups.isEmpty()) throw new RuntimeException("init redis sever fail groupList is null"); if (groups.isEmpty()) throw new RuntimeException("init redis sever fail groupList is null");
Logs.Console.info("init redis cluster success..."); Logs.Console.error("init redis cluster success...");
} }


@Override @Override
Expand Down Expand Up @@ -119,7 +119,7 @@ public List<RedisServer> hashSet(String key) {
private void register(List<com.mpush.tools.config.data.RedisGroup> groupList) { private void register(List<com.mpush.tools.config.data.RedisGroup> groupList) {
String data = Jsons.toJson(groupList); String data = Jsons.toJson(groupList);
ZKClient.I.registerPersist(REDIS_SERVER.getRootPath(), data); ZKClient.I.registerPersist(REDIS_SERVER.getRootPath(), data);
Logs.Console.info("register redis server group success, group=" + data); Logs.Console.error("register redis server group success, group=" + data);
} }


public void addGroup(RedisGroup group) { public void addGroup(RedisGroup group) {
Expand Down
183 changes: 132 additions & 51 deletions mpush-core/src/main/java/com/mpush/core/handler/AdminHandler.java
Expand Up @@ -19,19 +19,26 @@


package com.mpush.core.handler; package com.mpush.core.handler;


import com.google.common.base.Strings;
import com.mpush.api.Service;
import com.mpush.cache.redis.RedisKey; import com.mpush.cache.redis.RedisKey;
import com.mpush.cache.redis.manager.RedisManager; import com.mpush.cache.redis.manager.RedisManager;
import com.mpush.tools.config.ConfigManager; import com.mpush.common.router.RemoteRouter;
import com.mpush.core.router.RouterCenter;
import com.mpush.core.server.AdminServer;
import com.mpush.tools.Jsons; import com.mpush.tools.Jsons;
import com.mpush.tools.MPushUtil; import com.mpush.tools.MPushUtil;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.ConfigManager;
import com.mpush.zk.ZKClient; import com.mpush.zk.ZKClient;
import com.mpush.zk.ZKPath; import com.mpush.zk.ZKPath;
import com.mpush.zk.node.ZKServerNode; import com.mpush.zk.node.ZKServerNode;
import com.typesafe.config.ConfigRenderOptions;
import io.netty.channel.*; import io.netty.channel.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;


Expand All @@ -40,24 +47,37 @@ public final class AdminHandler extends SimpleChannelInboundHandler<String> {


private static final Logger LOGGER = LoggerFactory.getLogger(AdminHandler.class); private static final Logger LOGGER = LoggerFactory.getLogger(AdminHandler.class);


private static final String DOUBLE_END = "\r\n\r\n";


private static final String EOL = "\r\n"; private static final String EOL = "\r\n";


private static AdminServer adminServer;

public AdminHandler(AdminServer adminServer) {
this.adminServer = adminServer;
}

@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, String request) throws Exception { protected void messageReceived(ChannelHandlerContext ctx, String request) throws Exception {
Command command = Command.getCommand(request); Command command = Command.help;
ChannelFuture future = ctx.write(command.handler(request) + DOUBLE_END); String args = null;
if (command.equals(Command.QUIT)) { if (request != null) {
String[] cmd_args = request.split(" ");
command = Command.toCmd(cmd_args[0].trim());
if (cmd_args.length == 2) {
args = cmd_args[1];
}
}
Object result = command.handler(ctx, args);
ChannelFuture future = ctx.writeAndFlush(result + EOL + EOL);
if (command == Command.quit) {
future.addListener(ChannelFutureListener.CLOSE); future.addListener(ChannelFutureListener.CLOSE);
} }

} }


@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write("welcome to " + MPushUtil.getInetAddress() + "!" + EOL); ctx.write("welcome to " + MPushUtil.getInetAddress() + "!" + EOL);
ctx.write("It is " + new Date() + " now." + DOUBLE_END); ctx.write("It is " + new Date() + " now." + EOL + EOL);
ctx.flush(); ctx.flush();
} }


Expand All @@ -67,38 +87,118 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
} }


public enum Command { public enum Command {
HELP("help") { help {
@Override @Override
public String handler(String request) { public String handler(ChannelHandlerContext ctx, String args) {
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
buf.append("Command:" + EOL); buf.append("Option Description" + EOL);
buf.append("help:display all command." + EOL); buf.append("------ -----------" + EOL);
buf.append("quit:exit checkHealth." + EOL); buf.append("help show help" + EOL);
buf.append("scn:statistics connect num." + EOL); buf.append("quit exit console mode" + EOL);
buf.append("rcs:remove current server zk info." + EOL); buf.append("shutdown stop mpush server" + EOL);
buf.append("scs:stop connection server."); buf.append("restart restart mpush server" + EOL);
buf.append("zk:<redis, cs ,gs> query zk node" + EOL);
buf.append("count:<conn, online> count conn num or online user count" + EOL);
buf.append("route:<uid> show user route info" + EOL);
buf.append("conf:[key] show config info" + EOL);
return buf.toString(); return buf.toString();
} }
}, },
QUIT("quit") { quit {
@Override @Override
public String handler(String request) { public String handler(ChannelHandlerContext ctx, String args) {
return "have a good day!"; return "have a good day!";
} }
}, },
SCN("scn") { shutdown {
@Override
public String handler(ChannelHandlerContext ctx, String args) {
ctx.writeAndFlush("try close connect server...");
adminServer.getConnectionServer().stop(new Service.Listener() {
@Override
public void onSuccess(Object... args) {
ctx.writeAndFlush("connect server close success" + EOL);
adminServer.stop(null);//这个一定要在System.exit之前调用,不然jvm 会卡死 @see com.mpush.bootstrap.Main#addHook
System.exit(0);
}

@Override
public void onFailure(Throwable cause) {
ctx.writeAndFlush("connect server close failure, msg=" + cause.getLocalizedMessage());
}
});
return null;
}
},
restart {
@Override
public String handler(ChannelHandlerContext ctx, String args) {
return "unsupported";
}
},
zk {
@Override
public String handler(ChannelHandlerContext ctx, String args) {
switch (args) {
case "redis":
return ZKClient.I.get(ZKPath.REDIS_SERVER.getRootPath());
case "cs":
return getNodeData(ZKPath.CONNECT_SERVER);
case "gs":
return getNodeData(ZKPath.GATEWAY_SERVER);

}
return "[" + args + "] unsupported, try help.";
}

private String getNodeData(ZKPath path) {
List<String> rawData = ZKClient.I.getChildrenKeys(path.getRootPath());
StringBuilder sb = new StringBuilder();
for (String raw : rawData) {
sb.append(ZKClient.I.get(path.getFullPath(raw))).append('\n');
}
return sb.toString();
}
},
count {
@Override @Override
public String handler(String request) { public Serializable handler(ChannelHandlerContext ctx, String args) {
Long value = RedisManager.I.zCard(RedisKey.getUserOnlineKey(MPushUtil.getExtranetAddress())); switch (args) {
if (value == null) { case "conn":
value = 0L; return adminServer.getConnectionServer().getConnectionManager().getConnections().size();
case "online": {
Long value = RedisManager.I.zCard(RedisKey.getUserOnlineKey(MPushUtil.getExtranetAddress()));
return value == null ? 0 : value;
}

} }
return value.toString() + "."; return "[" + args + "] unsupported, try help.";
}
},
route {
@Override
public String handler(ChannelHandlerContext ctx, String args) {
if (Strings.isNullOrEmpty(args)) return "please input userId";
RemoteRouter router = RouterCenter.INSTANCE.getRemoteRouterManager().lookup(args);
if (router == null) return "user [" + args + "] offline now.";
return router.getRouteValue().toString();
} }
}, },
RCS("rcs") { conf {
@Override @Override
public String handler(String request) { public String handler(ChannelHandlerContext ctx, String args) {
if (Strings.isNullOrEmpty(args)) {
return CC.cfg.root().render(ConfigRenderOptions.concise().setFormatted(true));
}
if (CC.cfg.hasPath(args)) {
return CC.cfg.getAnyRef(args).toString();
}
return "key [" + args + "] not find in config";
}
},
rcs {
@Override
public String handler(ChannelHandlerContext ctx, String args) {


List<String> rawData = ZKClient.I.getChildrenKeys(ZKPath.CONNECT_SERVER.getRootPath()); List<String> rawData = ZKClient.I.getChildrenKeys(ZKPath.CONNECT_SERVER.getRootPath());
boolean removeSuccess = false; boolean removeSuccess = false;
Expand All @@ -121,35 +221,16 @@ public String handler(String request) {
return "remove false."; return "remove false.";
} }
} }
},
SCS("scs") {
@Override
public String handler(String request) {
return "not support now.";
}
}; };
private final String cmd;

public abstract String handler(String request);


private Command(String cmd) { public abstract Object handler(ChannelHandlerContext ctx, String args);
this.cmd = cmd;
}


public String getCmd() { public static Command toCmd(String cmd) {
return cmd; try {
} return Command.valueOf(cmd);

} catch (Exception e) {
public static Command getCommand(String request) {
if (StringUtils.isNoneEmpty(request)) {
for (Command command : Command.values()) {
if (command.getCmd().equals(request)) {
return command;
}
}
} }
return HELP; return help;
} }
} }

} }

0 comments on commit 717dba1

Please sign in to comment.