Skip to content

Commit

Permalink
抽取 connection server
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Jan 15, 2016
1 parent e1b129b commit d5825b6
Show file tree
Hide file tree
Showing 32 changed files with 1,202 additions and 998 deletions.
2 changes: 2 additions & 0 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Server.java
Expand Up @@ -8,6 +8,8 @@ public interface Server {
void start(Listener listener); void start(Listener listener);


void stop(Listener listener); void stop(Listener listener);

public void init();


boolean isRunning(); boolean isRunning();


Expand Down
Expand Up @@ -11,7 +11,6 @@
import com.shinemo.mpush.tools.zk.ZKPath; import com.shinemo.mpush.tools.zk.ZKPath;
import com.shinemo.mpush.tools.zk.ServerApp; import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkRegister; import com.shinemo.mpush.tools.zk.ZkRegister;
import com.shinemo.mpush.tools.zk.listener.impl.RedisPathListener;


import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
Expand Down Expand Up @@ -76,9 +75,9 @@ public void send(String content, Collection<String> userIds, Callback callback)
} }


public void initRedisClient() { public void initRedisClient() {
RedisPathListener listener = new RedisPathListener(); // RedisPathListener listener = new RedisPathListener();
zkRegister.getCache().getListenable().addListener(listener); // zkRegister.getCache().getListenable().addListener(listener);
listener.initData(null); // listener.initData(null);
} }


private class GatewayServerZKListener implements TreeCacheListener { private class GatewayServerZKListener implements TreeCacheListener {
Expand Down
@@ -1,50 +1,50 @@
package com.shinemo.mpush.client; //package com.shinemo.mpush.client;

//
import com.shinemo.mpush.api.PushSender; //import com.shinemo.mpush.api.PushSender;
import org.junit.Test; //import org.junit.Test;

//
import java.util.Arrays; //import java.util.Arrays;
import java.util.concurrent.locks.LockSupport; //import java.util.concurrent.locks.LockSupport;

//
import static org.junit.Assert.*; //import static org.junit.Assert.*;

//
/** ///**
* Created by ohun on 2016/1/7. // * Created by ohun on 2016/1/7.
*/ // */
public class PushClientTest { //public class PushClientTest {

//
@Test // @Test
public void testSend() throws Exception { // public void testSend() throws Exception {

//
} // }

//
public static void main(String[] args) throws Exception { // public static void main(String[] args) throws Exception {
PushClient client = new PushClient(); // PushClient client = new PushClient();
client.init(); // client.init();
Thread.sleep(1000); // Thread.sleep(1000);
client.send("this a first push", Arrays.asList("user-0", "user-1", "user-2", "user-3", "user-4"), // client.send("this a first push", Arrays.asList("user-0", "user-1", "user-2", "user-3", "user-4"),
new PushSender.Callback() { // new PushSender.Callback() {
@Override // @Override
public void onSuccess(String userId) { // public void onSuccess(String userId) {
System.err.println("push onSuccess userId=" + userId); // System.err.println("push onSuccess userId=" + userId);
} // }

//
@Override // @Override
public void onFailure(String userId) { // public void onFailure(String userId) {
System.err.println("push onFailure userId=" + userId); // System.err.println("push onFailure userId=" + userId);
} // }

//
@Override // @Override
public void onOffline(String userId) { // public void onOffline(String userId) {
System.err.println("push onOffline userId=" + userId); // System.err.println("push onOffline userId=" + userId);
} // }

//
@Override // @Override
public void onTimeout(String userId) { // public void onTimeout(String userId) {
System.err.println("push onTimeout userId=" + userId); // System.err.println("push onTimeout userId=" + userId);
} // }
} // }
); // );
LockSupport.park(); // LockSupport.park();
} // }
} //}
128 changes: 128 additions & 0 deletions mpush-core/src/main/java/com/shinemo/mpush/core/AbstractServer.java
@@ -0,0 +1,128 @@
package com.shinemo.mpush.core;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.shinemo.mpush.api.Server;
import com.shinemo.mpush.core.server.ConnectionServer;
import com.shinemo.mpush.tools.GenericsUtil;
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.MPushUtil;
import com.shinemo.mpush.tools.config.ConfigCenter;
import com.shinemo.mpush.tools.redis.RedisGroup;
import com.shinemo.mpush.tools.spi.ServiceContainer;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZKPath;
import com.shinemo.mpush.tools.zk.ZkRegister;
import com.shinemo.mpush.tools.zk.listener.DataChangeListener;

public abstract class AbstractServer<T extends Application> {

private static final Logger log = LoggerFactory.getLogger(AbstractServer.class);

protected Application application;

protected List<DataChangeListener> dataChangeListeners = Lists.newArrayList();

protected ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class);

protected Server server;

public AbstractServer() {
this.application = getApplication();
}

@SuppressWarnings("unchecked")
private Application getApplication() {
try {
return ((Class<Application>) GenericsUtil.getSuperClassGenericType(this.getClass(), 0)).newInstance();
} catch (Exception e) {
log.warn("exception:",e);
throw new RuntimeException(e);
}
}

public abstract Server getServer();

public void registerListener(DataChangeListener listener){
dataChangeListeners.add(listener);
}

//step1 启动 zk
public void initZK(){
zkRegister = ServiceContainer.getInstance(ZkRegister.class);
zkRegister.init();
}

//step2 获取redis
public void initRedis(){
boolean exist = zkRegister.isExisted(ZKPath.REDIS_SERVER.getPath());
if (!exist) {
List<RedisGroup> groupList = ConfigCenter.holder.redisGroups();
zkRegister.registerPersist(ZKPath.REDIS_SERVER.getPath(), Jsons.toJson(groupList));
}
}

//step3 初始化 listener data
public void initListenerData(){
for(DataChangeListener listener:dataChangeListeners){
listener.initData();
}
}

//step4 初始化server
public void initServer(){
server = getServer();
}

//step4 启动 netty server
public void startServer(){
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
final int port = ConfigCenter.holder.connectionServerPort();
ConnectionServer server = new ConnectionServer(port);
server.init();
server.start(new Server.Listener() {
@Override
public void onSuccess() {
log.error("mpush app start connection server success....");
}

@Override
public void onFailure(String message) {
log.error("mpush app start connection server failure, jvm exit with code -1");
System.exit(-1);
}
});
}
}, "conn-server", false).start();

}

//step5 注册应用到zk
public void registerServerToZk(){
ServerApp app = new ServerApp(MPushUtil.getLocalIp(), application.getPort());
zkRegister.registerEphemeralSequential(application.getServerRegisterZkPath(), Jsons.toJson(app));
}

public void init(){
initZK();
initRedis();
initListenerData();
startServer();
registerServerToZk();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
if (server != null) {
server.stop(null);
}
}
});
}

}

0 comments on commit d5825b6

Please sign in to comment.