Skip to content

Commit

Permalink
Issue 45: Add conditional/versioned delete.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpj authored and jzillmann committed Mar 14, 2016
1 parent da86574 commit 8154536
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 11 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CHANGELOG

ZkClient 0.8 (???)
---------------

- #45: Support for conditional deletes


ZkClient 0.7 (Nov 2015)
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/I0Itec/zkclient/IZkConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface IZkConnection {

public void delete(String path) throws InterruptedException, KeeperException;

public void delete(String path, int version) throws InterruptedException, KeeperException;

boolean exists(final String path, final boolean watch) throws KeeperException, InterruptedException;

List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException;
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/org/I0Itec/zkclient/InMemoryConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package org.I0Itec.zkclient;

import java.util.*;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -201,13 +207,25 @@ private String getParentPath(String path) {

@Override
public void delete(String path) throws InterruptedException, KeeperException {
this.delete(path, -1);
}

@Override
public void delete(String path, int version) throws InterruptedException, KeeperException {
_lock.lock();
try {
if (!exists(path, false)) {
throw new KeeperException.NoNodeException();
}
String parentPath = getParentPath(path);
checkACL(parentPath, ZooDefs.Perms.DELETE);
// If version isn't -1, check that it mateches
if (version != -1) {
DataAndVersion item = _data.get(path);
if (item._version != version) {
throw KeeperException.create(Code.BADVERSION);
}
}
_data.remove(path);
_creationTime.remove(path);
checkWatch(_nodeWatches, path, EventType.NodeDeleted);
Expand Down Expand Up @@ -384,7 +402,7 @@ public void setAcl(String path, List<ACL> acl, int version) throws KeeperExcepti
}

DataAndVersion dataAndVersion = _data.get(path);
if(version != dataAndVersion._version) {
if (version != dataAndVersion._version) {
throw new KeeperException.BadVersionException();
}

Expand Down
20 changes: 12 additions & 8 deletions src/main/java/org/I0Itec/zkclient/ZkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ZkClient implements Watcher {
protected static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";

protected final IZkConnection _connection;
protected final long operationRetryTimeoutInMillis;
protected final long _operationRetryTimeoutInMillis;
private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<String, Set<IZkChildListener>>();
private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener = new ConcurrentHashMap<String, Set<IZkDataListener>>();
private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>();
Expand Down Expand Up @@ -150,7 +150,7 @@ public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, f
}
_connection = zkConnection;
_zkSerializer = zkSerializer;
this.operationRetryTimeoutInMillis = operationRetryTimeout;
_operationRetryTimeoutInMillis = operationRetryTimeout;
_isZkSaslEnabled = isZkSaslEnabled();
connect(connectionTimeout, this);
}
Expand Down Expand Up @@ -999,18 +999,18 @@ public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedExcep
throw ExceptionUtil.convertToRuntimeException(e);
}
// before attempting a retry, check whether retry timeout has elapsed
if (this.operationRetryTimeoutInMillis > -1 && (System.currentTimeMillis() - operationStartTime) >= this.operationRetryTimeoutInMillis) {
throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" + this.operationRetryTimeoutInMillis + " milli seconds)");
if (_operationRetryTimeoutInMillis > -1 && (System.currentTimeMillis() - operationStartTime) >= _operationRetryTimeoutInMillis) {
throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" + _operationRetryTimeoutInMillis + " milli seconds)");
}
}
}

private void waitForRetry() {
if (this.operationRetryTimeoutInMillis < 0) {
this.waitUntilConnected();
if (_operationRetryTimeoutInMillis < 0) {
waitUntilConnected();
return;
}
this.waitUntilConnected(this.operationRetryTimeoutInMillis, TimeUnit.MILLISECONDS);
waitUntilConnected(_operationRetryTimeoutInMillis, TimeUnit.MILLISECONDS);
}

public void setCurrentState(KeeperState currentState) {
Expand All @@ -1034,12 +1034,16 @@ public ZkLock getEventLock() {
}

public boolean delete(final String path) {
return delete(path, -1);
}

public boolean delete(final String path, final int version) {
try {
retryUntilConnected(new Callable<Object>() {

@Override
public Object call() throws Exception {
_connection.delete(path);
_connection.delete(path, version);
return null;
}
});
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/I0Itec/zkclient/ZkConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public void delete(String path) throws InterruptedException, KeeperException {
_zk.delete(path, -1);
}

@Override
public void delete(String path, int version) throws InterruptedException, KeeperException {
_zk.delete(path, version);
}

@Override
public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException {
return _zk.exists(path, watch) != null;
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.log4j.Logger;
import org.junit.After;
Expand Down Expand Up @@ -79,6 +81,26 @@ public void testDelete() throws Exception {
assertFalse(_client.delete(path));
}

@Test
public void testDeleteWithVersion() throws Exception {
LOG.info("--- testDelete");
String path = "/a";
assertFalse(_client.delete(path, 0));
_client.createPersistent(path, null);
assertTrue(_client.delete(path, 0));
_client.createPersistent(path, null);
_client.writeData(path, new byte[0]);
assertTrue(_client.delete(path, 1));
_client.createPersistent(path, null);
try {
_client.delete(path, 1);
fail("Bad version excpetion expected.");
} catch (ZkBadVersionException e) {
// expected
}
assertTrue(_client.delete(path, 0));
}

@Test
public void testDeleteRecursive() throws Exception {
LOG.info("--- testDeleteRecursive");
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/org/I0Itec/zkclient/ZkStateChangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode mode) t
public void delete(String path) throws InterruptedException, KeeperException {
throw new RuntimeException("not implemented");
}

@Override
public void delete(String path, int version) throws InterruptedException, KeeperException {
throw new RuntimeException("not implemented");
}

@Override
public boolean exists(final String path, final boolean watch) throws KeeperException, InterruptedException {
Expand Down

0 comments on commit 8154536

Please sign in to comment.