Skip to content

Commit

Permalink
ISPN-7963 Make cache provider work w/ Infinispan 9
Browse files Browse the repository at this point in the history
  • Loading branch information
galderz committed Jul 5, 2017
1 parent 859e3a1 commit dee15db
Show file tree
Hide file tree
Showing 38 changed files with 644 additions and 384 deletions.
17 changes: 17 additions & 0 deletions hibernate-cache/pom.xml
Expand Up @@ -26,5 +26,22 @@
<groupId>org.hibernate</groupId> <groupId>org.hibernate</groupId>
<artifactId>hibernate-core</artifactId> <artifactId>hibernate-core</artifactId>
</dependency> </dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>infinispan-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
Expand Up @@ -12,7 +12,7 @@
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start; import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.BaseRpcInterceptor; import org.infinispan.interceptors.impl.BaseRpcInterceptor;
import org.infinispan.jmx.JmxStatisticsExposer; import org.infinispan.jmx.JmxStatisticsExposer;
import org.infinispan.jmx.annotations.DataType; import org.infinispan.jmx.annotations.DataType;
import org.infinispan.jmx.annotations.ManagedAttribute; import org.infinispan.jmx.annotations.ManagedAttribute;
Expand All @@ -24,6 +24,7 @@
import org.infinispan.remoting.rpc.RpcOptions; import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager; import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.util.ByteString;


import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -32,7 +33,7 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
private final AtomicLong invalidations = new AtomicLong(0); private final AtomicLong invalidations = new AtomicLong(0);
protected CommandsFactory commandsFactory; protected CommandsFactory commandsFactory;
protected StateTransferManager stateTransferManager; protected StateTransferManager stateTransferManager;
protected String cacheName; protected ByteString cacheName;
protected boolean statisticsEnabled; protected boolean statisticsEnabled;
protected RpcOptions syncRpcOptions; protected RpcOptions syncRpcOptions;
protected RpcOptions asyncRpcOptions; protected RpcOptions asyncRpcOptions;
Expand All @@ -41,7 +42,7 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager, Cache cache) { public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager, Cache cache) {
this.commandsFactory = commandsFactory; this.commandsFactory = commandsFactory;
this.stateTransferManager = stateTransferManager; this.stateTransferManager = stateTransferManager;
this.cacheName = cache.getName(); this.cacheName = ByteString.fromString(cache.getName());
} }


@Start @Start
Expand Down
Expand Up @@ -6,11 +6,13 @@
*/ */
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;


import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.DataWriteCommand; import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.Ownership;
import org.infinispan.interceptors.InvocationFinallyAction;
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor; import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;
import org.infinispan.util.concurrent.TimeoutException; import org.infinispan.util.concurrent.locks.LockUtil;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;


Expand All @@ -32,43 +34,41 @@
*/ */
public class LockingInterceptor extends NonTransactionalLockingInterceptor { public class LockingInterceptor extends NonTransactionalLockingInterceptor {
private static final Log log = LogFactory.getLog(LockingInterceptor.class); private static final Log log = LogFactory.getLog(LockingInterceptor.class);
private static final boolean trace = log.isTraceEnabled();


@Override protected final InvocationFinallyAction unlockAllReturnCheckCompletableFutureHandler = new InvocationFinallyAction() {
protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { @Override
Object returnValue = null; public void accept(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) throws Throwable {
try { lockManager.unlockAll(rCtx);
// Clear any metadata; we'll set them as appropriate in TombstoneCallInterceptor if (rv instanceof CompletableFuture) {
command.setMetadata(null); try {
((CompletableFuture) rv).join();
}
catch (CompletionException e) {
throw e.getCause();
}
}
}
};


lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command)); @Override
protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable {
try {
if (log.isTraceEnabled()) {
Ownership ownership = LockUtil.getLockOwnership( command.getKey(), cdl );
log.tracef( "Am I owner for key=%s ? %s", command.getKey(), ownership);
}

if (ctx.getLockOwner() == null) {
ctx.setLockOwner( command.getCommandInvocationId() );
}

lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command));
}
catch (Throwable t) {
lockManager.unlockAll(ctx);
throw t;
}
return invokeNextAndFinally(ctx, command, unlockAllReturnCheckCompletableFutureHandler);
}


