-
Notifications
You must be signed in to change notification settings - Fork 0
/
ZookeeperEventContainer.java
154 lines (129 loc) · 4.99 KB
/
ZookeeperEventContainer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package com.lastww.study.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by liuweiwei on 14-11-27.
*/
public class ZookeeperEventContainer implements Watcher {
private static Logger log = LoggerFactory.getLogger(ZookeeperEventContainer.class);
public static final int SESSION_TIMEOUT = 10000;
private ZooKeeper zooKeeper;
private CountDownLatch countDownLatch;
private static ZookeeperEventContainer container = new ZookeeperEventContainer();
public static ZookeeperEventContainer getInstance() {
return container;
}
public ZookeeperEventContainer() {
try {
this.zooKeeper = createZooKeeper();
this.monitor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public ZooKeeper createZooKeeper() throws IOException, InterruptedException {
log.debug("about to create zookeeper");
ZooKeeper newZooKeeper = new ZooKeeper("127.0.0.1:2181", SESSION_TIMEOUT, this);
countDownLatch = new CountDownLatch(1);
countDownLatch.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
log.debug("connection establed, session id:" + newZooKeeper.getSessionId());
return newZooKeeper;
}
public void monitor() {
//更新数据并注册watcher
try {
// 监听根节点数据变更、创建、删除
this.zooKeeper.getData("/root", true, null);
// 监听子节点数据更新、创建、删除
List<String> children = this.zooKeeper.getChildren("/root", true);
for (String child : children) {
this.zooKeeper.getData("/root/" + child, true, null);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
@Override
public void process(WatchedEvent event) {
log.debug("received event:" + event.toString());
switch (event.getType()) {
case None:
switch (event.getState()) {
case Expired:
log.debug("expired:" + this.zooKeeper.getSessionId());
try {
this.zooKeeper.close();
this.zooKeeper = createZooKeeper();
this.monitor();
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
case Disconnected:
log.debug("disconnected:" + this.zooKeeper.getSessionId());
try {
this.zooKeeper.close();
this.zooKeeper = createZooKeeper();
this.monitor();
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
case SyncConnected:
countDownLatch.countDown();
break;
}
break;
case NodeChildrenChanged:
//节点删除、创建同时会促发上层节点NodeChildrenChanged事件,因此可忽略
//case NodeDeleted:
//case NodeCreated:
case NodeDataChanged:
if (!this.zooKeeper.getState().isAlive()) {
try {
this.zooKeeper = createZooKeeper();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
this.monitor();
break;
}
}
public static void main(String[] args) {
ZookeeperEventContainer container = ZookeeperEventContainer.getInstance();
log.debug("container start");
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭连接、测试session disconnected
try {
ZooKeeper oldZooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, null, container.getZooKeeper().getSessionId(), null);
log.debug("close old zookeeper:" + oldZooKeeper.getSessionId());
oldZooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
while (true);
}
}