diff --git a/hibernate-cache/pom.xml b/hibernate-cache/pom.xml index 784c8721654f..8fcb8894bb53 100644 --- a/hibernate-cache/pom.xml +++ b/hibernate-cache/pom.xml @@ -26,5 +26,22 @@ org.hibernate hibernate-core + + + ${project.groupId} + infinispan-core + test-jar + test + + + org.hibernate + hibernate-testing + test + + + org.mockito + mockito-core + test + diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java index a805f0b30c57..329a45149998 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java @@ -12,7 +12,7 @@ import org.infinispan.context.Flag; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; -import org.infinispan.interceptors.base.BaseRpcInterceptor; +import org.infinispan.interceptors.impl.BaseRpcInterceptor; import org.infinispan.jmx.JmxStatisticsExposer; import org.infinispan.jmx.annotations.DataType; import org.infinispan.jmx.annotations.ManagedAttribute; @@ -24,6 +24,7 @@ import org.infinispan.remoting.rpc.RpcOptions; import org.infinispan.remoting.transport.Address; import org.infinispan.statetransfer.StateTransferManager; +import org.infinispan.util.ByteString; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -32,7 +33,7 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp private final AtomicLong invalidations = new AtomicLong(0); protected CommandsFactory commandsFactory; protected StateTransferManager stateTransferManager; - protected String cacheName; + protected ByteString cacheName; protected boolean statisticsEnabled; protected RpcOptions syncRpcOptions; protected RpcOptions asyncRpcOptions; @@ -41,7 +42,7 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager, Cache cache) { this.commandsFactory = commandsFactory; this.stateTransferManager = stateTransferManager; - this.cacheName = cache.getName(); + this.cacheName = ByteString.fromString(cache.getName()); } @Start diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/LockingInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/LockingInterceptor.java index 7e6fd4a5b838..790a9b8da747 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/LockingInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/LockingInterceptor.java @@ -6,11 +6,13 @@ */ package org.hibernate.cache.infinispan.access; +import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.write.DataWriteCommand; -import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; +import org.infinispan.distribution.Ownership; +import org.infinispan.interceptors.InvocationFinallyAction; import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor; -import org.infinispan.util.concurrent.TimeoutException; +import org.infinispan.util.concurrent.locks.LockUtil; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -32,43 +34,41 @@ */ public class LockingInterceptor extends NonTransactionalLockingInterceptor { private static final Log log = LogFactory.getLog(LockingInterceptor.class); - private static final boolean trace = log.isTraceEnabled(); - @Override - protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { - Object returnValue = null; - try { - // Clear any metadata; we'll set them as appropriate in TombstoneCallInterceptor - command.setMetadata(null); + protected final InvocationFinallyAction unlockAllReturnCheckCompletableFutureHandler = new InvocationFinallyAction() { + @Override + public void accept(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) throws Throwable { + lockManager.unlockAll(rCtx); + if (rv instanceof CompletableFuture) { + try { + ((CompletableFuture) rv).join(); + } + catch (CompletionException e) { + throw e.getCause(); + } + } + } + }; - lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command)); + @Override + protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { + try { + if (log.isTraceEnabled()) { + Ownership ownership = LockUtil.getLockOwnership( command.getKey(), cdl ); + log.tracef( "Am I owner for key=%s ? %s", command.getKey(), ownership); + } + + if (ctx.getLockOwner() == null) { + ctx.setLockOwner( command.getCommandInvocationId() ); + } + + lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command)); + } + catch (Throwable t) { + lockManager.unlockAll(ctx); + throw t; + } + return invokeNextAndFinally(ctx, command, unlockAllReturnCheckCompletableFutureHandler); + } - returnValue = invokeNextInterceptor(ctx, command); - return returnValue; - } - catch (TimeoutException e) { - if (!ctx.isOriginLocal() && command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) { - // FAIL_SILENTLY flag is not replicated to remote nodes and zero acquisition timeouts cause - // very noisy logs. - if (trace) { - log.tracef("Silently ignoring exception", e); - } - return null; - } - else { - throw e; - } - } - finally { - lockManager.unlockAll(ctx); - if (returnValue instanceof CompletableFuture) { - try { - ((CompletableFuture) returnValue).join(); - } - catch (CompletionException e) { - throw e.getCause(); - } - } - } - } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java index 1306c440208e..8bff0cec1859 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java @@ -8,6 +8,7 @@ import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; +import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.PutKeyValueCommand; @@ -15,12 +16,16 @@ import org.infinispan.commands.write.RemoveCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.WriteCommand; +import org.infinispan.commons.util.EnumUtil; import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; -import org.infinispan.interceptors.InvalidationInterceptor; +import org.infinispan.interceptors.InvocationFinallyFunction; +import org.infinispan.interceptors.impl.InvalidationInterceptor; import org.infinispan.jmx.annotations.MBean; import org.infinispan.util.concurrent.locks.RemoteLockCommand; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; import java.util.Collections; @@ -40,6 +45,7 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor { private CacheCommandInitializer commandInitializer; private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(InvalidationInterceptor.class); + private static final Log ispnLog = LogFactory.getLog(NonTxInvalidationInterceptor.class); public NonTxInvalidationInterceptor(PutFromLoadValidator putFromLoadValidator) { this.putFromLoadValidator = putFromLoadValidator; @@ -53,7 +59,7 @@ public void injectDependencies(CacheCommandInitializer commandInitializer) { @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) { - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } else { boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner()); @@ -63,12 +69,8 @@ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand if (!putFromLoadValidator.beginInvalidatingWithPFER(command.getKeyLockOwner(), command.getKey(), command.getValue())) { log.failedInvalidatePendingPut(command.getKey(), cacheName); } - RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlags()); - Object retval = invokeNextInterceptor(ctx, removeCommand); - if (command.isSuccessful()) { - invalidateAcrossCluster(command, isTransactional, command.getKey()); - } - return retval; + RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlagsBitSet()); + return invokeNextAndHandle( ctx, removeCommand, new InvalidateAndReturnFunction(isTransactional, command.getKeyLockOwner()) ); } } @@ -88,16 +90,12 @@ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) t else { log.trace("This is an eviction, not invalidating anything"); } - Object retval = invokeNextInterceptor(ctx, command); - if (command.isSuccessful()) { - invalidateAcrossCluster(command, isTransactional, command.getKey()); - } - return retval; + return invokeNextAndHandle( ctx, command, new InvalidateAndReturnFunction(isTransactional, command.getKeyLockOwner()) ); } @Override public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable { - Object retval = invokeNextInterceptor(ctx, command); + Object retval = invokeNext(ctx, command); if (!isLocalModeForced(command)) { // just broadcast the clear command - this is simplest! if (ctx.isOriginLocal()) { @@ -112,17 +110,18 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t throw new UnsupportedOperationException("Unexpected putAll"); } - private void invalidateAcrossCluster(T command, boolean isTransactional, Object key) throws Throwable { + private void invalidateAcrossCluster( + T command, boolean isTransactional, Object key, Object keyLockOwner) throws Throwable { // increment invalidations counter if statistics maintained incrementInvalidations(); InvalidateCommand invalidateCommand; if (!isLocalModeForced(command)) { if (isTransactional) { invalidateCommand = commandInitializer.buildBeginInvalidationCommand( - Collections.emptySet(), new Object[] { key }, command.getKeyLockOwner()); + EnumUtil.EMPTY_BIT_SET, new Object[] { key }, keyLockOwner); } else { - invalidateCommand = commandsFactory.buildInvalidateCommand(Collections.emptySet(), new Object[] { key }); + invalidateCommand = commandsFactory.buildInvalidateCommand(EnumUtil.EMPTY_BIT_SET, new Object[] {key }); } if (log.isDebugEnabled()) { log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand); @@ -132,4 +131,31 @@ private void invalidateAcrossCluste } } + @Override + protected Log getLog() { + return ispnLog; + } + + class InvalidateAndReturnFunction implements InvocationFinallyFunction { + + final boolean isTransactional; + final Object keyLockOwner; + + InvalidateAndReturnFunction(boolean isTransactional, Object keyLockOwner) { + this.isTransactional = isTransactional; + this.keyLockOwner = keyLockOwner; + } + + @Override + public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) + throws Throwable { + RemoveCommand removeCmd = (RemoveCommand) rCommand; + if ( removeCmd.isSuccessful()) { + invalidateAcrossCluster(removeCmd, isTransactional, removeCmd.getKey(), keyLockOwner); + } + return rv; + } + + } + } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java index 114cdbb9a99c..72642e1c2562 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java @@ -19,6 +19,7 @@ import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; +import org.infinispan.interceptors.BaseCustomAsyncInterceptor; import org.infinispan.interceptors.base.BaseCustomInterceptor; import org.infinispan.remoting.inboundhandler.DeliverOrder; import org.infinispan.remoting.rpc.ResponseMode; @@ -26,6 +27,7 @@ import org.infinispan.remoting.rpc.RpcOptions; import org.infinispan.remoting.transport.Address; import org.infinispan.statetransfer.StateTransferManager; +import org.infinispan.util.ByteString; import java.util.List; @@ -37,16 +39,16 @@ * * @author Radim Vansa <rvansa@redhat.com> */ -public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor { +public class NonTxPutFromLoadInterceptor extends BaseCustomAsyncInterceptor { private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NonTxPutFromLoadInterceptor.class); - private final String cacheName; + private final ByteString cacheName; private final PutFromLoadValidator putFromLoadValidator; private CacheCommandInitializer commandInitializer; private RpcManager rpcManager; private StateTransferManager stateTransferManager; private RpcOptions asyncUnordered; - public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) { + public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, ByteString cacheName) { this.putFromLoadValidator = putFromLoadValidator; this.cacheName = cacheName; } @@ -70,7 +72,7 @@ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand co putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key); } } - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } public void endInvalidating(Object key, Object lockOwner, boolean successful) { diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java index 99adda22b861..1f773b925efd 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java @@ -25,13 +25,17 @@ import org.hibernate.resource.transaction.spi.TransactionCoordinator; import org.infinispan.AdvancedCache; +import org.infinispan.Cache; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.interceptors.EntryWrappingInterceptor; -import org.infinispan.interceptors.InvalidationInterceptor; +import org.infinispan.interceptors.AsyncInterceptor; +import org.infinispan.interceptors.AsyncInterceptorChain; import org.infinispan.interceptors.base.CommandInterceptor; +import org.infinispan.interceptors.impl.EntryWrappingInterceptor; +import org.infinispan.interceptors.impl.InvalidationInterceptor; import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.util.ByteString; /** * Encapsulates logic to allow a {@link InvalidationCacheAccessDelegate} to determine @@ -95,7 +99,7 @@ public class PutFromLoadValidator { * Registry of expected, future, isPutValid calls. If a key+owner is registered in this map, it * is not a "naked put" and is allowed to proceed. */ - private final ConcurrentMap pendingPuts; + private final Cache pendingPuts; /** * Main cache where the entities/collections are stored. This is not modified from within this class. @@ -147,7 +151,7 @@ public PutFromLoadValidator(AdvancedCache cache, InfinispanRegionFactory regionF ConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); configurationBuilder.read(pendingPutsConfiguration); configurationBuilder.dataContainer().keyEquivalence(cacheConfiguration.dataContainer().keyEquivalence()); - String pendingPutsName = cache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE; + String pendingPutsName = getPendingPutsName( cache ); cacheManager.defineConfiguration(pendingPutsName, configurationBuilder.build()); if (pendingPutsConfiguration.expiration() != null && pendingPutsConfiguration.expiration().maxIdle() > 0) { @@ -169,19 +173,26 @@ public PutFromLoadValidator(AdvancedCache cache, InfinispanRegionFactory regionF this.cache = cache; this.pendingPuts = cacheManager.getCache(pendingPutsName); + // The session factory might have been closed but it uses a pre-existing cache manager, so start just in case + this.pendingPuts.start(); } + private String getPendingPutsName(AdvancedCache cache) { + return cache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE; + } + /** * Besides the call from constructor, this should be called only from tests when mocking the validator. */ public static void addToCache(AdvancedCache cache, PutFromLoadValidator validator) { - List interceptorChain = cache.getInterceptorChain(); - log.debug("Interceptor chain was: " + interceptorChain); - int position = 0; + AsyncInterceptorChain chain = cache.getAsyncInterceptorChain(); + List interceptors = chain.getInterceptors(); + log.debug("Interceptor chain was: " + interceptors); + int position = 0; // add interceptor before uses exact match, not instanceof match int invalidationPosition = 0; int entryWrappingPosition = 0; - for (CommandInterceptor ci : interceptorChain) { + for (AsyncInterceptor ci : interceptors) { if (ci instanceof InvalidationInterceptor) { invalidationPosition = position; } @@ -195,24 +206,24 @@ public static void addToCache(AdvancedCache cache, PutFromLoadValidator validato cache.removeInterceptor(invalidationPosition); TxInvalidationInterceptor txInvalidationInterceptor = new TxInvalidationInterceptor(); cache.getComponentRegistry().registerComponent(txInvalidationInterceptor, TxInvalidationInterceptor.class); - cache.addInterceptor(txInvalidationInterceptor, invalidationPosition); + chain.addInterceptor(txInvalidationInterceptor, invalidationPosition); // Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before // wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation // would not commit the entry removal (as during wrap the entry was not in cache) - TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(validator, cache.getName()); - cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class); - cache.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition); + TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(validator, ByteString.fromString(cache.getName())); + cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class); + chain.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition); } else { cache.removeInterceptor(invalidationPosition); NonTxInvalidationInterceptor nonTxInvalidationInterceptor = new NonTxInvalidationInterceptor(validator); cache.getComponentRegistry().registerComponent(nonTxInvalidationInterceptor, NonTxInvalidationInterceptor.class); - cache.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition); + chain.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition); - NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(validator, cache.getName()); + NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(validator, ByteString.fromString(cache.getName())); cache.getComponentRegistry().registerComponent(nonTxPutFromLoadInterceptor, NonTxPutFromLoadInterceptor.class); - cache.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition); + chain.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition); validator.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor; } log.debug("New interceptor chain is: " + cache.getInterceptorChain()); @@ -230,18 +241,19 @@ public static void addToCache(AdvancedCache cache, PutFromLoadValidator validato public static PutFromLoadValidator removeFromCache(AdvancedCache cache) { cache.removeInterceptor(TxPutFromLoadInterceptor.class); cache.removeInterceptor(NonTxPutFromLoadInterceptor.class); - for (Object i : cache.getInterceptorChain()) { + AsyncInterceptorChain chain = cache.getAsyncInterceptorChain(); + for (Object i : chain.getInterceptors()) { if (i instanceof NonTxInvalidationInterceptor) { InvalidationInterceptor invalidationInterceptor = new InvalidationInterceptor(); cache.getComponentRegistry().registerComponent(invalidationInterceptor, InvalidationInterceptor.class); - cache.addInterceptorBefore(invalidationInterceptor, NonTxInvalidationInterceptor.class); + chain.addInterceptorBefore(invalidationInterceptor, NonTxInvalidationInterceptor.class); cache.removeInterceptor(NonTxInvalidationInterceptor.class); break; } else if (i instanceof TxInvalidationInterceptor) { InvalidationInterceptor invalidationInterceptor = new InvalidationInterceptor(); cache.getComponentRegistry().registerComponent(invalidationInterceptor, InvalidationInterceptor.class); - cache.addInterceptorBefore(invalidationInterceptor, TxInvalidationInterceptor.class); + chain.addInterceptorBefore(invalidationInterceptor, TxInvalidationInterceptor.class); cache.removeInterceptor(TxInvalidationInterceptor.class); break; } @@ -258,6 +270,11 @@ public void resetCurrentSession() { currentSession.remove(); } + public void destroy() { + pendingPuts.stop(); + pendingPuts.getCacheManager().undefineConfiguration( pendingPuts.getName() ); + } + /** * Marker for lock acquired in {@link #acquirePutFromLoadLock(SharedSessionContractImplementor, Object, long)} */ @@ -654,6 +671,12 @@ private static String lockOwnerToString(Object lockOwner) { return lockOwner instanceof SharedSessionContractImplementor ? "Session#" + lockOwner.hashCode() : lockOwner.toString(); } + public void remotePendingPutsCache() { + String pendingPutsName = getPendingPutsName( cache ); + EmbeddedCacheManager cm = cache.getCacheManager(); + cm.removeCache( pendingPutsName ); + } + /** * Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a * single put is pending for a given key. diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TombstoneCallInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TombstoneCallInterceptor.java index 8aadf4d757cc..bfa4f5b7709e 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TombstoneCallInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TombstoneCallInterceptor.java @@ -17,14 +17,16 @@ import org.infinispan.commons.logging.Log; import org.infinispan.commons.logging.LogFactory; import org.infinispan.commons.util.CloseableIterable; +import org.infinispan.commons.util.CloseableIterator; +import org.infinispan.commons.util.Closeables; import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.MVCCEntry; import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; -import org.infinispan.filter.NullValueConverter; -import org.infinispan.interceptors.CallInterceptor; +import org.infinispan.filter.CacheFilters; +import org.infinispan.interceptors.DDAsyncInterceptor; import org.infinispan.metadata.EmbeddedMetadata; import org.infinispan.metadata.Metadata; @@ -41,7 +43,7 @@ * * @author Radim Vansa <rvansa@redhat.com> */ -public class TombstoneCallInterceptor extends CallInterceptor { +public class TombstoneCallInterceptor extends DDAsyncInterceptor { private static final Log log = LogFactory.getLog(TombstoneCallInterceptor.class); private static final UUID ZERO = new UUID(0, 0); @@ -70,20 +72,17 @@ public void start() { @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { MVCCEntry e = (MVCCEntry) ctx.lookupEntry(command.getKey()); - if (e == null) { - return null; - } log.tracef("In cache %s(%d) applying update %s to %s", cache.getName(), region.getLastRegionInvalidation(), command.getValue(), e.getValue()); try { Object value = command.getValue(); if (value instanceof TombstoneUpdate) { - return handleTombstoneUpdate(e, (TombstoneUpdate) value, command); + return handleTombstoneUpdate(ctx, e, (TombstoneUpdate) value, command); } else if (value instanceof Tombstone) { - return handleTombstone(e, (Tombstone) value); + return handleTombstone(e, (Tombstone) value, command); } else if (value instanceof FutureUpdate) { - return handleFutureUpdate(e, (FutureUpdate) value, command); + return handleFutureUpdate(ctx, e, (FutureUpdate) value, command); } else { return super.visitPutKeyValueCommand(ctx, command); @@ -94,13 +93,13 @@ else if (value instanceof FutureUpdate) { } } - private Object handleFutureUpdate(MVCCEntry e, FutureUpdate futureUpdate, PutKeyValueCommand command) { + private Object handleFutureUpdate(InvocationContext ctx, MVCCEntry e, FutureUpdate futureUpdate, PutKeyValueCommand command) { Object storedValue = e.getValue(); if (storedValue instanceof Tombstone) { // Note that the update has to keep tombstone even if the transaction was unsuccessful; // before write we have removed the value and we have to protect the entry against stale putFromLoads Tombstone tombstone = (Tombstone) storedValue; - setValue(e, tombstone.applyUpdate(futureUpdate.getUuid(), futureUpdate.getTimestamp(), futureUpdate.getValue())); + setValue(e, tombstone.applyUpdate(futureUpdate.getUuid(), futureUpdate.getTimestamp(), futureUpdate.getValue()), command); } else { @@ -108,48 +107,48 @@ private Object handleFutureUpdate(MVCCEntry e, FutureUpdate futureUpdate, PutKey // We need to first execute the async update and then local one, because if we're on the primary // owner the local future update would fail the async one. // TODO: There is some discrepancy with TombstoneUpdate handling which does not fail the update - setFailed(command); + return setFailed(ctx, command); } return null; } - private Object handleTombstone(MVCCEntry e, Tombstone tombstone) { + private Object handleTombstone(MVCCEntry e, Tombstone tombstone, PutKeyValueCommand command) { // Tombstones always come with lifespan in metadata Object storedValue = e.getValue(); if (storedValue instanceof Tombstone) { - setValue(e, ((Tombstone) storedValue).merge(tombstone)); + setValue(e, ((Tombstone) storedValue).merge(tombstone), command); } else { - setValue(e, tombstone); + setValue(e, tombstone, command); } return null; } - protected Object handleTombstoneUpdate(MVCCEntry e, TombstoneUpdate tombstoneUpdate, PutKeyValueCommand command) { + protected Object handleTombstoneUpdate(InvocationContext ctx, MVCCEntry e, TombstoneUpdate tombstoneUpdate, PutKeyValueCommand command) { Object storedValue = e.getValue(); Object value = tombstoneUpdate.getValue(); if (value == null) { // eviction if (storedValue == null || storedValue instanceof Tombstone) { - setFailed(command); + return setFailed(ctx, command); } else { // We have to keep Tombstone, because otherwise putFromLoad could insert a stale entry // (after it has been already updated and *then* evicted) - setValue(e, new Tombstone(ZERO, tombstoneUpdate.getTimestamp())); + setValue(e, new Tombstone(ZERO, tombstoneUpdate.getTimestamp()), command); } } else if (storedValue instanceof Tombstone) { Tombstone tombstone = (Tombstone) storedValue; if (tombstone.getLastTimestamp() < tombstoneUpdate.getTimestamp()) { - setValue(e, value); + setValue(e, value, command); } } else if (storedValue == null) { // async putFromLoads shouldn't cross the invalidation timestamp if (region.getLastRegionInvalidation() < tombstoneUpdate.getTimestamp()) { - setValue(e, value); + setValue(e, value, command); } } else { @@ -159,7 +158,7 @@ else if (storedValue == null) { return null; } - private Object setValue(MVCCEntry e, Object value) { + private Object setValue(MVCCEntry e, Object value, PutKeyValueCommand command) { if (e.isRemoved()) { e.setRemoved(false); e.setCreated(true); @@ -169,22 +168,23 @@ private Object setValue(MVCCEntry e, Object value) { e.setChanged(true); } if (value instanceof Tombstone) { + command.setMetadata( expiringMetadata ); e.setMetadata(expiringMetadata); } else { + command.setMetadata( defaultMetadata ); e.setMetadata(defaultMetadata); } return e.setValue(value); } - private void setFailed(PutKeyValueCommand command) { + private Object setFailed(InvocationContext ctx, PutKeyValueCommand command) { // This sets command to be unsuccessful, since we don't want to replicate it to backup owner command.setValueMatcher(ValueMatcher.MATCH_NEVER); - try { - command.perform(null); - } - catch (Throwable ignored) { - } + return invokeNextAndExceptionally( ctx, command, (rCtx, rCommand, throwable) -> { + // Ignore + return null; + } ); } @Override @@ -196,17 +196,18 @@ public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throw decoratedCache = decoratedCache.withFlags(flags.toArray(new Flag[flags.size()])); } // In non-transactional caches we don't care about context - CloseableIterable> iterable = decoratedCache - .filterEntries(Tombstone.EXCLUDE_TOMBSTONES).converter(NullValueConverter.getInstance()); + CloseableIterator> it = Closeables.iterator(decoratedCache.cacheEntrySet().stream() + .filter(CacheFilters.predicate(Tombstone.EXCLUDE_TOMBSTONES))); try { - for (CacheEntry entry : iterable) { + while (it.hasNext()) { + CacheEntry entry = it.next(); if (size++ == Integer.MAX_VALUE) { return Integer.MAX_VALUE; } } } finally { - iterable.close(); + it.close(); } return size; } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java index 92c6284afcf3..ab86c83cd73e 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java @@ -26,13 +26,15 @@ import org.infinispan.commands.write.RemoveCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.WriteCommand; -import org.infinispan.commons.util.InfinispanCollections; +import org.infinispan.commons.util.EnumUtil; import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.context.impl.LocalTxInvocationContext; import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.jmx.annotations.MBean; import org.infinispan.remoting.transport.Address; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; /** * This interceptor acts as a replacement to the replication interceptor when the CacheImpl is configured with @@ -50,13 +52,14 @@ @MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.") public class TxInvalidationInterceptor extends BaseInvalidationInterceptor { private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog( TxInvalidationInterceptor.class ); + private static final Log ispnLog = LogFactory.getLog(TxInvalidationInterceptor.class); @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { if ( !isPutForExternalRead( command ) ) { return handleInvalidate( ctx, command, command.getKey() ); } - return invokeNextInterceptor( ctx, command ); + return invokeNext( ctx, command ); } @Override @@ -71,14 +74,16 @@ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) t @Override public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable { - Object retval = invokeNextInterceptor( ctx, command ); - if ( !isLocalModeForced( command ) ) { - // just broadcast the clear command - this is simplest! - if ( ctx.isOriginLocal() ) { - rpcManager.invokeRemotely( getMembers(), command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions ); - } - } - return retval; + return invokeNextAndHandle( ctx, command, (rCtx, rCommand, rv, throwable) -> { + FlagAffectedCommand flagCmd = (FlagAffectedCommand) rCommand; + if ( !isLocalModeForced( flagCmd ) ) { + // just broadcast the clear command - this is simplest! + if ( rCtx.isOriginLocal() ) { + rpcManager.invokeRemotely( getMembers(), rCommand, isSynchronous(flagCmd) ? syncRpcOptions : asyncRpcOptions ); + } + } + return rv; + } ); } @Override @@ -88,26 +93,29 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t @Override public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { - Object retval = invokeNextInterceptor( ctx, command ); - log.tracef( "Entering InvalidationInterceptor's prepare phase. Ctx flags are empty" ); - // fetch the modifications before the transaction is committed (and thus removed from the txTable) - if ( shouldInvokeRemoteTxCommand( ctx ) ) { - if ( ctx.getTransaction() == null ) { - throw new IllegalStateException( "We must have an associated transaction" ); - } - - List mods = Arrays.asList( command.getModifications() ); - broadcastInvalidateForPrepare( mods, ctx ); - } - else { - log.tracef( "Nothing to invalidate - no modifications in the transaction." ); - } - return retval; + return invokeNextAndHandle( ctx, command, (rCtx, rCommand, rv, throwable) -> { + log.tracef( "Entering InvalidationInterceptor's prepare phase. Ctx flags are empty" ); + // fetch the modifications before the transaction is committed (and thus removed from the txTable) + TxInvocationContext txCtx = (TxInvocationContext) rCtx; + if ( shouldInvokeRemoteTxCommand( txCtx ) ) { + if ( txCtx.getTransaction() == null ) { + throw new IllegalStateException( "We must have an associated transaction" ); + } + + PrepareCommand prepareCmd = (PrepareCommand) rCommand; + List mods = Arrays.asList( prepareCmd.getModifications() ); + broadcastInvalidateForPrepare( mods, txCtx ); + } + else { + log.tracef( "Nothing to invalidate - no modifications in the transaction." ); + } + return rv; + } ); } @Override public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable { - Object retVal = invokeNextInterceptor( ctx, command ); + Object retVal = invokeNext( ctx, command ); if ( ctx.isOriginLocal() ) { //unlock will happen async as it is a best effort boolean sync = !command.isUnlock(); @@ -119,15 +127,17 @@ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlComman } private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object... keys) throws Throwable { - Object retval = invokeNextInterceptor( ctx, command ); - if ( command.isSuccessful() && !ctx.isInTxScope() ) { - if ( keys != null && keys.length != 0 ) { - if ( !isLocalModeForced( command ) ) { - invalidateAcrossCluster( isSynchronous( command ), keys, ctx ); - } - } - } - return retval; + return invokeNextAndHandle( ctx, command, (rCtx, rCommand, rv, throwable) -> { + WriteCommand writeCmd = (WriteCommand) rCommand; + if ( writeCmd.isSuccessful() && !rCtx.isInTxScope() ) { + if ( keys != null && keys.length != 0 ) { + if ( !isLocalModeForced( writeCmd ) ) { + invalidateAcrossCluster( isSynchronous( writeCmd ), keys, rCtx ); + } + } + } + return rv; + } ); } private void broadcastInvalidateForPrepare(List modifications, InvocationContext ctx) throws Throwable { @@ -163,7 +173,12 @@ else if ( filterVisitor.containsLocalModeFlag ) { } } - public static class InvalidationFilterVisitor extends AbstractVisitor { + @Override + protected Log getLog() { + return ispnLog; + } + + public static class InvalidationFilterVisitor extends AbstractVisitor { Set result; public boolean containsPutForExternalRead = false; @@ -204,7 +219,7 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t private void invalidateAcrossCluster(boolean synchronous, Object[] keys, InvocationContext ctx) throws Throwable { // increment invalidations counter if statistics maintained incrementInvalidations(); - final InvalidateCommand invalidateCommand = commandsFactory.buildInvalidateCommand( InfinispanCollections.emptySet(), keys ); + final InvalidateCommand invalidateCommand = commandsFactory.buildInvalidateCommand( EnumUtil.EMPTY_BIT_SET, keys ); if ( log.isDebugEnabled() ) { log.debug( "Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand ); } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java index 9effae61055e..50ec0f4f03fe 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java @@ -7,6 +7,7 @@ package org.hibernate.cache.infinispan.access; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -28,7 +29,7 @@ import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; -import org.infinispan.interceptors.base.BaseRpcInterceptor; +import org.infinispan.interceptors.impl.BaseRpcInterceptor; import org.infinispan.remoting.inboundhandler.DeliverOrder; import org.infinispan.remoting.rpc.ResponseMode; import org.infinispan.remoting.rpc.RpcManager; @@ -36,6 +37,9 @@ import org.infinispan.remoting.transport.Address; import org.infinispan.statetransfer.StateTransferManager; import org.infinispan.transaction.xa.GlobalTransaction; +import org.infinispan.util.ByteString; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; /** * Intercepts transactions in Infinispan, calling {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} @@ -47,15 +51,16 @@ */ class TxPutFromLoadInterceptor extends BaseRpcInterceptor { private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(TxPutFromLoadInterceptor.class); + private static final Log ispnLog = LogFactory.getLog(TxPutFromLoadInterceptor.class); private PutFromLoadValidator putFromLoadValidator; - private final String cacheName; + private final ByteString cacheName; private RpcManager rpcManager; private CacheCommandInitializer cacheCommandInitializer; private DataContainer dataContainer; private StateTransferManager stateTransferManager; private RpcOptions asyncUnordered; - public TxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) { + public TxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, ByteString cacheName) { this.putFromLoadValidator = putFromLoadValidator; this.cacheName = cacheName; } @@ -89,13 +94,13 @@ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand if (!command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) { beginInvalidating(ctx, command.getKey()); } - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } @Override public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { beginInvalidating(ctx, command.getKey()); - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } // We need to intercept PrepareCommand, not InvalidateCommand since the interception takes @@ -118,7 +123,7 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman } else { for (WriteCommand wc : command.getModifications()) { - Set keys = wc.getAffectedKeys(); + Collection keys = wc.getAffectedKeys(); if (log.isTraceEnabled()) { log.tracef("Invalidating keys %s with lock owner %s", keys, ctx.getLockOwner()); } @@ -127,7 +132,7 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman } } } - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } @Override @@ -177,7 +182,12 @@ protected Object endInvalidationAndInvokeNextInterceptor(TxInvocationContext } } finally { - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } } + + @Override + protected Log getLog() { + return ispnLog; + } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedDistributionInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedDistributionInterceptor.java index 40965b34cb9e..80a68dd4d47b 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedDistributionInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedDistributionInterceptor.java @@ -7,6 +7,7 @@ package org.hibernate.cache.infinispan.access; import org.infinispan.commands.write.PutKeyValueCommand; +import org.infinispan.commands.write.WriteCommand; import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.distribution.DistributionManager; @@ -54,7 +55,7 @@ public void start() { public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { if (command.hasFlag(Flag.CACHE_MODE_LOCAL)) { // for state-transfer related writes - return invokeNextInterceptor(ctx, command); + return invokeNext(ctx, command); } int commandTopologyId = command.getTopologyId(); int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId(); @@ -67,24 +68,46 @@ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand List
owners = null; if (writeCH.isReplicated()) { // local result is always ignored - invokeNextInterceptor(ctx, command); + List
finalOwners = owners; + return invokeNextAndHandle( ctx, command, (rCtx, rCommand, rv, throwable) -> { + WriteCommand writeCmd = (WriteCommand) rCommand; + if (rCtx.isOriginLocal() && writeCmd.isSuccessful()) { + // This is called with the entry locked. In order to avoid deadlocks we must not wait for RPC while + // holding the lock, therefore we'll return a future and wait for it in LockingInterceptor after + // unlocking (and committing) the entry. + return rpcManager.invokeRemotelyAsync( + finalOwners, writeCmd, isSynchronous( writeCmd ) ? syncRpcOptions : asyncRpcOptions); + } + return null; + } ); } else { owners = writeCH.locateOwners(command.getKey()); if (owners.contains(rpcManager.getAddress())) { - invokeNextInterceptor(ctx, command); + List
finalOwners = owners; + return invokeNextAndHandle( ctx, command, (rCtx, rCommand, rv, throwable) -> { + WriteCommand writeCmd = (WriteCommand) rCommand; + if (rCtx.isOriginLocal() && writeCmd.isSuccessful()) { + // This is called with the entry locked. In order to avoid deadlocks we must not wait for RPC while + // holding the lock, therefore we'll return a future and wait for it in LockingInterceptor after + // unlocking (and committing) the entry. + return rpcManager.invokeRemotelyAsync( + finalOwners, writeCmd, isSynchronous( writeCmd ) ? syncRpcOptions : asyncRpcOptions); + } + return null; + } ); } else { log.tracef("Not invoking %s on %s since it is not an owner", command, rpcManager.getAddress()); + if (ctx.isOriginLocal() && command.isSuccessful()) { + // This is called with the entry locked. In order to avoid deadlocks we must not wait for RPC while + // holding the lock, therefore we'll return a future and wait for it in LockingInterceptor after + // unlocking (and committing) the entry. + return rpcManager.invokeRemotelyAsync(owners, command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions); + } + return null; } } - if (ctx.isOriginLocal() && command.isSuccessful()) { - // This is called with the entry locked. In order to avoid deadlocks we must not wait for RPC while - // holding the lock, therefore we'll return a future and wait for it in LockingInterceptor after - // unlocking (and committing) the entry. - return rpcManager.invokeRemotelyAsync(owners, command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions); - } - return null; } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedReplicationLogic.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedReplicationLogic.java new file mode 100644 index 000000000000..39f2f8776a16 --- /dev/null +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/UnorderedReplicationLogic.java @@ -0,0 +1,16 @@ +package org.hibernate.cache.infinispan.access; + +import org.infinispan.commands.FlagAffectedCommand; +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.locking.ClusteringDependentLogic; + +public class UnorderedReplicationLogic extends ClusteringDependentLogic.ReplicationLogic { + + @Override + public Commit commitType( + FlagAffectedCommand command, InvocationContext ctx, Object key, boolean removed) { + Commit commit = super.commitType( command, ctx, key, removed ); + return commit == Commit.NO_COMMIT ? Commit.COMMIT_LOCAL : commit; + } + +} diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java index 91c654c999eb..163310ac876d 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java @@ -7,19 +7,18 @@ package org.hibernate.cache.infinispan.access; import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion; +import org.hibernate.cache.infinispan.util.FilterNullValueConverter; import org.hibernate.cache.infinispan.util.VersionedEntry; import org.infinispan.AdvancedCache; import org.infinispan.commands.read.SizeCommand; import org.infinispan.commands.write.PutKeyValueCommand; -import org.infinispan.commons.util.CloseableIterable; -import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.MVCCEntry; import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; -import org.infinispan.filter.NullValueConverter; -import org.infinispan.interceptors.CallInterceptor; +import org.infinispan.filter.CacheFilters; +import org.infinispan.interceptors.DDAsyncInterceptor; import org.infinispan.metadata.EmbeddedMetadata; import org.infinispan.metadata.Metadata; @@ -35,7 +34,7 @@ * * @author Radim Vansa <rvansa@redhat.com> */ -public class VersionedCallInterceptor extends CallInterceptor { +public class VersionedCallInterceptor extends DDAsyncInterceptor { private final Comparator versionComparator; private final Metadata expiringMetadata; private AdvancedCache cache; @@ -99,7 +98,7 @@ else if (ve.getValue() instanceof org.hibernate.cache.spi.entry.CacheEntry) { if (newVersion == null) { // eviction or post-commit removal: we'll store it with given timestamp - setValue(e, newValue, expiringMetadata); + setValue(e, newValue, expiringMetadata, command); return null; } if (oldVersion == null) { @@ -111,21 +110,21 @@ else if (ve.getValue() instanceof org.hibernate.cache.spi.entry.CacheEntry) { assert oldValue == null; } else { - setValue(e, actualNewValue, defaultMetadata); + setValue(e, actualNewValue, defaultMetadata, command); } return null; } int compareResult = versionComparator.compare(newVersion, oldVersion); if (isRemoval && compareResult >= 0) { - setValue(e, actualNewValue, expiringMetadata); + setValue(e, actualNewValue, expiringMetadata, command); } else if (compareResult > 0) { - setValue(e, actualNewValue, defaultMetadata); + setValue(e, actualNewValue, defaultMetadata, command); } return null; } - private Object setValue(MVCCEntry e, Object value, Metadata metadata) { + private Object setValue(MVCCEntry e, Object value, Metadata metadata, PutKeyValueCommand command) { if (e.isRemoved()) { e.setRemoved(false); e.setCreated(true); @@ -134,6 +133,7 @@ private Object setValue(MVCCEntry e, Object value, Metadata metadata) { else { e.setChanged(true); } + command.setMetadata( metadata ); e.setMetadata(metadata); return e.setValue(value); } @@ -147,18 +147,8 @@ public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throw decoratedCache = decoratedCache.withFlags(flags.toArray(new Flag[flags.size()])); } // In non-transactional caches we don't care about context - CloseableIterable> iterable = decoratedCache - .filterEntries(VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE).converter(NullValueConverter.getInstance()); - try { - for (CacheEntry entry : iterable) { - if (size++ == Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } - } - } - finally { - iterable.close(); - } - return size; + return (int) CacheFilters.filterAndConvert(decoratedCache.entrySet().stream(), + new FilterNullValueConverter(VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE)) + .count(); } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/impl/BaseTransactionalDataRegion.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/impl/BaseTransactionalDataRegion.java index 570620d998d0..3e3248c5c556 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/impl/BaseTransactionalDataRegion.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/impl/BaseTransactionalDataRegion.java @@ -16,6 +16,7 @@ import org.hibernate.cache.infinispan.access.TombstoneCallInterceptor; import org.hibernate.cache.infinispan.access.TxInvalidationCacheAccessDelegate; import org.hibernate.cache.infinispan.access.UnorderedDistributionInterceptor; +import org.hibernate.cache.infinispan.access.UnorderedReplicationLogic; import org.hibernate.cache.infinispan.access.VersionedCallInterceptor; import org.hibernate.cache.infinispan.util.Caches; import org.hibernate.cache.infinispan.util.FutureUpdate; @@ -28,6 +29,7 @@ import org.hibernate.cache.spi.access.AccessType; import org.infinispan.AdvancedCache; +import org.infinispan.commons.CacheException; import org.infinispan.commons.util.CloseableIterator; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; @@ -35,11 +37,14 @@ import org.infinispan.expiration.ExpirationManager; import org.infinispan.expiration.impl.ClusterExpirationManager; import org.infinispan.expiration.impl.ExpirationManagerImpl; +import org.infinispan.factories.ComponentRegistry; import org.infinispan.filter.KeyValueFilter; -import org.infinispan.interceptors.CallInterceptor; -import org.infinispan.interceptors.EntryWrappingInterceptor; +import org.infinispan.interceptors.AsyncInterceptorChain; import org.infinispan.interceptors.base.CommandInterceptor; import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor; +import org.infinispan.interceptors.distribution.TriangleDistributionInterceptor; +import org.infinispan.interceptors.impl.CallInterceptor; +import org.infinispan.interceptors.locking.ClusteringDependentLogic; import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor; import javax.transaction.TransactionManager; @@ -125,7 +130,7 @@ protected synchronized AccessDelegate createAccessDelegate(AccessType accessType CacheMode cacheMode = cache.getCacheConfiguration().clustering().cacheMode(); if (accessType == AccessType.NONSTRICT_READ_WRITE) { - prepareForVersionedEntries(); + prepareForVersionedEntries(cacheMode); return new NonStrictAccessDelegate(this); } if (cacheMode.isDistributed() || cacheMode.isReplicated()) { @@ -152,7 +157,7 @@ protected void prepareForValidation() { strategy = Strategy.VALIDATION; } - protected void prepareForVersionedEntries() { + protected void prepareForVersionedEntries(CacheMode cacheMode) { if (strategy != null) { assert strategy == Strategy.VERSIONED_ENTRIES; return; @@ -161,11 +166,17 @@ protected void prepareForVersionedEntries() { replaceCommonInterceptors(); replaceExpirationManager(); - cache.removeInterceptor(CallInterceptor.class); - VersionedCallInterceptor tombstoneCallInterceptor = new VersionedCallInterceptor(this, metadata.getVersionComparator()); - cache.getComponentRegistry().registerComponent(tombstoneCallInterceptor, VersionedCallInterceptor.class); - List interceptorChain = cache.getInterceptorChain(); - cache.addInterceptor(tombstoneCallInterceptor, interceptorChain.size()); + VersionedCallInterceptor versionedCallInterceptor = new VersionedCallInterceptor(this, metadata.getVersionComparator()); + ComponentRegistry compReg = cache.getComponentRegistry(); + compReg.registerComponent(versionedCallInterceptor, VersionedCallInterceptor.class); + AsyncInterceptorChain interceptorChain = cache.getAsyncInterceptorChain(); + interceptorChain.addInterceptorBefore(versionedCallInterceptor, CallInterceptor.class); + + if (cacheMode.isClustered()) { + UnorderedReplicationLogic replLogic = new UnorderedReplicationLogic(); + compReg.registerComponent( replLogic, ClusteringDependentLogic.class ); + compReg.rewire(); + } strategy = Strategy.VERSIONED_ENTRIES; } @@ -183,11 +194,15 @@ private void prepareForTombstones() { replaceCommonInterceptors(); replaceExpirationManager(); - cache.removeInterceptor(CallInterceptor.class); TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(this); - cache.getComponentRegistry().registerComponent(tombstoneCallInterceptor, TombstoneCallInterceptor.class); - List interceptorChain = cache.getInterceptorChain(); - cache.addInterceptor(tombstoneCallInterceptor, interceptorChain.size()); + ComponentRegistry compReg = cache.getComponentRegistry(); + compReg.registerComponent( tombstoneCallInterceptor, TombstoneCallInterceptor.class); + AsyncInterceptorChain interceptorChain = cache.getAsyncInterceptorChain(); + interceptorChain.addInterceptorBefore(tombstoneCallInterceptor, CallInterceptor.class); + + UnorderedReplicationLogic replLogic = new UnorderedReplicationLogic(); + compReg.registerComponent( replLogic, ClusteringDependentLogic.class ); + compReg.rewire(); strategy = Strategy.TOMBSTONES; } @@ -198,29 +213,23 @@ private void replaceCommonInterceptors() { return; } + AsyncInterceptorChain chain = cache.getAsyncInterceptorChain(); + LockingInterceptor lockingInterceptor = new LockingInterceptor(); cache.getComponentRegistry().registerComponent(lockingInterceptor, LockingInterceptor.class); - if (!cache.addInterceptorBefore(lockingInterceptor, NonTransactionalLockingInterceptor.class)) { - throw new IllegalStateException("Misconfigured cache, interceptor chain is " + cache.getInterceptorChain()); + if (!chain.addInterceptorBefore(lockingInterceptor, NonTransactionalLockingInterceptor.class)) { + throw new IllegalStateException("Misconfigured cache, interceptor chain is " + chain); } cache.removeInterceptor(NonTransactionalLockingInterceptor.class); UnorderedDistributionInterceptor distributionInterceptor = new UnorderedDistributionInterceptor(); cache.getComponentRegistry().registerComponent(distributionInterceptor, UnorderedDistributionInterceptor.class); - if (!cache.addInterceptorBefore(distributionInterceptor, NonTxDistributionInterceptor.class)) { - throw new IllegalStateException("Misconfigured cache, interceptor chain is " + cache.getInterceptorChain()); - } - cache.removeInterceptor(NonTxDistributionInterceptor.class); + if (!chain.addInterceptorBefore(distributionInterceptor, NonTxDistributionInterceptor.class) && + !chain.addInterceptorBefore( distributionInterceptor, TriangleDistributionInterceptor.class)) { + throw new IllegalStateException("Misconfigured cache, interceptor chain is " + chain); + } - EntryWrappingInterceptor ewi = cache.getComponentRegistry().getComponent(EntryWrappingInterceptor.class); - try { - Field isUsingLockDelegation = EntryWrappingInterceptor.class.getDeclaredField("isUsingLockDelegation"); - isUsingLockDelegation.setAccessible(true); - isUsingLockDelegation.set(ewi, false); - } - catch (NoSuchFieldException | IllegalAccessException e) { - throw new IllegalStateException(e); - } + cache.removeInterceptor(TriangleDistributionInterceptor.class); } private void replaceExpirationManager() { @@ -352,4 +361,13 @@ public boolean contains(Object key) { } return value != null; } + + @Override + public void destroy() throws CacheException { + super.destroy(); + if (validator != null) { + validator.destroy(); + } + } + } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/BeginInvalidationCommand.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/BeginInvalidationCommand.java index 700871052e34..fb8262827aa0 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/BeginInvalidationCommand.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/BeginInvalidationCommand.java @@ -27,8 +27,8 @@ public class BeginInvalidationCommand extends InvalidateCommand { public BeginInvalidationCommand() { } - public BeginInvalidationCommand(CacheNotifier notifier, Set flags, CommandInvocationId commandInvocationId, Object[] keys, Object lockOwner) { - super(notifier, flags, commandInvocationId, keys); + public BeginInvalidationCommand(CacheNotifier notifier, long flagsBitSet, CommandInvocationId commandInvocationId, Object[] keys, Object lockOwner) { + super(notifier, flagsBitSet, commandInvocationId, keys); this.lockOwner = lockOwner; } @@ -39,13 +39,13 @@ public Object getLockOwner() { @Override public void writeTo(ObjectOutput output) throws IOException { super.writeTo(output); - output.writeObject(lockOwner); + LockOwner.writeTo( output, lockOwner ); } @Override public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { super.readFrom(input); - lockOwner = input.readObject(); + lockOwner = LockOwner.readFrom( input ); } @Override @@ -74,7 +74,7 @@ public int hashCode() { @Override public String toString() { - return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) + + return "BeginInvalidationCommand{keys=" + Arrays.toString(keys) + ", sessionTransactionId=" + lockOwner + '}'; } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandExtensions.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandExtensions.java index 4687684000ec..eb72abc5a0d0 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandExtensions.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandExtensions.java @@ -6,8 +6,8 @@ */ package org.hibernate.cache.infinispan.util; -import org.infinispan.commands.module.ExtendedModuleCommandFactory; import org.infinispan.commands.module.ModuleCommandExtensions; +import org.infinispan.commands.module.ModuleCommandFactory; import org.infinispan.commands.module.ModuleCommandInitializer; /** @@ -21,7 +21,7 @@ public class CacheCommandExtensions implements ModuleCommandExtensions { final CacheCommandInitializer cacheCommandInitializer = new CacheCommandInitializer(); @Override - public ExtendedModuleCommandFactory getModuleCommandFactory() { + public ModuleCommandFactory getModuleCommandFactory() { return cacheCommandFactory; } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java index eb3f10f34623..d1df393fd999 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java @@ -15,8 +15,9 @@ import org.hibernate.cache.infinispan.impl.BaseRegion; import org.infinispan.commands.ReplicableCommand; -import org.infinispan.commands.module.ExtendedModuleCommandFactory; +import org.infinispan.commands.module.ModuleCommandFactory; import org.infinispan.commands.remote.CacheRpcCommand; +import org.infinispan.util.ByteString; /** * Command factory @@ -24,7 +25,7 @@ * @author Galder ZamarreƱo * @since 4.0 */ -public class CacheCommandFactory implements ExtendedModuleCommandFactory { +public class CacheCommandFactory implements ModuleCommandFactory { /** * Keeps track of regions to which second-level cache specific @@ -61,11 +62,11 @@ public Map> getModuleCommands() { } @Override - public CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheName) { + public CacheRpcCommand fromStream(byte commandId, ByteString cacheName) { CacheRpcCommand c; switch ( commandId ) { case CacheCommandIds.EVICT_ALL: - c = new EvictAllCommand( cacheName, allRegions.get( cacheName ) ); + c = new EvictAllCommand( cacheName, allRegions.get( cacheName.toString() ) ); break; case CacheCommandIds.END_INVALIDATION: c = new EndInvalidationCommand(cacheName); @@ -73,12 +74,11 @@ public CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheNam default: throw new IllegalArgumentException( "Not registered to handle command id " + commandId ); } - c.setParameters( commandId, args ); return c; } @Override - public ReplicableCommand fromStream(byte commandId, Object[] args) { + public ReplicableCommand fromStream(byte commandId) { ReplicableCommand c; switch ( commandId ) { case CacheCommandIds.BEGIN_INVALIDATION: @@ -87,7 +87,6 @@ public ReplicableCommand fromStream(byte commandId, Object[] args) { default: throw new IllegalArgumentException( "Not registered to handle command id " + commandId ); } - c.setParameters( commandId, args ); return c; } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java index a1fd0d218d31..aa7b2a11afa8 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java @@ -15,6 +15,7 @@ import org.infinispan.factories.annotations.Inject; import org.infinispan.interceptors.locking.ClusteringDependentLogic; import org.infinispan.notifications.cachelistener.CacheNotifier; +import org.infinispan.util.ByteString; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -58,7 +59,7 @@ public PutFromLoadValidator removePutFromLoadValidator(String cacheName) { * @param regionName name of region for {@link EvictAllCommand} * @return a new instance of {@link EvictAllCommand} */ - public EvictAllCommand buildEvictAllCommand(String regionName) { + public EvictAllCommand buildEvictAllCommand(ByteString regionName) { // No need to pass region factory because no information on that object // is sent around the cluster. However, when the command factory builds // and evict all command remotely, it does need to initialize it with @@ -66,11 +67,11 @@ public EvictAllCommand buildEvictAllCommand(String regionName) { return new EvictAllCommand( regionName ); } - public BeginInvalidationCommand buildBeginInvalidationCommand(Set flags, Object[] keys, Object lockOwner) { - return new BeginInvalidationCommand(notifier, flags, CommandInvocationId.generateId(clusteringDependentLogic.getAddress()), keys, lockOwner); + public BeginInvalidationCommand buildBeginInvalidationCommand(long flagsBitSet, Object[] keys, Object lockOwner) { + return new BeginInvalidationCommand(notifier, flagsBitSet, CommandInvocationId.generateId(clusteringDependentLogic.getAddress()), keys, lockOwner); } - public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { + public EndInvalidationCommand buildEndInvalidationCommand(ByteString cacheName, Object[] keys, Object lockOwner) { return new EndInvalidationCommand( cacheName, keys, lockOwner ); } @@ -79,11 +80,11 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) { switch (c.getCommandId()) { case CacheCommandIds.END_INVALIDATION: EndInvalidationCommand endInvalidationCommand = (EndInvalidationCommand) c; - endInvalidationCommand.setPutFromLoadValidator(putFromLoadValidators.get(endInvalidationCommand.getCacheName())); + endInvalidationCommand.setPutFromLoadValidator(putFromLoadValidators.get(endInvalidationCommand.getCacheName().toString())); break; case CacheCommandIds.BEGIN_INVALIDATION: BeginInvalidationCommand beginInvalidationCommand = (BeginInvalidationCommand) c; - beginInvalidationCommand.init(notifier, configuration); + beginInvalidationCommand.init(notifier); break; } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Caches.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Caches.java index 85c82f039232..1de8ccb8914d 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Caches.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Caches.java @@ -17,14 +17,16 @@ import org.infinispan.AdvancedCache; import org.infinispan.commons.util.CloseableIterable; import org.infinispan.commons.util.CloseableIterator; +import org.infinispan.commons.util.Closeables; import org.infinispan.container.entries.CacheEntry; import org.infinispan.context.Flag; import org.infinispan.filter.AcceptAllKeyValueFilter; +import org.infinispan.filter.CacheFilters; import org.infinispan.filter.Converter; import org.infinispan.filter.KeyValueFilter; -import org.infinispan.filter.NullValueConverter; import org.infinispan.remoting.rpc.RpcManager; import org.infinispan.remoting.rpc.RpcOptions; +import org.infinispan.util.ByteString; /** * Helper for dealing with Infinispan cache instances. @@ -226,7 +228,7 @@ public static void broadcastEvictAll(AdvancedCache cache) { .getComponent( CacheCommandInitializer.class ); final boolean isSync = isSynchronousCache( cache ); - final EvictAllCommand cmd = factory.buildEvictAllCommand( cache.getName() ); + final EvictAllCommand cmd = factory.buildEvictAllCommand(ByteString.fromString(cache.getName())); final RpcOptions options = rpcManager.getDefaultRpcOptions( isSync ); rpcManager.invokeRemotely( null, cmd, options ); } @@ -301,34 +303,6 @@ public interface MapCollectableCloseableIterable extends CloseableIterable Map toMap(); } - public static CollectableCloseableIterable keys(AdvancedCache cache, KeyValueFilter filter) { - // HHH-10023: we can't use keySet() - final CloseableIterable> entryIterable = cache - .filterEntries( filter ) - .converter( NullValueConverter.getInstance() ); - return new CollectableCloseableIterableImpl(entryIterable, Selector.KEY); - } - - public static CollectableCloseableIterable values(AdvancedCache cache, KeyValueFilter filter) { - if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) { - // Dummy read to enlist the LocalTransaction as workaround for ISPN-5676 - cache.containsKey(false); - } - // HHH-10023: we can't use values() - final CloseableIterable> entryIterable = cache.filterEntries(filter); - return new CollectableCloseableIterableImpl(entryIterable, Selector.VALUE); - } - - public static CollectableCloseableIterable values(AdvancedCache cache, KeyValueFilter filter, Converter converter) { - if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) { - // Dummy read to enlist the LocalTransaction as workaround for ISPN-5676 - cache.containsKey(false); - } - // HHH-10023: we can't use values() - final CloseableIterable> entryIterable = cache.filterEntries(filter).converter(converter); - return new CollectableCloseableIterableImpl(entryIterable, Selector.VALUE); - } - public static MapCollectableCloseableIterable entrySet(AdvancedCache cache) { return entrySet(cache, (KeyValueFilter) AcceptAllKeyValueFilter.getInstance()); } @@ -339,8 +313,9 @@ public static MapCollectableCloseableIterable entrySet(AdvancedCach cache.containsKey(false); } // HHH-10023: we can't use values() - final CloseableIterable> entryIterable = cache.filterEntries(filter); - return new MapCollectableCloseableIterableImpl(entryIterable); + CloseableIterator> iterator = Closeables.iterator( + cache.cacheEntrySet().stream().filter(CacheFilters.predicate(filter))); + return new MapCollectableCloseableIterableImpl(iterator); } public static MapCollectableCloseableIterable entrySet(AdvancedCache cache, KeyValueFilter filter, Converter converter) { @@ -349,8 +324,10 @@ public static MapCollectableCloseableIterable entrySet(AdvancedC cache.containsKey(false); } // HHH-10023: we can't use values() - final CloseableIterable> entryIterable = cache.filterEntries(filter).converter(converter); - return new MapCollectableCloseableIterableImpl(entryIterable); + CloseableIterator> it = Closeables.iterator(cache.cacheEntrySet().stream() + .filter(CacheFilters.predicate(filter)) + .map(CacheFilters.function(converter))); + return new MapCollectableCloseableIterableImpl(it); } /* Function, T> */ @@ -453,16 +430,15 @@ public Set toSet() { } private static class MapCollectableCloseableIterableImpl implements MapCollectableCloseableIterable { - private final CloseableIterable> entryIterable; + private final CloseableIterator> it; - public MapCollectableCloseableIterableImpl(CloseableIterable> entryIterable) { - this.entryIterable = entryIterable; + public MapCollectableCloseableIterableImpl(CloseableIterator> it) { + this.it = it; } @Override public Map toMap() { Map map = new HashMap(); - CloseableIterator> it = entryIterable.iterator(); try { while (it.hasNext()) { CacheEntry entry = it.next(); @@ -480,7 +456,6 @@ public Map toMap() { @Override public String toString() { - CloseableIterator> it = entryIterable.iterator(); try { if (!it.hasNext()) { return "{}"; @@ -504,12 +479,12 @@ public String toString() { @Override public void close() { - entryIterable.close(); + it.close(); } @Override public CloseableIterator> iterator() { - return entryIterable.iterator(); + return it; } } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java index 69c4a96e86ee..0250899cf284 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java @@ -16,6 +16,7 @@ import org.infinispan.commands.remote.BaseRpcCommand; import org.infinispan.commons.marshall.MarshallUtil; import org.infinispan.context.InvocationContext; +import org.infinispan.util.ByteString; /** * Sent in commit phase (after DB commit) to remote nodes in order to stop invalidating @@ -28,14 +29,14 @@ public class EndInvalidationCommand extends BaseRpcCommand { private Object lockOwner; private PutFromLoadValidator putFromLoadValidator; - public EndInvalidationCommand(String cacheName) { + public EndInvalidationCommand(ByteString cacheName) { this(cacheName, null, null); } /** * @param cacheName name of the cache to evict */ - public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { + public EndInvalidationCommand(ByteString cacheName, Object[] keys, Object lockOwner) { super(cacheName); this.keys = keys; this.lockOwner = lockOwner; @@ -57,13 +58,13 @@ public byte getCommandId() { @Override public void writeTo(ObjectOutput output) throws IOException { MarshallUtil.marshallArray(keys, output); - output.writeObject(lockOwner); + LockOwner.writeTo( output, lockOwner ); } @Override public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { keys = MarshallUtil.unmarshallArray(input, Object[]::new); - lockOwner = input.readObject(); + lockOwner = LockOwner.readFrom( input ); } @Override diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EvictAllCommand.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EvictAllCommand.java index b22085369d97..06b32e3612f1 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EvictAllCommand.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/EvictAllCommand.java @@ -6,10 +6,14 @@ */ package org.hibernate.cache.infinispan.util; +import java.io.ObjectInput; +import java.io.ObjectOutput; + import org.hibernate.cache.infinispan.impl.BaseRegion; import org.infinispan.commands.remote.BaseRpcCommand; import org.infinispan.context.InvocationContext; +import org.infinispan.util.ByteString; /** * Evict all command @@ -27,7 +31,7 @@ public class EvictAllCommand extends BaseRpcCommand { * @param regionName name of the region to evict * @param region to evict */ - public EvictAllCommand(String regionName, BaseRegion region) { + public EvictAllCommand(ByteString regionName, BaseRegion region) { // region name and cache names are the same... super( regionName ); this.region = region; @@ -38,7 +42,7 @@ public EvictAllCommand(String regionName, BaseRegion region) { * * @param regionName name of the region to evict */ - public EvictAllCommand(String regionName) { + public EvictAllCommand(ByteString regionName) { this( regionName, null ); } @@ -58,12 +62,7 @@ public byte getCommandId() { } @Override - public Object[] getParameters() { - return new Object[0]; - } - - @Override - public void setParameters(int commandId, Object[] parameters) { + public void writeTo(ObjectOutput output) { // No-op } @@ -72,4 +71,9 @@ public boolean isReturnValueExpected() { return false; } + @Override + public void readFrom(ObjectInput input) { + // No-op + } + } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Externalizers.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Externalizers.java index 2202e31e2bf8..073816508f2e 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Externalizers.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/Externalizers.java @@ -28,6 +28,8 @@ public class Externalizers { public final static int VALUE_EXTRACTOR = 1205; public final static int VERSIONED_ENTRY = 1206; public final static int EXCLUDE_EMPTY_EXTRACT_VALUE = 1207; + public final static int FILTER_NULL_VALUE_CONVERTER = 1208; + public final static int NULL_VALUE = 1209; public static class UUIDExternalizer implements AdvancedExternalizer { diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/FilterNullValueConverter.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/FilterNullValueConverter.java new file mode 100644 index 000000000000..5842a8fce477 --- /dev/null +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/FilterNullValueConverter.java @@ -0,0 +1,95 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.cache.infinispan.util; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Set; + +import org.infinispan.commons.marshall.AbstractExternalizer; +import org.infinispan.filter.AbstractKeyValueFilterConverter; +import org.infinispan.filter.KeyValueFilter; +import org.infinispan.metadata.Metadata; + +public final class FilterNullValueConverter extends AbstractKeyValueFilterConverter { + + private final KeyValueFilter filter; + + public FilterNullValueConverter(KeyValueFilter filter) { + this.filter = filter; + } + + @Override + public Object filterAndConvert(K key, V value, Metadata metadata) { + if ( filter.accept( key, value, metadata ) ) { + return NullValue.getInstance(); + } + + return null; + } + + public static final class Externalizer extends AbstractExternalizer { + + @Override + public Set> getTypeClasses() { + return Collections.singleton( FilterNullValueConverter.class ); + } + + @Override + public void writeObject(ObjectOutput output, FilterNullValueConverter object) throws IOException { + output.writeObject( object.filter ); + } + + @Override + public FilterNullValueConverter readObject(ObjectInput input) throws IOException, ClassNotFoundException { + KeyValueFilter filter = (KeyValueFilter) input.readObject(); + return new FilterNullValueConverter( filter ); + } + + @Override + public Integer getId() { + return Externalizers.FILTER_NULL_VALUE_CONVERTER; + } + } + + public static final class NullValue { + + private static final NullValue INSTANCE = new NullValue(); + + private NullValue() { + } + + public static NullValue getInstance() { + return NullValue.INSTANCE; + } + + public static final class Externalizer extends AbstractExternalizer { + + @Override + public Set> getTypeClasses() { + return Collections.singleton( NullValue.class ); + } + + @Override + public void writeObject(ObjectOutput output, NullValue object) { + } + + @Override + public NullValue readObject(ObjectInput input) { + return NullValue.getInstance(); + } + + @Override + public Integer getId() { + return Externalizers.NULL_VALUE; + } + } + } + +} diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/InfinispanMessageLogger.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/InfinispanMessageLogger.java index c698453e1d9f..e492dd7b86d9 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/InfinispanMessageLogger.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/InfinispanMessageLogger.java @@ -9,6 +9,7 @@ import org.hibernate.cache.CacheException; import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cache.infinispan.JndiInfinispanRegionFactory; +import org.infinispan.util.ByteString; import org.jboss.logging.BasicLogger; import org.jboss.logging.Logger; import org.jboss.logging.annotations.Cause; @@ -91,7 +92,7 @@ public static InfinispanMessageLogger getLog(Class clazz) { @LogMessage(level = ERROR) @Message(value = "Failed to end invalidating pending putFromLoad calls for key %s from region %s; the key won't be cached until invalidation expires.", id = 25016) - void failedEndInvalidating(Object key, String name); + void failedEndInvalidating(Object key, ByteString name); @Message(value = "Unable to retrieve CacheManager from JNDI [%s]", id = 25017) CacheException unableToRetrieveCmFromJndi(String jndiNamespace); @@ -118,7 +119,7 @@ public static InfinispanMessageLogger getLog(Class clazz) { CacheException cannotGetCurrentTx(@Cause SystemException e); @Message(value = "Failed to invalidate pending putFromLoad calls for key %s from region %s", id = 25024) - CacheException failedInvalidatePendingPut(Object key, String regionName); + CacheException failedInvalidatePendingPut(Object key, ByteString regionName); @LogMessage(level = ERROR) @Message(value = "Failed to invalidate pending putFromLoad calls for region %s", id = 25025) diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LifecycleCallbacks.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LifecycleCallbacks.java index 98849f19513f..1abf8de1ff25 100644 --- a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LifecycleCallbacks.java +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LifecycleCallbacks.java @@ -25,6 +25,7 @@ public void cacheManagerStarting(GlobalComponentRegistry gcr, GlobalConfiguratio externalizerMap.put( Externalizers.FUTURE_UPDATE, new FutureUpdate.Externalizer() ); externalizerMap.put( Externalizers.VERSIONED_ENTRY, new VersionedEntry.Externalizer() ); externalizerMap.put( Externalizers.EXCLUDE_EMPTY_EXTRACT_VALUE, new VersionedEntry.ExcludeEmptyExtractValueExternalizer() ); + externalizerMap.put( Externalizers.FILTER_NULL_VALUE_CONVERTER, new FilterNullValueConverter.Externalizer() ); } } diff --git a/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LockOwner.java b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LockOwner.java new file mode 100644 index 000000000000..0f9be8f4495d --- /dev/null +++ b/hibernate-cache/src/main/java/org/hibernate/cache/infinispan/util/LockOwner.java @@ -0,0 +1,43 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.cache.infinispan.util; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.infinispan.commands.CommandInvocationId; + +final class LockOwner { + + private LockOwner() { + } + + static void writeTo(ObjectOutput out, Object lockOwner) throws IOException { + if (lockOwner instanceof CommandInvocationId) { + out.writeByte( 0 ); + CommandInvocationId.writeTo( out, (CommandInvocationId) lockOwner ); + } + else { + out.writeByte( 1 ); + out.writeObject(lockOwner); + } + } + + static Object readFrom(ObjectInput in) throws IOException, ClassNotFoundException { + byte lockOwnerType = in.readByte(); + switch ( lockOwnerType ) { + case 0: + return CommandInvocationId.readFrom( in ); + case 1: + return in.readObject(); + default: + throw new IllegalStateException( "Unknown lock owner type" + lockOwnerType ); + } + } + +} diff --git a/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs-local.xml b/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs-local.xml index f1124475c098..0eeb4db831fb 100644 --- a/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs-local.xml +++ b/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs-local.xml @@ -6,8 +6,8 @@ ~ See the lgpl.txt file in the root directory or . --> + xmlns="urn:infinispan:config:9.1" + xsi:schemaLocation="urn:infinispan:config:9.1 http://www.infinispan.org/schemas/infinispan-config-9.1.xsd"> @@ -17,6 +17,7 @@ + @@ -24,13 +25,14 @@ + - + @@ -41,6 +43,7 @@ + diff --git a/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml b/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml index 56c2f22c082d..30f20edbcaae 100644 --- a/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml +++ b/hibernate-cache/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml @@ -6,8 +6,8 @@ ~ See the lgpl.txt file in the root directory or . --> + xmlns="urn:infinispan:config:9.1" + xsi:schemaLocation="urn:infinispan:config:9.1 http://www.infinispan.org/schemas/infinispan-config-9.1.xsd"> @@ -21,21 +21,21 @@ - + - + - + @@ -43,7 +43,7 @@ - + @@ -53,7 +53,7 @@ is required if query caching is used, even if the query cache itself is configured with CacheMode=LOCAL. --> - + @@ -64,6 +64,7 @@ + diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/InfinispanRegionFactoryTestCase.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/InfinispanRegionFactoryTestCase.java index 48b3f0974472..a62da0cb7f7d 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/InfinispanRegionFactoryTestCase.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/InfinispanRegionFactoryTestCase.java @@ -312,10 +312,9 @@ public void testTimestampValidation() { Properties p = createProperties(); InputStream configStream = FileLookupFactory.newInstance().lookupFile(InfinispanRegionFactory.DEF_INFINISPAN_CONFIG_RESOURCE, getClass().getClassLoader()); ConfigurationBuilderHolder cbh = new ParserRegistry().parse(configStream); - DefaultCacheManager manager = new DefaultCacheManager(cbh, true); - ConfigurationBuilder builder = new ConfigurationBuilder(); + ConfigurationBuilder builder = cbh.getNamedConfigurationBuilders().get( DEF_TIMESTAMPS_RESOURCE ); builder.clustering().cacheMode(CacheMode.INVALIDATION_SYNC); - manager.defineConfiguration( DEF_TIMESTAMPS_RESOURCE, builder.build() ); + DefaultCacheManager manager = new DefaultCacheManager(cbh, true); try { InfinispanRegionFactory factory = createRegionFactory( manager, p, null ); factory.start( CacheTestUtil.sfOptionsForStart(), p ); @@ -502,9 +501,6 @@ public void testEnableStatistics() { assertTrue(cache.getCacheConfiguration().jmxStatistics().enabled()); final String timestamps = "org.hibernate.cache.spi.UpdateTimestampsCache"; - ConfigurationBuilder builder = new ConfigurationBuilder(); - builder.clustering().stateTransfer().fetchInMemoryState(true); - manager.defineConfiguration("timestamps", builder.build()); TimestampsRegionImpl timestampsRegion = (TimestampsRegionImpl) factory.buildTimestampsRegion(timestamps, p); cache = timestampsRegion.getCache(); @@ -545,9 +541,6 @@ public void testDisableStatistics() { assertFalse( cache.getCacheConfiguration().jmxStatistics().enabled() ); final String timestamps = "org.hibernate.cache.spi.UpdateTimestampsCache"; - ConfigurationBuilder builder = new ConfigurationBuilder(); - builder.clustering().stateTransfer().fetchInMemoryState(true); - factory.getCacheManager().defineConfiguration( "timestamps", builder.build() ); TimestampsRegionImpl timestampsRegion = (TimestampsRegionImpl) factory.buildTimestampsRegion(timestamps, p); cache = timestampsRegion.getCache(); diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTest.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTest.java index 089fb7afdea5..6df45495e706 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTest.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTest.java @@ -76,6 +76,7 @@ public class PutFromLoadValidatorUnitTest { private EmbeddedCacheManager cm; private AdvancedCache cache; private List cleanup = new ArrayList<>(); + private PutFromLoadValidator testee; @BeforeClassOnce public void setUp() throws Exception { @@ -104,6 +105,8 @@ public void tearDown() throws Exception { } cache.clear(); cm.getCache(cache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).clear(); + + testee.remotePendingPutsCache(); } private static InfinispanRegionFactory regionFactory(EmbeddedCacheManager cm) { @@ -125,7 +128,7 @@ public void testNakedPutTransactional() throws Exception { } private void nakedPutTest(final boolean transactional) throws Exception { - PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); exec(transactional, new NakedPut(testee, true)); } @@ -139,7 +142,7 @@ public void testRegisteredPutTransactional() throws Exception { } private void registeredPutTest(final boolean transactional) throws Exception { - PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); exec(transactional, new RegularPut(testee)); } @@ -162,7 +165,7 @@ public void testNakedPutAfterRegionRemovalTransactional() throws Exception { private void nakedPutAfterRemovalTest(final boolean transactional, final boolean removeRegion) throws Exception { - PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); Invalidation invalidation = new Invalidation(testee, removeRegion); // the naked put can succeed because it has txTimestamp after invalidation NakedPut nakedPut = new NakedPut(testee, true); @@ -188,7 +191,7 @@ public void testRegisteredPutAfterRegionRemovalTransactional() throws Exception private void registeredPutAfterRemovalTest(final boolean transactional, final boolean removeRegion) throws Exception { - PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); Invalidation invalidation = new Invalidation(testee, removeRegion); RegularPut regularPut = new RegularPut(testee); exec(transactional, invalidation, regularPut); @@ -213,7 +216,7 @@ public void testRegisteredPutWithInterveningRegionRemovalTransactional() throws private void registeredPutWithInterveningRemovalTest( final boolean transactional, final boolean removeRegion) throws Exception { - PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); try { long txTimestamp = TIME_SERVICE.wallClockTime(); if (transactional) { @@ -258,7 +261,7 @@ public void testMultipleRegistrationsTransactional() throws Exception { } private void multipleRegistrationtest(final boolean transactional) throws Exception { - final PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); final CountDownLatch registeredLatch = new CountDownLatch(3); final CountDownLatch finishedLatch = new CountDownLatch(3); @@ -322,7 +325,7 @@ public void testInvalidateRegionBlocksForInProgressPut() throws Exception { } private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception { - final PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm)); + testee = new PutFromLoadValidator(cache, regionFactory(cm)); final CountDownLatch removeLatch = new CountDownLatch(1); final CountDownLatch pferLatch = new CountDownLatch(1); final AtomicReference cache = new AtomicReference<>("INITIAL"); @@ -494,7 +497,7 @@ public void testGetForNullReleasePuts() { doReturn(ppCfg).when(regionFactory).getPendingPutsCacheConfiguration(); doAnswer(invocation -> TIME_SERVICE.wallClockTime()).when(regionFactory).nextTimestamp(); - PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory, cm); + testee = new PutFromLoadValidator(cache, regionFactory, cm); for (int i = 0; i < 100; ++i) { try { diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/TombstoneTest.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/TombstoneTest.java index 31548f1f7153..d11203be7010 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/TombstoneTest.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/TombstoneTest.java @@ -321,7 +321,7 @@ public void testEvictPutFromLoadDuringUpdate() throws Exception { private Future blockedPutFromLoad(CyclicBarrier putFromLoadBarrier) throws InterruptedException, BrokenBarrierException, TimeoutException { BlockingInterceptor blockingInterceptor = new BlockingInterceptor(putFromLoadBarrier, PutKeyValueCommand.class, false, true); - entityCache.addInterceptor(blockingInterceptor, 0); + entityCache.getAsyncInterceptorChain().addInterceptor(blockingInterceptor, 0); cleanup.add(() -> entityCache.removeInterceptor(BlockingInterceptor.class)); // the putFromLoad should be blocked in the interceptor Future putFromLoad = executor.submit(() -> withTxSessionApply(s -> { diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java index 21c8e949e2b9..1ecb6c0f9b1d 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java @@ -17,6 +17,7 @@ import org.hibernate.PessimisticLockException; import org.hibernate.Session; import org.hibernate.StaleStateException; +import org.hibernate.cache.infinispan.access.VersionedCallInterceptor; import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion; import org.hibernate.cache.infinispan.util.Caches; import org.hibernate.cache.infinispan.util.VersionedEntry; @@ -24,6 +25,8 @@ import org.hibernate.test.cache.infinispan.functional.entities.Item; import org.hibernate.test.cache.infinispan.functional.entities.OtherItem; +import org.infinispan.interceptors.AsyncInterceptorChain; +import org.infinispan.interceptors.DDAsyncInterceptor; import org.junit.Test; import org.infinispan.AdvancedCache; @@ -274,8 +277,9 @@ public void testCollectionUpdate() throws Exception { AtomicBoolean committing = new AtomicBoolean(false); CollectionUpdateTestInterceptor collectionUpdateTestInterceptor = new CollectionUpdateTestInterceptor(putFromLoadLatch); AnotherCollectionUpdateTestInterceptor anotherInterceptor = new AnotherCollectionUpdateTestInterceptor(putFromLoadLatch, committing); - collectionCache.addInterceptor(collectionUpdateTestInterceptor, collectionCache.getInterceptorChain().size() - 1); - collectionCache.addInterceptor(anotherInterceptor, 0); + AsyncInterceptorChain interceptorChain = collectionCache.getAsyncInterceptorChain(); + interceptorChain.addInterceptorBefore( collectionUpdateTestInterceptor, VersionedCallInterceptor.class ); + interceptorChain.addInterceptor(anotherInterceptor, 0); TIME_SERVICE.advance(1); Future addFuture = executor.submit(() -> withTxSessionApply(s -> { @@ -297,13 +301,13 @@ public void testCollectionUpdate() throws Exception { addFuture.get(); readFuture.get(); - collectionCache.removeInterceptor(CollectionUpdateTestInterceptor.class); - collectionCache.removeInterceptor(AnotherCollectionUpdateTestInterceptor.class); + interceptorChain.removeInterceptor(CollectionUpdateTestInterceptor.class); + interceptorChain.removeInterceptor(AnotherCollectionUpdateTestInterceptor.class); withTxSession(s -> assertFalse(s.load(Item.class, itemId).getOtherItems().isEmpty())); } - private class CollectionUpdateTestInterceptor extends BaseCustomInterceptor { + private class CollectionUpdateTestInterceptor extends DDAsyncInterceptor { final AtomicBoolean firstPutFromLoad = new AtomicBoolean(true); final CountDownLatch putFromLoadLatch; final CountDownLatch updateLatch = new CountDownLatch(1); @@ -324,7 +328,7 @@ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand } } - private class AnotherCollectionUpdateTestInterceptor extends BaseCustomInterceptor { + private class AnotherCollectionUpdateTestInterceptor extends DDAsyncInterceptor { final CountDownLatch putFromLoadLatch; final AtomicBoolean committing; diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTest.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTest.java index e33c4ace0ea6..01f052179506 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTest.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTest.java @@ -24,6 +24,7 @@ import org.hibernate.test.cache.infinispan.functional.AbstractFunctionalTest; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; import org.hibernate.test.cache.infinispan.util.TxUtil; +import org.infinispan.marshall.core.ExternallyMarshallable; import org.junit.ClassRule; /** @@ -86,6 +87,7 @@ protected void cleanupTransactionManagement() { @Override public void startUp() { super.startUp(); + ExternallyMarshallable.addToWhiteList( "org.hibernate.cache" ); // In some cases tests are multi-threaded, so they have to join the group infinispanTestIdentifier.joinContext(); secondNodeEnvironment = new SecondNodeEnvironment(); diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/stress/SecondLevelCacheStressTestCase.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/stress/SecondLevelCacheStressTestCase.java index 7a45b8fef70b..c72bb9efdb72 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/stress/SecondLevelCacheStressTestCase.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/stress/SecondLevelCacheStressTestCase.java @@ -45,7 +45,7 @@ import org.junit.Ignore; import org.junit.Test; -import org.infinispan.util.concurrent.ConcurrentHashSet; +import org.infinispan.commons.util.concurrent.ConcurrentHashSet; import static org.infinispan.test.TestingUtil.withTx; import static org.junit.Assert.assertEquals; diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/BatchModeTransactionCoordinator.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/BatchModeTransactionCoordinator.java index 39ce9ddd5167..77fc1f5b52cb 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/BatchModeTransactionCoordinator.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/BatchModeTransactionCoordinator.java @@ -21,7 +21,7 @@ import org.hibernate.resource.transaction.spi.TransactionStatus; import org.infinispan.transaction.tm.BatchModeTransactionManager; -import org.infinispan.transaction.tm.DummyTransaction; +import org.infinispan.transaction.tm.EmbeddedTransaction; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -65,7 +65,7 @@ public void rollback() { @Override public TransactionStatus getStatus() { try { - DummyTransaction transaction = tm.getTransaction(); + EmbeddedTransaction transaction = tm.getTransaction(); return transaction == null ? TransactionStatus.NOT_ACTIVE : StatusTranslator.translate(transaction.getStatus()); } catch (SystemException e) { throw new RuntimeException(e); diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/ExpectingInterceptor.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/ExpectingInterceptor.java index d4a5b79cafab..bd0b7fd309b4 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/ExpectingInterceptor.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/ExpectingInterceptor.java @@ -3,8 +3,9 @@ import org.infinispan.AdvancedCache; import org.infinispan.commands.VisitableCommand; import org.infinispan.context.InvocationContext; -import org.infinispan.interceptors.InvocationContextInterceptor; -import org.infinispan.interceptors.base.BaseCustomInterceptor; +import org.infinispan.interceptors.BaseCustomAsyncInterceptor; +import org.infinispan.interceptors.InvocationFinallyAction; +import org.infinispan.interceptors.impl.InvocationContextInterceptor; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -18,23 +19,23 @@ import java.util.function.BooleanSupplier; -public class ExpectingInterceptor extends BaseCustomInterceptor { +public class ExpectingInterceptor extends BaseCustomAsyncInterceptor { private final static Log log = LogFactory.getLog(ExpectingInterceptor.class); private final List conditions = new LinkedList<>(); public static ExpectingInterceptor get(AdvancedCache cache) { - Optional self = cache.getInterceptorChain().stream().filter(ExpectingInterceptor.class::isInstance).findFirst(); - if (self.isPresent()) { - return self.get(); + ExpectingInterceptor self = cache.getAsyncInterceptorChain().findInterceptorWithClass(ExpectingInterceptor.class); + if (self != null) { + return self; } ExpectingInterceptor ei = new ExpectingInterceptor(); // We are adding this after ICI because we want to handle silent failures, too - cache.addInterceptorAfter(ei, InvocationContextInterceptor.class); + cache.getAsyncInterceptorChain().addInterceptorAfter(ei, InvocationContextInterceptor.class); return ei; } public static void cleanup(AdvancedCache... caches) { - for (AdvancedCache c : caches) c.removeInterceptor(ExpectingInterceptor.class); + for (AdvancedCache c : caches) c.getAsyncInterceptorChain().removeInterceptor(ExpectingInterceptor.class); } public synchronized Condition when(BiPredicate predicate) { @@ -57,39 +58,36 @@ private static String source() { @Override protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable { - boolean succeeded = false; - try { - log.tracef("Before command %s", command); - Object retval = super.handleDefault(ctx, command); - succeeded = true; - return retval; - } finally { - log.tracef("After command(successful=%s) %s", succeeded, command); - List toExecute = new ArrayList<>(); - synchronized (this) { - for (Iterator iterator = conditions.iterator(); iterator.hasNext(); ) { - Condition condition = iterator.next(); - log.tracef("Testing condition %s", condition); - if (condition.success != null && condition.success != succeeded) { - log.trace("Condition test failed, succeeded: " + succeeded); - } else if (condition.predicate.test(ctx, command)) { - assert condition.action != null; - log.trace("Condition succeeded"); - toExecute.add(condition.action); - if (condition.removeCheck == null || condition.removeCheck.getAsBoolean()) { - iterator.remove(); + return invokeNextAndFinally( ctx, command, new InvocationFinallyAction() { + @Override + public void accept(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) + throws Throwable { + boolean succeeded = throwable == null; + log.tracef("After command(successful=%s) %s", succeeded, rCommand); + List toExecute = new ArrayList<>(); + synchronized (ExpectingInterceptor.this) { + for (Iterator iterator = conditions.iterator(); iterator.hasNext(); ) { + Condition condition = iterator.next(); + log.tracef("Testing condition %s", condition); + if ((condition.success == null || condition.success == succeeded) && condition.predicate.test(rCtx, rCommand)) { + assert condition.action != null; + log.trace("Condition succeeded"); + toExecute.add(condition.action); + if (condition.removeCheck == null || condition.removeCheck.getAsBoolean()) { + iterator.remove(); + } + } else { + log.trace("Condition test failed"); } - } else { - log.trace("Condition test failed"); } } + // execute without holding the lock + for (Runnable runnable : toExecute) { + log.tracef("Executing %s", runnable); + runnable.run(); + } } - // execute without holding the lock - for (Runnable runnable : toExecute) { - log.tracef("Executing %s", runnable); - runnable.run(); - } - } + } ); } public class Condition { diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestDisconnectHandler.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestDisconnectHandler.java index 6704a5f6fa9f..c3a7f04ab03b 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestDisconnectHandler.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestDisconnectHandler.java @@ -1,6 +1,6 @@ package org.hibernate.test.cache.infinispan.util; -import org.infinispan.util.concurrent.ConcurrentHashSet; +import org.infinispan.commons.util.concurrent.ConcurrentHashSet; import org.jgroups.Address; import org.jgroups.Event; import org.jgroups.protocols.FD_ALL; diff --git a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestingKeyFactory.java b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestingKeyFactory.java index ac8d739b897e..6fc3046c7b43 100644 --- a/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestingKeyFactory.java +++ b/hibernate-cache/src/test/java/org/hibernate/test/cache/infinispan/util/TestingKeyFactory.java @@ -8,6 +8,8 @@ import java.io.Serializable; +import org.infinispan.marshall.core.ExternalPojo; + public class TestingKeyFactory { private TestingKeyFactory() { @@ -23,7 +25,7 @@ public static Object generateCollectionCacheKey(String id) { } //For convenience implement both interfaces. - private static class TestingEntityCacheKey implements Serializable { + private static class TestingEntityCacheKey implements Serializable, ExternalPojo { private final String id; diff --git a/hibernate-cache/src/test/resources/2lc-test-tcp.xml b/hibernate-cache/src/test/resources/2lc-test-tcp.xml index a6e0055bfb3a..1fdc3a08d65c 100644 --- a/hibernate-cache/src/test/resources/2lc-test-tcp.xml +++ b/hibernate-cache/src/test/resources/2lc-test-tcp.xml @@ -6,25 +6,18 @@ --> + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-4.0.xsd"> + thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}" + thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}" + thread_pool.keep_alive_time="60000" + /> @@ -42,15 +35,12 @@ xmit_table_num_rows="50" xmit_table_msgs_per_row="1024" xmit_table_max_compaction_time="30000" - max_msg_batch_size="100" resend_last_seqno="true" - become_server_queue_size="0" />