-
Notifications
You must be signed in to change notification settings - Fork 2
/
ZkClientSample.java
237 lines (195 loc) · 7.88 KB
/
ZkClientSample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package org.lam.zookeeper.zkclient;
import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import com.google.gson.Gson;
import lam.log.Console;
/**
* <p>
* TODO
* </p>
* @author linanmiao
* @date 2017年3月5日
* @versio 1.0
*/
public class ZkClientSample {
static Gson gson = new Gson();
public static void main(String[] args) throws Exception {
int connectionTimeOut = Integer.MAX_VALUE;//time out for connecting to server
ZkClient zkClient = new ZkClient(buildDefaultZkConnection(), connectionTimeOut, buildDefaultZkSerializer());
zkClient.subscribeStateChanges(new IZkStateListener(){
@Override
public void handleNewSession() throws Exception {
}
@Override
public void handleSessionEstablishmentError(Throwable arg0) throws Exception {
}
@Override
public void handleStateChanged(KeeperState arg0) throws Exception {
}
});
String path = "/test1";
String rootPath = "/";
//subscribe for node children list change
zkClient.subscribeChildChanges(rootPath, new IZkChildListener(){
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
lam.log.Console.println("%s child changed, child list:%s", parentPath, currentChilds);
}
});
LamZkData data = new LamZkData();
data.setServiceIp("192.168.20.100");
data.setServiceName("user-service");
data.setWeight(2);
//create node
String reply = zkClient.create(path, data, CreateMode.EPHEMERAL);
//subscribe for node data change
zkClient.subscribeDataChanges(path, new IZkDataListener(){
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
lam.log.Console.println("path:%s, data changed:%s", dataPath, gson.toJson(data));
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
lam.log.Console.println("path:%s, data deleted", dataPath);
}});
//read/get data from node
boolean returnNullIfPathNotExists = true;
LamZkData d = zkClient.readData(path, returnNullIfPathNotExists);
lam.log.Console.println("path:%s, data:%s", path, gson.toJson(d));
//get children list of node
List<String> children = zkClient.getChildren(rootPath);
lam.log.Console.println("path:%s, children list:%s", path, children);
//write node data
data.setWeight(3);
zkClient.writeData(path, data);
//delete node
zkClient.delete(path);
//权限控制=====================================
String digest = "digest"; //schema
String superAuth = "superUser:111111";
String createAuth = "createUser:111111"; //CREATE(c): 创建权限,可以创建当前node的子节点
String deleteAuth = "deleteUser:111111"; //DELETE(d): 删除权限,可以删除当前node的子节点
String readAuth = "readUser:111111"; //READ(r): 读权限,可以获取当前node的数据,可以list当前node所有的child nodes
String writeAuth = "writeUser:111111"; //WRITE(w): 写权限,可以向当前node写数据
String adminAuth = "adminUser:111111"; //ADMIN(a): 管理权限,可以设置当前node的permission
/**
public interface Perms {
int READ = 1 << 0; //00001
int WRITE = 1 << 1; //00010
int CREATE = 1 << 2; //00100
int DELETE = 1 << 3; //01000
int ADMIN = 1 << 4; //10000
int ALL = READ | WRITE | CREATE | DELETE | ADMIN; //11111
}
*/
//schema:权限控制模式(world,auth,digest,ip)
//auth:foo:true,类似:username:password
//先添加一个认证用户
zkClient.addAuthInfo(digest, superAuth.getBytes());
List<ACL> acls = new ArrayList<ACL>();
acls.add(new ACL(ZooDefs.Perms.ALL, new Id(digest, DigestAuthenticationProvider.generateDigest(superAuth))));
acls.add(new ACL(ZooDefs.Perms.CREATE, new Id(digest, DigestAuthenticationProvider.generateDigest(createAuth))));
acls.add(new ACL(ZooDefs.Perms.DELETE, new Id(digest, DigestAuthenticationProvider.generateDigest(deleteAuth))));
acls.add(new ACL(ZooDefs.Perms.READ, new Id(digest, DigestAuthenticationProvider.generateDigest(readAuth))));
acls.add(new ACL(ZooDefs.Perms.WRITE, new Id(digest, DigestAuthenticationProvider.generateDigest(writeAuth))));
acls.add(new ACL(ZooDefs.Perms.ADMIN, new Id(digest, DigestAuthenticationProvider.generateDigest(adminAuth))));
path = "/test";
zkClient.createPersistent(path, data, acls);
/**
[zk: 192.168.20.111:2181(CONNECTED) 8] getAcl /test
'digest,'superUser:vLmzknrXH9dGsV4jNWsPmK2gqUU=
: cdrwa
'digest,'createUser:vqAQivdWW2drkS8FT+LjmAbUY4k=
: c
'digest,'deleteUser:mHEOW8g3eARHH2FcfvbTIB7uAYo=
: d
'digest,'readUser:EVdGv4F9QkpvcZYCa99HF110xGQ=
: r
'digest,'writeUser:E1L5Eqbt3mwODQm4Rhaa+Qzyxb8=
: w
'digest,'adminUser:iioWqkg9vM4tfkUhuAgLJt9Wjgs=
: a
*/
//Console.println("AuthNode data:" + gson.toJson(zkClient.readData(path)));
waitSelf();
}
private static void waitSelf(){
try {
synchronized (ZkClientSample.class) {
ZkClientSample.class.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void sleep(long timeOut){
try {
TimeUnit.MILLISECONDS.sleep(timeOut);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* ZkConnection 实现 IZkConnection接口<br/>
* IZkConnection接口定义了客户端 对zookeeper服务器的一系列操作,如:<br/>
* 连接,断开,创建/删除节点,往节点写/删数据,获取子节点列表等
*/
private static ZkConnection buildDefaultZkConnection(){
String zkServers = "192.168.20.111:2181";
int sessionTimeOut = 30000;//half of a minute;
ZkConnection zkConnection = new ZkConnection(zkServers, sessionTimeOut);
return zkConnection;
}
/**
* ZkSerializer 定义 的客户端 发送数据给 服务器时,对发送的数据进行序列化,接收数据时反序列化
* SerializableSerializer 是 ZkSerializer接口的实现类,序列化策略是JDK自带的序列化
*/
private static ZkSerializer buildDefaultZkSerializer(){
SerializableSerializer zkSerializer = new SerializableSerializer();
return zkSerializer;
}
private static ZkSerializer buildJsonZkSerializer(){
ZkSerializer zkSerializer = new ZkSerializer() {
static final String DEFAULT_CHARSET_NAME = "utf-8";
Class<?> classType = LamZkData.class;
Gson gson = new Gson();
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String obj = gson.toJson(data, classType);
try {
return obj.getBytes(DEFAULT_CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
return null;
}
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
String obj = new String(bytes, DEFAULT_CHARSET_NAME);
return gson.fromJson(obj, classType);
} catch (UnsupportedEncodingException e) {
return null;
}
}
};
return zkSerializer;
}
}