Skip to content

Commit

Permalink
DATACOUCH-224 - Improve concurrent save/insert, optimistic locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Anastasiia Smirnova authored and simonbasle committed May 10, 2016
1 parent 117f0d0 commit 811c024
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 14 deletions.
@@ -0,0 +1,20 @@
package org.springframework.data.couchbase.core;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

public class AsyncUtils {

public static void executeConcurrently(int numThreads, Callable<Void> task) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(numThreads);

Collection<Callable<Void>> tasks = Collections.nCopies(numThreads, task);

List<Future<Void>> futures = pool.invokeAll(tasks);
for (Future future : futures) {
future.get(numThreads, TimeUnit.SECONDS);
}
}
}
Expand Up @@ -26,12 +26,20 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.RawJsonDocument;
Expand All @@ -40,12 +48,13 @@
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.consistency.ScanConsistency;
import com.couchbase.client.java.query.dsl.Expression;
import com.couchbase.client.java.view.Stale;
import com.couchbase.client.java.view.ViewQuery;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -69,6 +78,9 @@
@TestExecutionListeners(CouchbaseTemplateViewListener.class)
public class CouchbaseTemplateTests {

@Rule
public TestName testName = new TestName();

@Autowired
private Bucket client;

Expand Down Expand Up @@ -132,7 +144,11 @@ public void insertDoesNotOverride() throws Exception {
assertEquals("Mr. A", resultConv.get("name"));

doc = new SimplePerson(id, "Mr. B");
template.insert(doc);
try {
template.insert(doc);
} catch (OptimisticLockingFailureException e) {
//ignore, since this insert should fail
}

resultDoc = client.get(id, RawJsonDocument.class);
assertNotNull(resultDoc);
Expand Down Expand Up @@ -315,7 +331,11 @@ public void versionShouldNotUpdateOnSecondInsert() throws Exception {
VersionedClass versionedClass = new VersionedClass("versionedClass:2", "foobar");
template.insert(versionedClass);
long version1 = versionedClass.getVersion();
template.insert(versionedClass);
try {
template.insert(versionedClass);
} catch (OptimisticLockingFailureException e) {
//ignore, since this insert should fail
}
long version2 = versionedClass.getVersion();

assertTrue(version1 > 0);
Expand Down Expand Up @@ -402,6 +422,67 @@ public void shouldLoadVersionPropertyOnFind() throws Exception {
assertEquals(versionedClass.getVersion(), foundClass.getVersion());
}

@Test
public void shouldUpdateAlreadyExistingDocument() throws Exception {
final String key = testName.getMethodName();
removeIfExist(key);

final AtomicLong counter = new AtomicLong();

VersionedClass initial = new VersionedClass(key, "value-0");
template.save(initial);

AsyncUtils.executeConcurrently(3, new Callable<Void>() {
@Override
public Void call() throws Exception {
boolean saved = false;
while(!saved) {
long counterValue = counter.incrementAndGet();
VersionedClass messageData = template.findById(key, VersionedClass.class);
messageData.field = "value-" + counterValue;
try {
template.save(messageData);
saved = true;
} catch (OptimisticLockingFailureException e) {
}
}
return null;
}
});

VersionedClass actual = template.findById(key, VersionedClass.class);

assertNotEquals(initial.field, actual.field);
assertNotEquals(initial.version, actual.version);
}

@Test
public void shouldInsertOnlyFirstDocumentAndNextAttemptsShouldFailWithOptimisticLockingException() throws Exception {
final String key = testName.getMethodName();
removeIfExist(key);

final AtomicLong counter = new AtomicLong();
final AtomicLong optimisticLockCounter = new AtomicLong();
AsyncUtils.executeConcurrently(5, new Callable<Void>() {
@Override
public Void call() throws Exception {
long counterValue = counter.incrementAndGet();
String data = "value-" + counterValue;
VersionedClass messageData = new VersionedClass(key, data);
try {
template.insert(messageData);
} catch (OptimisticLockingFailureException e) {
optimisticLockCounter.incrementAndGet();
}
//should save operation throw OptimisticLockingFailureException on next attempts to save?
return null;
}
});


assertEquals(4, optimisticLockCounter.intValue());
}

/**
* @see DATACOUCH-59
*/
Expand Down Expand Up @@ -647,6 +728,15 @@ public String getField() {
public void setField(String field) {
this.field = field;
}

@Override
public String toString() {
return "VersionedClass{" +
"id='" + id + '\'' +
", version=" + version +
", field='" + field + '\'' +
'}';
}
}

@Document
Expand Down
Expand Up @@ -16,27 +16,26 @@

package org.springframework.data.couchbase.repository;

import static org.junit.Assert.*;

import java.util.Arrays;
import java.util.List;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.view.Stale;
import com.couchbase.client.java.view.ViewQuery;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import org.springframework.data.couchbase.IntegrationTestApplicationConfig;
import org.springframework.data.couchbase.core.AsyncUtils;
import org.springframework.data.couchbase.core.CouchbaseQueryExecutionException;
import org.springframework.data.couchbase.core.CouchbaseTemplateTests;
import org.springframework.data.couchbase.core.mapping.Document;
import org.springframework.data.couchbase.repository.config.RepositoryOperationsMapping;
import org.springframework.data.couchbase.repository.support.CouchbaseRepositoryFactory;
Expand All @@ -46,6 +45,15 @@
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;

/**
* @author Michael Nitschinger
*/
Expand All @@ -54,6 +62,9 @@
@TestExecutionListeners(SimpleCouchbaseRepositoryListener.class)
public class SimpleCouchbaseRepositoryTests {

@Rule
public TestName testName = new TestName();

@Autowired
private Bucket client;

Expand All @@ -73,6 +84,13 @@ public void setup() throws Exception {
versionedDataRepository = factory.getRepository(VersionedDataRepository.class);
}

private void remove(String key) {
try {
client.remove(key);
} catch (DocumentDoesNotExistException e) {
}
}

@Test
public void simpleCrud() {
String key = "my_unique_user_key";
Expand Down Expand Up @@ -220,6 +238,69 @@ public void shouldTakeVersionIntoAccountWhenDoingMultipleUpdates() {
}
}

@Test
public void shouldUpdateDocumentConcurrently() throws Exception {
final String key = testName.getMethodName();
remove(key);

final AtomicLong counter = new AtomicLong();
final AtomicLong updatedCounter = new AtomicLong();
VersionedData initial = new VersionedData(key, "value-initial");
versionedDataRepository.save(initial);
assertNotEquals(0L, initial.version);

Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
boolean updated = false;
while(!updated) {
long counterValue = counter.incrementAndGet();
VersionedData messageData = versionedDataRepository.findOne(key);
messageData.data = "value-" + counterValue;
try {
versionedDataRepository.save(messageData);
updated = true;
updatedCounter.incrementAndGet();
} catch (OptimisticLockingFailureException e) {
}
}
return null;
}
};
AsyncUtils.executeConcurrently(5, task);

assertNotEquals(initial.data, versionedDataRepository.findOne(key).data);
assertEquals(5, updatedCounter.intValue());
}

@Test
public void shouldFailOnMultipleConcurrentSaves() throws Exception {
final String key = testName.getMethodName();
remove(key);

final AtomicLong counter = new AtomicLong();
final AtomicLong optimisticLockCounter = new AtomicLong();

Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
long counterValue = counter.incrementAndGet();
VersionedData messageData = new VersionedData(key, "value-" + counterValue);
try {
versionedDataRepository.save(messageData);
} catch (OptimisticLockingFailureException e) {
optimisticLockCounter.incrementAndGet();
}
return null;
}
};

AsyncUtils.executeConcurrently(5, task);

assertEquals(4, optimisticLockCounter.intValue());
}


