Skip to content

Commit

Permalink
zk multhread test update
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Dec 31, 2015
1 parent d70deb6 commit 8a3e784
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 28 deletions.
12 changes: 12 additions & 0 deletions mpush-tools/src/main/java/com/shinemo/mpush/tools/zk/PathEnum.java
Expand Up @@ -8,12 +8,20 @@ public enum PathEnum {
public String getPathByIp(String ip) { public String getPathByIp(String ip) {
return getPath()+"/machine"; return getPath()+"/machine";
} }
@Override
public String getPathByName(String name) {
return getPath()+"/"+name;
}
}, },
CONNECTION_SERVER_KICK("/cs/%s/kick/con","连接服务器踢人的路径"){ CONNECTION_SERVER_KICK("/cs/%s/kick/con","连接服务器踢人的路径"){
@Override @Override
public String getPathByIp(String ip) { public String getPathByIp(String ip) {
return String.format(getPath(), ip); return String.format(getPath(), ip);
} }
@Override
public String getPathByName(String name) {
return getPath()+"/"+name;
}
}; };


PathEnum(String path, String desc) { PathEnum(String path, String desc) {
Expand All @@ -32,7 +40,11 @@ public String getDesc() {
return desc; return desc;
} }


//不同的机器,注册到不同的路径
public abstract String getPathByIp(String ip); public abstract String getPathByIp(String ip);

//根据从zk中获取的app的值,拼装全路径
public abstract String getPathByName(String name);


public static void main(String[] args) { public static void main(String[] args) {
String test = "/cs/%s/kick"; String test = "/cs/%s/kick";
Expand Down
Expand Up @@ -35,7 +35,7 @@ public void handler(CuratorFramework client, TreeCacheEvent event, String path)
if (path.startsWith(entry.getKey())) { if (path.startsWith(entry.getKey())) {
entry.getValue().handler(client, event, path); entry.getValue().handler(client, event, path);
} else { // 其他路径的事件,暂时不关心 } else { // 其他路径的事件,暂时不关心
log.warn("path:" + path + "," + event.getType().name()); log.warn("ListenerDispatcher other path:" + path + "," + event.getType().name());
} }
} }


Expand Down
Expand Up @@ -3,6 +3,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
Expand All @@ -12,6 +14,7 @@
import com.shinemo.mpush.tools.Jsons; import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.zk.PathEnum; import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp; import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil;
import com.shinemo.mpush.tools.zk.listener.CallBack; import com.shinemo.mpush.tools.zk.listener.CallBack;
import com.shinemo.mpush.tools.zk.manage.ServerManage; import com.shinemo.mpush.tools.zk.manage.ServerManage;


Expand All @@ -27,32 +30,45 @@ public class ConnectionPathListener implements CallBack{


@Override @Override
public void handler(CuratorFramework client, TreeCacheEvent event, String path) { public void handler(CuratorFramework client, TreeCacheEvent event, String path) {
String data = new String(event.getData().getData());
if (Type.NODE_ADDED == event.getType()) { if (Type.NODE_ADDED == event.getType()) {
log.warn("path:" + path + ", node Add"); log.warn("ConnectionPathListener path:" + path + ", node Add"+","+data);
} else if (Type.NODE_REMOVED == event.getType()) { } else if (Type.NODE_REMOVED == event.getType()) {
log.warn("path:" + path + ", node Remove"); log.warn("ConnectionPathListener path:" + path + ", node Remove"+","+data);
} else if (Type.NODE_UPDATED == event.getType()) { } else if (Type.NODE_UPDATED == event.getType()) {
log.warn("path:" + path + "," + "node update"); log.warn("ConnectionPathListener path:" + path + "," + "node update"+","+data);
} else { } else {
log.warn("path:" + path + "," + event.getType().name()); log.warn("ConnectionPathListener other path:" + path + "," + event.getType().name()+","+data);
} }
} }


@Override @Override
public void initData(ServerManage manage) { public void initData(ServerManage manage) {
log.warn("start init data"); log.warn("start init app data");
List<String> rawData = manage.getZkUtil().getChildrenKeys(PathEnum.CONNECTION_SERVER_ALL_HOST.getPath()); getData();
printAppList();
log.warn("end init app data");
}

private void getData(){
//获取机器列表
List<String> rawData = ZkUtil.instance.getChildrenKeys(PathEnum.CONNECTION_SERVER_ALL_HOST.getPath());
List<ServerApp> newAppList = new ArrayList<ServerApp>(); List<ServerApp> newAppList = new ArrayList<ServerApp>();
for(String raw:rawData){ for(String raw:rawData){
ServerApp app = Jsons.fromJson(raw, ServerApp.class); String rawApp = ZkUtil.instance.get(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByName(raw));
ServerApp app = Jsons.fromJson(rawApp, ServerApp.class);
newAppList.add(app); newAppList.add(app);
} }
appList = newAppList; appList = newAppList;
log.warn("end init data");
} }


public List<ServerApp> getAppList(){ public List<ServerApp> getAppList(){
return appList; return appList;
} }


private void printAppList(){
for(ServerApp app:appList){
log.warn(ToStringBuilder.reflectionToString(app, ToStringStyle.DEFAULT_STYLE));
}
}
} }
Expand Up @@ -19,14 +19,15 @@ public class KickPathListener implements CallBack{


@Override @Override
public void handler(CuratorFramework client, TreeCacheEvent event, String path) { public void handler(CuratorFramework client, TreeCacheEvent event, String path) {
String data = new String(event.getData().getData());
if (Type.NODE_ADDED == event.getType()) { if (Type.NODE_ADDED == event.getType()) {
log.warn("path:" + path + ", node Add" +","+event.getData().getData()); log.warn("path:" + path + ", node Add"+","+data);
} else if (Type.NODE_REMOVED == event.getType()) { } else if (Type.NODE_REMOVED == event.getType()) {
log.warn("path:" + path + ", node Remove"+","+event.getData().getData()); log.warn("path:" + path + ", node Remove"+","+data);
} else if (Type.NODE_UPDATED == event.getType()) { } else if (Type.NODE_UPDATED == event.getType()) {
log.warn("path:" + path + "," + "node update"+","+event.getData().getData()); log.warn("path:" + path + "," + "node update"+","+data);
} else { } else {
log.warn("path:" + path + "," + event.getType().name()+","+event.getData().getData()); log.warn("other path:" + path + "," + event.getType().name()+","+data);
} }
} }


Expand Down
Expand Up @@ -24,7 +24,7 @@ public class ServerManage {


private static ZkUtil zkUtil = ZkUtil.instance; private static ZkUtil zkUtil = ZkUtil.instance;


private static final AtomicBoolean startFlag = new AtomicBoolean(false); private final AtomicBoolean startFlag = new AtomicBoolean(false);


private final ServerApp app; private final ServerApp app;


Expand All @@ -41,10 +41,10 @@ public void start() {
ListenerDispatcher dispatcher = new ListenerDispatcher(app); ListenerDispatcher dispatcher = new ListenerDispatcher(app);


//注册机器到zk中 //注册机器到zk中
registerApp(app); registerApp();


// 注册连接状态监听器 // 注册连接状态监听器
registerConnectionLostListener(app); registerConnectionLostListener();


// 注册节点数据变化 // 注册节点数据变化
registerDataChange(dispatcher); registerDataChange(dispatcher);
Expand All @@ -54,12 +54,16 @@ public void start() {


} }


private void registerApp(ServerApp app){ private void registerApp(){
zkUtil.registerEphemeralSequential(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()),Jsons.toJson(app)); zkUtil.registerEphemeralSequential(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()),Jsons.toJson(app));
} }

