diff --git a/mpush-api/src/main/java/com/shinemo/mpush/api/Server.java b/mpush-api/src/main/java/com/shinemo/mpush/api/Server.java index a86cbeba..70d731fe 100644 --- a/mpush-api/src/main/java/com/shinemo/mpush/api/Server.java +++ b/mpush-api/src/main/java/com/shinemo/mpush/api/Server.java @@ -8,6 +8,8 @@ public interface Server { void start(Listener listener); void stop(Listener listener); + + public void init(); boolean isRunning(); diff --git a/mpush-client/src/main/java/com/shinemo/mpush/client/PushClient.java b/mpush-client/src/main/java/com/shinemo/mpush/client/PushClient.java index a1a3b1a9..ab25c23e 100644 --- a/mpush-client/src/main/java/com/shinemo/mpush/client/PushClient.java +++ b/mpush-client/src/main/java/com/shinemo/mpush/client/PushClient.java @@ -11,7 +11,6 @@ import com.shinemo.mpush.tools.zk.ZKPath; import com.shinemo.mpush.tools.zk.ServerApp; import com.shinemo.mpush.tools.zk.ZkRegister; -import com.shinemo.mpush.tools.zk.listener.impl.RedisPathListener; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; @@ -76,9 +75,9 @@ public void send(String content, Collection userIds, Callback callback) } public void initRedisClient() { - RedisPathListener listener = new RedisPathListener(); - zkRegister.getCache().getListenable().addListener(listener); - listener.initData(null); +// RedisPathListener listener = new RedisPathListener(); +// zkRegister.getCache().getListenable().addListener(listener); +// listener.initData(null); } private class GatewayServerZKListener implements TreeCacheListener { diff --git a/mpush-client/src/test/java/com/shinemo/mpush/client/PushClientTest.java b/mpush-client/src/test/java/com/shinemo/mpush/client/PushClientTest.java index 2fc93266..743bfbce 100644 --- a/mpush-client/src/test/java/com/shinemo/mpush/client/PushClientTest.java +++ b/mpush-client/src/test/java/com/shinemo/mpush/client/PushClientTest.java @@ -1,50 +1,50 @@ -package com.shinemo.mpush.client; - -import com.shinemo.mpush.api.PushSender; -import org.junit.Test; - -import java.util.Arrays; -import java.util.concurrent.locks.LockSupport; - -import static org.junit.Assert.*; - -/** - * Created by ohun on 2016/1/7. - */ -public class PushClientTest { - - @Test - public void testSend() throws Exception { - - } - - public static void main(String[] args) throws Exception { - PushClient client = new PushClient(); - client.init(); - Thread.sleep(1000); - client.send("this a first push", Arrays.asList("user-0", "user-1", "user-2", "user-3", "user-4"), - new PushSender.Callback() { - @Override - public void onSuccess(String userId) { - System.err.println("push onSuccess userId=" + userId); - } - - @Override - public void onFailure(String userId) { - System.err.println("push onFailure userId=" + userId); - } - - @Override - public void onOffline(String userId) { - System.err.println("push onOffline userId=" + userId); - } - - @Override - public void onTimeout(String userId) { - System.err.println("push onTimeout userId=" + userId); - } - } - ); - LockSupport.park(); - } -} \ No newline at end of file +//package com.shinemo.mpush.client; +// +//import com.shinemo.mpush.api.PushSender; +//import org.junit.Test; +// +//import java.util.Arrays; +//import java.util.concurrent.locks.LockSupport; +// +//import static org.junit.Assert.*; +// +///** +// * Created by ohun on 2016/1/7. +// */ +//public class PushClientTest { +// +// @Test +// public void testSend() throws Exception { +// +// } +// +// public static void main(String[] args) throws Exception { +// PushClient client = new PushClient(); +// client.init(); +// Thread.sleep(1000); +// client.send("this a first push", Arrays.asList("user-0", "user-1", "user-2", "user-3", "user-4"), +// new PushSender.Callback() { +// @Override +// public void onSuccess(String userId) { +// System.err.println("push onSuccess userId=" + userId); +// } +// +// @Override +// public void onFailure(String userId) { +// System.err.println("push onFailure userId=" + userId); +// } +// +// @Override +// public void onOffline(String userId) { +// System.err.println("push onOffline userId=" + userId); +// } +// +// @Override +// public void onTimeout(String userId) { +// System.err.println("push onTimeout userId=" + userId); +// } +// } +// ); +// LockSupport.park(); +// } +//} \ No newline at end of file diff --git a/mpush-core/src/main/java/com/shinemo/mpush/core/AbstractServer.java b/mpush-core/src/main/java/com/shinemo/mpush/core/AbstractServer.java new file mode 100644 index 00000000..575940be --- /dev/null +++ b/mpush-core/src/main/java/com/shinemo/mpush/core/AbstractServer.java @@ -0,0 +1,128 @@ +package com.shinemo.mpush.core; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.shinemo.mpush.api.Server; +import com.shinemo.mpush.core.server.ConnectionServer; +import com.shinemo.mpush.tools.GenericsUtil; +import com.shinemo.mpush.tools.Jsons; +import com.shinemo.mpush.tools.MPushUtil; +import com.shinemo.mpush.tools.config.ConfigCenter; +import com.shinemo.mpush.tools.redis.RedisGroup; +import com.shinemo.mpush.tools.spi.ServiceContainer; +import com.shinemo.mpush.tools.thread.ThreadPoolUtil; +import com.shinemo.mpush.tools.zk.ServerApp; +import com.shinemo.mpush.tools.zk.ZKPath; +import com.shinemo.mpush.tools.zk.ZkRegister; +import com.shinemo.mpush.tools.zk.listener.DataChangeListener; + +public abstract class AbstractServer { + + private static final Logger log = LoggerFactory.getLogger(AbstractServer.class); + + protected Application application; + + protected List dataChangeListeners = Lists.newArrayList(); + + protected ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class); + + protected Server server; + + public AbstractServer() { + this.application = getApplication(); + } + + @SuppressWarnings("unchecked") + private Application getApplication() { + try { + return ((Class) GenericsUtil.getSuperClassGenericType(this.getClass(), 0)).newInstance(); + } catch (Exception e) { + log.warn("exception:",e); + throw new RuntimeException(e); + } + } + + public abstract Server getServer(); + + public void registerListener(DataChangeListener listener){ + dataChangeListeners.add(listener); + } + + //step1 启动 zk + public void initZK(){ + zkRegister = ServiceContainer.getInstance(ZkRegister.class); + zkRegister.init(); + } + + //step2 获取redis + public void initRedis(){ + boolean exist = zkRegister.isExisted(ZKPath.REDIS_SERVER.getPath()); + if (!exist) { + List groupList = ConfigCenter.holder.redisGroups(); + zkRegister.registerPersist(ZKPath.REDIS_SERVER.getPath(), Jsons.toJson(groupList)); + } + } + + //step3 初始化 listener data + public void initListenerData(){ + for(DataChangeListener listener:dataChangeListeners){ + listener.initData(); + } + } + + //step4 初始化server + public void initServer(){ + server = getServer(); + } + + //step4 启动 netty server + public void startServer(){ + ThreadPoolUtil.newThread(new Runnable() { + @Override + public void run() { + final int port = ConfigCenter.holder.connectionServerPort(); + ConnectionServer server = new ConnectionServer(port); + server.init(); + server.start(new Server.Listener() { + @Override + public void onSuccess() { + log.error("mpush app start connection server success...."); + } + + @Override + public void onFailure(String message) { + log.error("mpush app start connection server failure, jvm exit with code -1"); + System.exit(-1); + } + }); + } + }, "conn-server", false).start(); + + } + + //step5 注册应用到zk + public void registerServerToZk(){ + ServerApp app = new ServerApp(MPushUtil.getLocalIp(), application.getPort()); + zkRegister.registerEphemeralSequential(application.getServerRegisterZkPath(), Jsons.toJson(app)); + } + + public void init(){ + initZK(); + initRedis(); + initListenerData(); + startServer(); + registerServerToZk(); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + if (server != null) { + server.stop(null); + } + } + }); + } + +} diff --git a/mpush-core/src/main/java/com/shinemo/mpush/core/App.java b/mpush-core/src/main/java/com/shinemo/mpush/core/App.java index 76d515d1..67672d94 100644 --- a/mpush-core/src/main/java/com/shinemo/mpush/core/App.java +++ b/mpush-core/src/main/java/com/shinemo/mpush/core/App.java @@ -1,130 +1,130 @@ -package com.shinemo.mpush.core; - -import com.shinemo.mpush.api.Server; -import com.shinemo.mpush.core.server.ConnectionServer; -import com.shinemo.mpush.core.server.GatewayServer; -import com.shinemo.mpush.tools.MPushUtil; -import com.shinemo.mpush.tools.Jsons; -import com.shinemo.mpush.tools.config.ConfigCenter; -import com.shinemo.mpush.tools.redis.RedisGroup; -import com.shinemo.mpush.tools.spi.ServiceContainer; -import com.shinemo.mpush.tools.thread.ThreadPoolUtil; -import com.shinemo.mpush.tools.zk.ZKPath; -import com.shinemo.mpush.tools.zk.ServerApp; -import com.shinemo.mpush.tools.zk.ZkRegister; -import com.shinemo.mpush.tools.zk.listener.impl.RedisPathListener; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -/** - * Created by ohun on 2016/1/5. - */ -public final class App { - private static final Logger LOGGER = LoggerFactory.getLogger(App.class); - private static final App APP = new App(); - private ConnectionServer connectionServer; - private GatewayServer gatewayServer; - - private ZkRegister zkRegister = null; - - public static void main(String[] args) throws Exception { - LOGGER.error("mpush app start begin...."); - APP.init(); - APP.initZkRegister(); - APP.initRedisClient(); - APP.startConnectionServer(); - APP.startGatewayServer(); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - if (APP.connectionServer != null) { - APP.connectionServer.stop(null); - } - if (APP.gatewayServer != null) { - APP.gatewayServer.stop(null); - } - } - }); - LOGGER.error("mpush app start end...."); - } - - private void init() throws IOException { - LOGGER.error("mpush app config center init success...."); - } - - public void startConnectionServer() { - ThreadPoolUtil.newThread(new Runnable() { - @Override - public void run() { - final int port = ConfigCenter.holder.connectionServerPort(); - ConnectionServer server = new ConnectionServer(port); - server.init(); - server.start(new Server.Listener() { - @Override - public void onSuccess() { - registerServerToZK(port, ZKPath.CONNECTION_SERVER); - LOGGER.error("mpush app start connection server success...."); - } - - @Override - public void onFailure(String message) { - LOGGER.error("mpush app start connection server failure, jvm exit with code -1"); - System.exit(-1); - } - }); - APP.connectionServer = server; - } - }, "conn-server", false).start(); - } - - public void startGatewayServer() { - ThreadPoolUtil.newThread(new Runnable() { - @Override - public void run() { - final int port = ConfigCenter.holder.gatewayServerPort(); - GatewayServer server = new GatewayServer(port); - server.init(); - server.start(new Server.Listener() { - @Override - public void onSuccess() { - registerServerToZK(port, ZKPath.GATEWAY_SERVER); - LOGGER.error("mpush app start gateway server success...."); - } - - @Override - public void onFailure(String message) { - System.exit(-2); - LOGGER.error("mpush app start gateway server failure, jvm exit with code -2"); - } - }); - APP.gatewayServer = server; - } - }, "gateway-server", false).start(); - } - - private void registerServerToZK(int port, ZKPath path) { - ServerApp app = new ServerApp(MPushUtil.getLocalIp(), port); - zkRegister.registerEphemeralSequential(path.getWatchPath(), Jsons.toJson(app)); - LOGGER.error("mpush app register server:{} to zk success", port); - } - - public void initZkRegister(){ - zkRegister = ServiceContainer.getInstance(ZkRegister.class); - zkRegister.init(); - } - - public void initRedisClient() throws Exception { - - boolean exist = zkRegister.isExisted(ZKPath.REDIS_SERVER.getPath()); - if (!exist) { - List groupList = ConfigCenter.holder.redisGroups(); - zkRegister.registerPersist(ZKPath.REDIS_SERVER.getPath(), Jsons.toJson(groupList)); - } - RedisPathListener listener = new RedisPathListener(); - zkRegister.getCache().getListenable().addListener(listener); - listener.initData(null); - } -} +//package com.shinemo.mpush.core; +// +//import com.shinemo.mpush.api.Server; +//import com.shinemo.mpush.core.server.ConnectionServer; +//import com.shinemo.mpush.core.server.GatewayServer; +//import com.shinemo.mpush.tools.MPushUtil; +//import com.shinemo.mpush.tools.Jsons; +//import com.shinemo.mpush.tools.config.ConfigCenter; +//import com.shinemo.mpush.tools.redis.RedisGroup; +//import com.shinemo.mpush.tools.spi.ServiceContainer; +//import com.shinemo.mpush.tools.thread.ThreadPoolUtil; +//import com.shinemo.mpush.tools.zk.ZKPath; +//import com.shinemo.mpush.tools.zk.ServerApp; +//import com.shinemo.mpush.tools.zk.ZkRegister; +//import com.shinemo.mpush.tools.zk.listener.impl.RedisPathListener; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.IOException; +//import java.util.List; +// +///** +// * Created by ohun on 2016/1/5. +// */ +//public final class App { +// private static final Logger LOGGER = LoggerFactory.getLogger(App.class); +// private static final App APP = new App(); +// private ConnectionServer connectionServer; +// private GatewayServer gatewayServer; +// +// private ZkRegister zkRegister = null; +// +// public static void main(String[] args) throws Exception { +// LOGGER.error("mpush app start begin...."); +// APP.init(); +// APP.initZkRegister(); +// APP.initRedisClient(); +// APP.startConnectionServer(); +// APP.startGatewayServer(); +// Runtime.getRuntime().addShutdownHook(new Thread() { +// public void run() { +// if (APP.connectionServer != null) { +// APP.connectionServer.stop(null); +// } +// if (APP.gatewayServer != null) { +// APP.gatewayServer.stop(null); +// } +// } +// }); +// LOGGER.error("mpush app start end...."); +// } +// +// private void init() throws IOException { +// LOGGER.error("mpush app config center init success...."); +// } +// +// public void startConnectionServer() { +// ThreadPoolUtil.newThread(new Runnable() { +// @Override +// public void run() { +// final int port = ConfigCenter.holder.connectionServerPort(); +// ConnectionServer server = new ConnectionServer(port); +// server.init(); +// server.start(new Server.Listener() { +// @Override +// public void onSuccess() { +// registerServerToZK(port, ZKPath.CONNECTION_SERVER); +// LOGGER.error("mpush app start connection server success...."); +// } +// +// @Override +// public void onFailure(String message) { +// LOGGER.error("mpush app start connection server failure, jvm exit with code -1"); +// System.exit(-1); +// } +// }); +// APP.connectionServer = server; +// } +// }, "conn-server", false).start(); +// } +// +// public void startGatewayServer() { +// ThreadPoolUtil.newThread(new Runnable() { +// @Override +// public void run() { +// final int port = ConfigCenter.holder.gatewayServerPort(); +// GatewayServer server = new GatewayServer(port); +// server.init(); +// server.start(new Server.Listener() { +// @Override +// public void onSuccess() { +// registerServerToZK(port, ZKPath.GATEWAY_SERVER); +// LOGGER.error("mpush app start gateway server success...."); +// } +// +// @Override +// public void onFailure(String message) { +// System.exit(-2); +// LOGGER.error("mpush app start gateway server failure, jvm exit with code -2"); +// } +// }); +// APP.gatewayServer = server; +// } +// }, "gateway-server", false).start(); +// } +// +// private void registerServerToZK(int port, ZKPath path) { +// ServerApp app = new ServerApp(MPushUtil.getLocalIp(), port); +// zkRegister.registerEphemeralSequential(path.getWatchPath(), Jsons.toJson(app)); +// LOGGER.error("mpush app register server:{} to zk success", port); +// } +// +// public void initZkRegister(){ +// zkRegister = ServiceContainer.getInstance(ZkRegister.class); +// zkRegister.init(); +// } +// +// public void initRedisClient() throws Exception { +// +// boolean exist = zkRegister.isExisted(ZKPath.REDIS_SERVER.getPath()); +// if (!exist) { +// List groupList = ConfigCenter.holder.redisGroups(); +// zkRegister.registerPersist(ZKPath.REDIS_SERVER.getPath(), Jsons.toJson(groupList)); +// } +// RedisPathListener listener = new RedisPathListener(); +// zkRegister.getCache().getListenable().addListener(listener); +// listener.initData(); +// } +//} diff --git a/mpush-core/src/main/java/com/shinemo/mpush/core/Application.java b/mpush-core/src/main/java/com/shinemo/mpush/core/Application.java new file mode 100644 index 00000000..b21837df --- /dev/null +++ b/mpush-core/src/main/java/com/shinemo/mpush/core/Application.java @@ -0,0 +1,30 @@ +package com.shinemo.mpush.core; + + +/** + * 系统配置 + * + */ +public abstract class Application { + + private int port; + + private String serverRegisterZkPath; + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getServerRegisterZkPath() { + return serverRegisterZkPath; + } + + public void setServerRegisterZkPath(String serverRegisterZkPath) { + this.serverRegisterZkPath = serverRegisterZkPath; + } + +} diff --git a/mpush-core/src/main/java/com/shinemo/mpush/core/server/AdminServer.java b/mpush-core/src/main/java/com/shinemo/mpush/core/server/AdminServer.java index 3e08f30d..bc511ddd 100644 --- a/mpush-core/src/main/java/com/shinemo/mpush/core/server/AdminServer.java +++ b/mpush-core/src/main/java/com/shinemo/mpush/core/server/AdminServer.java @@ -21,4 +21,10 @@ public void stop(Listener listener) { public boolean isRunning() { return false; } + + @Override + public void init() { + // TODO Auto-generated method stub + + } } diff --git a/mpush-core/src/test/java/com/shinemo/mpush/core/zk/ServerManageTest.java b/mpush-core/src/test/java/com/shinemo/mpush/core/zk/ServerManageTest.java index db104531..a8e86100 100644 --- a/mpush-core/src/test/java/com/shinemo/mpush/core/zk/ServerManageTest.java +++ b/mpush-core/src/test/java/com/shinemo/mpush/core/zk/ServerManageTest.java @@ -1,80 +1,80 @@ -package com.shinemo.mpush.core.zk; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.shinemo.mpush.tools.MPushUtil; -import com.shinemo.mpush.tools.zk.ServerApp; -import com.shinemo.mpush.tools.zk.ZKPath; -import com.shinemo.mpush.tools.zk.manage.ServerManage; - -public class ServerManageTest { - - private static Executor executor = Executors.newCachedThreadPool(); - - private ServerApp app = new ServerApp(MPushUtil.getLocalIp(), 3000); - - private ServerManage manage = new ServerManage(app, ZKPath.CONNECTION_SERVER); - - @Test - public void testMulThreadRegisterApp() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - for (int i = 1; i <= 2; i++) { - executor.execute(new Worker("192.168.1." + i, latch)); - } - latch.countDown(); - - Thread.sleep(Integer.MAX_VALUE); - } - - - @Test - public void testServerManageStart() throws InterruptedException { - manage.start(); - Thread.sleep(Integer.MAX_VALUE); - } - - - private static class Worker implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(Worker.class); - - private final String ip; - private final CountDownLatch latch; - - public Worker(String ip, CountDownLatch latch) { - this.ip = ip; - this.latch = latch; - } - - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - log.warn("start init " + ip); - ServerApp app = new ServerApp(ip, 3000); - ServerManage manage = new ServerManage(app, ZKPath.CONNECTION_SERVER); - manage.start(); - - try { - Thread.sleep(20000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - manage.close(); - - log.warn("end init " + ip); - } - - } - -} +//package com.shinemo.mpush.core.zk; +// +//import java.util.concurrent.CountDownLatch; +//import java.util.concurrent.Executor; +//import java.util.concurrent.Executors; +// +//import org.junit.Test; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.shinemo.mpush.tools.MPushUtil; +//import com.shinemo.mpush.tools.zk.ServerApp; +//import com.shinemo.mpush.tools.zk.ZKPath; +//import com.shinemo.mpush.tools.zk.manage.ServerManage; +// +//public class ServerManageTest { +// +// private static Executor executor = Executors.newCachedThreadPool(); +// +// private ServerApp app = new ServerApp(MPushUtil.getLocalIp(), 3000); +// +// private ServerManage manage = new ServerManage(app, ZKPath.CONNECTION_SERVER); +// +// @Test +// public void testMulThreadRegisterApp() throws InterruptedException { +// CountDownLatch latch = new CountDownLatch(1); +// for (int i = 1; i <= 2; i++) { +// executor.execute(new Worker("192.168.1." + i, latch)); +// } +// latch.countDown(); +// +// Thread.sleep(Integer.MAX_VALUE); +// } +// +// +// @Test +// public void testServerManageStart() throws InterruptedException { +// manage.start(); +// Thread.sleep(Integer.MAX_VALUE); +// } +// +// +// private static class Worker implements Runnable { +// +// private static final Logger log = LoggerFactory.getLogger(Worker.class); +// +// private final String ip; +// private final CountDownLatch latch; +// +// public Worker(String ip, CountDownLatch latch) { +// this.ip = ip; +// this.latch = latch; +// } +// +// @Override +// public void run() { +// try { +// latch.await(); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// log.warn("start init " + ip); +// ServerApp app = new ServerApp(ip, 3000); +// ServerManage manage = new ServerManage(app, ZKPath.CONNECTION_SERVER); +// manage.start(); +// +// try { +// Thread.sleep(20000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// manage.close(); +// +// log.warn("end init " + ip); +// } +// +// } +// +//} diff --git a/mpush-cs/pom.xml b/mpush-cs/pom.xml index 73220dce..4c252db7 100644 --- a/mpush-cs/pom.xml +++ b/mpush-cs/pom.xml @@ -11,7 +11,6 @@ jar mpush-cs - http://maven.apache.org UTF-8 @@ -19,10 +18,8 @@ - junit - junit - 3.8.1 - test + com.shinemo.mpush + mpush-core diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/ConnectionServerApplication.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/ConnectionServerApplication.java new file mode 100644 index 00000000..8f2ebba4 --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/ConnectionServerApplication.java @@ -0,0 +1,15 @@ +package com.shinemo.mpush.cs; + +import com.shinemo.mpush.core.Application; +import com.shinemo.mpush.tools.config.ConfigCenter; +import com.shinemo.mpush.tools.zk.ZKPath; + +public class ConnectionServerApplication extends Application{ + + + public ConnectionServerApplication() { + setPort(ConfigCenter.holder.connectionServerPort()); + setServerRegisterZkPath(ZKPath.CONNECTION_SERVER.getWatchPath()); + } + +} diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/ConnectionServerMain.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/ConnectionServerMain.java new file mode 100644 index 00000000..4823ecc2 --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/ConnectionServerMain.java @@ -0,0 +1,31 @@ +package com.shinemo.mpush.cs; + + + +import com.shinemo.mpush.api.Server; +import com.shinemo.mpush.core.AbstractServer; +import com.shinemo.mpush.core.server.ConnectionServer; +import com.shinemo.mpush.cs.zk.listener.impl.ConnectionServerPathListener; +import com.shinemo.mpush.cs.zk.listener.impl.PushServerPathListener; +import com.shinemo.mpush.cs.zk.listener.impl.RedisPathListener; +import com.shinemo.mpush.tools.config.ConfigCenter; + +public class ConnectionServerMain extends AbstractServer{ + + public ConnectionServerMain(){ + + registerListener(new RedisPathListener()); + registerListener(new PushServerPathListener()); + registerListener(new ConnectionServerPathListener()); + + } + + @Override + public Server getServer() { + final int port = ConfigCenter.holder.connectionServerPort(); + ConnectionServer connectionServer = new ConnectionServer(port); + return connectionServer; + } + + +} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/manage/ServerAppManage.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/ConnectionServerManage.java similarity index 74% rename from mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/manage/ServerAppManage.java rename to mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/ConnectionServerManage.java index 5a64b337..1d4a4027 100644 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/manage/ServerAppManage.java +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/ConnectionServerManage.java @@ -1,4 +1,4 @@ -package com.shinemo.mpush.tools.zk.manage; +package com.shinemo.mpush.cs.manage; import java.util.Collection; import java.util.Collections; @@ -12,19 +12,15 @@ import com.google.common.collect.Maps; import com.shinemo.mpush.tools.zk.ServerApp; -/** - * 系统中当前可用的app列表 - * - */ -public class ServerAppManage { - - private static final Logger log = LoggerFactory.getLogger(ServerAppManage.class); +public class ConnectionServerManage { + + private static final Logger log = LoggerFactory.getLogger(ConnectionServerManage.class); private static Map holder = Maps.newConcurrentMap(); - public static final ServerAppManage instance = new ServerAppManage(); + public static final ConnectionServerManage instance = new ConnectionServerManage(); - private ServerAppManage() { + private ConnectionServerManage() { } public void addOrUpdate(String fullPath,ServerApp app){ @@ -46,4 +42,5 @@ private void printAppList(){ log.warn(ToStringBuilder.reflectionToString(app, ToStringStyle.DEFAULT_STYLE)); } } + } diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/PushServerManage.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/PushServerManage.java new file mode 100644 index 00000000..bf560861 --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/PushServerManage.java @@ -0,0 +1,7 @@ +package com.shinemo.mpush.cs.manage; + +public class PushServerManage { + + + +} diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/RedisManage.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/RedisManage.java new file mode 100644 index 00000000..68f2a025 --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/manage/RedisManage.java @@ -0,0 +1,7 @@ +package com.shinemo.mpush.cs.manage; + +public class RedisManage { + + + +} diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/ConnectionServerPathListener.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/ConnectionServerPathListener.java new file mode 100644 index 00000000..1221333b --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/ConnectionServerPathListener.java @@ -0,0 +1,90 @@ +package com.shinemo.mpush.cs.zk.listener.impl; + +import java.util.List; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.shinemo.mpush.cs.manage.ConnectionServerManage; +import com.shinemo.mpush.tools.Jsons; +import com.shinemo.mpush.tools.spi.ServiceContainer; +import com.shinemo.mpush.tools.zk.ServerApp; +import com.shinemo.mpush.tools.zk.ZKPath; +import com.shinemo.mpush.tools.zk.ZkRegister; +import com.shinemo.mpush.tools.zk.listener.DataChangeListener; + +/** + * connection server 应用 监控 + * + */ +public class ConnectionServerPathListener extends DataChangeListener{ + + private static final Logger log = LoggerFactory.getLogger(ConnectionServerPathListener.class); + + private static ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class); + + @Override + public void initData() { + log.warn("start init app data"); + _initData(); + log.warn("end init app data"); + } + + private void _initData() { + // 获取机器列表 + List rawData = zkRegister.getChildrenKeys(ZKPath.CONNECTION_SERVER.getPath()); + for (String raw : rawData) { + String fullPath = ZKPath.CONNECTION_SERVER.getFullPath(raw); + ServerApp app = getServerApp(fullPath); + ConnectionServerManage.instance.addOrUpdate(fullPath, app); + } + } + + private void dataRemove(ChildData data) { + String path = data.getPath(); + ConnectionServerManage.instance.remove(path); + } + + private void dataAddOrUpdate(ChildData data) { + String path = data.getPath(); + byte[] rawData = data.getData(); + ServerApp serverApp = Jsons.fromJson(rawData, ServerApp.class); + ConnectionServerManage.instance.addOrUpdate(path, serverApp); + } + + private ServerApp getServerApp(String fullPath) { + String rawApp = zkRegister.get(fullPath); + ServerApp app = Jsons.fromJson(rawApp, ServerApp.class); + return app; + } + + @Override + public void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) throws Exception { + String data = ""; + if (event.getData() != null) { + data = ToStringBuilder.reflectionToString(event.getData(), ToStringStyle.MULTI_LINE_STYLE); + } + if (Type.NODE_ADDED == event.getType()) { + dataAddOrUpdate(event.getData()); + } else if (Type.NODE_REMOVED == event.getType()) { + dataRemove(event.getData()); + } else if (Type.NODE_UPDATED == event.getType()) { + dataAddOrUpdate(event.getData()); + } else { + log.warn("ConnPathListener other path:" + path + "," + event.getType().name() + "," + data); + } + + } + + @Override + public String listenerPath() { + return null; + } + +} diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/PushServerPathListener.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/PushServerPathListener.java new file mode 100644 index 00000000..04e90702 --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/PushServerPathListener.java @@ -0,0 +1,34 @@ +package com.shinemo.mpush.cs.zk.listener.impl; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.shinemo.mpush.tools.zk.listener.DataChangeListener; + +/** + * push server 路径监控 + * + */ +public class PushServerPathListener extends DataChangeListener{ + + private static final Logger log = LoggerFactory.getLogger(PushServerPathListener.class); + + @Override + public void initData() { + + } + + @Override + public void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) throws Exception { + + } + + @Override + public String listenerPath() { + return null; + } + + +} diff --git a/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/RedisPathListener.java b/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/RedisPathListener.java new file mode 100644 index 00000000..0217e12d --- /dev/null +++ b/mpush-cs/src/main/java/com/shinemo/mpush/cs/zk/listener/impl/RedisPathListener.java @@ -0,0 +1,90 @@ +package com.shinemo.mpush.cs.zk.listener.impl; + +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Strings; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.shinemo.mpush.tools.Jsons; +import com.shinemo.mpush.tools.redis.RedisGroup; +import com.shinemo.mpush.tools.redis.RedisRegister; +import com.shinemo.mpush.tools.spi.ServiceContainer; +import com.shinemo.mpush.tools.zk.ZKPath; +import com.shinemo.mpush.tools.zk.ZkRegister; +import com.shinemo.mpush.tools.zk.listener.DataChangeListener; + +/** + * redis 监控 + */ +public class RedisPathListener extends DataChangeListener { + private static final Logger log = LoggerFactory.getLogger(RedisPathListener.class); + + private static final ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class); + + private static final RedisRegister redisRegister = ServiceContainer.getInstance(RedisRegister.class); + + // 获取redis列表 + private void _initData() { + log.warn("start init redis data"); + List group = getRedisGroup(ZKPath.REDIS_SERVER.getPath()); + redisRegister.init(group); + log.warn("end init redis data"); + } + + private void dataRemove(ChildData data) { + _initData(); + } + + private void dataAddOrUpdate(ChildData data) { + _initData(); + } + + @SuppressWarnings("unchecked") + private List getRedisGroup(String fullPath) { + String rawGroup = zkRegister.get(fullPath); + if (Strings.isNullOrEmpty(rawGroup)) + return Collections.EMPTY_LIST; + List group = Jsons.fromJsonToList(rawGroup, RedisGroup[].class); + if (group == null) + return Collections.EMPTY_LIST; + return group; + } + + @Override + public void initData() { + _initData(); + } + + @Override + public void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) throws Exception { + + String data = ""; + if (event.getData() != null) { + data = ToStringBuilder.reflectionToString(event.getData(), ToStringStyle.MULTI_LINE_STYLE); + } + if (Type.NODE_ADDED == event.getType()) { + dataAddOrUpdate(event.getData()); + } else if (Type.NODE_REMOVED == event.getType()) { + dataRemove(event.getData()); + } else if (Type.NODE_UPDATED == event.getType()) { + dataAddOrUpdate(event.getData()); + } else { + log.warn("RedisPathListener other path:" + data + "," + event.getType().name() + "," + data); + } + + } + + @Override + public String listenerPath() { + return ZKPath.REDIS_SERVER.getPath(); + } +} diff --git a/mpush-ps/pom.xml b/mpush-ps/pom.xml index 8614904e..6c8f2d5e 100644 --- a/mpush-ps/pom.xml +++ b/mpush-ps/pom.xml @@ -18,11 +18,9 @@ - - junit - junit - 3.8.1 - test - + + com.shinemo.mpush + mpush-core + diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/GenericsUtil.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/GenericsUtil.java new file mode 100644 index 00000000..e2913435 --- /dev/null +++ b/mpush-tools/src/main/java/com/shinemo/mpush/tools/GenericsUtil.java @@ -0,0 +1,139 @@ +package com.shinemo.mpush.tools; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + + +public class GenericsUtil { + + /** + * 通过反射,获得指定类的父类的泛型参数的实际类型. 如BuyerServiceBean extends DaoSupport + * + * @param clazz clazz 需要反射的类,该类必须继承范型父类 + * @param index 泛型参数所在索引,从0开始. + * @return 范型参数的实际类型, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回Object.class + */ + public static Class getSuperClassGenericType(Class clazz, int index) { + Type genType = clazz.getGenericSuperclass();//得到泛型父类 + //如果没有实现ParameterizedType接口,即不支持泛型,直接返回Object.class + + if (!(genType instanceof ParameterizedType)) { + return Object.class; + } + //返回表示此类型实际类型参数的Type对象的数组,数组里放的都是对应类型的Class, 如BuyerServiceBean extends DaoSupport就返回Buyer和Contact类型 + Type[] params = ((ParameterizedType) genType).getActualTypeArguments(); + if (index >= params.length || index < 0) { + throw new RuntimeException("你输入的索引"+ (index<0 ? "不能小于0" : "超出了参数的总数")); + } + if (!(params[index] instanceof Class)) { + return Object.class; + } + return (Class) params[index]; + } + /** + * 通过反射,获得指定类的父类的第一个泛型参数的实际类型. 如BuyerServiceBean extends DaoSupport + * + * @param clazz clazz 需要反射的类,该类必须继承泛型父类 + * @return 泛型参数的实际类型, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回Object.class + */ + public static Class getSuperClassGenericType(Class clazz) { + return getSuperClassGenericType(clazz, 0); + } + /** + * 通过反射,获得方法返回值泛型参数的实际类型. 如: public Map getNames(){} + * + * @param method 方法 + * @param index 泛型参数所在索引,从0开始. + * @return 泛型参数的实际类型, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回Object.class + */ + public static Class getMethodGenericReturnType(Method method, int index) { + Type returnType = method.getGenericReturnType(); + if(returnType instanceof ParameterizedType){ + ParameterizedType type = (ParameterizedType) returnType; + Type[] typeArguments = type.getActualTypeArguments(); + if (index >= typeArguments.length || index < 0) { + throw new RuntimeException("你输入的索引"+ (index<0 ? "不能小于0" : "超出了参数的总数")); + } + return (Class)typeArguments[index]; + } + return Object.class; + } + /** + * 通过反射,获得方法返回值第一个泛型参数的实际类型. 如: public Map getNames(){} + * + * @param method 方法 + * @return 泛型参数的实际类型, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回Object.class + */ + public static Class getMethodGenericReturnType(Method method) { + return getMethodGenericReturnType(method, 0); + } + + /** + * 通过反射,获得方法输入参数第index个输入参数的所有泛型参数的实际类型. 如: public void add(Map maps, List names){} + * + * @param method 方法 + * @param index 第几个输入参数 + * @return 输入参数的泛型参数的实际类型集合, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回空集合 + */ + public static List> getMethodGenericParameterTypes(Method method, int index) { + List> results = new ArrayList>(); + Type[] genericParameterTypes = method.getGenericParameterTypes(); + if (index >= genericParameterTypes.length ||index < 0) { + throw new RuntimeException("你输入的索引"+ (index<0 ? "不能小于0" : "超出了参数的总数")); + } + Type genericParameterType = genericParameterTypes[index]; + if(genericParameterType instanceof ParameterizedType){ + ParameterizedType aType = (ParameterizedType) genericParameterType; + Type[] parameterArgTypes = aType.getActualTypeArguments(); + for(Type parameterArgType : parameterArgTypes){ + Class parameterArgClass = (Class) parameterArgType; + results.add(parameterArgClass); + } + return results; + } + return results; + } + /** + * 通过反射,获得方法输入参数第一个输入参数的所有泛型参数的实际类型. 如: public void add(Map maps, List names){} + * + * @param method 方法 + * @return 输入参数的泛型参数的实际类型集合, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回空集合 + */ + public static List> getMethodGenericParameterTypes(Method method) { + return getMethodGenericParameterTypes(method, 0); + } + /** + * 通过反射,获得Field泛型参数的实际类型. 如: public Map names; + * + * @param field 字段 + * @param index 泛型参数所在索引,从0开始. + * @return 泛型参数的实际类型, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回Object.class + */ + public static Class getFieldGenericType(Field field, int index) { + Type genericFieldType = field.getGenericType(); + + if(genericFieldType instanceof ParameterizedType){ + ParameterizedType aType = (ParameterizedType) genericFieldType; + Type[] fieldArgTypes = aType.getActualTypeArguments(); + if (index >= fieldArgTypes.length || index < 0) { + throw new RuntimeException("你输入的索引"+ (index<0 ? "不能小于0" : "超出了参数的总数")); + } + return (Class)fieldArgTypes[index]; + } + return Object.class; + } + /** + * 通过反射,获得Field泛型参数的实际类型. 如: public Map names; + * + * @param field 字段 + * @return 泛型参数的实际类型, 如果没有实现ParameterizedType接口,即不支持泛型,所以直接返回Object.class + */ + public static Class getFieldGenericType(Field field) { + return getFieldGenericType(field, 0); + } + +} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/RedisUtil.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/RedisUtil.java index 99bd59f3..8d6033bb 100644 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/RedisUtil.java +++ b/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/RedisUtil.java @@ -14,7 +14,6 @@ import com.google.common.collect.Maps; import com.shinemo.mpush.tools.Constants; import com.shinemo.mpush.tools.Jsons; -import com.shinemo.mpush.tools.zk.manage.ServerAppManage; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @@ -22,7 +21,7 @@ public class RedisUtil { - private static final Logger log = LoggerFactory.getLogger(ServerAppManage.class); + private static final Logger log = LoggerFactory.getLogger(RedisUtil.class); private static Map holder = Maps.newConcurrentMap(); diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/jedis/services/JedisRegisterManager.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/jedis/services/JedisRegisterManager.java index df435663..b20e6ebd 100644 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/jedis/services/JedisRegisterManager.java +++ b/mpush-tools/src/main/java/com/shinemo/mpush/tools/redis/jedis/services/JedisRegisterManager.java @@ -12,11 +12,10 @@ import com.shinemo.mpush.tools.redis.RedisGroup; import com.shinemo.mpush.tools.redis.RedisNode; import com.shinemo.mpush.tools.redis.RedisRegister; -import com.shinemo.mpush.tools.zk.manage.ServerAppManage; public class JedisRegisterManager implements RedisRegister{ - private static final Logger LOGGER = LoggerFactory.getLogger(ServerAppManage.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JedisRegisterManager.class); private static List groups = Lists.newArrayList(); diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/ZkRegister.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/ZkRegister.java index 23c1bedc..c335076f 100644 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/ZkRegister.java +++ b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/ZkRegister.java @@ -4,8 +4,8 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCache; - import com.shinemo.mpush.tools.spi.SPI; +import com.shinemo.mpush.tools.zk.listener.DataChangeListener; @SPI("zkRegister") @@ -37,6 +37,8 @@ public interface ZkRegister { public ZkConfig getZkConfig(); - TreeCache getCache(); + public TreeCache getCache(); + + public void registerListener(DataChangeListener listener); } diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/curator/services/ZkRegisterManager.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/curator/services/ZkRegisterManager.java index 13ec2daa..95097964 100644 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/curator/services/ZkRegisterManager.java +++ b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/curator/services/ZkRegisterManager.java @@ -14,6 +14,8 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; @@ -22,9 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.shinemo.mpush.tools.MPushUtil; import com.shinemo.mpush.tools.config.ConfigCenter; import com.shinemo.mpush.tools.zk.ZkConfig; import com.shinemo.mpush.tools.zk.ZkRegister; +import com.shinemo.mpush.tools.zk.listener.DataChangeListener; public class ZkRegisterManager implements ZkRegister { @@ -79,11 +83,27 @@ public List getAclForPath(final String path) { try { client.blockUntilConnected(); cacheData(); + registerConnectionLostListener(); } catch (final Exception ex) { LOGGER.error("zk connection error" + ToStringBuilder.reflectionToString(zkConfig, ToStringStyle.DEFAULT_STYLE)); } } + + // 注册连接状态监听器 + private void registerConnectionLostListener() { + client.getConnectionStateListenable().addListener(new ConnectionStateListener() { + + @Override + public void stateChanged(final CuratorFramework client, final ConnectionState newState) { + if (ConnectionState.LOST == newState) { + LOGGER.warn(MPushUtil.getInetAddress() + ", lost connection"); + } else if (ConnectionState.RECONNECTED == newState) { + LOGGER.warn(MPushUtil.getInetAddress() + ", reconnected"); + } + } + }); + } // 本地缓存 private void cacheData() throws Exception { @@ -277,6 +297,11 @@ public void remove(final String key) { LOGGER.error("remove" + key, ex); } } + + @Override + public void registerListener(DataChangeListener listener){ + cache.getListenable().addListener(listener); + } @Override public TreeCache getCache() { diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/CallBack.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/CallBack.java deleted file mode 100644 index 74d8c8b6..00000000 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/CallBack.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.shinemo.mpush.tools.zk.listener; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; - -import com.shinemo.mpush.tools.zk.manage.ServerManage; - -public interface CallBack { - - /** - * 处理目录发生变化的事件 - * @param client - * @param event - * @param path - */ - public void handler(CuratorFramework client, TreeCacheEvent event,String path); - - /** - * 应用起来的时候初始化数据 - * @param manage - */ - public void initData(ServerManage manage); - -} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/DataChangeListener.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/DataChangeListener.java new file mode 100644 index 00000000..f8eec80f --- /dev/null +++ b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/DataChangeListener.java @@ -0,0 +1,26 @@ +package com.shinemo.mpush.tools.zk.listener; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +public abstract class DataChangeListener implements TreeCacheListener{ + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + String path = null == event.getData() ? "" : event.getData().getPath(); + if (path.isEmpty()) { + return; + } + + if(listenerPath().equals(path)){ + dataChanged(client, event, path); + } + } + + public abstract void initData(); + + public abstract void dataChanged(CuratorFramework client, TreeCacheEvent event,String path) throws Exception; + + public abstract String listenerPath(); +} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/ListenerDispatcher.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/ListenerDispatcher.java deleted file mode 100644 index 123ff97d..00000000 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/ListenerDispatcher.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.shinemo.mpush.tools.zk.listener; - -import com.google.common.collect.Maps; -import com.shinemo.mpush.tools.zk.ServerApp; -import com.shinemo.mpush.tools.zk.manage.ServerManage; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.Map; - -public class ListenerDispatcher implements CallBack { - - private static final Logger log = LoggerFactory.getLogger(ListenerDispatcher.class); - - private Map holder = Maps.newTreeMap(); - - public ListenerDispatcher(ServerApp app) { - //所有connection server - //holder.put(ZKPath.CONNECTION_SERVER.getPathByIp(app.getIp()), new ConnPathListener()); - //所有redis - //holder.put(ZKPath.REDIS_SERVER.getPathByIp(app.getIp()), new RedisPathListener()); - //踢人的目录已经交给队列处理了,这里不需要重复处理 - //holder.put(ZKPath.GATEWAY_SERVER.getPathByIp(app.getIp()), new GatewayPathListener()); - } - - @Override - public void handler(CuratorFramework client, TreeCacheEvent event, String path) { - - Iterator> it = holder.entrySet().iterator(); - boolean hasHandler = false; - while (it.hasNext()) { - Map.Entry entry = it.next(); - if (path.startsWith(entry.getKey())) { - hasHandler = true; - entry.getValue().handler(client, event, path); - } - } - - if (!hasHandler) { - log.warn("ListenerDispatcher other path:" + path + "," + event.getType().name()); - } - - } - - @Override - public void initData(ServerManage manage) { - - Iterator> it = holder.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - entry.getValue().initData(manage); - } - - } -} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/ConnPathListener.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/ConnPathListener.java deleted file mode 100644 index 7e66ce2a..00000000 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/ConnPathListener.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.shinemo.mpush.tools.zk.listener.impl; - -import java.util.List; - -import com.shinemo.mpush.tools.spi.ServiceContainer; -import com.shinemo.mpush.tools.zk.ZKPath; -import com.shinemo.mpush.tools.zk.ZkRegister; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.shinemo.mpush.tools.Jsons; -import com.shinemo.mpush.tools.zk.ServerApp; -import com.shinemo.mpush.tools.zk.listener.CallBack; -import com.shinemo.mpush.tools.zk.manage.ServerAppManage; -import com.shinemo.mpush.tools.zk.manage.ServerManage; - -/** - * 注册的应用的发生变化 - */ -public class ConnPathListener implements CallBack { - - private static final Logger log = LoggerFactory.getLogger(ConnPathListener.class); - - private static ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class); - - @Override - public void handler(CuratorFramework client, TreeCacheEvent event, String path) { - String data = ""; - if (event.getData() != null) { - data = ToStringBuilder.reflectionToString(event.getData(), ToStringStyle.MULTI_LINE_STYLE); - } - if (Type.NODE_ADDED == event.getType()) { - dataAddOrUpdate(event.getData()); - } else if (Type.NODE_REMOVED == event.getType()) { - dataRemove(event.getData()); - } else if (Type.NODE_UPDATED == event.getType()) { - dataAddOrUpdate(event.getData()); - } else { - log.warn("ConnPathListener other path:" + path + "," + event.getType().name() + "," + data); - } - } - - @Override - public void initData(ServerManage manage) { - log.warn("start init app data"); - _initData(); - log.warn("end init app data"); - } - - private void _initData() { - //获取机器列表 - List rawData = zkRegister.getChildrenKeys(ZKPath.CONNECTION_SERVER.getPath()); - for (String raw : rawData) { - String fullPath = ZKPath.CONNECTION_SERVER.getFullPath(raw); - ServerApp app = getServerApp(fullPath); - ServerAppManage.instance.addOrUpdate(fullPath, app); - } - } - - private void dataRemove(ChildData data) { - String path = data.getPath(); - ServerAppManage.instance.remove(path); - } - - private void dataAddOrUpdate(ChildData data) { - String path = data.getPath(); - byte[] rawData = data.getData(); - ServerApp serverApp = Jsons.fromJson(rawData, ServerApp.class); - ServerAppManage.instance.addOrUpdate(path, serverApp); - } - - private ServerApp getServerApp(String fullPath) { - String rawApp = zkRegister.get(fullPath); - ServerApp app = Jsons.fromJson(rawApp, ServerApp.class); - return app; - } - - -} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/GatewayPathListener.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/GatewayPathListener.java deleted file mode 100644 index 9da911d2..00000000 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/GatewayPathListener.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.shinemo.mpush.tools.zk.listener.impl; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.shinemo.mpush.tools.zk.listener.CallBack; -import com.shinemo.mpush.tools.zk.manage.ServerManage; - -/** - * 当前应用下踢人的目录发生变化 - * - */ -public class GatewayPathListener implements CallBack{ - - private static final Logger log = LoggerFactory.getLogger(GatewayPathListener.class); - - @Override - public void handler(CuratorFramework client, TreeCacheEvent event, String path) { - String data = ""; - if(event.getData()!=null){ - data = ToStringBuilder.reflectionToString(event.getData(), ToStringStyle.MULTI_LINE_STYLE); - } - if (Type.NODE_ADDED == event.getType()) { - log.warn("path:" + path + ", node Add"+","+data); - } else if (Type.NODE_REMOVED == event.getType()) { - log.warn("path:" + path + ", node Remove"+","+data); - } else if (Type.NODE_UPDATED == event.getType()) { - log.warn("path:" + path + "," + "node update"+","+data); - } else { - log.warn("other path:" + path + "," + event.getType().name()+","+data); - } - } - - @Override - public void initData(ServerManage manage) { - - } - -} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/RedisPathListener.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/RedisPathListener.java deleted file mode 100644 index bd9c2506..00000000 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/listener/impl/RedisPathListener.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.shinemo.mpush.tools.zk.listener.impl; - -import java.util.Collections; -import java.util.List; - -import com.google.common.base.Strings; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.shinemo.mpush.tools.Jsons; -import com.shinemo.mpush.tools.redis.RedisGroup; -import com.shinemo.mpush.tools.redis.RedisRegister; -import com.shinemo.mpush.tools.spi.ServiceContainer; -import com.shinemo.mpush.tools.zk.ZKPath; -import com.shinemo.mpush.tools.zk.ZkRegister; -import com.shinemo.mpush.tools.zk.manage.ServerManage; - -/** - * 注册的应用的发生变化 - */ -public class RedisPathListener implements TreeCacheListener { - private static final Logger log = LoggerFactory.getLogger(RedisPathListener.class); - - private static final ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class); - - private static final RedisRegister redisRegister = ServiceContainer.getInstance(RedisRegister.class); - - @Override - public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception { - String data = ""; - if (event.getData() != null) { - data = ToStringBuilder.reflectionToString(event.getData(), ToStringStyle.MULTI_LINE_STYLE); - } - if (Type.NODE_ADDED == event.getType()) { - dataAddOrUpdate(event.getData()); - } else if (Type.NODE_REMOVED == event.getType()) { - dataRemove(event.getData()); - } else if (Type.NODE_UPDATED == event.getType()) { - dataAddOrUpdate(event.getData()); - } else { - log.warn("ConnPathListener other path:" + data + "," + event.getType().name() + "," + data); - } - } - - public void initData(ServerManage manage) { - log.warn("start init redis data"); - _initData(); - log.warn("end init redis data"); - } - - private void _initData() { - //获取redis列表 - List group = getRedisGroup(ZKPath.REDIS_SERVER.getPath()); - redisRegister.init(group); - } - - private void dataRemove(ChildData data) { - _initData(); - } - - private void dataAddOrUpdate(ChildData data) { - _initData(); - } - - private List getRedisGroup(String fullPath) { - String rawGroup = zkRegister.get(fullPath); - if (Strings.isNullOrEmpty(rawGroup)) return Collections.EMPTY_LIST; - List group = Jsons.fromJsonToList(rawGroup, RedisGroup[].class); - if (group == null) return Collections.EMPTY_LIST; - return group; - } -} diff --git a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/manage/ServerManage.java b/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/manage/ServerManage.java deleted file mode 100644 index 3af22e6a..00000000 --- a/mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/manage/ServerManage.java +++ /dev/null @@ -1,125 +0,0 @@ -package com.shinemo.mpush.tools.zk.manage; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.shinemo.mpush.tools.Jsons; -import com.shinemo.mpush.tools.spi.ServiceContainer; -import com.shinemo.mpush.tools.zk.ZKPath; -import com.shinemo.mpush.tools.zk.ServerApp; -import com.shinemo.mpush.tools.zk.ZkRegister; -import com.shinemo.mpush.tools.zk.listener.CallBack; -import com.shinemo.mpush.tools.zk.listener.ListenerDispatcher; - -public class ServerManage { - - private static final Logger log = LoggerFactory.getLogger(ServerManage.class); - - private ZkRegister zkUtil = ServiceContainer.getInstance(ZkRegister.class); - - private final AtomicBoolean startFlag = new AtomicBoolean(false); - - private final ServerApp app; - private final ZKPath path; - - private ListenerDispatcher dispatcher; - - public ServerManage(ServerApp app, ZKPath path) { - this.app = app; - this.path = path; - } - - public void start() { - - if (!startFlag.compareAndSet(false, true)) { - return; - } - - dispatcher = new ListenerDispatcher(app); - - //注册机器到zk中 - registerApp(); - - // 注册连接状态监听器 - registerConnectionLostListener(); - - // 注册节点数据变化 - registerDataChange(dispatcher); - - //获取应用起来的时候的初始化数据 - initAppData(dispatcher); - - } - - private void registerApp() { - zkUtil.registerEphemeralSequential(path.getPath(), Jsons.toJson(app)); - } - - public void unregisterApp() { - zkUtil.remove(path.getPath()); - } - - // 注册连接状态监听器 - private void registerConnectionLostListener() { - zkUtil.getClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - - @Override - public void stateChanged(final CuratorFramework client, final ConnectionState newState) { - if (ConnectionState.LOST == newState) { - log.warn(app.getIp() + ", lost connection"); - } else if (ConnectionState.RECONNECTED == newState) { - log.warn(app.getIp() + ", reconnected"); - } - } - }); - } - - // 注册节点数据变化 - private void registerDataChange(final CallBack callBack) { - zkUtil.getCache().getListenable().addListener(new TreeCacheListener() { - - @Override - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - String path = null == event.getData() ? "" : event.getData().getPath(); - if (path.isEmpty()) { - log.warn("registerDataChange empty path:" + path + "," + event.getType().name()); - return; - } - callBack.handler(client, event, path); - } - }); - } - - private void initAppData(final CallBack callBack) { - callBack.initData(this); - } - - public CuratorFramework getClient() { - return zkUtil.getClient(); - } - - public TreeCache getCache() { - return zkUtil.getCache(); - } - - public void close() { - zkUtil.close(); - } - - public ZkRegister getZkUtil() { - return zkUtil; - } - - public ServerApp getServerApp() { - return app; - } - -} diff --git a/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisGroupManageTest.java b/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisGroupManageTest.java index a3ab0037..60da5d22 100644 --- a/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisGroupManageTest.java +++ b/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisGroupManageTest.java @@ -1,113 +1,113 @@ -package com.shinemo.mpush.tools.redis; - -import java.util.Date; -import java.util.List; - -import com.shinemo.mpush.tools.redis.listener.MessageListener; -import com.shinemo.mpush.tools.zk.ZKPath; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.junit.Before; -import org.junit.Test; - -import com.shinemo.mpush.tools.MPushUtil; -import com.shinemo.mpush.tools.redis.manage.RedisManage; -import com.shinemo.mpush.tools.redis.pubsub.Subscriber; -import com.shinemo.mpush.tools.spi.ServiceContainer; -import com.shinemo.mpush.tools.zk.ServerApp; -import com.shinemo.mpush.tools.zk.manage.ServerManage; - -public class RedisGroupManageTest { - - ServerApp app = new ServerApp(MPushUtil.getLocalIp(), 3000); - ServerManage manage = new ServerManage(app, ZKPath.REDIS_SERVER); - List groupList = null; - - RedisNode node = new RedisNode("127.0.0.1", 6379, "ShineMoIpo"); - RedisNode node2 = new RedisNode("127.0.0.1", 6380, "ShineMoIpo"); - - RedisRegister redisRegister = ServiceContainer.getInstance(RedisRegister.class); - - @Before - public void init() { - manage.start(); - groupList = redisRegister.getGroupList(); - } - - @Test - public void testGetRedisGroup() { - for (RedisGroup group : groupList) { - for (RedisNode node : group.getRedisNodeList()) { - System.out.println(group + ToStringBuilder.reflectionToString(node, ToStringStyle.MULTI_LINE_STYLE)); - } - - } - } - - @Test - public void testAdd() { - User user = RedisManage.get("huang2", User.class); - if (user == null) { - user = new User("hi", 10, new Date()); - RedisManage.set("huang2", user); - user = RedisManage.get("huang2", User.class); - } - System.out.println(ToStringBuilder.reflectionToString(user, ToStringStyle.MULTI_LINE_STYLE)); - - User nowUser = RedisUtil.get(node, "huang2", User.class); - System.out.println("node1:" + ToStringBuilder.reflectionToString(nowUser)); - - nowUser = RedisUtil.get(node2, "huang2", User.class); - System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); - - RedisManage.del("huang2"); - - nowUser = RedisUtil.get(node2, "huang2", User.class); - if (nowUser == null) { - System.out.println("node2 nowUser is null"); - } else { - System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); - } - - nowUser = RedisUtil.get(node, "huang2", User.class); - System.out.println("node:" + ToStringBuilder.reflectionToString(nowUser)); - - } - - @Test - public void testPub() { - for (int i = 0; i < 20; i++) { - User user = new User("pub" + i, 10, new Date()); - RedisManage.publish("channel1", user); - RedisManage.publish("channel2", user); - } - } - - @Test - public void testSub() { - RedisManage.subscribe(new MessageListener() { - @Override - public void onMessage(String channel, String message) { - System.out.printf("on message channel=%s, message=%s%n", channel, message); - } - }, "channel1", "channel2"); - try { - Thread.sleep(Integer.MAX_VALUE); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Test - public void testSub2() { - RedisManage.subscribe(new Subscriber(), "channel1", "channel2"); - try { - Thread.sleep(Integer.MAX_VALUE); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - -} +//package com.shinemo.mpush.tools.redis; +// +//import java.util.Date; +//import java.util.List; +// +//import com.shinemo.mpush.tools.redis.listener.MessageListener; +//import com.shinemo.mpush.tools.zk.ZKPath; +// +//import org.apache.commons.lang3.builder.ToStringBuilder; +//import org.apache.commons.lang3.builder.ToStringStyle; +//import org.junit.Before; +//import org.junit.Test; +// +//import com.shinemo.mpush.tools.MPushUtil; +//import com.shinemo.mpush.tools.redis.manage.RedisManage; +//import com.shinemo.mpush.tools.redis.pubsub.Subscriber; +//import com.shinemo.mpush.tools.spi.ServiceContainer; +//import com.shinemo.mpush.tools.zk.ServerApp; +//import com.shinemo.mpush.tools.zk.manage.ServerManage; +// +//public class RedisGroupManageTest { +// +// ServerApp app = new ServerApp(MPushUtil.getLocalIp(), 3000); +// ServerManage manage = new ServerManage(app, ZKPath.REDIS_SERVER); +// List groupList = null; +// +// RedisNode node = new RedisNode("127.0.0.1", 6379, "ShineMoIpo"); +// RedisNode node2 = new RedisNode("127.0.0.1", 6380, "ShineMoIpo"); +// +// RedisRegister redisRegister = ServiceContainer.getInstance(RedisRegister.class); +// +// @Before +// public void init() { +// manage.start(); +// groupList = redisRegister.getGroupList(); +// } +// +// @Test +// public void testGetRedisGroup() { +// for (RedisGroup group : groupList) { +// for (RedisNode node : group.getRedisNodeList()) { +// System.out.println(group + ToStringBuilder.reflectionToString(node, ToStringStyle.MULTI_LINE_STYLE)); +// } +// +// } +// } +// +// @Test +// public void testAdd() { +// User user = RedisManage.get("huang2", User.class); +// if (user == null) { +// user = new User("hi", 10, new Date()); +// RedisManage.set("huang2", user); +// user = RedisManage.get("huang2", User.class); +// } +// System.out.println(ToStringBuilder.reflectionToString(user, ToStringStyle.MULTI_LINE_STYLE)); +// +// User nowUser = RedisUtil.get(node, "huang2", User.class); +// System.out.println("node1:" + ToStringBuilder.reflectionToString(nowUser)); +// +// nowUser = RedisUtil.get(node2, "huang2", User.class); +// System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); +// +// RedisManage.del("huang2"); +// +// nowUser = RedisUtil.get(node2, "huang2", User.class); +// if (nowUser == null) { +// System.out.println("node2 nowUser is null"); +// } else { +// System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); +// } +// +// nowUser = RedisUtil.get(node, "huang2", User.class); +// System.out.println("node:" + ToStringBuilder.reflectionToString(nowUser)); +// +// } +// +// @Test +// public void testPub() { +// for (int i = 0; i < 20; i++) { +// User user = new User("pub" + i, 10, new Date()); +// RedisManage.publish("channel1", user); +// RedisManage.publish("channel2", user); +// } +// } +// +// @Test +// public void testSub() { +// RedisManage.subscribe(new MessageListener() { +// @Override +// public void onMessage(String channel, String message) { +// System.out.printf("on message channel=%s, message=%s%n", channel, message); +// } +// }, "channel1", "channel2"); +// try { +// Thread.sleep(Integer.MAX_VALUE); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } +// +// @Test +// public void testSub2() { +// RedisManage.subscribe(new Subscriber(), "channel1", "channel2"); +// try { +// Thread.sleep(Integer.MAX_VALUE); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } +// +// +//} diff --git a/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisUtilTest.java b/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisUtilTest.java index 5bfc1443..07367fd8 100644 --- a/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisUtilTest.java +++ b/mpush-tools/src/test/java/com/shinemo/mpush/tools/redis/RedisUtilTest.java @@ -1,177 +1,177 @@ -package com.shinemo.mpush.tools.redis; - -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.junit.Test; - -import com.google.common.collect.Lists; -import com.shinemo.mpush.tools.zk.manage.ServerAppManage; - -import redis.clients.jedis.Jedis; - -public class RedisUtilTest { - - - - RedisNode node = new RedisNode("127.0.0.1", 6379, "ShineMoIpo"); - RedisNode node2 = new RedisNode("127.0.0.1", 6380, "ShineMoIpo"); - - List nodeList = Lists.newArrayList(node, node2); - - @Test - public void testAddAndGetAndDelete() { - Jedis jedis = RedisUtil.getClient(node2); - jedis.set("hi", "huang"); - - String ret = jedis.get("hi"); - System.out.println(ret); - - jedis.del("hi"); - ret = jedis.get("hi"); - if (ret == null) { - System.out.println("ret is null"); - } else { - System.out.println("ret is not null:" + ret); - } - - } - - @Test - public void testJedisPool() { - // 最大连接数是8,因此,获取10个链接会抛错误 - List jedisList = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - Jedis jedis = RedisUtil.getClient(node); - jedisList.add(jedis); - } - } - - @Test - public void testJedisPool2() { - // 最大连接数是8,因此,获取10个链接会抛错误 - List jedisList = Lists.newArrayList(); - for (int i = 1; i <= 8; i++) { - Jedis jedis = RedisUtil.getClient(node); - jedisList.add(jedis); - } - - System.out.println(jedisList.size()); - - try { - Jedis jedis = RedisUtil.getClient(node); - jedisList.add(jedis); - System.out.println("first get jedis success"); - } catch (Exception e) { - System.out.println(e); - } - - // 关闭一个链接 - RedisUtil.close(jedisList.get(0)); - - try { - Jedis jedis = RedisUtil.getClient(node); - jedisList.add(jedis); - System.out.println("second get jedis success"); - } catch (Exception e) { - System.out.println(e); - } - - System.out.println(jedisList.size()); - } - - @Test - public void testKV() { - User user = new User("huang", 18, new Date()); - RedisUtil.set(nodeList, "test", user); - - User nowUser = RedisUtil.get(node, "test", User.class); - System.out.println("node1:" + ToStringBuilder.reflectionToString(nowUser)); - - nowUser = RedisUtil.get(node2, "test", User.class); - System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); - - RedisUtil.del(nodeList, "test"); - - nowUser = RedisUtil.get(node2, "test", User.class); - if (nowUser == null) { - System.out.println("node2 nowUser is null"); - } else { - System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); - } - - nowUser = RedisUtil.get(node, "test", User.class); - if (nowUser == null) { - System.out.println("node nowUser is null"); - } else { - System.out.println("node:" + ToStringBuilder.reflectionToString(nowUser)); - } - - RedisUtil.set(nodeList, "test", user, 10); - - try { - Thread.sleep(12000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - nowUser = RedisUtil.get(node2, "test", User.class); - if (nowUser == null) { - System.out.println("node2 nowUser is null"); - } else { - System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); - } - - nowUser = RedisUtil.get(node, "test", User.class); - if (nowUser == null) { - System.out.println("node nowUser is null"); - } else { - System.out.println("node:" + ToStringBuilder.reflectionToString(nowUser)); - } - - } - - @Test - public void hashTest(){ - - User user = new User("huang", 18, new Date()); - - RedisUtil.hset(nodeList, "hashhuang", "hi", user); - - User nowUser = RedisUtil.hget(node, "hashhuang", "hi", User.class); - System.out.println("node1:"+ToStringBuilder.reflectionToString(nowUser)); - - nowUser = RedisUtil.hget(node2, "hashhuang", "hi", User.class); - System.out.println("node2:"+ToStringBuilder.reflectionToString(nowUser)); - - Map ret = RedisUtil.hgetAll(node, "hashhuang",User.class); - Iterator> iterator = ret.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String key = entry.getKey(); - User val = entry.getValue(); - System.out.println("all:"+key+","+ToStringBuilder.reflectionToString(val)); - } - - RedisUtil.hdel(nodeList, "hashhuang", "hi"); - - nowUser = RedisUtil.hget(node2, "hashhuang", "hi", User.class); - if(nowUser==null){ - System.out.println("node2 nowUser is null"); - }else{ - System.out.println("node2:"+ToStringBuilder.reflectionToString(nowUser)); - } - - nowUser = RedisUtil.hget(node, "hashhuang", "hi", User.class); - if(nowUser==null){ - System.out.println("node nowUser is null"); - }else{ - System.out.println("node:"+ToStringBuilder.reflectionToString(nowUser)); - } - - } - -} +//package com.shinemo.mpush.tools.redis; +// +//import java.util.Date; +//import java.util.Iterator; +//import java.util.List; +//import java.util.Map; +// +//import org.apache.commons.lang3.builder.ToStringBuilder; +//import org.junit.Test; +// +//import com.google.common.collect.Lists; +//import com.shinemo.mpush.tools.zk.manage.ServerAppManage; +// +//import redis.clients.jedis.Jedis; +// +//public class RedisUtilTest { +// +// +// +// RedisNode node = new RedisNode("127.0.0.1", 6379, "ShineMoIpo"); +// RedisNode node2 = new RedisNode("127.0.0.1", 6380, "ShineMoIpo"); +// +// List nodeList = Lists.newArrayList(node, node2); +// +// @Test +// public void testAddAndGetAndDelete() { +// Jedis jedis = RedisUtil.getClient(node2); +// jedis.set("hi", "huang"); +// +// String ret = jedis.get("hi"); +// System.out.println(ret); +// +// jedis.del("hi"); +// ret = jedis.get("hi"); +// if (ret == null) { +// System.out.println("ret is null"); +// } else { +// System.out.println("ret is not null:" + ret); +// } +// +// } +// +// @Test +// public void testJedisPool() { +// // 最大连接数是8,因此,获取10个链接会抛错误 +// List jedisList = Lists.newArrayList(); +// for (int i = 0; i < 10; i++) { +// Jedis jedis = RedisUtil.getClient(node); +// jedisList.add(jedis); +// } +// } +// +// @Test +// public void testJedisPool2() { +// // 最大连接数是8,因此,获取10个链接会抛错误 +// List jedisList = Lists.newArrayList(); +// for (int i = 1; i <= 8; i++) { +// Jedis jedis = RedisUtil.getClient(node); +// jedisList.add(jedis); +// } +// +// System.out.println(jedisList.size()); +// +// try { +// Jedis jedis = RedisUtil.getClient(node); +// jedisList.add(jedis); +// System.out.println("first get jedis success"); +// } catch (Exception e) { +// System.out.println(e); +// } +// +// // 关闭一个链接 +// RedisUtil.close(jedisList.get(0)); +// +// try { +// Jedis jedis = RedisUtil.getClient(node); +// jedisList.add(jedis); +// System.out.println("second get jedis success"); +// } catch (Exception e) { +// System.out.println(e); +// } +// +// System.out.println(jedisList.size()); +// } +// +// @Test +// public void testKV() { +// User user = new User("huang", 18, new Date()); +// RedisUtil.set(nodeList, "test", user); +// +// User nowUser = RedisUtil.get(node, "test", User.class); +// System.out.println("node1:" + ToStringBuilder.reflectionToString(nowUser)); +// +// nowUser = RedisUtil.get(node2, "test", User.class); +// System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); +// +// RedisUtil.del(nodeList, "test"); +// +// nowUser = RedisUtil.get(node2, "test", User.class); +// if (nowUser == null) { +// System.out.println("node2 nowUser is null"); +// } else { +// System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); +// } +// +// nowUser = RedisUtil.get(node, "test", User.class); +// if (nowUser == null) { +// System.out.println("node nowUser is null"); +// } else { +// System.out.println("node:" + ToStringBuilder.reflectionToString(nowUser)); +// } +// +// RedisUtil.set(nodeList, "test", user, 10); +// +// try { +// Thread.sleep(12000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// nowUser = RedisUtil.get(node2, "test", User.class); +// if (nowUser == null) { +// System.out.println("node2 nowUser is null"); +// } else { +// System.out.println("node2:" + ToStringBuilder.reflectionToString(nowUser)); +// } +// +// nowUser = RedisUtil.get(node, "test", User.class); +// if (nowUser == null) { +// System.out.println("node nowUser is null"); +// } else { +// System.out.println("node:" + ToStringBuilder.reflectionToString(nowUser)); +// } +// +// } +// +// @Test +// public void hashTest(){ +// +// User user = new User("huang", 18, new Date()); +// +// RedisUtil.hset(nodeList, "hashhuang", "hi", user); +// +// User nowUser = RedisUtil.hget(node, "hashhuang", "hi", User.class); +// System.out.println("node1:"+ToStringBuilder.reflectionToString(nowUser)); +// +// nowUser = RedisUtil.hget(node2, "hashhuang", "hi", User.class); +// System.out.println("node2:"+ToStringBuilder.reflectionToString(nowUser)); +// +// Map ret = RedisUtil.hgetAll(node, "hashhuang",User.class); +// Iterator> iterator = ret.entrySet().iterator(); +// while (iterator.hasNext()) { +// Map.Entry entry = iterator.next(); +// String key = entry.getKey(); +// User val = entry.getValue(); +// System.out.println("all:"+key+","+ToStringBuilder.reflectionToString(val)); +// } +// +// RedisUtil.hdel(nodeList, "hashhuang", "hi"); +// +// nowUser = RedisUtil.hget(node2, "hashhuang", "hi", User.class); +// if(nowUser==null){ +// System.out.println("node2 nowUser is null"); +// }else{ +// System.out.println("node2:"+ToStringBuilder.reflectionToString(nowUser)); +// } +// +// nowUser = RedisUtil.hget(node, "hashhuang", "hi", User.class); +// if(nowUser==null){ +// System.out.println("node nowUser is null"); +// }else{ +// System.out.println("node:"+ToStringBuilder.reflectionToString(nowUser)); +// } +// +// } +// +//}