Skip to content

Commit

Permalink
https://github.com/javers/javers/issues/186
Browse files Browse the repository at this point in the history
fixed concurrency issue in CommitSeqGenerator
  • Loading branch information
bartoszwalacik committed Aug 8, 2015
1 parent 619ba40 commit 9ccc152
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 43 deletions.
1 change: 0 additions & 1 deletion build.gradle
Expand Up @@ -37,7 +37,6 @@ subprojects {

dependencies {
testCompile 'junit:junit:4.11'
//testCompile 'org.codehaus.groovy:groovy-all:2.3.3'
testCompile 'org.spockframework:spock-core:1.0-groovy-2.4'
testCompile 'org.apache.commons:commons-lang3:3.3.2'
testCompile 'ch.qos.logback:logback-classic:1.1.2'
Expand Down
Expand Up @@ -71,7 +71,7 @@ public enum JaversExceptionCode {
"it doesn't exists in JaversRepository"),

CANT_FIND_COMMIT_HEAD_ID(JaversException.RUNTIME_ERROR+"can't find commit head id in JaversRepository"),
CANT_SAVE_ALREADY_PERSISTED_COMMIT(JaversException.RUNTIME_ERROR+"can't save already persisted commit"),
CANT_SAVE_ALREADY_PERSISTED_COMMIT(JaversException.RUNTIME_ERROR+"can't save already persisted commit '%s'"),

SQL_EXCEPTION(JaversException.RUNTIME_ERROR+"SqlException: %s"),

Expand Down
Expand Up @@ -7,22 +7,23 @@
* @author bartosz walacik
*/
class CommitSeqGenerator {
private HandedOutIds handedOut = new HandedOutIds();

private int seq;
private CommitId lastReturned;
public synchronized CommitId nextId(CommitId head)
{
Long major = getHeadMajorId(head) + 1;

public synchronized CommitId nextId(CommitId head) {
long major = getHeadMajorId(head) + 1;
CommitId lastReturned = handedOut.get(major);

if (lastReturned!= null && major == lastReturned.getMajorId()){
seq++;
CommitId result;
if (lastReturned == null){
result = new CommitId(major,0);
}
else{
seq = 0;
else {
result = new CommitId(major, lastReturned.getMinorId() + 1);
}

CommitId result = new CommitId(major, seq);
lastReturned = result;
handedOut.put(result);
return result;
}

Expand Down
57 changes: 57 additions & 0 deletions javers-core/src/main/java/org/javers/core/commit/HandedOutIds.java
@@ -0,0 +1,57 @@
package org.javers.core.commit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* @author bartosz.walacik
*/
class HandedOutIds {
private static final Logger logger = LoggerFactory.getLogger(HandedOutIds.class);

private int limit = 5;

private List<CommitId> handedOutList = new ArrayList<>();

void put (CommitId handedOut) {
if (handedOutList.size() == limit) {
handedOutList.remove(limit - 1);
}

int found = findIndex(handedOut.getMajorId());

if (found < 0){
handedOutList.add(0, handedOut);
} else {
handedOutList.remove(found);
handedOutList.add(found, handedOut);
}
}

private int findIndex(Long majorId){
for (int i=0; i<handedOutList.size(); i++){
CommitId c = handedOutList.get(i);
if (c.getMajorId() == majorId){
return i;
}

if (c.getMajorId() < majorId){
return -1;
}
}
return -1;
}

CommitId get(Long majorId) {
for (CommitId id : handedOutList){
if (id.getMajorId() == majorId){
return id;
}
}
return null;
}

}
Expand Up @@ -16,6 +16,7 @@
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Collections.unmodifiableList;

Expand All @@ -27,7 +28,7 @@
class InMemoryRepository implements JaversRepository {
private static final Logger logger = LoggerFactory.getLogger(InMemoryRepository.class);

private Map<GlobalId, LinkedList<CdoSnapshot>> snapshots = new HashMap<>();
private Map<GlobalId, LinkedList<CdoSnapshot>> snapshots = new ConcurrentHashMap<>();

private CommitId head;

Expand Down Expand Up @@ -161,7 +162,7 @@ public int compare(CdoSnapshot o1, CdoSnapshot o2) {
return all;
}

private void persist(CdoSnapshot snapshot) {
private synchronized void persist(CdoSnapshot snapshot) {
LinkedList<CdoSnapshot> states = snapshots.get(snapshot.getGlobalId());
if (states == null){
states = new LinkedList<>();
Expand Down
Expand Up @@ -9,6 +9,10 @@ import org.joda.time.LocalDate
import spock.lang.Specification
import spock.lang.Unroll

import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger

import static org.javers.core.JaversBuilder.javers
import static org.javers.repository.jql.InstanceIdDTO.instanceId
import static org.javers.repository.jql.UnboundedValueObjectIdDTO.unboundedValueObjectId
Expand All @@ -24,6 +28,41 @@ class JaversRepositoryE2ETest extends Specification {
javers = javers().build()
}

def setupSpec(){
System.setProperty("isDatabaseNotMultithreaded","true")
}

def "should allow concurrent writes"(){
given:
def executor = Executors.newFixedThreadPool(20)
def futures = new ArrayList()
def cnt = new AtomicInteger()
def sId = 222
def threads = 20
//initial commit
javers.commit("author", new SnapshotEntity(id: sId, intProperty: cnt.incrementAndGet()))

when:
(1..threads).each{
futures << executor.submit({
try {
javers.commit("author", new SnapshotEntity(id: sId, intProperty: cnt.incrementAndGet()))
} catch (Exception e){
println "Exception: "+ e
}
} as Callable)
}

while( futures.count { it.done } < threads){
println "waiting for all threads, " + futures.count { it.done } + " threads have finished ..."
Thread.currentThread().sleep(10)
}
println futures.count { it.done } + " threads have finished ..."

then:
javers.findSnapshots(QueryBuilder.byInstanceId(sId, SnapshotEntity).build()).size() == threads + 1
}

def "should query for ValueObject changes by owning Entity class"() {
given:
def data = [ new DummyUserDetails(id:1, dummyAddress: new DummyAddress(city:"London")),
Expand Down
Expand Up @@ -36,18 +36,13 @@ class CommitSeqGeneratorTest extends Specification {
def "should inc minor when the same head"() {
given:
def commitSeqGenerator = new CommitSeqGenerator()
def head = commitSeqGenerator.nextId(null)

when:
def gen1 = commitSeqGenerator.nextId(head)
def gen2 = commitSeqGenerator.nextId(head)
def gen3 = commitSeqGenerator.nextId(head)
def gen4 = commitSeqGenerator.nextId(gen2)

then:
gen1.value() == "2.0"
gen2.value() == "2.1"
gen3.value() == "2.2"
gen4.value() == "3.0"
def commit1 = commitSeqGenerator.nextId(null) //1.0
def commit2 = commitSeqGenerator.nextId(commit1) //2.0

expect:
commitSeqGenerator.nextId(commit1) == new CommitId(2,1)
commitSeqGenerator.nextId(commit2) == new CommitId(3,0)
commitSeqGenerator.nextId(commit1) == new CommitId(2,2)
commitSeqGenerator.nextId(commit2) == new CommitId(3,1)
}
}
Expand Up @@ -26,12 +26,13 @@ class MongoRepositoryFongoIntTest extends Specification {
given:
def javersTestBuilder = JaversTestBuilder.javersTestAssembly()
def mongoRepository = new MongoRepository(mongoDb, javersTestBuilder.jsonConverter)
def commitFactory = javersTestBuilder.commitFactory

def kazikV1 = dummyUser("Kazik").withAge(1).build()
def kazikV2 = dummyUser("Kazik").withAge(2).build()

def commit1 = javersTestBuilder.commitFactory.create("author", kazikV1)
def commit2 = javersTestBuilder.commitFactory.create("author", kazikV2)
def commit1 = commitFactory.create("author", kazikV1)
def commit2 = commitFactory.create("author", kazikV2)

when:
mongoRepository.persist(commit1)
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void persist(Commit commit) {
Optional<Long> primaryKey = commitRepository.getCommitPrimaryKey(commit);

if (primaryKey.isPresent()) {
throw new JaversException(JaversExceptionCode.CANT_SAVE_ALREADY_PERSISTED_COMMIT);
throw new JaversException(JaversExceptionCode.CANT_SAVE_ALREADY_PERSISTED_COMMIT, commit.getId());
}

long commitPk = commitRepository.save(commit.getAuthor(), commit.getCommitDate(), commit.getId());
Expand Down
@@ -1,10 +1,11 @@
package org.javers.repository.sql

import org.h2.tools.Server
import org.javers.core.JaversRepositoryE2ETest
import org.javers.core.model.SnapshotEntity
import org.javers.repository.jql.QueryBuilder
import org.javers.repository.sql.reposiotries.PersistentGlobalId
import spock.lang.Ignore
import spock.lang.Shared

import java.sql.Connection
import java.sql.DriverManager
Expand All @@ -14,22 +15,23 @@ import static org.javers.core.JaversBuilder.javers
class JaversSqlRepositoryE2ETest extends JaversRepositoryE2ETest {

protected Connection getConnection() {
DriverManager.getConnection("jdbc:h2:tcp://localhost:9092/mem:test;")//TRACE_LEVEL_SYSTEM_OUT=2")
DriverManager.getConnection("jdbc:h2:mem:")//TRACE_LEVEL_SYSTEM_OUT=2")
}

protected DialectName getDialect() {
DialectName.H2
}

@Shared
Connection dbConnection

@Override
def setup() {
Server.createTcpServer().start()
def setupSpec(){
dbConnection = getConnection()

dbConnection.setAutoCommit(false)
}

@Override
def setup() {
def connectionProvider = { dbConnection } as ConnectionProvider

def sqlRepository = SqlRepositoryBuilder
Expand All @@ -49,6 +51,14 @@ class JaversSqlRepositoryE2ETest extends JaversRepositoryE2ETest {
execute("delete from jv_cdo_class")
}

def cleanup() {
dbConnection.rollback()
}

def cleanupSpec() {
dbConnection.close()
}

def execute(String sql) {
def stmt = dbConnection.createStatement()
stmt.executeUpdate(sql)
Expand All @@ -58,7 +68,7 @@ class JaversSqlRepositoryE2ETest extends JaversRepositoryE2ETest {

def "should not interfere with user transactions"() {
given:
def anEntity = new SnapshotEntity(id:1)
def anEntity = new SnapshotEntity(id: 1)

when:
javers.commit("author", anEntity)
Expand All @@ -79,7 +89,7 @@ class JaversSqlRepositoryE2ETest extends JaversRepositoryE2ETest {

def "should preserve globalId.pk as PersistentGlobalId to minimize number of queries"() {
given:
def anEntity = new SnapshotEntity(id:1)
def anEntity = new SnapshotEntity(id: 1)
javers.commit("author", anEntity)

when:
Expand All @@ -90,9 +100,4 @@ class JaversSqlRepositoryE2ETest extends JaversRepositoryE2ETest {
commit.snapshots.get(0).globalId instanceof PersistentGlobalId
commit.snapshots.get(0).globalId.primaryKey > 0
}

def cleanup() {
dbConnection.rollback()
dbConnection.close()
}
}

0 comments on commit 9ccc152

Please sign in to comment.