Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-15895 Cross-site: 2PC transaction does not acquire locks #12217

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -438,6 +438,22 @@ public final boolean equals(Object obj) {
return this == obj;
}

public synchronized void transactionFailed(Throwable throwable) {
if (isDone()) {
throw new IllegalStateException("Transaction is done. Cannot change status");
}
if (status == Status.STATUS_MARKED_ROLLBACK) {
return;
}
status = Status.STATUS_MARKED_ROLLBACK;
if (firstRollbackException == null) {
firstRollbackException = new RollbackException("Exception caught while running transaction");
firstRollbackException.addSuppressed(throwable);
} else if (!(throwable instanceof RollbackException)) {
firstRollbackException.addSuppressed(throwable);
}
}

private RollbackException hasRollbackException(boolean forceRollback) {
if (firstRollbackException != null) {
if (forceRollback && FORCE_ROLLBACK_MESSAGE.equals(firstRollbackException.getMessage())) {
Expand Down Expand Up @@ -686,6 +702,10 @@ private boolean isDone() {
return false;
}

public RollbackException getRollbackException() {
return firstRollbackException;
}

private static class XaResourceData {
final XAResource xaResource;
volatile int status;
Expand Down
Expand Up @@ -202,7 +202,7 @@ public Object construct(String componentName) {
NoOpIracVersionGenerator.getInstance();
} else if (componentName.equals(BackupReceiver.class.getName())) {
return configuration.clustering().cacheMode().isClustered() ?
new ClusteredCacheBackupReceiver(componentRegistry.getCacheName()) :
new ClusteredCacheBackupReceiver() :
null;
} else if (componentName.equals(StorageConfigurationManager.class.getName())) {
return new StorageConfigurationManager();
Expand Down
@@ -1,13 +1,18 @@
package org.infinispan.interceptors.xsite;

import java.util.function.Predicate;

import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvocationStage;

/**
Expand All @@ -18,6 +23,13 @@
*/
public class OptimisticBackupInterceptor extends BaseBackupInterceptor {

private boolean hasOnePhaseCommitBackups;

@Inject
public void checkTwoPhaseCommit(Configuration configuration) {
hasOnePhaseCommitBackups = configuration.sites().syncBackupsStream().anyMatch(Predicate.not(BackupConfiguration::isTwoPhaseCommit));
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
if (skipXSiteBackup(command) || !command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ)) {
Expand Down Expand Up @@ -53,9 +65,21 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command)
stage = InvocationStage.completedNullStage();
}

if (hasOnePhaseCommitBackups) {
// One or more backup site(s) using one phase commit.
// Wait for backup site acknowledge, so we have time to rollback and keep the data consistent.
return makeStage(asyncInvokeNext(ctx, command, stage))
.thenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
trackKeysForAsyncBackups(rCommand);
return rv;
});
}

// The backup site(s) is using 2PC, we can send the commit in parallel for both local and remote site.
return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
//we need to track the keys only after it is applied in the local node!
keysFromMods(getModificationsFrom(rCommand)).forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
trackKeysForAsyncBackups(rCommand);
return stage.thenReturn(rCtx, rCommand, rv);
});
}
Expand Down Expand Up @@ -84,4 +108,10 @@ private boolean shouldRollbackRemoteTxCommand(TxInvocationContext<?> ctx) {
private boolean hasBeenPrepared(LocalTxInvocationContext ctx) {
return !ctx.getRemoteLocksAcquired().isEmpty();
}

private void trackKeysForAsyncBackups(CommitCommand command) {
var gtx = command.getGlobalTransaction();
keysFromMods(getModificationsFrom(command))
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), gtx));
}
}
Expand Up @@ -47,11 +47,15 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman
stage = InvocationStage.completedNullStage();
}

return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
keysFromMods(rCommand.getModifications().stream())
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
return stage.thenReturn(rCtx, rCommand, rv);
});
// We need to way for the other site commit.
// In the local site, we have locks locked so no other transaction will commit.
// If the other site fails to commit, we abort it here.
return makeStage(asyncInvokeNext(ctx, command, stage))
.thenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
keysFromMods(rCommand.getModifications().stream())
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
return rv;
});
}
}
7 changes: 5 additions & 2 deletions core/src/main/java/org/infinispan/xsite/BackupSenderImpl.java
Expand Up @@ -168,6 +168,10 @@ public InvocationStage backupRollback(RollbackCommand command, Transaction trans
return backupCommand(command, command, xSiteBackups, transaction);
}

public CustomFailurePolicy<Object, Object> getCustomFailurePolicy(String site) {
return siteFailurePolicy.get(site);
}

private InvocationStage backupCommand(VisitableCommand command, VisitableCommand originalCommand,
List<XSiteBackup> xSiteBackups, Transaction transaction) {
XSiteCacheRequest<Object> xsiteCommand = commandsFactory.buildSingleXSiteRpcCommand(command);
Expand Down Expand Up @@ -333,9 +337,8 @@ void handleException(String siteName, Throwable throwable) {
addException(siteName, throwable);
break;
case CUSTOM:
CustomFailurePolicy<Object,Object> failurePolicy = siteFailurePolicy.get(siteName);
try {
command.acceptVisitor(null, new CustomBackupPolicyInvoker(siteName, failurePolicy, transaction));
command.acceptVisitor(null, new CustomBackupPolicyInvoker(siteName, getCustomFailurePolicy(siteName), transaction));
} catch (Throwable t) {
addException(siteName, t);
}
Expand Down