Skip to content

Commit

Permalink
ISPN-15895 Cross-site: 2PC transaction does not acquire locks
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and ryanemerson committed Apr 9, 2024
1 parent 6242241 commit b57b079
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,22 @@ public final boolean equals(Object obj) {
return this == obj;
}

public synchronized void transactionFailed(Throwable throwable) {
if (isDone()) {
throw new IllegalStateException("Transaction is done. Cannot change status");
}
if (status == Status.STATUS_MARKED_ROLLBACK) {
return;
}
status = Status.STATUS_MARKED_ROLLBACK;
if (firstRollbackException == null) {
firstRollbackException = new RollbackException("Exception caught while running transaction");
firstRollbackException.addSuppressed(throwable);
} else if (!(throwable instanceof RollbackException)) {
firstRollbackException.addSuppressed(throwable);
}
}

private RollbackException hasRollbackException(boolean forceRollback) {
if (firstRollbackException != null) {
if (forceRollback && FORCE_ROLLBACK_MESSAGE.equals(firstRollbackException.getMessage())) {
Expand Down Expand Up @@ -686,6 +702,10 @@ private boolean isDone() {
return false;
}

public RollbackException getRollbackException() {
return firstRollbackException;
}

private static class XaResourceData {
final XAResource xaResource;
volatile int status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public Object construct(String componentName) {
NoOpIracVersionGenerator.getInstance();
} else if (componentName.equals(BackupReceiver.class.getName())) {
return configuration.clustering().cacheMode().isClustered() ?
new ClusteredCacheBackupReceiver(componentRegistry.getCacheName()) :
new ClusteredCacheBackupReceiver() :
null;
} else if (componentName.equals(StorageConfigurationManager.class.getName())) {
return new StorageConfigurationManager();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.infinispan.interceptors.xsite;

import java.util.function.Predicate;

import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvocationStage;

/**
Expand All @@ -18,6 +23,13 @@
*/
public class OptimisticBackupInterceptor extends BaseBackupInterceptor {

private boolean hasOnePhaseCommitBackups;

@Inject
public void checkTwoPhaseCommit(Configuration configuration) {
hasOnePhaseCommitBackups = configuration.sites().syncBackupsStream().anyMatch(Predicate.not(BackupConfiguration::isTwoPhaseCommit));
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
if (skipXSiteBackup(command) || !command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ)) {
Expand Down Expand Up @@ -53,9 +65,21 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command)
stage = InvocationStage.completedNullStage();
}

if (hasOnePhaseCommitBackups) {
// One or more backup site(s) using one phase commit.
// Wait for backup site acknowledge, so we have time to rollback and keep the data consistent.
return makeStage(asyncInvokeNext(ctx, command, stage))
.thenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
trackKeysForAsyncBackups(rCommand);
return rv;
});
}

// The backup site(s) is using 2PC, we can send the commit in parallel for both local and remote site.
return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
//we need to track the keys only after it is applied in the local node!
keysFromMods(getModificationsFrom(rCommand)).forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
trackKeysForAsyncBackups(rCommand);
return stage.thenReturn(rCtx, rCommand, rv);
});
}
Expand Down Expand Up @@ -84,4 +108,10 @@ private boolean shouldRollbackRemoteTxCommand(TxInvocationContext<?> ctx) {
private boolean hasBeenPrepared(LocalTxInvocationContext ctx) {
return !ctx.getRemoteLocksAcquired().isEmpty();
}

private void trackKeysForAsyncBackups(CommitCommand command) {
var gtx = command.getGlobalTransaction();
keysFromMods(getModificationsFrom(command))
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), gtx));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman
stage = InvocationStage.completedNullStage();
}

return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
keysFromMods(rCommand.getModifications().stream())
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
return stage.thenReturn(rCtx, rCommand, rv);
});
// We need to way for the other site commit.
// In the local site, we have locks locked so no other transaction will commit.
// If the other site fails to commit, we abort it here.
return makeStage(asyncInvokeNext(ctx, command, stage))
.thenApply(ctx, command, (rCtx, rCommand, rv) -> {
//for async, all nodes need to keep track of the updates keys after it is applied locally.
keysFromMods(rCommand.getModifications().stream())
.forEach(key -> iracManager.trackUpdatedKey(key.getSegment(), key.getKey(), rCommand.getGlobalTransaction()));
return rv;
});
}
}
7 changes: 5 additions & 2 deletions core/src/main/java/org/infinispan/xsite/BackupSenderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public InvocationStage backupRollback(RollbackCommand command, Transaction trans
return backupCommand(command, command, xSiteBackups, transaction);
}

public CustomFailurePolicy<Object, Object> getCustomFailurePolicy(String site) {
return siteFailurePolicy.get(site);
}

private InvocationStage backupCommand(VisitableCommand command, VisitableCommand originalCommand,
List<XSiteBackup> xSiteBackups, Transaction transaction) {
XSiteCacheRequest<Object> xsiteCommand = commandsFactory.buildSingleXSiteRpcCommand(command);
Expand Down Expand Up @@ -333,9 +337,8 @@ void handleException(String siteName, Throwable throwable) {
addException(siteName, throwable);
break;
case CUSTOM:
CustomFailurePolicy<Object,Object> failurePolicy = siteFailurePolicy.get(siteName);
try {
command.acceptVisitor(null, new CustomBackupPolicyInvoker(siteName, failurePolicy, transaction));
command.acceptVisitor(null, new CustomBackupPolicyInvoker(siteName, getCustomFailurePolicy(siteName), transaction));
} catch (Throwable t) {
addException(siteName, t);
}
Expand Down

0 comments on commit b57b079

Please sign in to comment.