diff --git a/src/java/com/twitter/common/zookeeper/ZooKeeperMap.java b/src/java/com/twitter/common/zookeeper/ZooKeeperMap.java index 478f1404f..dc97080b6 100644 --- a/src/java/com/twitter/common/zookeeper/ZooKeeperMap.java +++ b/src/java/com/twitter/common/zookeeper/ZooKeeperMap.java @@ -65,6 +65,29 @@ */ public class ZooKeeperMap extends ForwardingMap { + /** + * An optional listener which can be supplied and triggered when entries in a ZooKeeperMap + * are changed or removed. For a ZooKeeperMap of type , the listener will fire a "nodeChanged" + * event with the name of the ZNode that changed, and its resulting value as interpreted by the + * provided deserializer. Removal of child nodes triggers the "nodeRemoved" method indicating the + * name of the ZNode which is no longer present in the map. + */ + public interface ZKMapListener { + + /** + * Fired when a node is added to the ZooKeeperMap or changed. + * @param nodeName indicates the name of the ZNode that was added or changed. + * @param value is the new value of the node after passing through your supplied deserializer. + */ + public void nodeChanged(String nodeName, V value); + + /** + * Fired when a node is removed from the ZooKeeperMap. + * @param nodeName indicates the name of the ZNode that was removed from the ZooKeeperMap. + */ + public void nodeRemoved(String nodeName); + } + /** * Default deserializer for the constructor if you want to simply store the zookeeper byte[] data * in this map. @@ -80,6 +103,8 @@ public class ZooKeeperMap extends ForwardingMap { private final ConcurrentMap localMap; private final BackoffHelper backoffHelper; + private final ZKMapListener mapListener; + // Whether it's safe to re-establish watches if our zookeeper session has expired. private final Object safeToRewatchLock; private volatile boolean safeToRewatch; @@ -92,6 +117,7 @@ public class ZooKeeperMap extends ForwardingMap { * @param nodePath path to a node whose data will be watched * @param deserializer a function that converts byte[] data from a zk node to this map's * value type V + * @param listener is a ZKMapListener which fires when values are added, changed, or removed. * * @throws InterruptedException if the underlying zookeeper server transaction is interrupted * @throws KeeperException.NoNodeException if the given nodePath doesn't exist @@ -100,13 +126,34 @@ public class ZooKeeperMap extends ForwardingMap { * cluster */ public static ZooKeeperMap create(ZooKeeperClient zkClient, String nodePath, - Function deserializer) throws InterruptedException, KeeperException, + Function deserializer, ZKMapListener listener) throws InterruptedException, KeeperException, ZooKeeperConnectionException { - ZooKeeperMap zkMap = new ZooKeeperMap(zkClient, nodePath, deserializer); + ZooKeeperMap zkMap = new ZooKeeperMap(zkClient, nodePath, deserializer, listener); zkMap.init(); return zkMap; } + /** + * Returns an initialized ZooKeeperMap. The given path must exist at the time of + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static ZooKeeperMap create(ZooKeeperClient zkClient, String nodePath, + Function deserializer) throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + return ZooKeeperMap.create(zkClient, nodePath, deserializer, null); + } + /** * Initializes a ZooKeeperMap. The given path must exist at the time of object creation or * a {@link KeeperException} will be thrown. @@ -119,6 +166,7 @@ public static ZooKeeperMap create(ZooKeeperClient zkClient, String nodePa * @param nodePath top-level node path under which the map data lives * @param deserializer a function that converts byte[] data from a zk node to this map's * value type V + * @param listener is a ZKMapListener which fires when values are added, changed, or removed. * * @throws InterruptedException if the underlying zookeeper server transaction is interrupted * @throws KeeperException.NoNodeException if the given nodePath doesn't exist @@ -128,9 +176,10 @@ public static ZooKeeperMap create(ZooKeeperClient zkClient, String nodePa */ @VisibleForTesting ZooKeeperMap(ZooKeeperClient zkClient, String nodePath, - Function deserializer) throws InterruptedException, KeeperException, + Function deserializer, ZKMapListener mapListener) throws InterruptedException, KeeperException, ZooKeeperConnectionException { super(); + this.mapListener = mapListener; this.zkClient = Preconditions.checkNotNull(zkClient); this.nodePath = MorePreconditions.checkNotBlank(nodePath); this.deserializer = Preconditions.checkNotNull(deserializer); @@ -145,6 +194,32 @@ public static ZooKeeperMap create(ZooKeeperClient zkClient, String nodePa } } + /** + * Initializes a ZooKeeperMap. The given path must exist at the time of object creation or + * a {@link KeeperException} will be thrown. + * + * Please note that this object will not track any remote zookeeper data until {@link #init()} + * is successfully called. After construction and before that call, this {@link Map} will + * be empty. + * + * @param zkClient a zookeeper client + * @param nodePath top-level node path under which the map data lives + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + ZooKeeperMap(ZooKeeperClient zkClient, String nodePath, + Function deserializer) throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + this(zkClient, nodePath, deserializer, null); + } + /** * Initialize zookeeper tracking for this {@link Map}. Once this call returns, this object * will be tracking data in zookeeper. @@ -331,11 +406,13 @@ public void process(WatchedEvent event) { @VisibleForTesting void removeEntry(String key) { localMap.remove(key); + if (mapListener != null) mapListener.nodeRemoved(key); } @VisibleForTesting void putEntry(String key, V value) { localMap.put(key, value); + if (mapListener != null) mapListener.nodeChanged(key, value); } private void rewatchDataNodes() throws InterruptedException {