Skip to content

Commit

Permalink
HHH-11304 Replace PutFromLoadValidator properly in CollectionRegionAc…
Browse files Browse the repository at this point in the history
…cessStrategyTest
  • Loading branch information
rvansa authored and galderz committed Jul 5, 2017
1 parent 5214dee commit 3114aac
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 127 deletions.
Expand Up @@ -105,7 +105,7 @@ public class PutFromLoadValidator {
/**
* Injected interceptor
*/
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
private NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;

/**
* The time of the last call to {@link #endInvalidatingRegion()}. Puts from transactions started after
Expand Down Expand Up @@ -161,54 +161,61 @@ public PutFromLoadValidator(AdvancedCache cache, InfinispanRegionFactory regionF
if (!cacheMode.isInvalidation()) {
throw new IllegalArgumentException("PutFromLoadValidator in clustered caches requires invalidation mode.");
}
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
log.debug("Interceptor chain was: " + interceptorChain);
int position = 0;
// add interceptor before uses exact match, not instanceof match
int invalidationPosition = 0;
int entryWrappingPosition = 0;
for (CommandInterceptor ci : interceptorChain) {
if (ci instanceof InvalidationInterceptor) {
invalidationPosition = position;
}
if (ci instanceof EntryWrappingInterceptor) {
entryWrappingPosition = position;
}
position++;
}
boolean transactional = cache.getCacheConfiguration().transaction().transactionMode().isTransactional();
if (transactional) {
cache.removeInterceptor(invalidationPosition);
TxInvalidationInterceptor txInvalidationInterceptor = new TxInvalidationInterceptor();
cache.getComponentRegistry().registerComponent(txInvalidationInterceptor, TxInvalidationInterceptor.class);
cache.addInterceptor(txInvalidationInterceptor, invalidationPosition);

// Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before
// wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation
// would not commit the entry removal (as during wrap the entry was not in cache)
TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(this, cache.getName());
cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class);
cache.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition);
}
else {
cache.removeInterceptor(invalidationPosition);
NonTxInvalidationInterceptor nonTxInvalidationInterceptor = new NonTxInvalidationInterceptor(this);
cache.getComponentRegistry().registerComponent(nonTxInvalidationInterceptor, NonTxInvalidationInterceptor.class);
cache.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition);

nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(this, cache.getName());
cache.getComponentRegistry().registerComponent(nonTxPutFromLoadInterceptor, NonTxPutFromLoadInterceptor.class);
cache.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition);
}
log.debug("New interceptor chain is: " + cache.getInterceptorChain());

CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), this);
addToCache(cache, this);
}

this.cache = cache;
this.pendingPuts = cacheManager.getCache(pendingPutsName);
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
}

/**
* Besides the call from constructor, this should be called only from tests when mocking the validator.
*/
public static void addToCache(AdvancedCache cache, PutFromLoadValidator validator) {
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
log.debug("Interceptor chain was: " + interceptorChain);
int position = 0;
// add interceptor before uses exact match, not instanceof match
int invalidationPosition = 0;
int entryWrappingPosition = 0;
for (CommandInterceptor ci : interceptorChain) {
if (ci instanceof InvalidationInterceptor) {
invalidationPosition = position;
}
if (ci instanceof EntryWrappingInterceptor) {
entryWrappingPosition = position;
}
position++;
}
boolean transactional = cache.getCacheConfiguration().transaction().transactionMode().isTransactional();
if (transactional) {
cache.removeInterceptor(invalidationPosition);
TxInvalidationInterceptor txInvalidationInterceptor = new TxInvalidationInterceptor();
cache.getComponentRegistry().registerComponent(txInvalidationInterceptor, TxInvalidationInterceptor.class);
cache.addInterceptor(txInvalidationInterceptor, invalidationPosition);

// Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before
// wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation
// would not commit the entry removal (as during wrap the entry was not in cache)
TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(validator, cache.getName());
cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class);
cache.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition);
}
else {
cache.removeInterceptor(invalidationPosition);
NonTxInvalidationInterceptor nonTxInvalidationInterceptor = new NonTxInvalidationInterceptor(validator);
cache.getComponentRegistry().registerComponent(nonTxInvalidationInterceptor, NonTxInvalidationInterceptor.class);
cache.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition);

NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(validator, cache.getName());
cache.getComponentRegistry().registerComponent(nonTxPutFromLoadInterceptor, NonTxPutFromLoadInterceptor.class);
cache.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition);
validator.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
}
log.debug("New interceptor chain is: " + cache.getInterceptorChain());

CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), validator);
}

