Skip to content
This repository has been archived by the owner on Dec 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #177 from foursquare/fs-zk-20130819
Browse files Browse the repository at this point in the history
Fixed ZooKeeperNode to prevent duplicate WatcherS
  • Loading branch information
jsirois committed Aug 23, 2013
2 parents e83df50 + 975abdc commit 2f8f699
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions src/java/com/twitter/common/zookeeper/ZooKeeperNode.java
Expand Up @@ -14,6 +14,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

Expand Down Expand Up @@ -60,6 +61,14 @@ public class ZooKeeperNode<T> implements Supplier<T> {
@Nullable private volatile T nodeData;
private final Closure<T> dataUpdateListener;

/**
* When a call to ZooKeeper.getData is made, the Watcher is added to a Set before the the network request is made
* and if the request fails, the Watcher remains. There's a problem where WatcherS can accumulate when there are
* failed requests, so they are set to instance fields and reused.
*/
private final Watcher nodeWatcher;
private final Watcher existenceWatcher;

/**
* Returns an initialized ZooKeeperNode. The given node must exist at the time of object
* creation or a {@link KeeperException} will be thrown.
Expand Down Expand Up @@ -156,6 +165,35 @@ public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodeP
safeToRewatchLock = new Object();
safeToRewatch = false;
nodeData = NO_DATA;

nodeWatcher = new Watcher() {
@Override public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
try {
tryWatchDataNode();
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e);
Thread.currentThread().interrupt();
}
} else {
LOG.info("Ignoring watcher event " + event);
}
}
};

existenceWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Watcher.Event.EventType.NodeCreated) {
try {
tryWatchDataNode();
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e);
Thread.currentThread().interrupt();
}
}
}
};
}

/**
Expand Down Expand Up @@ -241,21 +279,6 @@ private void tryWatchDataNode() throws InterruptedException {

private void watchDataNode() throws InterruptedException, KeeperException,
ZooKeeperConnectionException {
final Watcher nodeWatcher = new Watcher() {
@Override public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
try {
tryWatchDataNode();
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e);
Thread.currentThread().interrupt();
}
} else {
LOG.info("Ignoring watcher event " + event);
}
}
};

try {
Stat stat = new Stat();
byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat);
Expand All @@ -273,27 +296,13 @@ private void watchDataNode() throws InterruptedException, KeeperException,

private void watchForExistence() throws InterruptedException, KeeperException,
ZooKeeperConnectionException {
final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Watcher.Event.EventType.NodeCreated) {
try {
tryWatchDataNode();
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e);
Thread.currentThread().interrupt();
}
}
}
};

/*
* If the node was created between the getData call and this call, just try watching it.
* We'll have an extra exists watch on it that goes off on its next deletion, which will
* be a no-op.
* Otherwise, just let the exists watch wait for its creation.
*/
if (zkClient.get().exists(nodePath, watcher) != null) {
if (zkClient.get().exists(nodePath, existenceWatcher) != null) {
tryWatchDataNode();
}
}
Expand Down

0 comments on commit 2f8f699

Please sign in to comment.