diff --git a/src/main/java/com/couchbase/client/CouchbaseClient.java b/src/main/java/com/couchbase/client/CouchbaseClient.java index dddcfcc9..5661f273 100644 --- a/src/main/java/com/couchbase/client/CouchbaseClient.java +++ b/src/main/java/com/couchbase/client/CouchbaseClient.java @@ -39,35 +39,46 @@ import com.couchbase.client.protocol.views.ViewsFetcherOperation; import com.couchbase.client.protocol.views.ViewsFetcherOperationImpl; import com.couchbase.client.vbucket.Reconfigurable; +import com.couchbase.client.vbucket.VBucketNodeLocator; import com.couchbase.client.vbucket.config.Bucket; +import com.couchbase.client.vbucket.config.VBucket; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import net.spy.memcached.AddrUtil; +import net.spy.memcached.BroadcastOpFactory; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; import net.spy.memcached.MemcachedClient; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.ObserveResponse; import net.spy.memcached.OperationTimeoutException; +import net.spy.memcached.PersistTo; +import net.spy.memcached.ReplicateTo; import net.spy.memcached.compat.CloseUtil; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.GetlOperation; +import net.spy.memcached.ops.ObserveOperation; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; @@ -208,7 +219,6 @@ public CouchbaseClient(final List baseList, final String bucketName, * want to start the changes feed later. * * @param cf the ConnectionFactory to use to create connections - * @param subscribe whether or not to subscribe to config changes * @throws IOException if connections could not be made * @throws ConfigurationException if the configuration provided by the server * has issues or is not compatible @@ -795,6 +805,188 @@ public Boolean unlock(final String key, return unlock(key, casId, transcoder); } + /** + * Set a value and Observe. + * + * @param key the key to set + * @param exp the Expiry value + * @param value the Key value + * @param req the Persistence to Master value + * @param rep the Persistence to Replicas + * @return whether or not the operation was performed + * + */ + public OperationFuture set(String key, int exp, + String value, PersistTo req, ReplicateTo rep) { + OperationFuture setOp = set(key, exp, value); + try { + if (setOp.get()) { + observePoll(key, setOp.getCas(), req, rep); + } + } catch (InterruptedException e) { + setOp.set(false, setOp.getStatus()); + } catch (ExecutionException e) { + setOp.set(false, setOp.getStatus()); + } catch (TimeoutException e) { + setOp.set(false, setOp.getStatus()); + } catch (IllegalArgumentException e) { + setOp.set(false, setOp.getStatus()); + } catch (RuntimeException e) { + setOp.set(false, setOp.getStatus()); + } + return (setOp); + } +/** + * Set a value with Observe. + * + * @param key the key to set + * @param exp the Expiry value + * @param value the Key value + * @param req the Persistence to Master value + * @return whether or not the operation was performed + * + */ + public OperationFuture set(String key, int exp, + String value, PersistTo req) { + return set(key, exp, value, req, ReplicateTo.ZERO); + } + /** + * Observe a key with a CAS. + * + * @param key the Key + * @param cas the CAS of the key + * @return ObserveReponse the Response on master and replicas + * @throws IllegalStateException in the rare circumstance where queue is too + * full to accept any more requests + */ + public ObserveResponse[] observe(final String key, final long cas) { + final ObserveResponse[] observeResponse = new + ObserveResponse[VBucket.MAX_REPLICAS]; + for (int i=0; i < VBucket.MAX_REPLICAS; i++) { + observeResponse[i] = ObserveResponse.UNINITIALIZED; + } + final CountDownLatch latch = new CountDownLatch(1); + final OperationFuture rv = + new OperationFuture(key, latch, operationTimeout); + final AtomicReference observeResult = + new AtomicReference(null); + final ConcurrentLinkedQueue ops = + new ConcurrentLinkedQueue(); + final int vb = ((VBucketNodeLocator) + ((CouchbaseConnection) mconn).getLocator()).getVBucketIndex(key); + + final int master, replicas; + + master = ((CouchbaseConnectionFactory) + connFactory).getVBucketConfig().getMaster(vb); + replicas = ((CouchbaseConnectionFactory) + connFactory).getVBucketConfig().getReplicasCount(); + + Collection allNodes = (mconn.getLocator()).getAll(); + + List masterList = new + ArrayList(1); + List replicaList = new + ArrayList(replicas); + List allList = new ArrayList(allNodes); + + VBucketNodeLocator vbNodeLocator = + ((VBucketNodeLocator) + ((CouchbaseConnection) mconn).getLocator()); + + masterList.add((MemcachedNode) + vbNodeLocator.getServerByIndex(master)); + + for (int i=0; i < replicas; i++) { + int replica = ((CouchbaseConnectionFactory) + connFactory).getVBucketConfig().getReplica(vb, i); + if (replica >= 0) { // Replica count is updated, not enough servers + replicaList.add(vbNodeLocator.getServerByIndex(replica)); + } + + } + + // Issue a Broadcast Op on the Master + CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { + public Operation newOp(final MemcachedNode n, + final CountDownLatch latch) { + final SocketAddress sa = n.getSocketAddress(); + // rv.put(sa, new HashMap()); + return opFact.observe(key, cas, vb, new ObserveOperation.Callback() { + private ObserveResponse r = null; + + public void receivedStatus(OperationStatus s) { + } + + public void gotData(String key, long retCas, + ObserveResponse or) { + observeResponse[0] = or; + if (((or == ObserveResponse.FOUND_PERSISTED) + || (or == ObserveResponse.FOUND_NOT_PERSISTED)) + && retCas != cas) { + observeResponse[0] = ObserveResponse.MODIFIED; + } + r = observeResponse[0]; + } + public void complete() { + latch.countDown(); + } + }); + } + }, masterList); + try { + blatch.await(operationTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted waiting for value", e); + } + + // Now issue a broadcast on the Replicas + + blatch = broadcastOp(new BroadcastOpFactory() { + + public Operation newOp(final MemcachedNode n, + final CountDownLatch latch) { + final SocketAddress sa = n.getSocketAddress(); + return opFact.observe(key, cas, vb, new ObserveOperation.Callback() { + + private ObserveResponse r = null; + + public void receivedStatus(OperationStatus s) { + } + + public void gotData(String key, long retCas, + ObserveResponse or) { + r = or; + for (int i = 1; i <= replicas; i++) { + if (observeResponse[i] == ObserveResponse.UNINITIALIZED) { + r = observeResponse[i] = or; + if (((or == ObserveResponse.FOUND_PERSISTED) + || (or == ObserveResponse.FOUND_NOT_PERSISTED)) + && retCas != cas) { + observeResponse[i] = ObserveResponse.MODIFIED; + } + break; + } + } + + } + + public void complete() { + latch.countDown(); + } + }); + } + }, replicaList); + try { + blatch.await(operationTimeout, TimeUnit.MILLISECONDS); + // return rv.get(operationTimeout, + // TimeUnit.MILLISECONDS); + return observeResponse; + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted waiting for value", e); + } + } + /** * Gets the number of vBuckets that are contained in the cluster. This * function is for internal use only and should rarely be since there @@ -823,4 +1015,93 @@ public boolean shutdown(long timeout, TimeUnit unit) { return shutdownResult; } + private void observePoll(String key, long cas, + PersistTo persist, ReplicateTo replicate) throws TimeoutException { + int persists, replicates; + int totPersists, totReplicas; + boolean masterPersisted; + int loop = 0; + + int replicas = ((CouchbaseConnectionFactory) + connFactory).getVBucketConfig().getReplicasCount(); + + // Other than Master + switch (persist) { + case MASTER: + persists=0; + break; + case TWO: + persists=1; + break; + case THREE: + persists=2; + break; + case FOUR: + default: + persists=3; + } + switch (replicate) { + case ZERO: + replicates=0; + break; + case ONE: + replicates=1; + break; + case TWO: + replicates=2; + break; + case THREE: + default: + replicates=3; + } + + if (replicates > replicas + || persists > replicas) { + throw new IllegalArgumentException("Requested Persists and " + + "Requested Replicates exceed number of replicas = " + + replicas); + } + + ObserveResponse[] or; + + do { + masterPersisted = false; + totPersists = totReplicas = 0; + + or = observe(key, cas); + // Assume Persisted for all cases unless modified + if ((or[0] != ObserveResponse.FOUND_NOT_PERSISTED) + && (or[0] != ObserveResponse.NOT_FOUND_NOT_PERSISTED)) { + masterPersisted = true; + } + if ((or[0]) == ObserveResponse.MODIFIED) { // Master + throw new RuntimeException("Observe - the key was modified"); + } + + for (int i=1; i < or.length; i++) { + if (or[i] == ObserveResponse.UNINITIALIZED) { + continue; // Skip to the next entry + } + if ((or[i] != ObserveResponse.FOUND_NOT_PERSISTED) + && or[i] != ObserveResponse.NOT_FOUND_NOT_PERSISTED) { + totPersists++; + totReplicas++; + } + } + // Completed the durability requirement + if (masterPersisted && (totPersists >= persists) + && (totReplicas >= replicates)) { + return; + } + try { + Thread.sleep(400); + } catch (InterruptedException e) { + getLogger().error("Interrupted while in observe loop.", e); + } + } while (loop++ < 10); + + throw new TimeoutException("Observe - Polled for Maximum retries of " + + loop); + } + } diff --git a/src/main/java/com/couchbase/client/CouchbaseClientIF.java b/src/main/java/com/couchbase/client/CouchbaseClientIF.java index 63a5e7e9..765b0197 100644 --- a/src/main/java/com/couchbase/client/CouchbaseClientIF.java +++ b/src/main/java/com/couchbase/client/CouchbaseClientIF.java @@ -26,10 +26,14 @@ import net.spy.memcached.CASValue; import net.spy.memcached.MemcachedClientIF; +import net.spy.memcached.ObserveResponse; +import net.spy.memcached.PersistTo; +import net.spy.memcached.ReplicateTo; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.transcoders.Transcoder; + /** * This interface is provided as a helper for testing clients of the * CouchbaseClient. @@ -44,6 +48,7 @@ Future> asyncGetAndLock(final String key, int exp, CASValue getAndLock(String key, int exp, Transcoder tc); CASValue getAndLock(String key, int exp); + OperationFuture asyncUnlock(final String key, long casId, final Transcoder tc); @@ -56,6 +61,11 @@ Boolean unlock(final String key, Boolean unlock(final String key, long casId); + ObserveResponse[] observe(final String key, long cas); + OperationFuture set(String key, int exp, + String value, PersistTo persist); + OperationFuture set(String key, int exp, + String value, PersistTo persist, ReplicateTo replicate); int getNumVBuckets(); } diff --git a/src/main/java/com/couchbase/client/vbucket/VBucketNodeLocator.java b/src/main/java/com/couchbase/client/vbucket/VBucketNodeLocator.java index f3a53f3c..6e993f52 100644 --- a/src/main/java/com/couchbase/client/vbucket/VBucketNodeLocator.java +++ b/src/main/java/com/couchbase/client/vbucket/VBucketNodeLocator.java @@ -95,6 +95,15 @@ public MemcachedNode getPrimary(String k) { return pNode; } + public MemcachedNode getServerByIndex(int k) { + TotalConfig totConfig = fullConfig.get(); + Config config = totConfig.getConfig(); + Map nodesMap = totConfig.getNodesMap(); + + String server = config.getServer(k); + // choose appropriate MemcachedNode according to config data + return nodesMap.get(server); + } /** * {@inheritDoc} */ diff --git a/src/test/java/com/couchbase/client/CouchbaseClientTest.java b/src/test/java/com/couchbase/client/CouchbaseClientTest.java index 602f4c39..eb61f377 100644 --- a/src/test/java/com/couchbase/client/CouchbaseClientTest.java +++ b/src/test/java/com/couchbase/client/CouchbaseClientTest.java @@ -34,7 +34,10 @@ import net.spy.memcached.BinaryClientTest; import net.spy.memcached.CASValue; import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.PersistTo; +import net.spy.memcached.ReplicateTo; import net.spy.memcached.TestConfig; +import net.spy.memcached.internal.OperationFuture; import org.junit.Ignore; /** @@ -163,6 +166,20 @@ public void testSimpleUnlock() throws Exception { assert client.set("getunltest", 1, "newvalue").get().booleanValue() : "Key was locked for too long"; } + + public void testObserve() throws Exception { + assertNull(client.get("observetest")); + OperationFuture setOp = + (((CouchbaseClient)client).set("observetest", 0, "value", + PersistTo.MASTER)); + assert setOp.get().booleanValue() + : "Key was not persisted to master"; + setOp = (((CouchbaseClient)client).set("observetest", 0, "value", + PersistTo.FOUR, ReplicateTo.THREE)); + assert !setOp.get().booleanValue() + : "Was there really 4 servers with 3 replicas" + + "for a testing system?"; + } public void testGetStatsSlabs() throws Exception { // Empty }