Skip to content

Commit

Permalink
继续重构zk的代码
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Dec 31, 2015
1 parent dcb83c0 commit 23f42ce
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 35 deletions.
Expand Up @@ -6,19 +6,19 @@ public class ServerApp implements Serializable{


private static final long serialVersionUID = 5495972321679092837L; private static final long serialVersionUID = 5495972321679092837L;


private String ip; private final String ip;
private String port; private final String port;

public ServerApp(String ip, String port) {
this.ip = ip;
this.port = port;
}

public String getIp() { public String getIp() {
return ip; return ip;
} }
public void setIp(String ip) {
this.ip = ip;
}
public String getPort() { public String getPort() {
return port; return port;
} }
public void setPort(String port) {
this.port = port;
}


} }
12 changes: 12 additions & 0 deletions mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/ZkUtil.java
Expand Up @@ -227,6 +227,18 @@ public void registerEphemeral(final String key, final String value) {
} }
} }


/**
* 注册临时顺序数据
* @param key
*/
public void registerEphemeralSequential(final String key,final String value) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes());
} catch (final Exception ex) {
log.error("persistEphemeralSequential" + key,ex);
}
}

/** /**
* 注册临时顺序数据 * 注册临时顺序数据
* @param key * @param key
Expand Down
Expand Up @@ -9,8 +9,8 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.shinemo.mpush.tools.InetAddressUtil;
import com.shinemo.mpush.tools.zk.PathEnum; import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.listener.impl.ConnectionPathListener; import com.shinemo.mpush.tools.zk.listener.impl.ConnectionPathListener;
import com.shinemo.mpush.tools.zk.listener.impl.KickPathListener; import com.shinemo.mpush.tools.zk.listener.impl.KickPathListener;
import com.shinemo.mpush.tools.zk.manage.ServerManage; import com.shinemo.mpush.tools.zk.manage.ServerManage;
Expand All @@ -21,11 +21,9 @@ public class ListenerDispatcher implements CallBack {


private Map<String, CallBack> holder = Maps.newTreeMap(); private Map<String, CallBack> holder = Maps.newTreeMap();


public static ListenerDispatcher instance = new ListenerDispatcher(); public ListenerDispatcher(ServerApp app) {

holder.put(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()), new ConnectionPathListener());
private ListenerDispatcher() { holder.put(PathEnum.CONNECTION_SERVER_KICK.getPathByIp(app.getIp()), new KickPathListener());
holder.put(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(InetAddressUtil.getInetAddress()), new ConnectionPathListener());
holder.put(PathEnum.CONNECTION_SERVER_KICK.getPathByIp(InetAddressUtil.getInetAddress()), new KickPathListener());
} }


@Override @Override
Expand Down
Expand Up @@ -11,8 +11,9 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.shinemo.mpush.tools.InetAddressUtil; import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.zk.PathEnum; import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil; import com.shinemo.mpush.tools.zk.ZkUtil;
import com.shinemo.mpush.tools.zk.listener.CallBack; import com.shinemo.mpush.tools.zk.listener.CallBack;
import com.shinemo.mpush.tools.zk.listener.ListenerDispatcher; import com.shinemo.mpush.tools.zk.listener.ListenerDispatcher;
Expand All @@ -25,41 +26,48 @@ public class ServerManage {


private static final AtomicBoolean startFlag = new AtomicBoolean(false); private static final AtomicBoolean startFlag = new AtomicBoolean(false);


public static ServerManage instance = new ServerManage(); private final ServerApp app;

public ServerManage(ServerApp app){
this.app = app;
}


private ServerManage(){}

public void start() { public void start() {


if (!startFlag.compareAndSet(false, true)) { if (!startFlag.compareAndSet(false, true)) {
return; return;
} }

ListenerDispatcher dispatcher = new ListenerDispatcher(app);


//注册机器到zk中 //注册机器到zk中
registerApp(); registerApp(app);

// 注册连接状态监听器 // 注册连接状态监听器
registerConnectionLostListener(); registerConnectionLostListener(app);

// 注册节点数据变化 // 注册节点数据变化
registerDataChange(ListenerDispatcher.instance); registerDataChange(dispatcher);

//获取应用起来的时候的初始化数据 //获取应用起来的时候的初始化数据
initAppData(ListenerDispatcher.instance); initAppData(dispatcher);


} }


private void registerApp(){ private void registerApp(ServerApp app){
zkUtil.registerEphemeralSequential(PathEnum.CONNECTION_SERVER_ALL_HOST.getPath()); zkUtil.registerEphemeralSequential(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()),Jsons.toJson(app));
} }


// 注册连接状态监听器 // 注册连接状态监听器
private void registerConnectionLostListener() { private void registerConnectionLostListener(final ServerApp app) {
zkUtil.getClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { zkUtil.getClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {


@Override @Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) { public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (ConnectionState.LOST == newState) { if (ConnectionState.LOST == newState) {
log.warn(InetAddressUtil.getInetAddress() + ", lost connection"); log.warn(app.getIp() + ", lost connection");
} else if (ConnectionState.RECONNECTED == newState) { } else if (ConnectionState.RECONNECTED == newState) {
log.warn(InetAddressUtil.getInetAddress() + ", reconnected"); log.warn(app.getIp() + ", reconnected");
} }
} }
}); });
Expand Down
Expand Up @@ -16,7 +16,8 @@


public class DistributedQueueConsumerTest { public class DistributedQueueConsumerTest {


private ServerManage manage = ServerManage.instance; private ServerApp app = new ServerApp("127.0.0.1","3000");
private ServerManage manage = new ServerManage(app);


@Before @Before
public void setup(){ public void setup(){
Expand Down
Expand Up @@ -18,7 +18,8 @@


public class DistributedQueueProviderTest { public class DistributedQueueProviderTest {


private ServerManage manage = ServerManage.instance; private ServerApp app = new ServerApp("127.0.0.1","3000");
private ServerManage manage = new ServerManage(app);


@Before @Before
public void setup(){ public void setup(){
Expand Down
Expand Up @@ -14,6 +14,10 @@ public class ServerManageTest {


private static Executor executor = Executors.newCachedThreadPool(); private static Executor executor = Executors.newCachedThreadPool();


private ServerApp app = new ServerApp("127.0.0.1","3000");

private ServerManage manage = new ServerManage(app);

@Test @Test
public void testMulThread() throws InterruptedException{ public void testMulThread() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -28,14 +32,13 @@ public void testMulThread() throws InterruptedException{


@Test @Test
public void testUpdate(){ public void testUpdate(){
ServerManage manage = ServerManage.instance;
manage.start(); manage.start();


} }


@Test @Test
public void testServerManageStart(){ public void testServerManageStart(){
ServerManage manage = ServerManage.instance;
manage.start(); manage.start();
} }


Expand All @@ -60,7 +63,8 @@ public void run() {
e.printStackTrace(); e.printStackTrace();
} }
log.warn("start init "+ip); log.warn("start init "+ip);
ServerManage manage = ServerManage.instance; ServerApp app = new ServerApp(ip,"3000");
ServerManage manage = new ServerManage(app);
manage.start(); manage.start();
log.warn("end init "+ip); log.warn("end init "+ip);
} }
Expand Down
Expand Up @@ -86,9 +86,7 @@ public void testLocalIp(){
@Test @Test
public void testRegisterIp(){ public void testRegisterIp(){
String localIp = InetAddressUtil.getInetAddress(); String localIp = InetAddressUtil.getInetAddress();
ServerApp app = new ServerApp(); ServerApp app = new ServerApp(localIp,"3000");
app.setIp(localIp);
app.setPort("3000");
zkUtil.registerPersist("/"+localIp, Jsons.toJson(app)); zkUtil.registerPersist("/"+localIp, Jsons.toJson(app));
String value = zkUtil.get("/"+localIp); String value = zkUtil.get("/"+localIp);
System.out.println(value); System.out.println(value);
Expand Down

0 comments on commit 23f42ce

Please sign in to comment.