returnValue = invokeNextInterceptor(ctx, command);
return returnValue;
}
catch (TimeoutException e) {
if (!ctx.isOriginLocal() && command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
// FAIL_SILENTLY flag is not replicated to remote nodes and zero acquisition timeouts cause
// very noisy logs.
if (trace) {
log.tracef("Silently ignoring exception", e);
}
return null;
}
else {
throw e;
}
}
finally {
lockManager.unlockAll(ctx);
if (returnValue instanceof CompletableFuture) {
try {
((CompletableFuture) returnValue).join();
}
catch (CompletionException e) {
throw e.getCause();
}
}
}
}
} }
Expand Up @@ -8,19 +8,24 @@


import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand; import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand; import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand; import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvalidationInterceptor; import org.infinispan.interceptors.InvocationFinallyFunction;
import org.infinispan.interceptors.impl.InvalidationInterceptor;
import org.infinispan.jmx.annotations.MBean; import org.infinispan.jmx.annotations.MBean;
import org.infinispan.util.concurrent.locks.RemoteLockCommand; import org.infinispan.util.concurrent.locks.RemoteLockCommand;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;


import java.util.Collections; import java.util.Collections;


Expand All @@ -40,6 +45,7 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
private CacheCommandInitializer commandInitializer; private CacheCommandInitializer commandInitializer;


private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(InvalidationInterceptor.class); private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(InvalidationInterceptor.class);
private static final Log ispnLog = LogFactory.getLog(NonTxInvalidationInterceptor.class);


public NonTxInvalidationInterceptor(PutFromLoadValidator putFromLoadValidator) { public NonTxInvalidationInterceptor(PutFromLoadValidator putFromLoadValidator) {
this.putFromLoadValidator = putFromLoadValidator; this.putFromLoadValidator = putFromLoadValidator;
Expand All @@ -53,7 +59,7 @@ public void injectDependencies(CacheCommandInitializer commandInitializer) {
@Override @Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) { if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
return invokeNextInterceptor(ctx, command); return invokeNext(ctx, command);
} }
else { else {
boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner()); boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
Expand All @@ -63,12 +69,8 @@ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
if (!putFromLoadValidator.beginInvalidatingWithPFER(command.getKeyLockOwner(), command.getKey(), command.getValue())) { if (!putFromLoadValidator.beginInvalidatingWithPFER(command.getKeyLockOwner(), command.getKey(), command.getValue())) {
log.failedInvalidatePendingPut(command.getKey(), cacheName); log.failedInvalidatePendingPut(command.getKey(), cacheName);
} }
RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlags()); RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlagsBitSet());
Object retval = invokeNextInterceptor(ctx, removeCommand); return invokeNextAndHandle( ctx, removeCommand, new InvalidateAndReturnFunction(isTransactional, command.getKeyLockOwner()) );
if (command.isSuccessful()) {
invalidateAcrossCluster(command, isTransactional, command.getKey());
}
return retval;
} }
} }


Expand All @@ -88,16 +90,12 @@ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) t
else { else {
log.trace("This is an eviction, not invalidating anything"); log.trace("This is an eviction, not invalidating anything");
} }
Object retval = invokeNextInterceptor(ctx, command); return invokeNextAndHandle( ctx, command, new InvalidateAndReturnFunction(isTransactional, command.getKeyLockOwner()) );
if (command.isSuccessful()) {
invalidateAcrossCluster(command, isTransactional, command.getKey());
}
return retval;
} }


