Skip to content

Commit

Permalink
Concurent Cache Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vzakharchenko committed Mar 19, 2020
1 parent 01109fe commit b72ca9c
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String getUuid() {

public boolean valid(QueryStatistic queryStatistic, TransactionalCache transactionCache) {
return queryStatistic.getTables().stream().allMatch(qTable -> {
String key = StringUtils.upperCase(qTable.getTableName());
StatisticCacheKey key = new StatisticCacheKey(qTable.getTableName());
String id = transactionCache.getFromCache(
key, String.class);
if (id == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.github.vzakharchenko.dynamic.orm.core.query.cache;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.Objects;

public class StatisticCacheKey implements Serializable {
private final String name;

public StatisticCacheKey(String name) {
this.name = StringUtils.upperCase(name);
}

public String getName() {
return name;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StatisticCacheKey that = (StatisticCacheKey) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.github.vzakharchenko.dynamic.orm.core.helper.CacheHelper;
import com.github.vzakharchenko.dynamic.orm.core.helper.CompositeKey;
import com.github.vzakharchenko.dynamic.orm.core.query.QueryContextImpl;
import com.github.vzakharchenko.dynamic.orm.core.query.cache.StatisticCacheKey;
import com.github.vzakharchenko.dynamic.orm.core.transaction.cache.TransactionalCache;
import com.querydsl.core.types.Path;
import com.querydsl.sql.RelationalPath;
Expand Down Expand Up @@ -133,7 +134,8 @@ public void afterUpdate(Map<CompositeKey, DiffColumnModel> diffColumnModelMap) {
@Override
public void cleanQueryCache() {
TransactionalCache transactionCache = queryContext.getTransactionCache();
transactionCache.cacheEvict(StringUtils.upperCase(qTable.getTableName()));
transactionCache.cacheEvict(
new StatisticCacheKey(StringUtils.upperCase(qTable.getTableName())));
}

private Map<CompositeKey, DiffColumnModel> softDeletedMap(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.github.vzakharchenko.dynamic.orm.core.statistic;

import com.github.vzakharchenko.dynamic.orm.core.query.cache.StatisticCacheHolder;
import com.github.vzakharchenko.dynamic.orm.core.query.cache.StatisticCacheKey;
import com.github.vzakharchenko.dynamic.orm.core.transaction.cache.TransactionalCache;
import com.google.common.collect.ImmutableList;
import com.querydsl.sql.RelationalPath;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.*;
Expand Down Expand Up @@ -36,7 +36,7 @@ public StatisticCacheHolder get(TransactionalCache transactionalCache,
List<? extends Serializable> primaryKeys) {
String uuid = UUID.randomUUID().toString();
getTables().forEach(qTable -> transactionalCache
.putToCache(StringUtils.upperCase(qTable.getTableName()), uuid));
.putToCache(new StatisticCacheKey(qTable.getTableName()), uuid));
StatisticCacheHolder statisticCacheHolder = new StatisticCacheHolder(primaryKeys, uuid);
transactionalCache.putToCache(sql, statisticCacheHolder);
return statisticCacheHolder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.vzakharchenko.dynamic.orm.core.transaction.cache;

import com.github.vzakharchenko.dynamic.orm.core.helper.CompositeKey;
import com.github.vzakharchenko.dynamic.orm.core.query.cache.StatisticCacheKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -10,7 +11,8 @@
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.io.Serializable;
import java.util.function.BiConsumer;
import java.util.Objects;
import java.util.UUID;

public class OrmTransactionSynchronizationAdapter extends TransactionSynchronizationAdapter {

Expand Down Expand Up @@ -42,17 +44,29 @@ private void evict(String message, Serializable evictKey) {
evict(evictKey);
}


private void evict(String cacheValue, StatisticCacheKey key, Serializable value) {
if (!Objects.equals(cacheValue, value)) {
targetCache.evict(key);
}
}

private void afterCompletion(StatisticCacheKey key, Serializable value) {
String s = targetCache.get(key, String.class);
if (s != null) {
evict(s, key, value);
} else {
targetCache.put(key, value);
}
}

private void afterCompletion(TransactionalCache transactionalCache) {
if (transactionalCache != null) {
transactionalCache.getInternalCache().forEach(new BiConsumer<Serializable, Serializable>() {
@Override
public void accept(Serializable key, Serializable value) {
Cache.ValueWrapper valueWrapper = targetCache.get(key);
if (valueWrapper != null && valueWrapper.get() != null) {
targetCache.evict(key);
} else {
targetCache.put(key, value);
}
transactionalCache.getInternalCache().forEach((key, value) -> {
if (key instanceof StatisticCacheKey) {
afterCompletion((StatisticCacheKey) key, value);
} else {
targetCache.put(key, value);
}
});
transactionalCache.getEvictObjects().forEach(evictKey ->
Expand All @@ -69,6 +83,7 @@ public void accept(Serializable key, Serializable value) {
private void evict(CompositeKey compositeKey) {
String key = StringUtils.upperCase(compositeKey.getTable().getTableName());
targetCache.evict(key);
targetCache.put(new StatisticCacheKey(key), UUID.randomUUID().toString());
}

private void evict(Serializable evictKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ public boolean putModelToTargetCache(Serializable key, Serializable value) {
if (transactionCache.containsKey(key)) {
return false;
}

targetCache.put(key, value);
transactionCache.put(key, value);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.github.vzakharchenko.dynamic.orm.core.dynamic.QDynamicTable;
import com.github.vzakharchenko.dynamic.orm.core.dynamic.dml.DynamicTableModel;
import com.github.vzakharchenko.dynamic.orm.core.pk.PrimaryKeyGenerators;
import org.apache.commons.collections4.ListUtils;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
Expand All @@ -23,13 +24,26 @@ public class ConcurrentSelectCacheTest extends DebugAnnotationTestQueryOrm {

public void createSchema() {
ormQueryFactory.transactionManager().startTransactionIfNeeded();
qDynamicTableFactory.buildTables("DynamicTable")
qDynamicTableFactory

.buildTables("DynamicTable")
.columns().addStringColumn("Id").size(255).useAsPrimaryKey().create()
.addDateTimeColumn("modificationTime").notNull().create()
.addStringColumn("TestColumn").size(255).create()
.finish()
.addPrimaryKey().addPrimaryKeyGenerator(PrimaryKeyGenerators.UUID.getPkGenerator()).finish()
.addVersionColumn("modificationTime").finish().buildSchema();
.addVersionColumn("modificationTime")

.buildNextTable("DynamicTable2")
.columns()
.addStringColumn("Id1").size(255).useAsPrimaryKey().create()
.addStringColumn("Id2").size(255).useAsPrimaryKey().create()
.addStringColumn("TestColumn").size(255).create()
.addDateTimeColumn("modificationTime").notNull().create()
.finish()
.addVersionColumn("modificationTime")

.finish().buildSchema();
ormQueryFactory.transactionManager().commit();
}

Expand All @@ -43,6 +57,18 @@ public void insertThread(String value) {
ormQueryFactory.transactionManager().commit();
}

public void insertThread2(String value) {
TransactionSynchronizationManager.clear();
QDynamicTable dynamicTable = qDynamicTableFactory.getQDynamicTableByName("DynamicTable2");
DynamicTableModel dynamicTableModel = new DynamicTableModel(dynamicTable);
dynamicTableModel.addColumnValue("Id1", value);
dynamicTableModel.addColumnValue("Id2", value);
dynamicTableModel.addColumnValue("TestColumn", value);
ormQueryFactory.transactionManager().startTransactionIfNeeded();
ormQueryFactory.insert(dynamicTableModel);
ormQueryFactory.transactionManager().commit();
}


public void selectThread() throws InterruptedException {
ormQueryFactory.transactionManager().startTransactionIfNeeded();
Expand All @@ -55,13 +81,39 @@ public void selectThread() throws InterruptedException {
}


public void selectThread12() throws InterruptedException {
QDynamicTable dynamicTable1 = qDynamicTableFactory.getQDynamicTableByName("DynamicTable");
QDynamicTable dynamicTable2 = qDynamicTableFactory.getQDynamicTableByName("DynamicTable2");
for (int i = 0; i < 400; i++) {
ormQueryFactory.transactionManager().startTransactionIfNeeded();
List<RawModel> rawModels = ormQueryFactory.selectCache().rawSelect(
ormQueryFactory.buildQuery().from(dynamicTable1).innerJoin(dynamicTable2).on(
dynamicTable2.getStringColumnByName("TestColumn").eq(
dynamicTable1.getStringColumnByName("TestColumn")
))).findAll(ListUtils.union(dynamicTable1.getColumns(), dynamicTable2.getColumns()));
ormQueryFactory.transactionManager().commit();
}
Thread.sleep(1000);
for (int i = 0; i < 2; i++) {
ormQueryFactory.transactionManager().startTransactionIfNeeded();
List<RawModel> rawModels = ormQueryFactory.selectCache().rawSelect(
ormQueryFactory.buildQuery().from(dynamicTable1).innerJoin(dynamicTable2).on(
dynamicTable2.getStringColumnByName("TestColumn").eq(
dynamicTable1.getStringColumnByName("TestColumn")
))).findAll(ListUtils.union(dynamicTable1.getColumns(), dynamicTable2.getColumns()));
ormQueryFactory.transactionManager().commit();

}
}


@Test
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void testConcurrents() throws InterruptedException {
for (int i = 0; i < 5; i++) {
try {
testConcurrent();
qDynamicTableFactory.dropTableOrView("DynamicTable").buildSchema();
qDynamicTableFactory.dropTableOrView("DynamicTable","DynamicTable2").buildSchema();
} catch (AssertionError e) {
System.err.println("Iteration : " + i);
throw e;
Expand Down Expand Up @@ -102,4 +154,74 @@ public void testConcurrent() throws InterruptedException {
assertEquals(tableModels.size(), tableModels2.size());
assertEquals(tableModels.size(), 100);
}

@Test
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void testConcurrents2() throws InterruptedException {
for (int i = 0; i < 5; i++) {
try {
testConcurrent2Tables();
qDynamicTableFactory.dropTableOrView("DynamicTable",
"DynamicTable2").buildSchema();
} catch (AssertionError e) {
System.err.println("Iteration : " + i);
throw e;
}
}
}

@Test
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void testConcurrent2Tables() throws InterruptedException {
createSchema();
ExecutorService executorServiceSelect = Executors.newFixedThreadPool(1);
Future<?> future1 = executorServiceSelect.submit(() -> {
try {
selectThread12();
} catch (InterruptedException e) {
throw new IllegalStateException();
}
});
executorServiceSelect.shutdown();
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
futures.add(executorService.submit(() -> insertThread(String.valueOf(finalI))));
}
for (int i = 0; i < 150; i++) {
int finalI = i;
futures.add(executorService.submit(() -> insertThread2(String.valueOf(finalI))));
}
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.MINUTES);
executorServiceSelect.awaitTermination(90, TimeUnit.MINUTES);
assertEquals(futures.size(), 250);
for (Future f : futures) {
assertTrue(f.isDone());
assertFalse(f.isCancelled());
}
QDynamicTable dynamicTable1 = qDynamicTableFactory.getQDynamicTableByName("DynamicTable");
QDynamicTable dynamicTable2 = qDynamicTableFactory.getQDynamicTableByName("DynamicTable2");

List<RawModel> rawModels1 = ormQueryFactory.selectCache().rawSelect(
ormQueryFactory.buildQuery().from(dynamicTable1).innerJoin(dynamicTable2).on(
dynamicTable2.getStringColumnByName("TestColumn").eq(
dynamicTable1.getStringColumnByName("TestColumn")
))).findAll(ListUtils.union(dynamicTable1.getColumns(), dynamicTable2.getColumns()));

List<RawModel> rawModels2 = ormQueryFactory.selectCache().rawSelect(
ormQueryFactory.buildQuery().from(dynamicTable1).innerJoin(dynamicTable2).on(
dynamicTable2.getStringColumnByName("TestColumn").eq(
dynamicTable1.getStringColumnByName("TestColumn")
))).findAll(ListUtils.union(dynamicTable1.getColumns(), dynamicTable2.getColumns()));
rawModels2 = ormQueryFactory.selectCache().rawSelect(
ormQueryFactory.buildQuery().from(dynamicTable1).innerJoin(dynamicTable2).on(
dynamicTable2.getStringColumnByName("TestColumn").eq(
dynamicTable1.getStringColumnByName("TestColumn")
))).findAll(ListUtils.union(dynamicTable1.getColumns(), dynamicTable2.getColumns()));

assertEquals(rawModels1.size(), rawModels2.size());
assertEquals(rawModels2.size(), 100);
}
}

0 comments on commit b72ca9c

Please sign in to comment.