Skip to content

Commit

Permalink
ISPN-7224 Synchronous get
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Laskawiec authored and tristantarrant committed Feb 16, 2018
1 parent 384d708 commit 7e9af44
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.infinispan.spring.provider;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.infinispan.commons.api.BasicCache;
import org.springframework.cache.Cache;
import org.springframework.cache.support.SimpleValueWrapper;
import org.springframework.util.Assert;
Expand All @@ -16,6 +20,8 @@
*/
class CacheDelegate implements Cache {

private final Map<Object, ReentrantLock> synchronousGetLocks = new ConcurrentHashMap<>();

//Implemented as a static holder class for backwards compatibility.
//Imagine a situation where a client has new integration module and old Spring version. In that case
//this exception does not exist. However we can bypass this by using separate class file (which is loaded
Expand All @@ -31,7 +37,7 @@ static RuntimeException throwValueRetrievalException(Object key, Callable<?> loa
/**
* @param nativeCache underlying cache
*/
public CacheDelegate(final org.infinispan.commons.api.BasicCache<Object, Object> nativeCache) {
public CacheDelegate(final BasicCache<Object, Object> nativeCache) {
Assert.notNull(nativeCache, "A non-null Infinispan cache implementation is required");
this.nativeCache = nativeCache;
}
Expand Down Expand Up @@ -71,13 +77,31 @@ public <T> T get(Object key, Class<T> type) {

@Override
public <T> T get(Object key, Callable<T> valueLoader) {
return (T) nativeCache.computeIfAbsent(key, keyToBeInserted -> {
ReentrantLock lock = null;
T value = (T) nativeCache.get(key);
if (value == null) {
lock = synchronousGetLocks.computeIfAbsent(key, k -> new ReentrantLock());
lock.lock();
try {
return valueLoader.call();
} catch (Exception e) {
throw ValueRetrievalExceptionResolver.throwValueRetrievalException(key, valueLoader, e);
if ((value = (T) nativeCache.get(key)) == null) {
try {
T newValue = valueLoader.call();
// we can't use computeIfAbsent here since in distributed embedded scenario we would
// send a lambda to other nodes. This is the behavior we want to avoid.
value = (T) nativeCache.putIfAbsent(key, newValue);
if (value == null) {
value = newValue;
}
} catch (Exception e) {
throw ValueRetrievalExceptionResolver.throwValueRetrievalException(key, valueLoader, e);
}
}
} finally {
lock.unlock();
synchronousGetLocks.remove(key);
}
});
}
return value;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package org.infinispan.spring.provider;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertSame;
import static org.testng.AssertJUnit.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.spring.support.embedded.InfinispanNamedEmbeddedCacheFactoryBean;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.springframework.cache.Cache;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -30,8 +34,8 @@
* @author Marius Bogoevici
*
*/
@Test(testName = "spring.provider.SpringCacheCacheTest", groups = {"unit", "smoke"})
public class SpringCacheCacheTest extends SingleCacheManagerTest {
@Test(testName = "spring.provider.SpringCacheTest", groups = {"unit", "smoke"})
public class SpringCacheTest extends SingleCacheManagerTest {

protected final static String CACHE_NAME = "testCache";

Expand All @@ -55,12 +59,6 @@ public void setUp() throws Exception {
this.cache = createCache(this.nativeCache);
}

@AfterMethod
public void tearDown() throws Exception {
this.nativeCache = null;
this.cache = null;
}

@Test
public void testCacheName() throws Exception {
assertEquals(CACHE_NAME, this.cache.getName());
Expand Down Expand Up @@ -212,6 +210,43 @@ public void testValueLoaderWithNoPreviousValue() {
assertEquals("test", valueFromCache.get());
}

/*
* In this test Thread 1 should exclusively block Cache#get method so that Thread 2 won't be able to
* insert "thread2" string into the cache.
*
* The test check this part of the Spring spec:
* Return the value to which this cache maps the specified key, obtaining that value from valueLoader if necessary.
* This method provides a simple substitute for the conventional "if cached, return; otherwise create, cache and return" pattern.
* @see http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/cache/Cache.html#get-java.lang.Object-java.util.concurrent.Callable-
*/
@Test
public void testValueLoaderWithLocking() throws Exception {
//given
CountDownLatch waitUntilThread1LocksValueGetter = new CountDownLatch(1);

//when
Future<String> thread1 = fork(() -> cache.get("test", () -> {
waitUntilThread1LocksValueGetter.countDown();
return "thread1";
}));

Future<String> thread2 = fork(() -> {
waitUntilThread1LocksValueGetter.await(30, TimeUnit.SECONDS);
return cache.get("test", () -> "thread2");
});

String valueObtainedByThread1 = thread1.get();
String valueObtainedByThread2 = thread2.get();

Cache.ValueWrapper valueAfterGetterIsDone = cache.get("test");

//then
assertNotNull(valueAfterGetterIsDone);
assertEquals("thread1", valueAfterGetterIsDone.get());
assertEquals("thread1", valueObtainedByThread1);
assertEquals("thread1", valueObtainedByThread2);
}

@Test
public void testValueLoaderWithPreviousValue() {
//given
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.infinispan.spring.provider;

import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.springframework.cache.Cache;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(testName = "spring.provider.SpringRemoteCacheTest", groups = "functional")
public class SpringRemoteCacheTest extends SingleCacheManagerTest {

private static final String TEST_CACHE_NAME = "spring.remote.cache.Test";

private RemoteCacheManager remoteCacheManager;
private HotRodServer hotrodServer;

@Override
protected EmbeddedCacheManager createCacheManager() throws Exception {
cacheManager = TestCacheManagerFactory.createCacheManager(hotRodCacheConfiguration());
cacheManager.defineConfiguration(TEST_CACHE_NAME, cacheManager.getDefaultCacheConfiguration());
cache = cacheManager.getCache(TEST_CACHE_NAME);

return cacheManager;
}

@BeforeClass
public void setupRemoteCacheFactory() {
hotrodServer = HotRodTestingUtil.startHotRodServer(cacheManager, 0);
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer().host("localhost").port(hotrodServer.getPort());
remoteCacheManager = new RemoteCacheManager(builder.build());
}

@AfterClass
public void destroyRemoteCacheFactory() {
remoteCacheManager.stop();
hotrodServer.stop();
}

/*
* In this test Thread 1 should exclusively block Cache#get method so that Thread 2 won't be able to
* insert "thread2" string into the cache.
*
* The test check this part of the Spring spec:
* Return the value to which this cache maps the specified key, obtaining that value from valueLoader if necessary.
* This method provides a simple substitute for the conventional "if cached, return; otherwise create, cache and return" pattern.
* @see http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/cache/Cache.html#get-java.lang.Object-java.util.concurrent.Callable-
*/
@Test(timeOut = 30_000)
public void testValueLoaderWithLocking() throws Exception {
//given
final SpringRemoteCacheManager springRemoteCacheManager = new SpringRemoteCacheManager(remoteCacheManager);
final SpringCache cache = springRemoteCacheManager.getCache(TEST_CACHE_NAME);

CountDownLatch waitUntilThread1LocksValueGetter = new CountDownLatch(1);

//when
Future<String> thread1 = fork(() -> cache.get("test", () -> {
waitUntilThread1LocksValueGetter.countDown();
// /TimeUnit.MILLISECONDS.sleep(10);
return "thread1";
}));

Future<String> thread2 = fork(() -> {
waitUntilThread1LocksValueGetter.await();
return cache.get("test", () -> "thread2");
});

String valueObtainedByThread1 = thread1.get();
String valueObtainedByThread2 = thread2.get();

Cache.ValueWrapper valueAfterGetterIsDone = cache.get("test");

//then
assertNotNull(valueAfterGetterIsDone);
assertEquals("thread1", valueAfterGetterIsDone.get());
assertEquals("thread1", valueObtainedByThread1);
assertEquals("thread1", valueObtainedByThread2);
}
}

0 comments on commit 7e9af44

Please sign in to comment.