Skip to content

Commit

Permalink
Add support for synchronous set via Observe operation.
Browse files Browse the repository at this point in the history
This change introduces a simple synchronous set which
uses the underlying observe command introduced in
spymemcached 2.8.3.  This command is a binary command only
and is in Couchbase Server 2.0 (approx. build 1495) only.

Conflicts:
	src/test/java/com/couchbase/client/CouchbaseClientTest.java

Change-Id: I6a1e8af54ef13d4a40a5dcc21bb7a939fb63499f
Reviewed-on: http://review.couchbase.org/19068
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information
ragsns authored and ingenthr committed Aug 19, 2012
1 parent 93e59f0 commit 4460d26
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 2 deletions.
285 changes: 283 additions & 2 deletions src/main/java/com/couchbase/client/CouchbaseClient.java
Expand Up @@ -39,35 +39,46 @@
import com.couchbase.client.protocol.views.ViewsFetcherOperation;
import com.couchbase.client.protocol.views.ViewsFetcherOperationImpl;
import com.couchbase.client.vbucket.Reconfigurable;
import com.couchbase.client.vbucket.VBucketNodeLocator;
import com.couchbase.client.vbucket.config.Bucket;
import com.couchbase.client.vbucket.config.VBucket;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.ObserveResponse;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.PersistTo;
import net.spy.memcached.ReplicateTo;
import net.spy.memcached.compat.CloseUtil;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.GetlOperation;
import net.spy.memcached.ops.ObserveOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
Expand Down Expand Up @@ -208,7 +219,6 @@ public CouchbaseClient(final List<URI> baseList, final String bucketName,
* want to start the changes feed later.
*
* @param cf the ConnectionFactory to use to create connections
* @param subscribe whether or not to subscribe to config changes
* @throws IOException if connections could not be made
* @throws ConfigurationException if the configuration provided by the server
* has issues or is not compatible
Expand Down Expand Up @@ -795,6 +805,188 @@ public Boolean unlock(final String key,
return unlock(key, casId, transcoder);
}

/**
* Set a value and Observe.
*
* @param key the key to set
* @param exp the Expiry value
* @param value the Key value
* @param req the Persistence to Master value
* @param rep the Persistence to Replicas
* @return whether or not the operation was performed
*
*/
public OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo req, ReplicateTo rep) {
OperationFuture<Boolean> setOp = set(key, exp, value);
try {
if (setOp.get()) {
observePoll(key, setOp.getCas(), req, rep);
}
} catch (InterruptedException e) {
setOp.set(false, setOp.getStatus());
} catch (ExecutionException e) {
setOp.set(false, setOp.getStatus());
} catch (TimeoutException e) {
setOp.set(false, setOp.getStatus());
} catch (IllegalArgumentException e) {
setOp.set(false, setOp.getStatus());
} catch (RuntimeException e) {
setOp.set(false, setOp.getStatus());
}
return (setOp);
}
/**
* Set a value with Observe.
*
* @param key the key to set
* @param exp the Expiry value
* @param value the Key value
* @param req the Persistence to Master value
* @return whether or not the operation was performed
*
*/
public OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo req) {
return set(key, exp, value, req, ReplicateTo.ZERO);
}
/**
* Observe a key with a CAS.
*
* @param key the Key
* @param cas the CAS of the key
* @return ObserveReponse the Response on master and replicas
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public ObserveResponse[] observe(final String key, final long cas) {
final ObserveResponse[] observeResponse = new
ObserveResponse[VBucket.MAX_REPLICAS];
for (int i=0; i < VBucket.MAX_REPLICAS; i++) {
observeResponse[i] = ObserveResponse.UNINITIALIZED;
}
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<ObserveResponse> rv =
new OperationFuture<ObserveResponse>(key, latch, operationTimeout);
final AtomicReference<ObserveResponse> observeResult =
new AtomicReference<ObserveResponse>(null);
final ConcurrentLinkedQueue<Operation> ops =
new ConcurrentLinkedQueue<Operation>();
final int vb = ((VBucketNodeLocator)
((CouchbaseConnection) mconn).getLocator()).getVBucketIndex(key);

final int master, replicas;

master = ((CouchbaseConnectionFactory)
connFactory).getVBucketConfig().getMaster(vb);
replicas = ((CouchbaseConnectionFactory)
connFactory).getVBucketConfig().getReplicasCount();

Collection allNodes = (mconn.getLocator()).getAll();

List<MemcachedNode> masterList = new
ArrayList<MemcachedNode>(1);
List<MemcachedNode> replicaList = new
ArrayList<MemcachedNode>(replicas);
List<MemcachedNode> allList = new ArrayList(allNodes);

VBucketNodeLocator vbNodeLocator =
((VBucketNodeLocator)
((CouchbaseConnection) mconn).getLocator());

masterList.add((MemcachedNode)
vbNodeLocator.getServerByIndex(master));

for (int i=0; i < replicas; i++) {
int replica = ((CouchbaseConnectionFactory)
connFactory).getVBucketConfig().getReplica(vb, i);
if (replica >= 0) { // Replica count is updated, not enough servers
replicaList.add(vbNodeLocator.getServerByIndex(replica));
}

}

// Issue a Broadcast Op on the Master
CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() {
public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
final SocketAddress sa = n.getSocketAddress();
// rv.put(sa, new HashMap<String, String>());
return opFact.observe(key, cas, vb, new ObserveOperation.Callback() {
private ObserveResponse r = null;

public void receivedStatus(OperationStatus s) {
}

public void gotData(String key, long retCas,
ObserveResponse or) {
observeResponse[0] = or;
if (((or == ObserveResponse.FOUND_PERSISTED)
|| (or == ObserveResponse.FOUND_NOT_PERSISTED))
&& retCas != cas) {
observeResponse[0] = ObserveResponse.MODIFIED;
}
r = observeResponse[0];
}
public void complete() {
latch.countDown();
}
});
}
}, masterList);
try {
blatch.await(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
}

// Now issue a broadcast on the Replicas

blatch = broadcastOp(new BroadcastOpFactory() {

public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
final SocketAddress sa = n.getSocketAddress();
return opFact.observe(key, cas, vb, new ObserveOperation.Callback() {

private ObserveResponse r = null;

public void receivedStatus(OperationStatus s) {
}

public void gotData(String key, long retCas,
ObserveResponse or) {
r = or;
for (int i = 1; i <= replicas; i++) {
if (observeResponse[i] == ObserveResponse.UNINITIALIZED) {
r = observeResponse[i] = or;
if (((or == ObserveResponse.FOUND_PERSISTED)
|| (or == ObserveResponse.FOUND_NOT_PERSISTED))
&& retCas != cas) {
observeResponse[i] = ObserveResponse.MODIFIED;
}
break;
}
}

}

public void complete() {
latch.countDown();
}
});
}
}, replicaList);
try {
blatch.await(operationTimeout, TimeUnit.MILLISECONDS);
// return rv.get(operationTimeout,
// TimeUnit.MILLISECONDS);
return observeResponse;
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
}
}

/**
* Gets the number of vBuckets that are contained in the cluster. This
* function is for internal use only and should rarely be since there
Expand Down Expand Up @@ -823,4 +1015,93 @@ public boolean shutdown(long timeout, TimeUnit unit) {
return shutdownResult;
}

private void observePoll(String key, long cas,
PersistTo persist, ReplicateTo replicate) throws TimeoutException {
int persists, replicates;
int totPersists, totReplicas;
boolean masterPersisted;
int loop = 0;

int replicas = ((CouchbaseConnectionFactory)
connFactory).getVBucketConfig().getReplicasCount();

// Other than Master
switch (persist) {
case MASTER:
persists=0;
break;
case TWO:
persists=1;
break;
case THREE:
persists=2;
break;
case FOUR:
default:
persists=3;
}
switch (replicate) {
case ZERO:
replicates=0;
break;
case ONE:
replicates=1;
break;
case TWO:
replicates=2;
break;
case THREE:
default:
replicates=3;
}

if (replicates > replicas
|| persists > replicas) {
throw new IllegalArgumentException("Requested Persists and "
+ "Requested Replicates exceed number of replicas = "
+ replicas);
}

ObserveResponse[] or;

do {
masterPersisted = false;
totPersists = totReplicas = 0;

or = observe(key, cas);
// Assume Persisted for all cases unless modified
if ((or[0] != ObserveResponse.FOUND_NOT_PERSISTED)
&& (or[0] != ObserveResponse.NOT_FOUND_NOT_PERSISTED)) {
masterPersisted = true;
}
if ((or[0]) == ObserveResponse.MODIFIED) { // Master
throw new RuntimeException("Observe - the key was modified");
}

for (int i=1; i < or.length; i++) {
if (or[i] == ObserveResponse.UNINITIALIZED) {
continue; // Skip to the next entry
}
if ((or[i] != ObserveResponse.FOUND_NOT_PERSISTED)
&& or[i] != ObserveResponse.NOT_FOUND_NOT_PERSISTED) {
totPersists++;
totReplicas++;
}
}
// Completed the durability requirement
if (masterPersisted && (totPersists >= persists)
&& (totReplicas >= replicates)) {
return;
}
try {
Thread.sleep(400);
} catch (InterruptedException e) {
getLogger().error("Interrupted while in observe loop.", e);
}
} while (loop++ < 10);

throw new TimeoutException("Observe - Polled for Maximum retries of "
+ loop);
}

}
10 changes: 10 additions & 0 deletions src/main/java/com/couchbase/client/CouchbaseClientIF.java
Expand Up @@ -26,10 +26,14 @@

import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.ObserveResponse;
import net.spy.memcached.PersistTo;
import net.spy.memcached.ReplicateTo;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.transcoders.Transcoder;



/**
* This interface is provided as a helper for testing clients of the
* CouchbaseClient.
Expand All @@ -44,6 +48,7 @@ <T> Future<CASValue<T>> asyncGetAndLock(final String key, int exp,
<T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc);

CASValue<Object> getAndLock(String key, int exp);

<T> OperationFuture<Boolean> asyncUnlock(final String key,
long casId, final Transcoder<T> tc);

Expand All @@ -56,6 +61,11 @@ <T> Boolean unlock(final String key,
Boolean unlock(final String key,
long casId);

ObserveResponse[] observe(final String key, long cas);

OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo persist);
OperationFuture<Boolean> set(String key, int exp,
String value, PersistTo persist, ReplicateTo replicate);
int getNumVBuckets();
}
Expand Up @@ -95,6 +95,15 @@ public MemcachedNode getPrimary(String k) {
return pNode;
}

public MemcachedNode getServerByIndex(int k) {
TotalConfig totConfig = fullConfig.get();
Config config = totConfig.getConfig();
Map<String, MemcachedNode> nodesMap = totConfig.getNodesMap();

String server = config.getServer(k);
// choose appropriate MemcachedNode according to config data
return nodesMap.get(server);
}
/**
* {@inheritDoc}
*/
Expand Down

0 comments on commit 4460d26

Please sign in to comment.