@Override @Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable { public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command); Object retval = invokeNext(ctx, command);
if (!isLocalModeForced(command)) { if (!isLocalModeForced(command)) {
// just broadcast the clear command - this is simplest! // just broadcast the clear command - this is simplest!
if (ctx.isOriginLocal()) { if (ctx.isOriginLocal()) {
Expand All @@ -112,17 +110,18 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t
throw new UnsupportedOperationException("Unexpected putAll"); throw new UnsupportedOperationException("Unexpected putAll");
} }


private <T extends WriteCommand & RemoteLockCommand> void invalidateAcrossCluster(T command, boolean isTransactional, Object key) throws Throwable { private <T extends WriteCommand & RemoteLockCommand> void invalidateAcrossCluster(
T command, boolean isTransactional, Object key, Object keyLockOwner) throws Throwable {
// increment invalidations counter if statistics maintained // increment invalidations counter if statistics maintained
incrementInvalidations(); incrementInvalidations();
InvalidateCommand invalidateCommand; InvalidateCommand invalidateCommand;
if (!isLocalModeForced(command)) { if (!isLocalModeForced(command)) {
if (isTransactional) { if (isTransactional) {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand( invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
Collections.emptySet(), new Object[] { key }, command.getKeyLockOwner()); EnumUtil.EMPTY_BIT_SET, new Object[] { key }, keyLockOwner);
} }
else { else {
invalidateCommand = commandsFactory.buildInvalidateCommand(Collections.emptySet(), new Object[] { key }); invalidateCommand = commandsFactory.buildInvalidateCommand(EnumUtil.EMPTY_BIT_SET, new Object[] {key });
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand); log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
Expand All @@ -132,4 +131,31 @@ private <T extends WriteCommand & RemoteLockCommand> void invalidateAcrossCluste
} }
} }


@Override
protected Log getLog() {
return ispnLog;
}

class InvalidateAndReturnFunction implements InvocationFinallyFunction {

final boolean isTransactional;
final Object keyLockOwner;

InvalidateAndReturnFunction(boolean isTransactional, Object keyLockOwner) {
this.isTransactional = isTransactional;
this.keyLockOwner = keyLockOwner;
}

@Override
public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable)
throws Throwable {
RemoveCommand removeCmd = (RemoveCommand) rCommand;
if ( removeCmd.isSuccessful()) {
invalidateAcrossCluster(removeCmd, isTransactional, removeCmd.getKey(), keyLockOwner);
}
return rv;
}

}

} }
Expand Up @@ -19,13 +19,15 @@
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start; import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.BaseCustomAsyncInterceptor;
import org.infinispan.interceptors.base.BaseCustomInterceptor; import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder; import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.ResponseMode; import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager; import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions; import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager; import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.util.ByteString;


import java.util.List; import java.util.List;


Expand All @@ -37,16 +39,16 @@
* *
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor { public class NonTxPutFromLoadInterceptor extends BaseCustomAsyncInterceptor {
private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NonTxPutFromLoadInterceptor.class); private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NonTxPutFromLoadInterceptor.class);
private final String cacheName; private final ByteString cacheName;
private final PutFromLoadValidator putFromLoadValidator; private final PutFromLoadValidator putFromLoadValidator;
private CacheCommandInitializer commandInitializer; private CacheCommandInitializer commandInitializer;
private RpcManager rpcManager; private RpcManager rpcManager;
private StateTransferManager stateTransferManager; private StateTransferManager stateTransferManager;
private RpcOptions asyncUnordered; private RpcOptions asyncUnordered;


public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) { public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, ByteString cacheName) {
this.putFromLoadValidator = putFromLoadValidator; this.putFromLoadValidator = putFromLoadValidator;
this.cacheName = cacheName; this.cacheName = cacheName;
} }
Expand All @@ -70,7 +72,7 @@ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand co
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key); putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
} }
} }
return invokeNextInterceptor(ctx, command); return invokeNext(ctx, command);
} }


public void endInvalidating(Object key, Object lockOwner, boolean successful) { public void endInvalidating(Object key, Object lockOwner, boolean successful) {
Expand Down

0 comments on commit dee15db

Please sign in to comment.