Skip to content

Commit

Permalink
继续重构zk的代码
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Dec 31, 2015
1 parent 8dc4357 commit d70deb6
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 35 deletions.
Expand Up @@ -6,19 +6,19 @@ public class ServerApp implements Serializable{

private static final long serialVersionUID = 5495972321679092837L;

private String ip;
private String port;
private final String ip;
private final String port;

public ServerApp(String ip, String port) {
this.ip = ip;
this.port = port;
}

public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}

}
12 changes: 12 additions & 0 deletions mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/ZkUtil.java
Expand Up @@ -227,6 +227,18 @@ public void registerEphemeral(final String key, final String value) {
}
}

/**
* 注册临时顺序数据
* @param key
*/
public void registerEphemeralSequential(final String key,final String value) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes());
} catch (final Exception ex) {
log.error("persistEphemeralSequential" + key,ex);
}
}

/**
* 注册临时顺序数据
* @param key
Expand Down
Expand Up @@ -9,8 +9,8 @@
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.shinemo.mpush.tools.InetAddressUtil;
import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.listener.impl.ConnectionPathListener;
import com.shinemo.mpush.tools.zk.listener.impl.KickPathListener;
import com.shinemo.mpush.tools.zk.manage.ServerManage;
Expand All @@ -21,11 +21,9 @@ public class ListenerDispatcher implements CallBack {

private Map<String, CallBack> holder = Maps.newTreeMap();

public static ListenerDispatcher instance = new ListenerDispatcher();

private ListenerDispatcher() {
holder.put(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(InetAddressUtil.getInetAddress()), new ConnectionPathListener());
holder.put(PathEnum.CONNECTION_SERVER_KICK.getPathByIp(InetAddressUtil.getInetAddress()), new KickPathListener());
public ListenerDispatcher(ServerApp app) {
holder.put(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()), new ConnectionPathListener());
holder.put(PathEnum.CONNECTION_SERVER_KICK.getPathByIp(app.getIp()), new KickPathListener());
}

@Override
Expand Down
Expand Up @@ -11,8 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shinemo.mpush.tools.InetAddressUtil;
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil;
import com.shinemo.mpush.tools.zk.listener.CallBack;
import com.shinemo.mpush.tools.zk.listener.ListenerDispatcher;
Expand All @@ -25,41 +26,48 @@ public class ServerManage {

private static final AtomicBoolean startFlag = new AtomicBoolean(false);

public static ServerManage instance = new ServerManage();
private final ServerApp app;

public ServerManage(ServerApp app){
this.app = app;
}

private ServerManage(){}

public void start() {

if (!startFlag.compareAndSet(false, true)) {
return;
}

ListenerDispatcher dispatcher = new ListenerDispatcher(app);

//注册机器到zk中
registerApp();
registerApp(app);

// 注册连接状态监听器
registerConnectionLostListener();
registerConnectionLostListener(app);

// 注册节点数据变化
registerDataChange(ListenerDispatcher.instance);
registerDataChange(dispatcher);

//获取应用起来的时候的初始化数据
initAppData(ListenerDispatcher.instance);
initAppData(dispatcher);

}

private void registerApp(){
zkUtil.registerEphemeralSequential(PathEnum.CONNECTION_SERVER_ALL_HOST.getPath());
private void registerApp(ServerApp app){
zkUtil.registerEphemeralSequential(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()),Jsons.toJson(app));
}

// 注册连接状态监听器
private void registerConnectionLostListener() {
private void registerConnectionLostListener(final ServerApp app) {
zkUtil.getClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {

@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (ConnectionState.LOST == newState) {
log.warn(InetAddressUtil.getInetAddress() + ", lost connection");
log.warn(app.getIp() + ", lost connection");
} else if (ConnectionState.RECONNECTED == newState) {
log.warn(InetAddressUtil.getInetAddress() + ", reconnected");
log.warn(app.getIp() + ", reconnected");
}
}
});
Expand Down
Expand Up @@ -16,7 +16,8 @@

public class DistributedQueueConsumerTest {

private ServerManage manage = ServerManage.instance;
private ServerApp app = new ServerApp("127.0.0.1","3000");
private ServerManage manage = new ServerManage(app);

@Before
public void setup(){
Expand Down
Expand Up @@ -18,7 +18,8 @@

public class DistributedQueueProviderTest {

private ServerManage manage = ServerManage.instance;
private ServerApp app = new ServerApp("127.0.0.1","3000");
private ServerManage manage = new ServerManage(app);

@Before
public void setup(){
Expand Down
Expand Up @@ -14,6 +14,10 @@ public class ServerManageTest {

private static Executor executor = Executors.newCachedThreadPool();

private ServerApp app = new ServerApp("127.0.0.1","3000");

private ServerManage manage = new ServerManage(app);

@Test
public void testMulThread() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -28,14 +32,13 @@ public void testMulThread() throws InterruptedException{

@Test
public void testUpdate(){
ServerManage manage = ServerManage.instance;

manage.start();

}

@Test
public void testServerManageStart(){
ServerManage manage = ServerManage.instance;
manage.start();
}

Expand All @@ -60,7 +63,8 @@ public void run() {
e.printStackTrace();
}
log.warn("start init "+ip);
ServerManage manage = ServerManage.instance;
ServerApp app = new ServerApp(ip,"3000");
ServerManage manage = new ServerManage(app);
manage.start();
log.warn("end init "+ip);
}
Expand Down
Expand Up @@ -86,9 +86,7 @@ public void testLocalIp(){
@Test
public void testRegisterIp(){
String localIp = InetAddressUtil.getInetAddress();
ServerApp app = new ServerApp();
app.setIp(localIp);
app.setPort("3000");
ServerApp app = new ServerApp(localIp,"3000");
zkUtil.registerPersist("/"+localIp, Jsons.toJson(app));
String value = zkUtil.get("/"+localIp);
System.out.println(value);
Expand Down

0 comments on commit d70deb6

Please sign in to comment.