public interface VersionedDataRepository extends CouchbaseRepository<VersionedData, String> { }

@Document
Expand Down
Expand Up @@ -35,7 +35,6 @@
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.error.TranscodingException;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.N1qlQueryResult;
Expand All @@ -49,7 +48,6 @@
import com.couchbase.client.java.view.ViewRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.dao.OptimisticLockingFailureException;
Expand Down Expand Up @@ -529,14 +527,22 @@ private void doPersist(Object objectToPersist, final PersistTo persistTo, final
public Boolean doInBucket() throws InterruptedException, ExecutionException {
Document<String> doc = encodeAndWrap(converted, version);
Document<String> storedDoc;
boolean checkVersion = version != null && version > 0L;
//We will check version only if required
boolean versionPresent = versionProperty != null;
//If version is not set - assumption that document is new, otherwise updating
boolean existingDocument = version != null && version > 0L;
try {
switch (persistType) {
case SAVE:
if (checkVersion) {
if (!versionPresent) {

This comment has been minimized.

Copy link
@chaeyk

chaeyk Aug 26, 2016

Contributor

Why changed like this?
There is no way to upsert document when model has version property.
I think if there is no version property or version value is not set, upsert() should be executed.

//No version field - no cas
storedDoc = client.upsert(doc, persistTo, replicateTo);
} else if (existingDocument) {
//Updating existing document with cas
storedDoc = client.replace(doc, persistTo, replicateTo);
} else {
storedDoc = client.upsert(doc, persistTo, replicateTo);
//Creating new document
storedDoc = client.insert(doc, persistTo, replicateTo);
}
break;
case UPDATE:
Expand All @@ -554,6 +560,9 @@ public Boolean doInBucket() throws InterruptedException, ExecutionException {
return true;
}
return false;
} catch (DocumentAlreadyExistsException e) {
throw new OptimisticLockingFailureException(persistType.getSpringDataOperationName() +
" document with version value failed: " + version, e);
} catch (CASMismatchException e) {
throw new OptimisticLockingFailureException(persistType.getSpringDataOperationName() +
" document with version value failed: " + version, e);
Expand Down

0 comments on commit 811c024

Please sign in to comment.