This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
curator.clj
156 lines (141 loc) · 5.55 KB
/
curator.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
(ns onyx.log.curator
(:require [taoensso.timbre :refer [fatal warn trace info]]
[onyx.static.default-vals :refer [default-vals]])
(:import [org.apache.zookeeper CreateMode]
[org.apache.zookeeper
KeeperException
KeeperException$NoNodeException
KeeperException$NodeExistsException
KeeperException$Code
Watcher]
[org.apache.curator.test TestingServer]
[org.apache.zookeeper.data Stat]
[org.apache.curator.framework CuratorFrameworkFactory CuratorFramework]
[org.apache.curator.framework.api CuratorWatcher PathAndBytesable Versionable GetDataBuilder
SetDataBuilder DeleteBuilder ExistsBuilder GetChildrenBuilder Pathable Watchable]
[org.apache.curator.framework.state ConnectionStateListener ConnectionState]
[org.apache.curator.framework.imps CuratorFrameworkState]
[org.apache.curator RetryPolicy]
[org.apache.curator.retry BoundedExponentialBackoffRetry]))
;; Thanks to zookeeper-clj
(defn stat-to-map
([^org.apache.zookeeper.data.Stat stat]
(when stat
{:czxid (.getCzxid stat) ;; long
:mzxid (.getMzxid stat) ;; long
:ctime (.getCtime stat) ;; long
:mtime (.getMtime stat) ;; long
:version (.getVersion stat) ;; int
:cversion (.getCversion stat) ;; int
:aversion (.getAversion stat) ;; long
:ephemeralOwner (.getEphemeralOwner stat) ;;int
:dataLength (.getDataLength stat) ;; int
:numChildren (.getNumChildren stat) ;; int
:pzxid (.getPzxid stat)}))) ;; long
(defn event-to-map
([^org.apache.zookeeper.WatchedEvent event]
(when event
{:event-type (keyword (.name (.getType event)))
:keeper-state (keyword (.name (.getState event)))
:path (.getPath event)})))
;; Watcher
(defn make-watcher
([handler]
(reify Watcher
(process [this event]
(handler (event-to-map event))))))
(defn ^CuratorFramework connect
([connection-string]
(connect connection-string ""))
([connection-string ns]
(connect connection-string ns
(BoundedExponentialBackoffRetry.
(:onyx.zookeeper/backoff-base-sleep-time-ms default-vals)
(:onyx.zookeeper/backoff-max-sleep-time-ms default-vals)
(:onyx.zookeeper/backoff-max-retries default-vals))))
([connection-string ns ^RetryPolicy retry-policy]
(doto
(.. (CuratorFrameworkFactory/builder)
(namespace ns)
(connectString connection-string)
(retryPolicy retry-policy)
(build))
.start)))
(defn close
"Closes the connection to the ZooKeeper server."
[^CuratorFramework client]
(.close client))
(defn create-mode [opts]
(cond (and (:persistent? opts) (:sequential? opts))
CreateMode/PERSISTENT_SEQUENTIAL
(:persistent? opts)
CreateMode/PERSISTENT
(:sequential? opts)
CreateMode/EPHEMERAL_SEQUENTIAL
:else
CreateMode/EPHEMERAL))
(defn create
[^CuratorFramework client path & {:keys [data] :as opts}]
(try
(let [cr ^SetDataBuilder (.. client
create
(withMode (create-mode opts)))]
(if data
(.forPath ^SetDataBuilder cr path data)
(.forPath ^SetDataBuilder cr path)))
(catch org.apache.zookeeper.KeeperException$NodeExistsException e
false)))
(defn create-all
[^CuratorFramework client path & {:keys [data] :as opts}]
(try
(let [cr (.. client
create
creatingParentsIfNeeded
(withMode (create-mode opts)))]
(if data
(.forPath ^SetDataBuilder cr path data)
(.forPath ^SetDataBuilder cr path)))
(catch org.apache.zookeeper.KeeperException$NodeExistsException e
false)))
(defn delete
"Deletes the given node if it exists. Otherwise returns false."
[^CuratorFramework client path]
(try
(.forPath ^DeleteBuilder (.delete client) path)
true
(catch KeeperException$NoNodeException e false)))
(defn delete-with-children
"Deletes the given node if it exists. Otherwise returns false."
[^CuratorFramework client path]
(try (.forPath ^DeleteBuilder (.deletingChildrenIfNeeded (.delete client)) path)
true
(catch KeeperException$NoNodeException e false)))
(defn children
([^CuratorFramework client path & {:keys [watcher]}]
(let [children-builder ^GetChildrenBuilder (.getChildren client)]
(if watcher
(.forPath ^GetChildrenBuilder (.usingWatcher children-builder
^Watcher (make-watcher watcher))
path)
(.forPath ^GetChildrenBuilder children-builder path)))))
(defn data [^CuratorFramework client path]
(let [stat ^Stat (Stat.)
data (.forPath ^GetDataBuilder (.storingStatIn ^GetDataBuilder
(.getData client) stat)
path)]
{:data data
:stat (stat-to-map stat)}))
(defn set-data [^CuratorFramework client path data version]
(-> (.forPath ^SetDataBuilder (.withVersion ^SetDataBuilder (.setData client)
version)
path
data)
stat-to-map))
(defn exists [^CuratorFramework client path & {:keys [watcher]}]
(stat-to-map
(let [builder ^ExistsBuilder (.. client checkExists)]
(if watcher
(.forPath ^ExistsBuilder (.usingWatcher builder
^Watcher (make-watcher watcher))
path)
(.forPath builder path)))))