Skip to content

Commit

Permalink
ISPN-5718 Random failures in
Browse files Browse the repository at this point in the history
ClusterListenerReplTxInitialStateTest.testAllExpired

* Fixed issue where PrepareCommand was causing additional expirations
* Cause of failure was due to not locking key with optimistic prepare
  • Loading branch information
wburns authored and galderz committed Sep 3, 2015
1 parent 2ddd584 commit 40eed13
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.RemoveExpiredCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.InfinispanCollections;
Expand Down Expand Up @@ -127,6 +128,7 @@ public Collection<Object> getKeysToLock() {
switch (writeCommand.getCommandId()) {
case PutKeyValueCommand.COMMAND_ID:
case RemoveCommand.COMMAND_ID:
case RemoveExpiredCommand.COMMAND_ID:
case ReplaceCommand.COMMAND_ID:
set.add(((DataWriteCommand) writeCommand).getKey());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* overwritten.
*/
public class ExpirationInterceptor<K, V> extends CommandInterceptor {
private ExpirationManager<K, V> expirationManager;
protected ExpirationManager<K, V> expirationManager;

@Inject
public void inject(ExpirationManager<K, V> expirationManager) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.infinispan.expiration.impl;

import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;

import java.util.Set;

/**
* Interceptor to be used with optimistic transactions to make sure prepare doesn't cause unneeded expirations
*/
public class OptimisticTxExpirationInterceptor<K, V> extends ExpirationInterceptor<K, V> {
private final RegisterInterceptor registerInterceptor = new RegisterInterceptor();
private final UnregisterInterceptor unregisterInterceptor = new UnregisterInterceptor();

@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
// We only need to keep register for prepare as commit command doesn't read the entry
for (WriteCommand cmd : command.getModifications()) {
cmd.acceptVisitor(ctx, registerInterceptor);
}
try {
return super.visitPrepareCommand(ctx, command);
} finally {
for (WriteCommand cmd : command.getModifications()) {
cmd.acceptVisitor(ctx, unregisterInterceptor);
}
}
}

private abstract class ExpirationVisitor extends AbstractVisitor {
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleSingleKey((K) command.getKey());
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
return handleSingleKey((K) command.getKey());
}

@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
((Set<K>) command.getAffectedKeys()).forEach(this::handleSingleKey);
return null;
}

protected abstract Void handleSingleKey(K key);
}

private class RegisterInterceptor extends ExpirationVisitor {
protected Void handleSingleKey(K key) {
expirationManager.registerWriteIncoming(key);
return null;
}
}

private class UnregisterInterceptor extends ExpirationVisitor {
protected Void handleSingleKey(K key) {
expirationManager.unregisterWrite(key);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.infinispan.configuration.cache.InterceptorConfiguration;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.expiration.impl.ExpirationInterceptor;
import org.infinispan.expiration.impl.OptimisticTxExpirationInterceptor;
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.interceptors.ActivationInterceptor;
import org.infinispan.interceptors.BatchingInterceptor;
Expand Down Expand Up @@ -104,8 +105,9 @@ private boolean isUsingMarshalledValues(Configuration c) {
}

public InterceptorChain buildInterceptorChain() {
boolean needsVersionAwareComponents = configuration.transaction().transactionMode().isTransactional() &&
Configurations.isVersioningEnabled(configuration);
TransactionMode transactionMode = configuration.transaction().transactionMode();
boolean needsVersionAwareComponents = transactionMode.isTransactional() &&
Configurations.isVersioningEnabled(configuration);

InterceptorChain interceptorChain = new InterceptorChain(componentRegistry.getComponentMetadataRepo());
// add the interceptor chain to the registry first, since some interceptors may ask for it.
Expand Down Expand Up @@ -156,7 +158,7 @@ public InterceptorChain buildInterceptorChain() {
} else {
interceptorChain.appendInterceptor(createInterceptor(new StateTransferInterceptor(), StateTransferInterceptor.class), false);
}
if (configuration.transaction().transactionMode().isTransactional()) {
if (transactionMode.isTransactional()) {
interceptorChain.appendInterceptor(createInterceptor(new TransactionSynchronizerInterceptor(), TransactionSynchronizerInterceptor.class), false);
}
}
Expand All @@ -174,7 +176,7 @@ public InterceptorChain buildInterceptorChain() {
}

// load the tx interceptor
if (configuration.transaction().transactionMode().isTransactional())
if (transactionMode.isTransactional())
interceptorChain.appendInterceptor(createInterceptor(new TxInterceptor(), TxInterceptor.class), false);

if (isUsingMarshalledValues(configuration)) {
Expand All @@ -186,7 +188,7 @@ public InterceptorChain buildInterceptorChain() {
}

// NotificationInterceptor is used only for Prepare/Commit/Rollback notifications
if (configuration.transaction().transactionMode().isTransactional() && configuration.transaction().notifications()) {
if (transactionMode.isTransactional() && configuration.transaction().notifications()) {
interceptorChain.appendInterceptor(createInterceptor(new NotificationInterceptor(), NotificationInterceptor.class), false);
}

Expand All @@ -196,7 +198,7 @@ public InterceptorChain buildInterceptorChain() {

//the total order protocol doesn't need locks
if (!isTotalOrder) {
if (configuration.transaction().transactionMode().isTransactional()) {
if (transactionMode.isTransactional()) {
if (configuration.transaction().lockingMode() == LockingMode.PESSIMISTIC) {
interceptorChain.appendInterceptor(createInterceptor(new PessimisticLockingInterceptor(), PessimisticLockingInterceptor.class), false);
} else {
Expand All @@ -208,7 +210,7 @@ public InterceptorChain buildInterceptorChain() {
}

if (configuration.sites().hasEnabledBackups() && !configuration.sites().disableBackups()) {
if ((configuration.transaction().transactionMode() == TransactionMode.TRANSACTIONAL)) {
if (transactionMode == TransactionMode.TRANSACTIONAL) {
if (configuration.transaction().lockingMode() == LockingMode.OPTIMISTIC) {
interceptorChain.appendInterceptor(createInterceptor(new OptimisticBackupInterceptor(), OptimisticBackupInterceptor.class), false);
} else {
Expand All @@ -230,7 +232,13 @@ public InterceptorChain buildInterceptorChain() {

// The expiration interceptor must come before the entry wrapping interceptor as it can stop data container
// access from expiring things.
interceptorChain.appendInterceptor(createInterceptor(new ExpirationInterceptor(), ExpirationInterceptor.class), false);
if (transactionMode.isTransactional() && configuration.transaction().lockingMode() == LockingMode.OPTIMISTIC) {
interceptorChain.appendInterceptor(createInterceptor(new OptimisticTxExpirationInterceptor<>(),
ExpirationInterceptor.class), false);
} else {
interceptorChain.appendInterceptor(createInterceptor(new ExpirationInterceptor<>(),
ExpirationInterceptor.class), false);
}

if (needsVersionAwareComponents && cacheMode.isClustered()) {
if (isTotalOrder) {
Expand Down Expand Up @@ -272,7 +280,7 @@ public InterceptorChain buildInterceptorChain() {
}

if (configuration.clustering().l1().enabled()) {
if (configuration.transaction().transactionMode().isTransactional()) {
if (transactionMode.isTransactional()) {
interceptorChain.appendInterceptor(createInterceptor(new L1TxInterceptor(), L1TxInterceptor.class), false);
}
else {
Expand All @@ -298,7 +306,7 @@ public InterceptorChain buildInterceptorChain() {
}
case DIST_ASYNC:
case REPL_ASYNC:
if (configuration.transaction().transactionMode().isTransactional()) {
if (transactionMode.isTransactional()) {
if (isTotalOrder) {
interceptorChain.appendInterceptor(createInterceptor(new TotalOrderDistributionInterceptor(), TotalOrderDistributionInterceptor.class), false);
} else {
Expand Down

0 comments on commit 40eed13

Please sign in to comment.