Skip to content

Commit

Permalink
Merge pull request #1060 from tomjenkinson/JBTM-2701-new
Browse files Browse the repository at this point in the history
JBTM-2701 Updated to allow JCA recover call to refresh the list of XI…
  • Loading branch information
tomjenkinson committed Aug 30, 2016
2 parents 065834a + 53ccee5 commit f68e5bb
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 22 deletions.
Expand Up @@ -53,6 +53,7 @@
import com.arjuna.ats.arjuna.objectstore.RecoveryStore;
import com.arjuna.ats.arjuna.objectstore.StateStatus;
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.arjuna.recovery.RecoveryModule;
import com.arjuna.ats.arjuna.state.InputObjectState;
import com.arjuna.ats.internal.arjuna.common.UidHelper;
Expand Down Expand Up @@ -137,13 +138,15 @@ public List<SerializableXAResourceDeserializer> getSeriablizableXAResourceDeseri

public synchronized void periodicWorkFirstPass()
{
// JBTM-1354 allow a second thread to execute the first pass but make sure it is only done once per scan (TMSTART/ENDSCAN)
synchronized (scanState) {
if (!getScanState().equals(ScanStates.IDLE))
return;
// JBTM-1354 JCA needs to be able to recover XAResources associated with a subordinate transaction so we have to do at least
// the start scan to make sure that we have loaded all the XAResources we possibly can to assist subordinate transactions recovering
// the reason we can't do bottom up recovery is if this server has an XAResource which tries to recover a remote server (e.g. distributed JTA)
// then we get deadlock on the secondpass
if (getScanState() == ScanStates.BETWEEN_PASSES) {
periodicWorkSecondPass();
}

setScanState(ScanStates.FIRST_PASS); // synchronized uses a reentrant lock
}
setScanState(ScanStates.FIRST_PASS); // synchronized uses a reentrant lock

if(jtaLogger.logger.isDebugEnabled()) {
jtaLogger.logger.debugv("{0} - first pass", _logName);
Expand Down Expand Up @@ -192,7 +195,7 @@ public synchronized void periodicWorkFirstPass()
setScanState(ScanStates.BETWEEN_PASSES);
}

public void periodicWorkSecondPass()
public synchronized void periodicWorkSecondPass()
{
setScanState(ScanStates.SECOND_PASS);

Expand Down Expand Up @@ -229,6 +232,27 @@ public void periodicWorkSecondPass()
setScanState(ScanStates.IDLE);
}

public static XARecoveryModule getRegisteredXARecoveryModule () {
if (registeredXARecoveryModule == null) {
RecoveryManager recMan = RecoveryManager.manager();
Vector recoveryModules = recMan.getModules();

if (recoveryModules != null) {
Enumeration modules = recoveryModules.elements();

while (modules.hasMoreElements()) {
RecoveryModule m = (RecoveryModule) modules.nextElement();

if (m instanceof XARecoveryModule) {
registeredXARecoveryModule = (XARecoveryModule) m;
break;
}
}
}
}
return registeredXARecoveryModule;
}

public String id()
{
return "XARecoveryModule:" + _recoveryManagerClass;
Expand All @@ -242,13 +266,18 @@ public String id()
*/
private XAResource getNewXAResource(Xid xid)
{
// JBTM-1354 JCA needs to be able to recover XAResources associated with a subordinate transaction so we have to do at least
// the start scan to make sure that we have loaded all the XAResources we possibly can to assist subordinate transactions recovering
// the reason we can't do bottom up recovery is if this server has an XAResource which tries to recover a remote server (e.g. distributed JTA)
// then we get deadlock on the secondpass
periodicWorkFirstPass();
XAResource toReturn = getTheKey(xid);

if (toReturn == null) {
periodicWorkFirstPass();
toReturn = getTheKey(xid);
}

if (_xidScans != null)
return toReturn;
}

private XAResource getTheKey(Xid xid) {
if (_xidScans != null)
{
Enumeration<XAResource> keys = _xidScans.keys();

Expand All @@ -265,9 +294,8 @@ private XAResource getNewXAResource(Xid xid)
}
}
}

return null;
}
}

