Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

upgrade to curator 1.0.1, halt process when there's an unrecoverable …

…error
  • Loading branch information...
commit 7cce193313f89b9afa7f57e288993235e9f8c165 1 parent 0eb696b
@nathanmarz nathanmarz authored
View
2  project.clj
@@ -13,7 +13,7 @@
[storm/libthrift7 "0.7.0"]
[clj-time "0.3.0"]
[log4j/log4j "1.2.16"]
- [com.netflix.curator/curator-framework "0.6.4"]
+ [com.netflix.curator/curator-framework "1.0.1"]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[com.googlecode/kryo "1.04"]
View
1  src/clj/backtype/storm/cluster.clj
@@ -180,6 +180,7 @@
(defstruct TaskError :error :time-secs)
+;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
(defn mk-storm-cluster-state [cluster-state-spec]
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
View
42 src/clj/backtype/storm/zookeeper.clj
@@ -1,6 +1,6 @@
(ns backtype.storm.zookeeper
(:import [com.netflix.curator.retry RetryNTimes])
- (:import [com.netflix.curator.framework.api CuratorEvent CuratorListener])
+ (:import [com.netflix.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener])
(:import [com.netflix.curator.framework CuratorFramework CuratorFrameworkFactory])
(:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException
ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
@@ -27,32 +27,42 @@
})
-;; TODO: make this block until session is established (wait until a flag is triggered by watcher)
(defn mk-client
([conn-str session-timeout watcher]
- (let [fk (CuratorFrameworkFactory/newClient
+ (let [fk (CuratorFrameworkFactory/newClient
conn-str
session-timeout
15000
- ;;TODO: make retry times could be configured.
+ ;;TODO: consider making this configurable
(RetryNTimes. 5 1000))]
- (.. fk (getCuratorListenable) (addListener (reify CuratorListener
- (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
- (let [^WatchedEvent event (.getWatchedEvent e)]
- (watcher (zk-keeper-states (.getState event))
- (zk-event-types (.getType event))
- (.getPath event)))))))
- (.start fk)
- fk))
+ (.. fk
+ (getCuratorListenable)
+ (addListener
+ (reify CuratorListener
+ (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
+ (when (= (.getType e) CuratorEventType/WATCHED)
+ (let [^WatchedEvent event (.getWatchedEvent e)]
+ (watcher (zk-keeper-states (.getState event))
+ (zk-event-types (.getType event))
+ (.getPath event))))))))
+ (.. fk
+ (getUnhandledErrorListenable)
+ (addListener
+ (reify UnhandledErrorListener
+ (unhandledError [this msg error]
+ (log-error error "Unrecoverable Zookeeper error, halting process: " msg)
+ (halt-process! 1 "Unrecoverable Zookeeper error")))))
+ (.start fk)
+ fk))
([conn-str watcher]
- (mk-client conn-str 10000 watcher))
+ (mk-client conn-str 10000 watcher))
([conn-str]
- ;; this constructor is intended for debugging
- (mk-client
+ ;; this constructor is intended for debugging
+ (mk-client
conn-str
(fn [state type path]
(log-message "Zookeeper state update: " state type path)))
- ))
+ ))
(def zk-create-modes
{:ephemeral CreateMode/EPHEMERAL
Please sign in to comment.
Something went wrong with that request. Please try again.