-
Notifications
You must be signed in to change notification settings - Fork 0
/
DistributeClientLock.java
114 lines (96 loc) · 3.13 KB
/
DistributeClientLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.lunarku.bigdata.zklock;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* zookeeper 实现分布式锁
*/
public class DistributeClientLock {
// 设置连接超时时间
private static final int SESSION_TIMEOUT = 5000;
// zookeeper 集群地址
private String hosts =
"192.168.25.141:2181,192.168.25.142:2181,192.168.25.143:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 记录自己创建的子节点路径
private String thisPath;
/**
* 判断 group 路径是否存在, 若不存在则创建
* @throws Exception
*/
public void createGroupNode() throws Exception {
Stat exists = zk.exists("/" + groupNode, false);
if(exists == null) {
zk.create("/" + groupNode, "parent Node".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 连接zookeeper
* @throws Exception
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
// 判断事件类型, 此处只处理子节点变化事件
if(event.getType() == EventType.NodeChildrenChanged
&& event.getPath().equals("/" + groupNode)) {
// 获取子节点, 并对父节点进行监听
List<String> children = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
Collections.sort(children);
if(children.indexOf(thisNode) == 0) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
this.createGroupNode();
// 初始化 thisPath
thisPath = zk.create("/" + groupNode + "/" + subNode, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(new Random().nextInt(1000));
// 获取group下子节点, 并设置对group的监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
if(childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 处理业务逻辑, 处理完成后将thispath删除
* @throws Exception
*/
private void doSomething() throws Exception {
try {
//System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
}finally {
//System.out.println("finished: " + thisPath);
//
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributeClientLock dl = new DistributeClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}