/**
Expand All @@ -217,7 +224,7 @@ public PutFromLoadValidator(AdvancedCache cache, InfinispanRegionFactory regionF
*
* @param cache
*/
public static void removeFromCache(AdvancedCache cache) {
public static PutFromLoadValidator removeFromCache(AdvancedCache cache) {
cache.removeInterceptor(TxPutFromLoadInterceptor.class);
cache.removeInterceptor(NonTxPutFromLoadInterceptor.class);
for (Object i : cache.getInterceptorChain()) {
Expand All @@ -237,7 +244,7 @@ else if (i instanceof TxInvalidationInterceptor) {
}
}
CacheCommandInitializer cci = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
cci.removePutFromLoadValidator(cache.getName());
return cci.removePutFromLoadValidator(cache.getName());
}

public void setCurrentSession(SharedSessionContractImplementor session) {
Expand Down
Expand Up @@ -6,15 +6,13 @@
*/
package org.hibernate.test.cache.infinispan.collection;

import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.access.NonTxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
Expand All @@ -25,21 +23,18 @@

import org.hibernate.test.cache.infinispan.AbstractRegionAccessStrategyTest;
import org.hibernate.test.cache.infinispan.NodeEnvironment;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.junit.Test;
import junit.framework.AssertionFailedError;

import org.infinispan.AdvancedCache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;

import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

/**
* Base class for tests of CollectionRegionAccessStrategy impls.
Expand Down Expand Up @@ -73,79 +68,18 @@ public void testGetRegion() {

@Test
public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
if (cacheMode.isInvalidation()) {
doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation();
if (!cacheMode.isInvalidation()) {
return;
}
}

public void doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation() {
final CountDownLatch pferLatch = new CountDownLatch( 1 );
final CountDownLatch removeLatch = new CountDownLatch( 1 );
withCacheManager(new CacheManagerCallable(createCacheManager(localRegion.getRegionFactory())) {
@Override
public void call() {
PutFromLoadValidator validator = getPutFromLoadValidator(remoteRegion.getCache(), cm, removeLatch, pferLatch);

final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
new TxInvalidationCacheAccessDelegate(localRegion, validator) :
new NonTxInvalidationCacheAccessDelegate(localRegion, validator);

Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
SharedSessionContractImplementor session = mockedSession();
delegate.putFromLoad(session, "k1", "v1", session.getTimestamp(), null );
return null;
}
};

Callable<Void> removeCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
SharedSessionContractImplementor session = mockedSession();
withTx(localEnvironment, session, new Callable<Void>() {
@Override
public Void call() throws Exception {
delegate.remove(session, "k1");
return null;
}
});
pferLatch.countDown();
return null;
}
};

ExecutorService executorService = Executors.newCachedThreadPool();
Future<Void> pferFuture = executorService.submit( pferCallable );
Future<Void> removeFuture = executorService.submit( removeCallable );

try {
pferFuture.get();
removeFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}

assertFalse(localRegion.getCache().containsKey("k1"));
}
});
}

private static EmbeddedCacheManager createCacheManager(InfinispanRegionFactory regionFactory) {
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
return cacheManager;
}

protected PutFromLoadValidator getPutFromLoadValidator(AdvancedCache cache, EmbeddedCacheManager cm,
CountDownLatch removeLatch, CountDownLatch pferLatch) {
// remove the interceptor inserted by default PutFromLoadValidator, we're using different one
PutFromLoadValidator.removeFromCache(cache);
InfinispanRegionFactory regionFactory = new InfinispanRegionFactory();
regionFactory.setCacheManager(cm);
regionFactory.start(CacheTestUtil.sfOptionsForStart(), new Properties());
return new PutFromLoadValidator(cache, regionFactory, cm) {
@Override
public Lock acquirePutFromLoadLock(SharedSessionContractImplementor session, Object key, long txTimestamp) {
Lock lock = super.acquirePutFromLoadLock(session, key, txTimestamp);
PutFromLoadValidator originalValidator = PutFromLoadValidator.removeFromCache(localRegion.getCache());
PutFromLoadValidator mockValidator = spy(originalValidator);
doAnswer(invocation -> {
try {
return invocation.callRealMethod();
} finally {
try {
removeLatch.countDown();
// the remove should be blocked because the putFromLoad has been acquired
Expand All @@ -160,9 +94,44 @@ public Lock acquirePutFromLoadLock(SharedSessionContractImplementor session, Obj
log.error( "Error", e );
throw new RuntimeException( "Error", e );
}
return lock;
}
};
}).when(mockValidator).acquirePutFromLoadLock(any(), any(), anyLong());
PutFromLoadValidator.addToCache(localRegion.getCache(), mockValidator);

try {
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
new TxInvalidationCacheAccessDelegate(localRegion, mockValidator) :
new NonTxInvalidationCacheAccessDelegate(localRegion, mockValidator);

ExecutorService executorService = Executors.newCachedThreadPool();

final String KEY = "k1";
Future<Void> pferFuture = executorService.submit(() -> {
SharedSessionContractImplementor session = mockedSession();
delegate.putFromLoad(session, KEY, "v1", session.getTimestamp(), null);
return null;
});

Future<Void> removeFuture = executorService.submit(() -> {
removeLatch.await();
SharedSessionContractImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
delegate.remove(session, KEY);
return null;
});
pferLatch.countDown();
return null;
});

pferFuture.get();
removeFuture.get();

assertFalse(localRegion.getCache().containsKey(KEY));
assertFalse(remoteRegion.getCache().containsKey(KEY));
} finally {
PutFromLoadValidator.removeFromCache(localRegion.getCache());
PutFromLoadValidator.addToCache(localRegion.getCache(), originalValidator);
}
}

@Test
Expand Down

0 comments on commit 3114aac

Please sign in to comment.