Skip to content

Commit

Permalink
ISPN-6978 AffinityIndexManager does not work on transactional caches
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavonalle committed Aug 26, 2016
1 parent 0e670cd commit afcb86e
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 114 deletions.
Expand Up @@ -14,6 +14,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import javax.transaction.Transaction;
import javax.transaction.TransactionManager;

import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
Expand All @@ -35,6 +38,7 @@
import org.infinispan.query.backend.ComponentRegistryService;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.backend.TransactionHelper;
import org.infinispan.query.logging.Log;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
Expand Down Expand Up @@ -65,6 +69,7 @@ public class AffinityIndexManager extends DirectoryBasedIndexManager implements
private final Lock writeLock = flushLock.writeLock();
private final Lock readLock = flushLock.readLock();
private ExecutorService asyncExecutor;
private TransactionHelper transactionHelper;


@Override
Expand Down Expand Up @@ -104,10 +109,16 @@ private void clearIfNeeded(Address lockHolder) {

@Override
public void initialize(String indexName, Properties properties, Similarity similarity, WorkerBuildContext buildContext) {
super.initialize(indexName, properties, similarity, buildContext);
ServiceManager serviceManager = buildContext.getServiceManager();
ComponentRegistryService componentRegistryService = serviceManager.requestService(ComponentRegistryService.class);
ComponentRegistry componentRegistry = componentRegistryService.getComponentRegistry();
transactionHelper = new TransactionHelper(componentRegistry.getComponent(TransactionManager.class));
Transaction tx = transactionHelper.suspendTxIfExists();
try {
super.initialize(indexName, properties, similarity, buildContext);
} finally {
transactionHelper.resume(tx);
}
asyncExecutor = componentRegistry.getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);
distributionManager = componentRegistry.getComponent(DistributionManager.class);
rpcManager = componentRegistry.getComponent(RpcManager.class);
Expand Down Expand Up @@ -175,8 +186,13 @@ private int getSegment(Object key) {
}

private Address getLockHolder(String indexName) {
InfinispanDirectoryProvider directoryProvider = (InfinispanDirectoryProvider) getDirectoryProvider();
return directoryProvider.getLockOwner(indexName, IndexWriter.WRITE_LOCK_NAME);
Transaction tx = transactionHelper.suspendTxIfExists();
try {
InfinispanDirectoryProvider directoryProvider = (InfinispanDirectoryProvider) getDirectoryProvider();
return directoryProvider.getLockOwner(indexName, IndexWriter.WRITE_LOCK_NAME);
} finally {
transactionHelper.resume(tx);
}
}

private Address getLocation(LuceneWork work) {
Expand Down Expand Up @@ -242,7 +258,7 @@ private void sendWork(List<LuceneWork> works, Address destination) {
List<Address> dest = Collections.singletonList(destination);
log.debugf("Sending works %s to %s", works, dest);
CompletableFuture<Map<Address, Response>> result
= rpcManager.invokeRemotelyAsync(dest, indexUpdateCommand, rpcManager.getDefaultRpcOptions(false));
= rpcManager.invokeRemotelyAsync(dest, indexUpdateCommand, rpcManager.getDefaultRpcOptions(false));
result.whenComplete((responses, error) -> {
if (error != null) {
log.error("Error forwarding index job", error);
Expand Down
Expand Up @@ -69,7 +69,7 @@ public final class QueryKnownClasses {
*/
private volatile AdvancedCache<KeyValuePair<String, Class<?>>, Boolean> knownClassesCache;

private volatile TransactionManager transactionManager;
private volatile TransactionHelper transactionHelper;

/**
* A second level cache. Not using a ConcurrentHashMap as this will degenerate into a read-only Map at runtime;
Expand Down Expand Up @@ -140,7 +140,7 @@ Set<Class<?>> keys() {

startInternalCache();
Set<Class<?>> result = new HashSet<>();
Transaction tx = suspendTx();
Transaction tx = transactionHelper.suspendTxIfExists();
try {
for (KeyValuePair<String, Class<?>> key : knownClassesCache.keySet()) {
if (key.getKey().equals(cacheName)) {
Expand All @@ -149,7 +149,7 @@ Set<Class<?>> keys() {
}
return result;
} finally {
resumeTx(tx);
transactionHelper.resume(tx);
}
}

Expand All @@ -176,11 +176,11 @@ void put(final Class<?> clazz, final Boolean value) {
throw new IllegalArgumentException("Null values are not allowed");
}
startInternalCache();
Transaction tx = suspendTx();
Transaction tx = transactionHelper.suspendTxIfExists();
try {
runCommand(() -> knownClassesCache.put(new KeyValuePair<>(cacheName, clazz), value));
} finally {
resumeTx(tx);
transactionHelper.resume(tx);
}

localCacheInsert(clazz, value);
Expand Down Expand Up @@ -211,7 +211,7 @@ private void startInternalCache() {
internalCacheRegistry.registerInternalCache(QUERY_KNOWN_CLASSES_CACHE_NAME, getInternalCacheConfig());
Cache<KeyValuePair<String, Class<?>>, Boolean> knownClassesCache = SecurityActions.getCache(cacheManager, QUERY_KNOWN_CLASSES_CACHE_NAME);
this.knownClassesCache = knownClassesCache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
transactionManager = this.knownClassesCache.getTransactionManager();
transactionHelper = new TransactionHelper(this.knownClassesCache.getTransactionManager());
}
}
}
Expand Down Expand Up @@ -252,27 +252,4 @@ private void runCommand(Runnable runnable) {
}
}

/**
* Suspend any ongoing transaction, so that the internal cache writes are committed immediately.
*/
private Transaction suspendTx() {
if (transactionManager == null) {
return null;
}
try {
return transactionManager.suspend();
} catch (SystemException e) {
throw new CacheException("Unable to suspend ongoing transaction", e);
}
}

private void resumeTx(Transaction tx) {
if (tx != null) {
try {
transactionManager.resume(tx);
} catch (InvalidTransactionException | SystemException e) {
throw new CacheException("Unable to resume ongoing transaction", e);
}
}
}
}
@@ -1,11 +1,11 @@
package org.infinispan.query.backend;

import static org.infinispan.query.backend.TransactionHelper.Operation;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

import javax.transaction.Transaction;

import org.hibernate.search.spi.SearchIntegrator;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
Expand Down Expand Up @@ -47,8 +47,7 @@ boolean updateKnownTypesIfNeeded(final Object value) {
final Boolean existingBoolean = queryKnownClasses.get(potentialNewType);
if (existingBoolean != null) {
return existingBoolean;
}
else {
} else {
handleOnDemandRegistration(false, potentialNewType);
Boolean isIndexable = queryKnownClasses.get(potentialNewType);
return isIndexable != null ? isIndexable : false;
Expand Down Expand Up @@ -92,12 +91,13 @@ private void updateSearchFactory(final Class<?>... classes) {
return;
}
final Class<?>[] newtypes = reducedSet.toArray(new Class<?>[reducedSet.size()]);
transactionHelper.runSuspendingTx(new Operation() {
@Override
public void execute() {
searchFactory.addClasses(newtypes);
}
});

Transaction tx = transactionHelper.suspendTxIfExists();
try {
searchFactory.addClasses(newtypes);
} finally {
transactionHelper.resume(tx);
}
for (Class<?> type : newtypes) {
if (hasIndex(type)) {
log.detectedUnknownIndexedEntity(queryKnownClasses.getCacheName(), type.getName());
Expand Down
Expand Up @@ -3,54 +3,48 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;

import org.infinispan.query.logging.Log;
import org.infinispan.util.logging.LogFactory;

/**
* Transaction related helper
*
* @author gustavonalle
* @since 7.0
*/
class TransactionHelper {
public final class TransactionHelper {

private static final Log LOGGER = LogFactory.getLog(TransactionHelper.class, Log.class);

private final TransactionManager transactionManager;

TransactionHelper(final TransactionManager transactionManager) {
public TransactionHelper(final TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

void runSuspendingTx(Operation op) {
final Transaction transaction = suspend();
try {
op.execute();
} finally {
resume(transaction);
public void resume(final Transaction transaction) {
if (transaction != null) {
try {
transactionManager.resume(transaction);
} catch (Exception e) {
throw LOGGER.unableToResumeSuspendedTx(transaction, e);
}
}
}

Transaction suspend() {
public Transaction suspendTxIfExists() {
if (transactionManager == null) {
return null;
}
try {
return transactionManager.suspend();
Transaction tx;
if ((tx = transactionManager.getTransaction()) != null) {
transactionManager.suspend();
}
return tx;
} catch (Exception e) {
//ignored
}
return null;
}

void resume(Transaction transaction) {
if (transaction == null || transactionManager == null) {
return;
throw LOGGER.unableToSuspendTx(e);
}
try {
transactionManager.resume(transaction);
} catch (Exception e) {
//ignored;
}
}

interface Operation {
void execute();
}

}
Expand Up @@ -10,14 +10,14 @@
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.hibernate.search.indexes.spi.DirectoryBasedIndexManager;
import org.infinispan.commons.CacheException;
import org.infinispan.lucene.impl.DirectoryExtensions;
import org.infinispan.query.backend.TransactionHelper;
import org.infinispan.query.logging.Log;
import org.infinispan.util.logging.LogFactory;

/**
* Used to control and override the ownership of the Lucene index lock.
*
* <p>
* Rather than wrapping the Directory or the LockManager directly, we need to wrap the IndexManager
* as the Directory initialization is deferred.
*
Expand All @@ -29,28 +29,28 @@ final class IndexManagerBasedLockController implements IndexLockController {
private static final Log log = LogFactory.getLog(IndexManagerBasedLockController.class, Log.class);

private final DirectoryBasedIndexManager indexManager;
private final TransactionManager tm;
private final TransactionHelper transactionHelper;

public IndexManagerBasedLockController(DirectoryBasedIndexManager indexManager, TransactionManager tm) {
this.indexManager = indexManager;
this.tm = tm;
this.transactionHelper = new TransactionHelper(tm);
}

@Override
public boolean waitForAvailability() {
final Transaction tx = suspendTxIfExists();
final Transaction tx = transactionHelper.suspendTxIfExists();
try {
boolean waitForAvailabilityInternal = waitForAvailabilityInternal();
log.waitingForLockAcquired(waitForAvailabilityInternal);
return waitForAvailabilityInternal;
}
finally {
resumeTx(tx);
} finally {
transactionHelper.resume(tx);
}
}

/**
* This is returning as soon as the lock is available, or after 10 seconds.
*
* @return true if the lock is free at the time of returning.
*/
private boolean waitForAvailabilityInternal() {
Expand All @@ -59,50 +59,21 @@ private boolean waitForAvailabilityInternal() {
Lock lock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
lock.close();
return true;
}
catch (LockObtainFailedException lofe) {
} catch (LockObtainFailedException lofe) {
return false;
}
catch (IOException e) {
} catch (IOException e) {
log.error(e);
return false;
}
}

private void resumeTx(final Transaction tx) {
if (tx != null) {
try {
tm.resume(tx);
} catch (Exception e) {
throw new CacheException("Unable to resume suspended transaction " + tx, e);
}
}
}

private Transaction suspendTxIfExists() {
if (tm==null) {
return null;
}
try {
Transaction tx;
if ((tx = tm.getTransaction()) != null) {
tm.suspend();
}
return tx;
}
catch (Exception e) {
throw new CacheException(e);
}
}

@Override
public void forceLockClear() {
final Transaction tx = suspendTxIfExists();
final Transaction tx = transactionHelper.suspendTxIfExists();
try {
forceLockClearInternal();
}
finally {
resumeTx(tx);
} finally {
transactionHelper.resume(tx);
}
}

Expand Down
9 changes: 9 additions & 0 deletions query/src/main/java/org/infinispan/query/logging/Log.java
Expand Up @@ -9,6 +9,8 @@
import java.io.IOException;
import java.util.List;

import javax.transaction.Transaction;

import org.hibernate.hql.ParsingException;
import org.hibernate.search.backend.LuceneWork;
import org.infinispan.commons.CacheException;
Expand Down Expand Up @@ -143,4 +145,11 @@ public interface Log extends org.infinispan.util.logging.Log {

@Message(value = "Could not locate error handler class %s", id = 14032)
IllegalArgumentException unsupportedErrorHandlerConfigurationValueType(Class<?> type);

@Message(value = "Unable to resume suspended transaction %s", id = 14033)
CacheException unableToResumeSuspendedTx(Transaction transaction, @Cause Throwable cause);

@Message(value = "Unable to suspend transaction", id = 14034)
CacheException unableToSuspendTx(@Cause Throwable cause);

}

0 comments on commit afcb86e

Please sign in to comment.