public void unregisterApp(){
zkUtil.remove(PathEnum.CONNECTION_SERVER_ALL_HOST.getPathByIp(app.getIp()));
}


// 注册连接状态监听器 // 注册连接状态监听器
private void registerConnectionLostListener(final ServerApp app) { private void registerConnectionLostListener() {
zkUtil.getClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { zkUtil.getClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {


@Override @Override
Expand All @@ -81,6 +85,7 @@ private void registerDataChange(final CallBack callBack) {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
String path = null == event.getData() ? "" : event.getData().getPath(); String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) { if (path.isEmpty()) {
log.warn("registerDataChange empty path:" + path + "," + event.getType().name());
return; return;
} }
callBack.handler(client, event, path); callBack.handler(client, event, path);
Expand Down
Expand Up @@ -14,14 +14,14 @@ public class ServerManageTest {


private static Executor executor = Executors.newCachedThreadPool(); private static Executor executor = Executors.newCachedThreadPool();


private ServerApp app = new ServerApp("127.0.0.1","3000"); private ServerApp app = new ServerApp("10.1.10.65","3000");


private ServerManage manage = new ServerManage(app); private ServerManage manage = new ServerManage(app);


@Test @Test
public void testMulThread() throws InterruptedException{ public void testMulThreadRegisterApp() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
for(int i = 1;i<=10;i++){ for(int i = 1;i<=2;i++){
executor.execute(new Worker("192.168.1."+i, latch)); executor.execute(new Worker("192.168.1."+i, latch));
} }
latch.countDown(); latch.countDown();
Expand All @@ -30,13 +30,6 @@ public void testMulThread() throws InterruptedException{
} }




@Test
public void testUpdate(){

manage.start();

}

@Test @Test
public void testServerManageStart(){ public void testServerManageStart(){
manage.start(); manage.start();
Expand Down Expand Up @@ -66,6 +59,15 @@ public void run() {
ServerApp app = new ServerApp(ip,"3000"); ServerApp app = new ServerApp(ip,"3000");
ServerManage manage = new ServerManage(app); ServerManage manage = new ServerManage(app);
manage.start(); manage.start();

try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}

manage.close();

log.warn("end init "+ip); log.warn("end init "+ip);
} }


Expand Down

0 comments on commit 8a3e784

Please sign in to comment.