Skip to content

Commit

Permalink
ISPN-15895 Cross-site: 2PC transaction does not acquire locks
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo committed Apr 2, 2024
1 parent a241ce0 commit dc6da5e
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 384 deletions.
Expand Up @@ -438,6 +438,22 @@ public final boolean equals(Object obj) {
return this == obj;
}

public synchronized void transactionFailed(Throwable throwable) {
if (isDone()) {
throw new IllegalStateException("Transaction is done. Cannot change status");
}
if (status == Status.STATUS_MARKED_ROLLBACK) {
return;
}
status = Status.STATUS_MARKED_ROLLBACK;
if (firstRollbackException == null) {
firstRollbackException = new RollbackException("Exception caught while running transaction");
firstRollbackException.addSuppressed(throwable);
} else if (!(throwable instanceof RollbackException)) {
firstRollbackException.addSuppressed(throwable);
}
}

private RollbackException hasRollbackException(boolean forceRollback) {
if (firstRollbackException != null) {
if (forceRollback && FORCE_ROLLBACK_MESSAGE.equals(firstRollbackException.getMessage())) {
Expand Down Expand Up @@ -686,6 +702,10 @@ private boolean isDone() {
return false;
}

public RollbackException getRollbackException() {
return firstRollbackException;
}

private static class XaResourceData {
final XAResource xaResource;
volatile int status;
Expand Down
Expand Up @@ -202,7 +202,7 @@ public Object construct(String componentName) {
NoOpIracVersionGenerator.getInstance();
} else if (componentName.equals(BackupReceiver.class.getName())) {
return configuration.clustering().cacheMode().isClustered() ?
new ClusteredCacheBackupReceiver(componentRegistry.getCacheName()) :
new ClusteredCacheBackupReceiver() :
null;
} else if (componentName.equals(StorageConfigurationManager.class.getName())) {
return new StorageConfigurationManager();
Expand Down
@@ -1,13 +1,18 @@
package org.infinispan.interceptors.xsite;

import java.util.function.Predicate;

import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvocationStage;

/**
Expand All @@ -18,6 +23,13 @@
*/
public class OptimisticBackupInterceptor extends BaseBackupInterceptor {

private boolean hasOnePhaseCommitBackups;

@Inject
public void checkTwoPhaseCommit(Configuration configuration) {
hasOnePhaseCommitBackups = configuration.sites().syncBackupsStream().anyMatch(Predicate.not(BackupConfiguration::isTwoPhaseCommit));
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
if (skipXSiteBackup(command) || !command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ)) {
Expand Down Expand Up @@ -53,9 +65,21 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command)
stage = InvocationStage.completedNullStage();
}

if (hasOnePhaseCommitBackups) {
// One or more backup site(s) using one phase commit.
// Wait for backup site acknowledge, so we have time to rollback and keep the data consistent.
return makeStage(asyncInvokeNext(ctx, command, stage))
.thenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
trackKeysForAsyncBackups(rCommand);
return rv;
});
}

// The backup site(s) is using 2PC, we can send the commit in parallel for both local and remote site.
return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
//we need to track the keys only after it is applied in the local node!
keysFromMods(getModificationsFrom(rCommand)).forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
trackKeysForAsyncBackups(rCommand);
return stage.thenReturn(rCtx, rCommand, rv);
});
}
Expand Down Expand Up @@ -84,4 +108,10 @@ private boolean shouldRollbackRemoteTxCommand(TxInvocationContext<?> ctx) {
private boolean hasBeenPrepared(LocalTxInvocationContext ctx) {
return !ctx.getRemoteLocksAcquired().isEmpty();
}

private void trackKeysForAsyncBackups(CommitCommand command) {
var gtx = command.getGlobalTransaction();
keysFromMods(getModificationsFrom(command))
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), gtx));
}
}
Expand Up @@ -47,11 +47,15 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman
stage = InvocationStage.completedNullStage();
}

return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
keysFromMods(rCommand.getModifications().stream())
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
return stage.thenReturn(rCtx, rCommand, rv);
});
// We need to way for the other site commit.
// In the local site, we have locks locked so no other transaction will commit.
// If the other site fails to commit, we abort it here.
return makeStage(asyncInvokeNext(ctx, command, stage))
.thenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
keysFromMods(rCommand.getModifications().stream())
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
return rv;
});
}
}
7 changes: 5 additions & 2 deletions core/src/main/java/org/infinispan/xsite/BackupSenderImpl.java
Expand Up @@ -168,6 +168,10 @@ public InvocationStage backupRollback(RollbackCommand command, Transaction trans
return backupCommand(command, command, xSiteBackups, transaction);
}

public CustomFailurePolicy<Object, Object> getCustomFailurePolicy(String site) {
return siteFailurePolicy.get(site);
}

private InvocationStage backupCommand(VisitableCommand command, VisitableCommand originalCommand,
List<XSiteBackup> xSiteBackups, Transaction transaction) {
XSiteCacheRequest<Object> xsiteCommand = commandsFactory.buildSingleXSiteRpcCommand(command);
Expand Down Expand Up @@ -333,9 +337,8 @@ void handleException(String siteName, Throwable throwable) {
addException(siteName, throwable);
break;
case CUSTOM:
CustomFailurePolicy<Object,Object> failurePolicy = siteFailurePolicy.get(siteName);
try {
command.acceptVisitor(null, new CustomBackupPolicyInvoker(siteName, failurePolicy, transaction));
command.acceptVisitor(null, new CustomBackupPolicyInvoker(siteName, getCustomFailurePolicy(siteName), transaction));
} catch (Throwable t) {
addException(siteName, t);
}
Expand Down

0 comments on commit dc6da5e

Please sign in to comment.