From afcb86e3b46fc9cb1f7a15959097566e9e065389 Mon Sep 17 00:00:00 2001 From: gustavonalle Date: Fri, 26 Aug 2016 12:50:40 +0100 Subject: [PATCH] ISPN-6978 AffinityIndexManager does not work on transactional caches --- .../query/affinity/AffinityIndexManager.java | 24 ++++++-- .../query/backend/QueryKnownClasses.java | 35 ++---------- .../query/backend/SearchFactoryHandler.java | 20 +++---- .../query/backend/TransactionHelper.java | 48 +++++++--------- .../IndexManagerBasedLockController.java | 55 +++++-------------- .../org/infinispan/query/logging/Log.java | 9 +++ ...dCacheWithAffinityIndexManagerTxTest.java} | 4 +- 7 files changed, 81 insertions(+), 114 deletions(-) rename query/src/test/java/org/infinispan/query/blackbox/{ClusteredCacheWithAffinityIndexManagerTx.java => ClusteredCacheWithAffinityIndexManagerTxTest.java} (62%) diff --git a/query/src/main/java/org/infinispan/query/affinity/AffinityIndexManager.java b/query/src/main/java/org/infinispan/query/affinity/AffinityIndexManager.java index c7a3b19d66c..a74beea917a 100644 --- a/query/src/main/java/org/infinispan/query/affinity/AffinityIndexManager.java +++ b/query/src/main/java/org/infinispan/query/affinity/AffinityIndexManager.java @@ -14,6 +14,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; + import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.Directory; @@ -35,6 +38,7 @@ import org.infinispan.query.backend.ComponentRegistryService; import org.infinispan.query.backend.KeyTransformationHandler; import org.infinispan.query.backend.QueryInterceptor; +import org.infinispan.query.backend.TransactionHelper; import org.infinispan.query.logging.Log; import org.infinispan.remoting.responses.Response; import org.infinispan.remoting.rpc.RpcManager; @@ -65,6 +69,7 @@ public class AffinityIndexManager extends DirectoryBasedIndexManager implements private final Lock writeLock = flushLock.writeLock(); private final Lock readLock = flushLock.readLock(); private ExecutorService asyncExecutor; + private TransactionHelper transactionHelper; @Override @@ -104,10 +109,16 @@ private void clearIfNeeded(Address lockHolder) { @Override public void initialize(String indexName, Properties properties, Similarity similarity, WorkerBuildContext buildContext) { - super.initialize(indexName, properties, similarity, buildContext); ServiceManager serviceManager = buildContext.getServiceManager(); ComponentRegistryService componentRegistryService = serviceManager.requestService(ComponentRegistryService.class); ComponentRegistry componentRegistry = componentRegistryService.getComponentRegistry(); + transactionHelper = new TransactionHelper(componentRegistry.getComponent(TransactionManager.class)); + Transaction tx = transactionHelper.suspendTxIfExists(); + try { + super.initialize(indexName, properties, similarity, buildContext); + } finally { + transactionHelper.resume(tx); + } asyncExecutor = componentRegistry.getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR); distributionManager = componentRegistry.getComponent(DistributionManager.class); rpcManager = componentRegistry.getComponent(RpcManager.class); @@ -175,8 +186,13 @@ private int getSegment(Object key) { } private Address getLockHolder(String indexName) { - InfinispanDirectoryProvider directoryProvider = (InfinispanDirectoryProvider) getDirectoryProvider(); - return directoryProvider.getLockOwner(indexName, IndexWriter.WRITE_LOCK_NAME); + Transaction tx = transactionHelper.suspendTxIfExists(); + try { + InfinispanDirectoryProvider directoryProvider = (InfinispanDirectoryProvider) getDirectoryProvider(); + return directoryProvider.getLockOwner(indexName, IndexWriter.WRITE_LOCK_NAME); + } finally { + transactionHelper.resume(tx); + } } private Address getLocation(LuceneWork work) { @@ -242,7 +258,7 @@ private void sendWork(List works, Address destination) { List
dest = Collections.singletonList(destination); log.debugf("Sending works %s to %s", works, dest); CompletableFuture> result - = rpcManager.invokeRemotelyAsync(dest, indexUpdateCommand, rpcManager.getDefaultRpcOptions(false)); + = rpcManager.invokeRemotelyAsync(dest, indexUpdateCommand, rpcManager.getDefaultRpcOptions(false)); result.whenComplete((responses, error) -> { if (error != null) { log.error("Error forwarding index job", error); diff --git a/query/src/main/java/org/infinispan/query/backend/QueryKnownClasses.java b/query/src/main/java/org/infinispan/query/backend/QueryKnownClasses.java index 25cb666e3e7..205abd33d75 100644 --- a/query/src/main/java/org/infinispan/query/backend/QueryKnownClasses.java +++ b/query/src/main/java/org/infinispan/query/backend/QueryKnownClasses.java @@ -69,7 +69,7 @@ public final class QueryKnownClasses { */ private volatile AdvancedCache>, Boolean> knownClassesCache; - private volatile TransactionManager transactionManager; + private volatile TransactionHelper transactionHelper; /** * A second level cache. Not using a ConcurrentHashMap as this will degenerate into a read-only Map at runtime; @@ -140,7 +140,7 @@ Set> keys() { startInternalCache(); Set> result = new HashSet<>(); - Transaction tx = suspendTx(); + Transaction tx = transactionHelper.suspendTxIfExists(); try { for (KeyValuePair> key : knownClassesCache.keySet()) { if (key.getKey().equals(cacheName)) { @@ -149,7 +149,7 @@ Set> keys() { } return result; } finally { - resumeTx(tx); + transactionHelper.resume(tx); } } @@ -176,11 +176,11 @@ void put(final Class clazz, final Boolean value) { throw new IllegalArgumentException("Null values are not allowed"); } startInternalCache(); - Transaction tx = suspendTx(); + Transaction tx = transactionHelper.suspendTxIfExists(); try { runCommand(() -> knownClassesCache.put(new KeyValuePair<>(cacheName, clazz), value)); } finally { - resumeTx(tx); + transactionHelper.resume(tx); } localCacheInsert(clazz, value); @@ -211,7 +211,7 @@ private void startInternalCache() { internalCacheRegistry.registerInternalCache(QUERY_KNOWN_CLASSES_CACHE_NAME, getInternalCacheConfig()); Cache>, Boolean> knownClassesCache = SecurityActions.getCache(cacheManager, QUERY_KNOWN_CLASSES_CACHE_NAME); this.knownClassesCache = knownClassesCache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES); - transactionManager = this.knownClassesCache.getTransactionManager(); + transactionHelper = new TransactionHelper(this.knownClassesCache.getTransactionManager()); } } } @@ -252,27 +252,4 @@ private void runCommand(Runnable runnable) { } } - /** - * Suspend any ongoing transaction, so that the internal cache writes are committed immediately. - */ - private Transaction suspendTx() { - if (transactionManager == null) { - return null; - } - try { - return transactionManager.suspend(); - } catch (SystemException e) { - throw new CacheException("Unable to suspend ongoing transaction", e); - } - } - - private void resumeTx(Transaction tx) { - if (tx != null) { - try { - transactionManager.resume(tx); - } catch (InvalidTransactionException | SystemException e) { - throw new CacheException("Unable to resume ongoing transaction", e); - } - } - } } diff --git a/query/src/main/java/org/infinispan/query/backend/SearchFactoryHandler.java b/query/src/main/java/org/infinispan/query/backend/SearchFactoryHandler.java index d023a0ca0f4..1e28a407dc7 100644 --- a/query/src/main/java/org/infinispan/query/backend/SearchFactoryHandler.java +++ b/query/src/main/java/org/infinispan/query/backend/SearchFactoryHandler.java @@ -1,11 +1,11 @@ package org.infinispan.query.backend; -import static org.infinispan.query.backend.TransactionHelper.Operation; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.ReentrantLock; +import javax.transaction.Transaction; + import org.hibernate.search.spi.SearchIntegrator; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; @@ -47,8 +47,7 @@ boolean updateKnownTypesIfNeeded(final Object value) { final Boolean existingBoolean = queryKnownClasses.get(potentialNewType); if (existingBoolean != null) { return existingBoolean; - } - else { + } else { handleOnDemandRegistration(false, potentialNewType); Boolean isIndexable = queryKnownClasses.get(potentialNewType); return isIndexable != null ? isIndexable : false; @@ -92,12 +91,13 @@ private void updateSearchFactory(final Class... classes) { return; } final Class[] newtypes = reducedSet.toArray(new Class[reducedSet.size()]); - transactionHelper.runSuspendingTx(new Operation() { - @Override - public void execute() { - searchFactory.addClasses(newtypes); - } - }); + + Transaction tx = transactionHelper.suspendTxIfExists(); + try { + searchFactory.addClasses(newtypes); + } finally { + transactionHelper.resume(tx); + } for (Class type : newtypes) { if (hasIndex(type)) { log.detectedUnknownIndexedEntity(queryKnownClasses.getCacheName(), type.getName()); diff --git a/query/src/main/java/org/infinispan/query/backend/TransactionHelper.java b/query/src/main/java/org/infinispan/query/backend/TransactionHelper.java index adb23f9b361..8ec8db0f0f1 100644 --- a/query/src/main/java/org/infinispan/query/backend/TransactionHelper.java +++ b/query/src/main/java/org/infinispan/query/backend/TransactionHelper.java @@ -3,54 +3,48 @@ import javax.transaction.Transaction; import javax.transaction.TransactionManager; +import org.infinispan.query.logging.Log; +import org.infinispan.util.logging.LogFactory; + /** * Transaction related helper * * @author gustavonalle * @since 7.0 */ -class TransactionHelper { +public final class TransactionHelper { + + private static final Log LOGGER = LogFactory.getLog(TransactionHelper.class, Log.class); private final TransactionManager transactionManager; - TransactionHelper(final TransactionManager transactionManager) { + public TransactionHelper(final TransactionManager transactionManager) { this.transactionManager = transactionManager; } - void runSuspendingTx(Operation op) { - final Transaction transaction = suspend(); - try { - op.execute(); - } finally { - resume(transaction); + public void resume(final Transaction transaction) { + if (transaction != null) { + try { + transactionManager.resume(transaction); + } catch (Exception e) { + throw LOGGER.unableToResumeSuspendedTx(transaction, e); + } } } - Transaction suspend() { + public Transaction suspendTxIfExists() { if (transactionManager == null) { return null; } try { - return transactionManager.suspend(); + Transaction tx; + if ((tx = transactionManager.getTransaction()) != null) { + transactionManager.suspend(); + } + return tx; } catch (Exception e) { - //ignored - } - return null; - } - - void resume(Transaction transaction) { - if (transaction == null || transactionManager == null) { - return; + throw LOGGER.unableToSuspendTx(e); } - try { - transactionManager.resume(transaction); - } catch (Exception e) { - //ignored; - } - } - - interface Operation { - void execute(); } } diff --git a/query/src/main/java/org/infinispan/query/indexmanager/IndexManagerBasedLockController.java b/query/src/main/java/org/infinispan/query/indexmanager/IndexManagerBasedLockController.java index 032966acc24..58fe1dfaf3a 100644 --- a/query/src/main/java/org/infinispan/query/indexmanager/IndexManagerBasedLockController.java +++ b/query/src/main/java/org/infinispan/query/indexmanager/IndexManagerBasedLockController.java @@ -10,14 +10,14 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.hibernate.search.indexes.spi.DirectoryBasedIndexManager; -import org.infinispan.commons.CacheException; import org.infinispan.lucene.impl.DirectoryExtensions; +import org.infinispan.query.backend.TransactionHelper; import org.infinispan.query.logging.Log; import org.infinispan.util.logging.LogFactory; /** * Used to control and override the ownership of the Lucene index lock. - * + *

* Rather than wrapping the Directory or the LockManager directly, we need to wrap the IndexManager * as the Directory initialization is deferred. * @@ -29,28 +29,28 @@ final class IndexManagerBasedLockController implements IndexLockController { private static final Log log = LogFactory.getLog(IndexManagerBasedLockController.class, Log.class); private final DirectoryBasedIndexManager indexManager; - private final TransactionManager tm; + private final TransactionHelper transactionHelper; public IndexManagerBasedLockController(DirectoryBasedIndexManager indexManager, TransactionManager tm) { this.indexManager = indexManager; - this.tm = tm; + this.transactionHelper = new TransactionHelper(tm); } @Override public boolean waitForAvailability() { - final Transaction tx = suspendTxIfExists(); + final Transaction tx = transactionHelper.suspendTxIfExists(); try { boolean waitForAvailabilityInternal = waitForAvailabilityInternal(); log.waitingForLockAcquired(waitForAvailabilityInternal); return waitForAvailabilityInternal; - } - finally { - resumeTx(tx); + } finally { + transactionHelper.resume(tx); } } /** * This is returning as soon as the lock is available, or after 10 seconds. + * * @return true if the lock is free at the time of returning. */ private boolean waitForAvailabilityInternal() { @@ -59,50 +59,21 @@ private boolean waitForAvailabilityInternal() { Lock lock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME); lock.close(); return true; - } - catch (LockObtainFailedException lofe) { + } catch (LockObtainFailedException lofe) { return false; - } - catch (IOException e) { + } catch (IOException e) { log.error(e); return false; } } - private void resumeTx(final Transaction tx) { - if (tx != null) { - try { - tm.resume(tx); - } catch (Exception e) { - throw new CacheException("Unable to resume suspended transaction " + tx, e); - } - } - } - - private Transaction suspendTxIfExists() { - if (tm==null) { - return null; - } - try { - Transaction tx; - if ((tx = tm.getTransaction()) != null) { - tm.suspend(); - } - return tx; - } - catch (Exception e) { - throw new CacheException(e); - } - } - @Override public void forceLockClear() { - final Transaction tx = suspendTxIfExists(); + final Transaction tx = transactionHelper.suspendTxIfExists(); try { forceLockClearInternal(); - } - finally { - resumeTx(tx); + } finally { + transactionHelper.resume(tx); } } diff --git a/query/src/main/java/org/infinispan/query/logging/Log.java b/query/src/main/java/org/infinispan/query/logging/Log.java index c9343381471..ec64375fcc9 100644 --- a/query/src/main/java/org/infinispan/query/logging/Log.java +++ b/query/src/main/java/org/infinispan/query/logging/Log.java @@ -9,6 +9,8 @@ import java.io.IOException; import java.util.List; +import javax.transaction.Transaction; + import org.hibernate.hql.ParsingException; import org.hibernate.search.backend.LuceneWork; import org.infinispan.commons.CacheException; @@ -143,4 +145,11 @@ public interface Log extends org.infinispan.util.logging.Log { @Message(value = "Could not locate error handler class %s", id = 14032) IllegalArgumentException unsupportedErrorHandlerConfigurationValueType(Class type); + + @Message(value = "Unable to resume suspended transaction %s", id = 14033) + CacheException unableToResumeSuspendedTx(Transaction transaction, @Cause Throwable cause); + + @Message(value = "Unable to suspend transaction", id = 14034) + CacheException unableToSuspendTx(@Cause Throwable cause); + } diff --git a/query/src/test/java/org/infinispan/query/blackbox/ClusteredCacheWithAffinityIndexManagerTx.java b/query/src/test/java/org/infinispan/query/blackbox/ClusteredCacheWithAffinityIndexManagerTxTest.java similarity index 62% rename from query/src/test/java/org/infinispan/query/blackbox/ClusteredCacheWithAffinityIndexManagerTx.java rename to query/src/test/java/org/infinispan/query/blackbox/ClusteredCacheWithAffinityIndexManagerTxTest.java index 948c4634beb..82964dcb49d 100644 --- a/query/src/test/java/org/infinispan/query/blackbox/ClusteredCacheWithAffinityIndexManagerTx.java +++ b/query/src/test/java/org/infinispan/query/blackbox/ClusteredCacheWithAffinityIndexManagerTxTest.java @@ -5,8 +5,8 @@ /** * @since 9.0 */ -@Test(groups = "functional", testName = "query.blackbox.ClusteredCacheWithAffinityIndexManagerTx") -public class ClusteredCacheWithAffinityIndexManagerTx extends ClusteredCacheWithAffinityIndexManagerTest { +@Test(groups = "functional", testName = "query.blackbox.ClusteredCacheWithAffinityIndexManagerTxTest") +public class ClusteredCacheWithAffinityIndexManagerTxTest extends ClusteredCacheWithAffinityIndexManagerTest { @Override protected boolean transactionsEnabled() {