Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

RIFTSAW-505 Add support of web service transactions #2

Merged
merged 1 commit into from

2 participants

@ibek

No description provided.

@objectiser objectiser merged commit d1dc22d into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 22, 2012
  1. @ibek
This page is out of date. Refresh to see the latest.
View
6 bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
@@ -109,6 +109,8 @@
public static final String PROP_CACHE_PROVIDER = "cache.provider";
public static final String PROP_MIGRATION_TRANSACTION_TIMEOUT = "migration.transaction.timeout";
+
+ public static final String PROP_XTS_ENABLE = "xts.enable";
public static final String DEFAULT_TX_FACTORY_CLASS_NAME = "org.apache.ode.il.EmbeddedGeronimoFactory";
@@ -392,4 +394,8 @@ public int getMigrationTransactionTimeout() {
return Integer.valueOf(getProperty(PROP_MIGRATION_TRANSACTION_TIMEOUT, String.valueOf(0)));
}
+ public boolean isXTSEnable() {
+ return Boolean.valueOf(getProperty(OdeConfigProperties.PROP_XTS_ENABLE, "false"));
+ }
+
}
View
5 bpel-runtime/pom.xml
@@ -116,6 +116,11 @@
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.jboss.narayana.xts</groupId>
+ <artifactId>jbossxts</artifactId>
+ <version>${narayana.version}</version>
+ </dependency>
</dependencies>
View
10 bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -690,6 +690,16 @@ public void setProcessSize(QName processId, boolean hydratedOnce) {
}
}
+ private boolean _xtsEnable = false;
+
+ public void setXTSEnable(boolean xtsEnable) {
+ this._xtsEnable = xtsEnable;
+ }
+
+ public boolean isXTSEnable() {
+ return _xtsEnable;
+ }
+
/**
* Returns true if the last used process was dehydrated because it was not in-use.
*/
View
20 bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -69,6 +69,7 @@
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
import org.apache.ode.bpel.runtime.channels.FaultData;
+import org.apache.ode.bpel.wstx.WebServiceTransaction;
import org.apache.ode.dao.bpel.BpelDAOConnection;
import org.apache.ode.dao.bpel.DeferredProcessInstanceCleanable;
import org.apache.ode.dao.bpel.ProcessDAO;
@@ -150,6 +151,9 @@
private boolean _checkGuid=true;
+ /** Web service transaction of process instance */
+ private Map<Long, WebServiceTransaction> _wstMap;
+
public BpelProcess(ProcessConf conf) {
_pid = conf.getProcessId();
_pconf = conf;
@@ -1173,4 +1177,20 @@ public long getTimeout(OPartnerLink partnerLink, boolean p2p) {
public long getVersion() {
return Long.parseLong(_pid.getLocalPart().substring(_pid.getLocalPart().lastIndexOf('-') + 1));
}
+
+ public WebServiceTransaction getWebServiceTransaction(Long instanceId) {
+ return (_wstMap != null)?_wstMap.get(instanceId):null;
+ }
+
+ public void setWebServiceTransaction(Long instanceId, WebServiceTransaction wst) {
+ if (_wstMap == null) {
+ _wstMap = new HashMap<Long, WebServiceTransaction>();
+ }
+ _wstMap.put(instanceId, wst);
+ }
+
+ public void removeWebServiceTransaction(Long instanceId) {
+ _wstMap.remove(instanceId);
+ }
+
}
View
146 bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
@@ -25,10 +25,15 @@
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
+import javax.wsdl.Binding;
+import javax.wsdl.BindingOperation;
import javax.wsdl.Fault;
import javax.wsdl.Operation;
+import javax.wsdl.extensions.ExtensibilityElement;
+import javax.wsdl.extensions.UnknownExtensibilityElement;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -85,6 +90,9 @@
import org.apache.ode.bpel.runtime.channels.InvokeResponseChannel;
import org.apache.ode.bpel.runtime.channels.PickResponseChannel;
import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
+import org.apache.ode.bpel.wstx.WebServiceTransaction;
+import org.apache.ode.bpel.wstx.WebServiceTransactionFactory;
+import org.apache.ode.bpel.wstx.WebServiceTransactionType;
import org.apache.ode.dao.bpel.CorrelationSetDAO;
import org.apache.ode.dao.bpel.CorrelatorDAO;
import org.apache.ode.dao.bpel.MessageDAO;
@@ -136,10 +144,13 @@
/** Five second maximum for continous execution. */
private long _maxReductionTimeMs = 2000000;
+
+ private WebServiceTransaction _wst;
public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
MyRoleMessageExchangeImpl instantiatingMessageExchange) {
_bpelProcess = bpelProcess;
+ _wst = bpelProcess.getWebServiceTransaction(dao.getInstanceId());
_dao = dao;
_iid = dao.getInstanceId();
_instantiatingMessageExchange = instantiatingMessageExchange;
@@ -243,7 +254,16 @@ public void completedFault(FaultData faultData) {
_bpelProcess._engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
}
- public void beforeCompletion() {
+ public void beforeCompletion() {
+ if (_wst != null && _wst.isActive() && !_wst.isSubordinate()) {
+ try {
+ _wst.rollback();
+ } catch (Exception e) {
+ __log.warn("Web service transaction wasn't properly aborted or it is already rolled back.");
+ } finally {
+ _bpelProcess.removeWebServiceTransaction(_dao.getInstanceId());
+ }
+ }
_dao.delete(_bpelProcess.getCleanupCategories(false), false);
}
});
@@ -272,7 +292,16 @@ public void completedOk() {
_bpelProcess._engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
}
- public void beforeCompletion() {
+ public void beforeCompletion() {
+ if (_wst != null && _wst.isActive() && !_wst.isSubordinate()) {
+ try {
+ _wst.commit();
+ } catch (Exception e) {
+ __log.warn("Web service transaction wasn't commited or it is already commited.");
+ } finally {
+ _bpelProcess.removeWebServiceTransaction(_dao.getInstanceId());
+ }
+ }
_dao.delete(_bpelProcess.getCleanupCategories(true), false);
}
});
@@ -706,6 +735,15 @@ public void terminate() {
public void afterCompletion(boolean success) {
}
public void beforeCompletion() {
+ if (_wst != null && _wst.isActive() && !_wst.isSubordinate()) {
+ try {
+ _wst.rollback();
+ } catch (Exception e) {
+ __log.warn("Web service transaction wasn't properly aborted or it is already rolled back.");
+ } finally {
+ _bpelProcess.removeWebServiceTransaction(_dao.getInstanceId());
+ }
+ }
_dao.delete(_bpelProcess.getCleanupCategories(false), false);
}
});
@@ -741,13 +779,13 @@ private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKeySet ke
public void checkInvokeExternalPermission() {}
/**
- * Called back when the process executes an invokation.
+ * Called back when the process executes an invocation.
*
- * @param activityId The activity id in the process definition (id of OInvoke)
- * @param partnerLinkInstance The partner link variable instance
+ * @param aid The activity id in the process definition (id of OInvoke)
+ * @param partnerLink The partner link variable instance
* @param operation The wsdl operation.
- * @param outboundMsg The message sent outside as a DOM
- * @param invokeResponseChannel Object called back when the response is received.
+ * @param outgoingMessage The message sent outside as a DOM
+ * @param channel Object called back when the response is received.
* @return The instance id of the message exchange.
* @throws FaultException When the response is a fault or when the invoke could not be executed
* in which case it is one of the bpel standard fault.
@@ -775,7 +813,7 @@ public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operati
BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink +
", op=" + operation.getName() + " channel=" + channel + ")");
}
-
+
// prepare event
ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
evt.setOperation(operation.getName());
@@ -828,6 +866,45 @@ public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operati
if (getConfigForPartnerLink(partnerLink.partnerLink).usePeer2Peer && partnerEndpoint != null)
p2pProcesses = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, mex.getRequest());
+ if (_bpelProcess._engine.isXTSEnable()) {
+
+ WebServiceTransactionType wstType = getTypeOfWSTAssertion(operation, _bpelProcess.getConf().getDefinitionForService(partnerEndpoint.serviceName).getBindings().values());
+
+ if ((_wst == null || !_wst.isActive()) && wstType != WebServiceTransactionType.NOT_DETERMINED) {
+ // Creating a distributed transaction if the operation has an AtomicTransaction or BusinessActivity assertion
+ if (BpelProcess.__log.isDebugEnabled()) {
+ __log.debug("Creating distributed transaction ...");
+ }
+ _wst = WebServiceTransactionFactory.instance(wstType);
+ if (_wst != null) {
+ try {
+ _wst.begin(_instantiatingMessageExchange.getRequest());
+ _bpelProcess.setWebServiceTransaction(_dao.getInstanceId(), _wst);
+ if (BpelProcess.__log.isDebugEnabled()) {
+ __log.debug("Distributed transaction has been created with id = " + _wst.getTransactionIdentifier());
+ }
+ } catch (Exception e) {
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnUnknownFault, "Web Service Transaction error while creating the transaction. "+e.getMessage(), e);
+ }
+ }
+ }
+
+ if (_wst != null && _wst.isActive() && wstType != WebServiceTransactionType.NOT_DETERMINED) {
+ if (wstType != _wst.getType()) {
+ __log.warn("The invocation requires another type of web service transaction. The coordination context won't be sent.");
+ } else {
+ try {
+ Element headerElement = message.getHeader();
+ headerElement = _wst.putCoordinationContext(headerElement);
+ message.setHeader(headerElement);
+ } catch (Exception e) {
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnUnknownFault, "Cannot put transaction context into message header. "+e.getMessage(), e);
+ }
+ }
+ }
+
+ }
+
if (p2pProcesses != null && !p2pProcesses.isEmpty()) {
// Creating a my mex using the same message id as partner mex to "pipe" them
MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
@@ -870,6 +947,7 @@ public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operati
mex.setStatus(MessageExchange.Status.REQUEST);
// Assuming an unreliable protocol, we schedule a task to check if recovery mode will be needed
scheduleInvokeCheck(mex, partnerLink.partnerLink, false);
+
_bpelProcess._engine._contexts.mexContext.invokePartner(mex);
} else {
__log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
@@ -908,6 +986,49 @@ public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operati
return mexDao.getMessageExchangeId();
}
+ @SuppressWarnings("unchecked")
+ private WebServiceTransactionType getTypeOfWSTAssertion(Operation operation, Collection<Binding> bindings){
+ Iterator<Binding> i = bindings.iterator();
+ while(i.hasNext()){
+ Binding b = i.next();
+ List<BindingOperation> bolist = b.getBindingOperations();
+ for(BindingOperation bo : bolist){
+ if(bo.getOperation().getName().compareTo(operation.getName()) != 0)
+ continue;
+ List<ExtensibilityElement> eelist = bo.getExtensibilityElements();
+ for(ExtensibilityElement ee : eelist){
+ if (! (ee instanceof UnknownExtensibilityElement))
+ continue;
+ UnknownExtensibilityElement uee = (UnknownExtensibilityElement) ee;
+ if(uee.getElementType().getLocalPart().equals("PolicyReference")){
+ String uri = uee.getElement().getAttribute("URI").substring(1);
+ NodeList policyNodeList = uee.getElement().getOwnerDocument().getElementsByTagNameNS("*", "Policy");
+ for (int j = 0; j < policyNodeList.getLength(); j++) {
+ Element element = (Element) policyNodeList.item(j);
+ String refUri = element.getAttributeNS("http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd", "Id");
+ if (refUri != null && refUri.equals(uri)) {
+ NodeList nlist = element.getElementsByTagNameNS("http://docs.oasis-open.org/ws-tx/wsat/2006/06","ATAssertion");
+ if(nlist!= null && nlist.getLength() == 1){
+ return WebServiceTransactionType.ATOMIC_TRANSACTION;
+ }
+ nlist = element.getElementsByTagNameNS("http://docs.oasis-open.org/ws-tx/wsba/2006/06","BAAtomicOutcomeAssertion");
+ if(nlist!= null && nlist.getLength() == 1){
+ return WebServiceTransactionType.BUSINESS_ACTIVITY_ATOMIC_OUTCOME;
+ }
+ nlist = element.getElementsByTagNameNS("http://docs.oasis-open.org/ws-tx/wsba/2006/06","BAMixedOutcomeAssertion");
+ if(nlist!= null && nlist.getLength() == 1){
+ return WebServiceTransactionType.BUSINESS_ACTIVITY_MIXED_OUTCOME;
+ }
+ return WebServiceTransactionType.NOT_DETERMINED;
+ }
+ }
+ }
+ }
+ }
+ }
+ return WebServiceTransactionType.NOT_DETERMINED;
+ }
+
// enable extensibility
protected PartnerRoleMessageExchangeImpl createPartnerRoleMessageExchangeImpl(MessageExchangeDAO mexDao,
PartnerLinkInstance partnerLink, Operation operation, EndpointReference partnerEpr,
@@ -1004,6 +1125,15 @@ public void execute() {
throw new BpelEngineException(e);
}
}
+
+ if (_wst != null && _wst.isActive()) {
+ try {
+ _wst.complete();
+ } catch (Exception e) {
+ __log.warn("BusinessActivity partially completion failed.", e);
+ }
+ }
+
}
}
View
4 bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
@@ -576,6 +576,10 @@ public void setInstanceThrottledMaximumCount(
_engine.setInstanceThrottledMaximumCount(instanceThrottledMaximumCount);
}
+ public void setXTSEnable(boolean xtsEnable) {
+ _engine.setXTSEnable(xtsEnable);
+ }
+
/**
* A polled runnable instance that implements this interface will be set
* with the contexts before the run() method is called.
View
163 bpel-runtime/src/main/java/org/apache/ode/bpel/wstx/AtomicTransaction.java
@@ -0,0 +1,163 @@
+package org.apache.ode.bpel.wstx;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.engine.MessageImpl;
+import org.apache.ode.bpel.iapi.Message;
+import org.oasis_open.docs.ws_tx.wscoor._2006._06.CoordinationContextType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import com.arjuna.mw.wst.TxContext;
+import com.arjuna.mw.wst11.TransactionManager;
+import com.arjuna.mw.wst11.UserTransaction;
+import com.arjuna.mw.wst11.common.CoordinationContextHelper;
+import com.arjuna.mwlabs.wst11.at.context.TxContextImple;
+import com.arjuna.webservices11.wscoor.CoordinationConstants;
+import com.arjuna.wst.SystemException;
+import com.arjuna.wst.TransactionRolledBackException;
+import com.arjuna.wst.UnknownTransactionException;
+import com.arjuna.wst.WrongStateException;
+
+/**
+ *
+ * @author ibek
+ *
+ */
+public class AtomicTransaction implements WebServiceTransaction {
+
+ private static final Log __log = LogFactory.getLog(AtomicTransaction.class);
+
+ protected UserTransaction _tx;
+ protected TxContext _txcontext;
+ protected boolean _active;
+ protected boolean _subordinate;
+
+ public AtomicTransaction() {
+ _active = false;
+ _subordinate = false;
+ }
+
+ /**
+ * Begin of transaction must be performed with mutual exclusion in one thread
+ * because the registration service cannot begin more transactions concurrently.
+ */
+ private static synchronized void begin(UserTransaction tx) throws WrongStateException, SystemException{
+ tx.begin();
+ }
+
+ public void begin(Message bpelRequest) throws WrongStateException, SystemException {
+ MessageImpl req = (MessageImpl)bpelRequest;
+ _subordinate = false;
+ if(req._dao.getHeader() != null){
+ try {
+ NodeList cc = req._dao.getHeader().getElementsByTagNameNS(CoordinationConstants.WSCOOR_NAMESPACE,
+ CoordinationConstants.WSCOOR_ELEMENT_COORDINATION_CONTEXT);
+ if (cc.getLength() > 0){
+ CoordinationContextType cct = CoordinationContextHelper.deserialise((Element)cc.item(0));
+ if (cct != null) {
+ TxContext ctx = new TxContextImple(cct);
+ TransactionManager.getTransactionManager().resume(ctx);
+ _subordinate = true;
+ }
+ }
+ } catch (Exception e) {
+ __log.error("Wrong coordination context. The transaction won't be subordinated.");
+ }
+ }
+
+ _tx = UserTransaction.getUserTransaction();
+ if (_subordinate && _tx != null) {
+ _tx = UserTransaction.getUserTransaction().getUserSubordinateTransaction();
+ }
+
+ if (_tx == null)
+ throw new SystemException(
+ "Distributed transaction has not been created. Check that JBoss XTS is runnning.");
+ try {
+ begin(_tx);
+ } catch (WrongStateException wse) {
+ TransactionManager.getTransactionManager().suspend(); // previous transaction will be resumed by another instance
+ _tx = UserTransaction.getUserTransaction();
+ begin(_tx); // we try again to create new transaction
+ }
+ _txcontext = TransactionManager.getTransactionManager().currentTransaction();
+ _active = true;
+ }
+
+ public void commit() throws SecurityException, UnknownTransactionException, SystemException,
+ WrongStateException {
+ _active = false;
+ try {
+ resume();
+ _tx.commit();
+ } catch (TransactionRolledBackException e) {
+ __log.debug("Web service transaction was aborted.");
+ } finally {
+ _tx = null;
+ _txcontext = null;
+ }
+ }
+
+ public void complete() throws UnknownTransactionException, SystemException, WrongStateException {
+ // this is atomic transaction, we cannot partially complete the transaction as BusinessActivity can
+ }
+
+ public boolean isActive() {
+ return _tx != null && _active;
+ }
+
+ public boolean isSubordinate() {
+ return _subordinate;
+ }
+
+ public String getTransactionIdentifier() {
+ return _tx.transactionIdentifier();
+ }
+
+ public WebServiceTransactionType getType() {
+ return WebServiceTransactionType.ATOMIC_TRANSACTION;
+ }
+
+ public void rollback() throws SecurityException, UnknownTransactionException, SystemException,
+ WrongStateException {
+ _active = false;
+ try{
+ resume();
+ _tx.rollback();
+ } finally {
+ _tx = null;
+ _txcontext = null;
+ }
+ }
+
+ public void resume() throws UnknownTransactionException, SystemException {
+ if (!_txcontext.equals(TransactionManager.getTransactionManager().currentTransaction())) {
+ TransactionManager.getTransactionManager().resume(_txcontext);
+ _tx = UserTransaction.getUserTransaction();
+ }
+ }
+
+ public void suspend() throws SystemException {
+ _txcontext = TransactionManager.getTransactionManager().suspend();
+ }
+
+ public Element putCoordinationContext(Element headerElement) throws UnknownTransactionException, SystemException {
+ resume();
+ final TxContextImple txContext = (TxContextImple) _txcontext;
+ CoordinationContextType ctx = txContext.context().getCoordinationContext();
+ try {
+ Document doc = headerElement.getOwnerDocument();
+ Element coord = doc.createElementNS(CoordinationConstants.WSCOOR_NAMESPACE,
+ CoordinationConstants.WSCOOR_ELEMENT_COORDINATION_CONTEXT);
+ headerElement.appendChild(coord);
+ CoordinationContextHelper.serialise(ctx, headerElement);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new SystemException("Coordination context has not been added to the header.");
+ }
+ return headerElement;
+ }
+
+}
View
170 bpel-runtime/src/main/java/org/apache/ode/bpel/wstx/BusinessActivity.java
@@ -0,0 +1,170 @@
+package org.apache.ode.bpel.wstx;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.engine.MessageImpl;
+import org.apache.ode.bpel.iapi.Message;
+import org.oasis_open.docs.ws_tx.wscoor._2006._06.CoordinationContextType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import com.arjuna.mw.wst.TxContext;
+import com.arjuna.mw.wst11.BusinessActivityManager;
+import com.arjuna.mw.wst11.TransactionManager;
+import com.arjuna.mw.wst11.UserBusinessActivity;
+import com.arjuna.mw.wst11.UserTransaction;
+import com.arjuna.mw.wst11.common.CoordinationContextHelper;
+import com.arjuna.mwlabs.wst11.ba.context.TxContextImple;
+import com.arjuna.webservices11.wscoor.CoordinationConstants;
+import com.arjuna.wst.SystemException;
+import com.arjuna.wst.TransactionRolledBackException;
+import com.arjuna.wst.UnknownTransactionException;
+import com.arjuna.wst.WrongStateException;
+
+/**
+ *
+ * TODO: consider AtomicOutcome and MixedOutcome
+ *
+ * @author ibek
+ *
+ */
+public class BusinessActivity implements WebServiceTransaction {
+
+ private static final Log __log = LogFactory.getLog(BusinessActivity.class);
+
+ protected UserBusinessActivity _uba;
+ protected TxContext _txcontext;
+ protected boolean _active;
+ protected WebServiceTransactionType _type;
+ protected boolean _subordinate;
+
+ public BusinessActivity(WebServiceTransactionType type) {
+ _type = type;
+ _active = false;
+ _subordinate = false;
+ }
+
+ /**
+ * Begin of transaction must be performed with mutual exclusion in one thread
+ * because the registration service cannot begin more transactions concurrently.
+ */
+ private static synchronized void begin(UserBusinessActivity uba) throws WrongStateException, SystemException{
+ uba.begin();
+ }
+
+ public void begin(Message bpelRequest) throws WrongStateException, SystemException {
+ MessageImpl req = (MessageImpl)bpelRequest;
+ _subordinate = false;
+ if(req._dao.getHeader() != null){
+ try {
+ NodeList cc = req._dao.getHeader().getElementsByTagNameNS(CoordinationConstants.WSCOOR_NAMESPACE,
+ CoordinationConstants.WSCOOR_ELEMENT_COORDINATION_CONTEXT);
+ if (cc.getLength() > 0){
+ CoordinationContextType cct = CoordinationContextHelper.deserialise((Element)cc.item(0));
+ if (cct != null) {
+ TxContext ctx = new TxContextImple(cct);
+ BusinessActivityManager.getBusinessActivityManager().resume(ctx);
+ _subordinate = true;
+ }
+ }
+ } catch (Exception e) {
+ __log.warn("Wrong coordination context. The transaction won't be subordinated.");
+ }
+ }
+
+ _uba = UserBusinessActivity.getUserBusinessActivity();
+ if (_subordinate && _uba != null) {
+ _uba = UserBusinessActivity.getUserBusinessActivity().getUserSubordinateBusinessActivity();
+ }
+
+ if (_uba == null)
+ throw new SystemException(
+ "Distributed transaction has not been created. Check that JBoss XTS is runnning.");
+ try {
+ begin(_uba);
+ } catch (WrongStateException wse) {
+ BusinessActivityManager.getBusinessActivityManager().suspend(); // previous transaction will be resumed by another instance
+ _uba = UserBusinessActivity.getUserBusinessActivity();
+ begin(_uba); // we try again to create new transaction
+ }
+ _txcontext = BusinessActivityManager.getBusinessActivityManager().currentTransaction();
+ _active = true;
+ }
+
+ public void commit() throws SecurityException, UnknownTransactionException, SystemException,
+ WrongStateException {
+ _active = false;
+ try {
+ resume();
+ _uba.close();
+ } catch (TransactionRolledBackException e) {
+ __log.info("Web service transaction was aborted");
+ } finally {
+ _uba = null;
+ _txcontext = null;
+ }
+ }
+
+ public void complete() throws UnknownTransactionException, SystemException, WrongStateException {
+ _uba.complete();
+ }
+
+ public boolean isActive() {
+ return _uba != null && _active;
+ }
+
+ public boolean isSubordinate() {
+ return _subordinate;
+ }
+
+ public void rollback() throws SecurityException, UnknownTransactionException, SystemException,
+ WrongStateException {
+ _active = false;
+ try{
+ resume();
+ _uba.cancel();
+ } finally {
+ _uba = null;
+ _txcontext = null;
+ }
+ }
+
+ public String getTransactionIdentifier() {
+ return _uba.transactionIdentifier();
+ }
+
+ public WebServiceTransactionType getType() {
+ return _type;
+ }
+
+ public void resume() throws UnknownTransactionException, SystemException {
+ if (!_txcontext.equals(BusinessActivityManager.getBusinessActivityManager().currentTransaction())) {
+ BusinessActivityManager.getBusinessActivityManager().resume(_txcontext);
+ _uba = UserBusinessActivity.getUserBusinessActivity();
+ }
+ }
+
+ public void suspend() throws SystemException {
+ _txcontext = BusinessActivityManager.getBusinessActivityManager().suspend();
+ }
+
+ public Element putCoordinationContext(Element headerElement)
+ throws UnknownTransactionException, SystemException {
+ resume();
+ final TxContextImple txContext = (TxContextImple) BusinessActivityManager.getBusinessActivityManager().currentTransaction();
+ CoordinationContextType ctx = txContext.context().getCoordinationContext();
+ try {
+ Document doc = headerElement.getOwnerDocument();
+ Element coord = doc.createElementNS(CoordinationConstants.WSCOOR_NAMESPACE,
+ CoordinationConstants.WSCOOR_ELEMENT_COORDINATION_CONTEXT);
+ headerElement.appendChild(coord);
+ CoordinationContextHelper.serialise(ctx, headerElement);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new SystemException("Coordination context has not been added to the header.");
+ }
+ return headerElement;
+ }
+
+}
View
48 bpel-runtime/src/main/java/org/apache/ode/bpel/wstx/WebServiceTransaction.java
@@ -0,0 +1,48 @@
+package org.apache.ode.bpel.wstx;
+
+import org.apache.ode.bpel.iapi.Message;
+import org.w3c.dom.Element;
+
+import com.arjuna.wst.SystemException;
+import com.arjuna.wst.UnknownTransactionException;
+import com.arjuna.wst.WrongStateException;
+
+/**
+ *
+ * @author ibek
+ *
+ */
+public interface WebServiceTransaction {
+
+ /**
+ * TODO: begin transaction should have specified timeout
+ *
+ * @throws WrongStateException
+ * @throws SystemException
+ */
+ public void begin(Message bpelRequest) throws WrongStateException, SystemException;
+
+ public void commit() throws SecurityException, UnknownTransactionException, SystemException,
+ WrongStateException;
+
+ public void complete() throws UnknownTransactionException, SystemException, WrongStateException;
+
+ public boolean isActive();
+
+ public boolean isSubordinate();
+
+ public void rollback() throws SecurityException, UnknownTransactionException, SystemException,
+ WrongStateException;
+
+ public String getTransactionIdentifier();
+
+ public WebServiceTransactionType getType();
+
+ public void resume() throws UnknownTransactionException, SystemException;
+
+ public void suspend() throws SystemException;
+
+ public Element putCoordinationContext(Element headerElement)
+ throws UnknownTransactionException, SystemException;
+
+}
View
22 bpel-runtime/src/main/java/org/apache/ode/bpel/wstx/WebServiceTransactionFactory.java
@@ -0,0 +1,22 @@
+package org.apache.ode.bpel.wstx;
+
+/**
+ *
+ * @author ibek
+ *
+ */
+public class WebServiceTransactionFactory {
+
+ public static WebServiceTransaction instance(WebServiceTransactionType type) {
+ switch(type){
+ case ATOMIC_TRANSACTION:
+ return new AtomicTransaction();
+ case BUSINESS_ACTIVITY_ATOMIC_OUTCOME:
+ case BUSINESS_ACTIVITY_MIXED_OUTCOME:
+ return new BusinessActivity(type);
+ default:
+ return null;
+ }
+ }
+
+}
View
10 bpel-runtime/src/main/java/org/apache/ode/bpel/wstx/WebServiceTransactionType.java
@@ -0,0 +1,10 @@
+package org.apache.ode.bpel.wstx;
+
+public enum WebServiceTransactionType {
+
+ NOT_DETERMINED,
+ ATOMIC_TRANSACTION,
+ BUSINESS_ACTIVITY_ATOMIC_OUTCOME,
+ BUSINESS_ACTIVITY_MIXED_OUTCOME
+
+}
View
1  pom.xml
@@ -115,6 +115,7 @@
<h2.version>1.2.131</h2.version>
<activity.monitor.model.version>1.2.1.Final</activity.monitor.model.version>
<jmock.version>1.2.0</jmock.version>
+ <narayana.version>5.0.0.M1</narayana.version>
</properties>
Something went wrong with that request. Please try again.