Permalink
Browse files

fixed up observeWithStale test

  • Loading branch information...
1 parent 66b8f32 commit 3d788ab9d3a88c1dc20717c4dd110e3a8bb5f5bc @mnunberg committed Nov 10, 2012
View
2 src/test/java/com/couchbase/client/CbTestConfig.java
@@ -29,7 +29,7 @@
public static final String CLUSTER_PASS_PROP = "cluster.password";
public static final String CLUSTER_ADMINNAME_PROP = "cluster.adminname";
public static final String CLUSTER_PASS =
- System.getProperty(CLUSTER_PASS_PROP, "password");
+ System.getProperty(CLUSTER_PASS_PROP, "123456");
public static final String CLUSTER_ADMINNAME =
System.getProperty(CLUSTER_ADMINNAME_PROP, "Administrator");
View
82 src/test/java/com/couchbase/client/ViewTest.java
@@ -56,6 +56,7 @@
import net.spy.memcached.PersistTo;
import net.spy.memcached.ReplicateTo;
import net.spy.memcached.TestConfig;
+import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.OperationStatus;
import org.apache.http.HttpResponse;
@@ -118,7 +119,7 @@ protected static void initClient() throws Exception {
public static void before() throws Exception {
BucketTool bucketTool = new BucketTool();
bucketTool.deleteAllBuckets();
- bucketTool.createDefaultBucket(BucketType.COUCHBASE, 256, 0);
+ bucketTool.createDefaultBucket(BucketType.COUCHBASE, 256, 1);
BucketTool.FunctionCallback callback = new FunctionCallback() {
@Override
@@ -179,7 +180,7 @@ public String success(long elapsedTime) {
client.shutdown();
System.out.println("Setup of design docs complete, "
+ "sleeping until they propogate.");
- Thread.sleep(5000);
+ Thread.sleep(10000);
}
@Before
@@ -750,12 +751,76 @@ public void testInvalidDesignDocHandling() {
@Test
public void testObserveWithStaleFalse()
throws InterruptedException, ExecutionException {
- int docAmount = 500;
- for (int i = 1; i <= docAmount; i++) {
- String value = "{\"type\":\"observetest\",\"value\":"+i+"}";
- client.set("observetest"+i, 0, value, PersistTo.MASTER, ReplicateTo.ONE);
+ int expectAmount = 0;
+
+ class OpCounter {
+ public int numRemaining = 0;
+ public int numSuccess = 0;
+
+ public OpCounter(int initVal) {
+ numRemaining = initVal;
+ }
+
+ public synchronized void submitResult(OperationStatus status) {
+ if (status.isSuccess()) {
+ numSuccess++;
+ return;
+ }
+ System.err.printf("Couldn't submit op: %s\n", status.getMessage());
+ }
+
+ public synchronized int getKeySuffix() {
+ numRemaining--;
+ return numRemaining;
+ }
+
+ };
+
+ class KeySetter extends Thread {
+ private final OpCounter opCounter;
+ public KeySetter(OpCounter oc) {
+ opCounter = oc;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+
+ int id = opCounter.getKeySuffix();
+ if (id < 0) {
+ System.out.println("Done enqueuing operations");
+ break;
+ }
+
+ String key = "observetest" + id;
+ String value = "{\"type\":\"observetest\",\"value\":"+id+"}";
+ OperationFuture<Boolean> ft = client.set(
+ key, 0, value, PersistTo.MASTER, ReplicateTo.ONE);
+ opCounter.submitResult(ft.getStatus());
+ }
+ }
}
-
+
+ int workerCount = 1;
+ OpCounter globOc = new OpCounter(500);
+ KeySetter[] setters = new KeySetter[workerCount];
+
+ for (int i = 0; i < workerCount; i++) {
+ setters[i] = new KeySetter(globOc);
+ setters[i].start();
+ }
+
+ System.out.printf("Waiting for %d threads to finish..\n", workerCount);
+
+ // Now wait for them to complete..
+ for (int i = 0; i < workerCount; i++) {
+ setters[i].join();
+ }
+ expectAmount = globOc.numSuccess;
+
+ assert expectAmount > 0 : "Not enough successes..";
+ System.out.printf("Have %d keys to wait for..\n", expectAmount);
+
Query query = new Query().setStale(Stale.FALSE);
View view = client.getView(DESIGN_DOC_OBSERVE, VIEW_NAME_OBSERVE);
@@ -771,7 +836,8 @@ public void testObserveWithStaleFalse()
returnedRows.add(row);
}
- assertEquals(docAmount, returnedRows.size());
+ assertEquals(expectAmount, returnedRows.size());
+ System.out.println("Done!\n");
}
/**

0 comments on commit 3d788ab

Please sign in to comment.