/**
* @param xaResourceRecord The record to reassociate.
Expand Down Expand Up @@ -1054,4 +1082,5 @@ private enum ScanStates {

private Set<String> contactedJndiNames = new HashSet<String>();

private static XARecoveryModule registeredXARecoveryModule;
}
Expand Up @@ -33,7 +33,9 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Stack;
import java.util.Vector;

import javax.transaction.HeuristicCommitException;
import javax.transaction.HeuristicMixedException;
Expand All @@ -49,8 +51,11 @@
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.objectstore.RecoveryStore;
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.arjuna.recovery.RecoveryModule;
import com.arjuna.ats.arjuna.state.InputObjectState;
import com.arjuna.ats.internal.arjuna.common.UidHelper;
import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule;
import com.arjuna.ats.internal.jta.resources.spi.XATerminatorExtensions;
import com.arjuna.ats.internal.jta.transaction.arjunacore.subordinate.jca.SubordinateAtomicAction;
import com.arjuna.ats.internal.jta.transaction.arjunacore.subordinate.jca.TransactionImple;
Expand Down Expand Up @@ -319,12 +324,20 @@ public Xid[] recover (int flag) throws XAException
case XAResource.TMSTARTRSCAN: // check the object store
if (_recoveryStarted)
throw new XAException(XAException.XAER_PROTO);
else
else {
_recoveryStarted = true;
if (XARecoveryModule.getRegisteredXARecoveryModule() != null) {
XARecoveryModule.getRegisteredXARecoveryModule().periodicWorkFirstPass();
}
}
break;
case XAResource.TMENDRSCAN: // null op for us
if (_recoveryStarted)
if (_recoveryStarted) {
_recoveryStarted = false;
if (XARecoveryModule.getRegisteredXARecoveryModule() != null) {
XARecoveryModule.getRegisteredXARecoveryModule().periodicWorkSecondPass();
}
}
else
throw new XAException(XAException.XAER_PROTO);
return null;
Expand Down
Expand Up @@ -154,7 +154,6 @@ public void run() {

// Run the scan to clear the content
manager.scan();
manager.scan();

assertTrue(xaResource.wasCommitted());
assertFalse(xaResource.wasRolledback());
Expand Down
Expand Up @@ -30,21 +30,35 @@
public class TestXAResourceRecovery implements XAResourceRecovery {

private static Stack<XAResource> resources = new Stack<XAResource>();
private static int count = 0;

public XAResource getXAResource() throws SQLException {
return resources.pop();
// This method was changed because now periodicWorkFirstPass can be called to retry loading Xids from XAR
// During getNewXAResource. If we don't return an XAR then getContactedJndiNames (JBTM-860) fails and the TX
// cannot be cleaned up
count--;
XAResource toReturn = resources.remove(0);
resources.push(toReturn);
return toReturn;
}

public boolean initialise(String p) throws SQLException {
return true;
}

public boolean hasMoreResources() {
return resources.size() > 0;
if (count == 0) {
count = 2;
return false;
} else {
return true;
}
}

public static void setResources(XAResource resource, XAResource resource2) {
resources.clear();
resources.push(resource);
resources.push(resource2);
count = 2;
}
}
Expand Up @@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.List;

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

Expand Down Expand Up @@ -289,6 +290,90 @@ public void testXAResourceOrphanFilter () throws Exception
m.invoke(xarm, parameters);
}


@Test
public void testCanRepeatFirstPass () throws Exception
{

XARecoveryModule xarm = new XARecoveryModule();


xarm.addXAResourceRecoveryHelper(new XAResourceRecoveryHelper() {
@Override
public boolean initialise(String p) throws Exception {
return false;
}

@Override
public XAResource[] getXAResources() throws Exception {
return new XAResource[]{new XAResource() {

@Override
public void commit(Xid xid, boolean b) throws XAException {

}

@Override
public void end(Xid xid, int i) throws XAException {

}

@Override
public void forget(Xid xid) throws XAException {

}

@Override
public int getTransactionTimeout() throws XAException {
return 0;
}

@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
return false;
}

@Override
public int prepare(Xid xid) throws XAException {
return 0;
}

@Override
public Xid[] recover(int i) throws XAException {
recoverCalled++;
return new Xid[0];
}

@Override
public void rollback(Xid xid) throws XAException {

}

@Override
public boolean setTransactionTimeout(int i) throws XAException {
return false;
}

@Override
public void start(Xid xid, int i) throws XAException {

}
}};
}
});

xarm.periodicWorkFirstPass();
assertEquals(recoverCalled, 1);
xarm.periodicWorkSecondPass();
assertEquals(recoverCalled, 2);
xarm.periodicWorkFirstPass();
assertEquals(recoverCalled, 3);
xarm.periodicWorkFirstPass();
assertEquals(recoverCalled, 5);
xarm.periodicWorkSecondPass();
assertEquals(recoverCalled, 6);
}

class DummyXAResourceOrphanFilter implements XAResourceOrphanFilter
{
public DummyXAResourceOrphanFilter ()
Expand All @@ -309,4 +394,7 @@ public Vote checkXid(Xid xid)

private Vote _vote;
}

private int recoverCalled;

}
Expand Up @@ -50,6 +50,7 @@
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.state.InputObjectState;
import com.arjuna.ats.internal.arjuna.common.UidHelper;
import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule;
import com.arjuna.ats.internal.jta.resources.spi.XATerminatorExtensions;
import com.arjuna.ats.internal.jta.transaction.arjunacore.jca.SubordinateTransaction;
import com.arjuna.ats.internal.jta.transaction.arjunacore.jca.SubordinationManager;
Expand Down Expand Up @@ -238,12 +239,20 @@ public Xid[] recover (int flag) throws XAException
case XAResource.TMSTARTRSCAN: // check the object store
if (_recoveryStarted)
throw new XAException(XAException.XAER_PROTO);
else
else {
_recoveryStarted = true;
if (XARecoveryModule.getRegisteredXARecoveryModule() != null) {
XARecoveryModule.getRegisteredXARecoveryModule().periodicWorkFirstPass();
}
}
break;
case XAResource.TMENDRSCAN: // null op for us
if (_recoveryStarted)
if (_recoveryStarted) {
_recoveryStarted = false;
if (XARecoveryModule.getRegisteredXARecoveryModule() != null) {
XARecoveryModule.getRegisteredXARecoveryModule().periodicWorkSecondPass();
}
}
else
throw new XAException(XAException.XAER_PROTO);
return null;
Expand Down

0 comments on commit f68e5bb

Please sign in to comment.