Skip to content

Commit

Permalink
Added numberOfListeners() method.
Browse files Browse the repository at this point in the history
  • Loading branch information
pvoss committed Sep 17, 2009
1 parent 68ed1c1 commit 5269f8a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
33 changes: 25 additions & 8 deletions src/main/java/org/I0Itec/zkclient/ZkClient.java
Expand Up @@ -41,7 +41,7 @@ public class ZkClient implements Watcher {

private final static Logger LOG = Logger.getLogger(ZkClient.class);

private final IZkConnection _connection;
private IZkConnection _connection;
private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<String, Set<IZkChildListener>>();
private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener = new ConcurrentHashMap<String, Set<IZkDataListener>>();
private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>();
Expand Down Expand Up @@ -105,7 +105,7 @@ public void subscribeDataChanges(String path, IZkDataListener listener) {
listeners.add(listener);
}
watchForData(path);
LOG.info("Subscribed data changes for " + path);
LOG.debug("Subscribed data changes for " + path);
}

public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
Expand Down Expand Up @@ -291,7 +291,7 @@ public boolean exists(final String path) {
}

private void processStateChanged(WatchedEvent event) {
LOG.warn("zookeeper state changed (" + event.getState() + ")");
LOG.info("zookeeper state changed (" + event.getState() + ")");
setCurrentState(event.getState());
if (getShutdownTrigger()) {
return;
Expand Down Expand Up @@ -425,7 +425,7 @@ public void run() throws Exception {

public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
LOG.info("Waiting until znode '" + path + "' becomes available.");
LOG.debug("Waiting until znode '" + path + "' becomes available.");
if (exists(path)) {
return true;
}
Expand Down Expand Up @@ -499,7 +499,7 @@ public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit t
}
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));

LOG.info("Waiting for keeper state " + keeperState);
LOG.debug("Waiting for keeper state " + keeperState);
acquireEventLock();
try {
boolean stillWaiting = true;
Expand All @@ -509,7 +509,7 @@ public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit t
}
stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
}
LOG.info("State is " + _currentState);
LOG.debug("State is " + _currentState);
return true;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
Expand Down Expand Up @@ -708,19 +708,23 @@ public long getCreationTime(String path) {
}

public void close() {
LOG.info("Closing ZkClient...");
if (_connection == null) {
return;
}
LOG.debug("Closing ZkClient...");
getEventLock().lock();
try {
setShutdownTrigger(true);
_eventThread.interrupt();
_eventThread.join(2000);
_connection.close();
_connection = null;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} finally {
getEventLock().unlock();
}
LOG.info("Closing ZkClient...done");
LOG.debug("Closing ZkClient...done");
}

private void reconnect() {
Expand All @@ -742,4 +746,17 @@ public void setShutdownTrigger(boolean triggerState) {
public boolean getShutdownTrigger() {
return _shutdownTriggered;
}

public int numberOfListeners() {
int listeners = 0;
for (Set<IZkChildListener> childListeners : _childListener.values()) {
listeners += childListeners.size();
}
for (Set<IZkDataListener> dataListeners : _dataListener.values()) {
listeners += dataListeners.size();
}
listeners += _stateListener.size();

return listeners;
}
}
32 changes: 32 additions & 0 deletions src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java
Expand Up @@ -2,6 +2,8 @@

import static org.junit.Assert.*;

import static org.mockito.Mockito.*;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -337,4 +339,34 @@ public void testGetCreationTime() throws Exception {
long creationTime = _client.getCreationTime(path);
assertTrue(start < creationTime && end > creationTime);
}

@Test
public void testNumberOfListeners() {
IZkChildListener zkChildListener = mock(IZkChildListener.class);
_client.subscribeChildChanges("/", zkChildListener);
assertEquals(1, _client.numberOfListeners());

IZkDataListener zkDataListener = mock(IZkDataListener.class);
_client.subscribeDataChanges("/a", zkDataListener);
assertEquals(2, _client.numberOfListeners());

_client.subscribeDataChanges("/b", zkDataListener);
assertEquals(3, _client.numberOfListeners());

IZkStateListener zkStateListener = mock(IZkStateListener.class);
_client.subscribeStateChanges(zkStateListener);
assertEquals(4, _client.numberOfListeners());

_client.unsubscribeChildChanges("/", zkChildListener);
assertEquals(3, _client.numberOfListeners());

_client.unsubscribeDataChanges("/b", zkDataListener);
assertEquals(2, _client.numberOfListeners());

_client.unsubscribeDataChanges("/a", zkDataListener);
assertEquals(1, _client.numberOfListeners());

_client.unsubscribeStateChanges(zkStateListener);
assertEquals(0, _client.numberOfListeners());
}
}
18 changes: 18 additions & 0 deletions src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java
@@ -0,0 +1,18 @@
package org.I0Itec.zkclient;

import static org.junit.Assert.*;

import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.junit.Test;

public class InMemoryConnectionTest {

@Test
public void testGetChildren_OnEmptyFileSystem() throws KeeperException, InterruptedException {
InMemoryConnection connection = new InMemoryConnection();
List<String> children = connection.getChildren("/", false);
assertEquals(0, children.size());
}
}

0 comments on commit 5269f8a

Please sign in to comment.