diff --git a/api/pom.xml b/api/pom.xml index dc44e8030c..fd00e49ee2 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -31,6 +31,11 @@ test-jar test + + + org.jboss.spec.javax.transaction + jboss-transaction-api_1.2_spec + diff --git a/api/src/main/java/org/teiid/resource/api/XAImporter.java b/api/src/main/java/org/teiid/resource/api/XAImporter.java new file mode 100644 index 0000000000..23eef062b6 --- /dev/null +++ b/api/src/main/java/org/teiid/resource/api/XAImporter.java @@ -0,0 +1,132 @@ +/* + * Copyright Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags and + * the COPYRIGHT.txt file distributed with this work. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.teiid.resource.api; + +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; + +/** + * Provides the functionality of an XATerminator and the ability to import the relevant + * transaction. + */ +public interface XAImporter { + + /** + * Provide the {@link Transaction} for the given {@link Xid} + * @param transactionManager + * @param xid + * @param transactionTimeout + * @return + * @throws XAException + */ + Transaction importTransaction(TransactionManager transactionManager, Xid xid, int transactionTimeout) throws XAException; + + /** + * Commits the global transaction specified by xid. + * + * @param xid A global transaction identifier + * + * @param onePhase If true, the resource manager should use a one-phase + * commit protocol to commit the work done on behalf of xid. + * + * @exception XAException An error has occurred. Possible XAExceptions + * are XA_HEURHAZ, XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, + * XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO. + * + *

If the resource manager did not commit the transaction and the + * parameter onePhase is set to true, the resource manager may throw + * one of the XA_RB* exceptions. Upon return, the resource manager has + * rolled back the branch's work and has released all held resources. + */ + void commit(Xid xid, boolean onePhase) throws XAException; + + /** + * Tells the resource manager to forget about a heuristically + * completed transaction branch. + * + * @param xid A global transaction identifier. + * + * @exception XAException An error has occurred. Possible exception + * values are XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or + * XAER_PROTO. + */ + void forget(Xid xid) throws XAException; + + /** + * Ask the resource manager to prepare for a transaction commit + * of the transaction specified in xid. + * + * @param xid A global transaction identifier. + * + * @exception XAException An error has occurred. Possible exception + * values are: XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, + * or XAER_PROTO. + * + * @return A value indicating the resource manager's vote on the + * outcome of the transaction. The possible values are: XA_RDONLY + * or XA_OK. These constants are defined in + * javax.transaction.xa.XAResource interface. + * If the resource manager wants to roll back the + * transaction, it should do so by raising an appropriate XAException + * in the prepare method. + */ + int prepare(Xid xid) throws XAException; + + /** + * Obtains a list of prepared transaction branches from a resource + * manager. The transaction manager calls this method during recovery + * to obtain the list of transaction branches that are currently in + * prepared or heuristically completed states. + * + * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. TMNOFLAGS + * must be used when no other flags are set in the parameter. These + * constants are defined in javax.transaction.xa.XAResource + * interface. + * + * @exception XAException An error has occurred. Possible values are + * XAER_RMERR, XAER_RMFAIL, XAER_INVAL, and XAER_PROTO. + * + * @return The resource manager returns zero or more XIDs of the + * transaction branches that are currently in a prepared or + * heuristically completed state. If an error occurs during the + * operation, the resource manager should throw the appropriate + * XAException. + */ + Xid[] recover(int flag) throws XAException; + + /** + * Informs the resource manager to roll back work done on behalf + * of a transaction branch. + * + * @param xid A global transaction identifier. + * + * @exception XAException An error has occurred. Possible XAExceptions are + * XA_HEURHAZ, XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, + * XAER_NOTA, XAER_INVAL, or XAER_PROTO. + * + *

If the transaction branch is already marked rollback-only the + * resource manager may throw one of the XA_RB* exceptions. Upon return, + * the resource manager has rolled back the branch's work and has released + * all held resources. + */ + void rollback(Xid xid) throws XAException; + +} diff --git a/connectors/webservice/translator-ws/pom.xml b/connectors/webservice/translator-ws/pom.xml index f082591235..ecc70eef2e 100644 --- a/connectors/webservice/translator-ws/pom.xml +++ b/connectors/webservice/translator-ws/pom.xml @@ -18,6 +18,10 @@ org.teiid teiid-common-core + + + org.jboss.spec.javax.xml.ws + jboss-jaxws-api_2.3_spec com.googlecode.json-simple diff --git a/engine/pom.xml b/engine/pom.xml index 4535459b5a..b063a59806 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -86,11 +86,6 @@ test - - org.jboss.spec.javax.resource - jboss-connector-api_1.7_spec - - org.jboss.spec.javax.transaction jboss-transaction-api_1.2_spec diff --git a/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java b/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java index b9c1e706a3..bb2c884fe6 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java +++ b/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.activation.DataSource; -import javax.resource.ResourceException; import javax.xml.stream.XMLStreamException; import javax.xml.transform.Source; import javax.xml.transform.stax.StAXSource; @@ -362,7 +361,7 @@ public synchronized void execute() throws TranslatorException { if (connection instanceof WrappedConnection) { try { unwrapped = ((WrappedConnection)connection).unwrap(); - } catch (ResourceException e) { + } catch (Exception e) { throw new TranslatorException(QueryPlugin.Event.TEIID30477, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30477, this.manager.getConnectionName())); } } diff --git a/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java b/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java index d088a699e9..1abcd72349 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java +++ b/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java @@ -18,10 +18,6 @@ package org.teiid.dqp.internal.process; -import javax.resource.spi.work.Work; -import javax.resource.spi.work.WorkEvent; -import javax.resource.spi.work.WorkListener; - import org.teiid.logging.LogManager; @@ -30,7 +26,7 @@ * Represents a task that performs work that may take more than one processing pass to complete. * During processing the WorkItem may receive events asynchronously through the moreWork method. */ -public abstract class AbstractWorkItem implements Work, WorkListener { +public abstract class AbstractWorkItem implements Runnable { enum ThreadState { MORE_WORK, WORKING, IDLE, DONE @@ -134,24 +130,4 @@ protected void pauseProcessing() { public abstract String toString(); - @Override - public void release() { - - } - - @Override - public void workAccepted(WorkEvent arg0) { - } - - @Override - public void workCompleted(WorkEvent arg0) { - } - - @Override - public void workRejected(WorkEvent event) { - } - - @Override - public void workStarted(WorkEvent arg0) { - } } diff --git a/engine/src/main/java/org/teiid/dqp/internal/process/FutureWork.java b/engine/src/main/java/org/teiid/dqp/internal/process/FutureWork.java index 11f7b0d01e..76135e20cb 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/process/FutureWork.java +++ b/engine/src/main/java/org/teiid/dqp/internal/process/FutureWork.java @@ -23,14 +23,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; -import javax.resource.spi.work.Work; - import org.teiid.dqp.internal.process.DQPCore.CompletionListener; import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable; import org.teiid.logging.LogConstants; import org.teiid.logging.LogManager; -public final class FutureWork extends FutureTask implements PrioritizedRunnable, Work { +public final class FutureWork extends FutureTask implements PrioritizedRunnable { private int priority; private long creationTime = System.currentTimeMillis(); private DQPWorkContext workContext = DQPWorkContext.getWorkContext(); @@ -79,11 +77,6 @@ public DQPWorkContext getDqpWorkContext() { return workContext; } - @Override - public void release() { - - } - synchronized void addCompletionListener(CompletionListener completionListener) { if (this.isDone()) { completionListener.onCompletion(this); diff --git a/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java b/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java index 957e231c66..35d1d94ea7 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java +++ b/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java @@ -22,8 +22,6 @@ import java.nio.charset.Charset; import java.sql.SQLException; -import javax.resource.spi.work.Work; - import org.teiid.client.lob.LobChunk; import org.teiid.client.util.ResultsReceiver; import org.teiid.core.TeiidComponentException; @@ -38,7 +36,7 @@ import org.teiid.query.QueryPlugin; -public class LobWorkItem implements Work { +public class LobWorkItem implements Runnable { private RequestWorkItem parent; private int chunkSize; @@ -135,8 +133,4 @@ synchronized void setResultsReceiver(ResultsReceiver resultsReceiver) this.resultsReceiver = resultsReceiver; } - @Override - public void release() { - - } } diff --git a/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java b/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java index a3afb2a338..81eaf86daa 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java +++ b/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java @@ -1334,15 +1334,6 @@ public long getProcessingTimestamp() { return processingTimestamp; } - @Override - public void release() { - try { - requestCancel("WorkManager requested release"); //$NON-NLS-1$ - } catch (TeiidComponentException e) { - LogManager.logWarning(LogConstants.CTX_DQP, e, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30026,requestID)); - } - } - private void done() { doneProducingBatches(); addToCache(); diff --git a/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java b/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java index ab7201be54..b4fbf91f03 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java +++ b/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java @@ -32,8 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import javax.resource.spi.work.Work; - import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata; import org.teiid.core.util.NamedThreadFactory; import org.teiid.logging.LogConstants; @@ -77,7 +75,7 @@ public interface PrioritizedRunnable extends Runnable { } - public static class RunnableWrapper implements PrioritizedRunnable, Work { + public static class RunnableWrapper implements PrioritizedRunnable { Runnable r; DQPWorkContext workContext = DQPWorkContext.getWorkContext(); long creationTime; @@ -119,11 +117,6 @@ public DQPWorkContext getDqpWorkContext() { return workContext; } - @Override - public void release() { - - } - } private final ThreadPoolExecutor tpe; diff --git a/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java b/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java index ab8faad861..5533ebb26c 100644 --- a/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java +++ b/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java @@ -25,13 +25,7 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import javax.resource.NotSupportedException; -import javax.resource.spi.XATerminator; -import javax.resource.spi.work.WorkException; -import javax.resource.spi.work.WorkManager; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.InvalidTransactionException; @@ -54,6 +48,7 @@ import org.teiid.dqp.service.TransactionContext.Scope; import org.teiid.dqp.service.TransactionService; import org.teiid.query.QueryPlugin; +import org.teiid.resource.api.XAImporter; /** * Note that the begin methods do not leave the transaction associated with the @@ -113,10 +108,9 @@ public synchronized void addTransactionContext(TransactionContext tc) { protected TransactionMapping transactions = new TransactionMapping(); - private XATerminator xaTerminator; protected TransactionManager transactionManager; - private WorkManager workManager; private boolean detectTransactions; + private XAImporter xaImporter; public void setDetectTransactions(boolean detectTransactions) { this.detectTransactions = detectTransactions; @@ -125,19 +119,15 @@ public void setDetectTransactions(boolean detectTransactions) { public boolean isDetectTransactions() { return detectTransactions; } - - public void setXaTerminator(XATerminator xaTerminator) { - this.xaTerminator = xaTerminator; - } + public void setXaImporter(XAImporter xaImporter) { + this.xaImporter = xaImporter; + } + public void setTransactionManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } - public void setWorkManager(WorkManager workManager) { - this.workManager = workManager; - } - /** * Global Transaction */ @@ -153,7 +143,7 @@ public int prepare(final String threadId, XidImpl xid, boolean singleTM) throws } try { - return this.xaTerminator.prepare(tc.getXid()); + return this.xaImporter.prepare(tc.getXid()); } catch (XAException e) { throw new XATransactionException(QueryPlugin.Event.TEIID30506, e); } @@ -169,7 +159,7 @@ public void commit(final String threadId, XidImpl xid, boolean onePhase, boolean return; //nothing to do } //TODO: we have no way of knowing for sure if we can safely use the onephase optimization - this.xaTerminator.commit(tc.getXid(), false); + this.xaImporter.commit(tc.getXid(), false); } catch (XAException e) { throw new XATransactionException(QueryPlugin.Event.TEIID30507, e); } finally { @@ -185,7 +175,7 @@ public void rollback(final String threadId, XidImpl xid, boolean singleTM) throw try { // In the case of single TM, the container directly roll backs the sources. if (!singleTM) { - this.xaTerminator.rollback(tc.getXid()); + this.xaImporter.rollback(tc.getXid()); } } catch (XAException e) { throw new XATransactionException(QueryPlugin.Event.TEIID30508, e); @@ -204,7 +194,7 @@ public Xid[] recover(int flag, boolean singleTM) throws XATransactionException { } try { - return this.xaTerminator.recover(flag); + return this.xaImporter.recover(flag); } catch (XAException e) { throw new XATransactionException(QueryPlugin.Event.TEIID30509, e); } @@ -219,7 +209,7 @@ public void forget(final String threadId, XidImpl xid, boolean singleTM) throws if (singleTM) { return; } - this.xaTerminator.forget(xid); + this.xaImporter.forget(xid); } catch (XAException err) { throw new XATransactionException(QueryPlugin.Event.TEIID30510, err); } finally { @@ -242,7 +232,6 @@ public void start(final String threadId, final XidImpl xid, int flags, int timeo if (tc.getTransactionType() != TransactionContext.Scope.NONE) { throw new XATransactionException(QueryPlugin.Event.TEIID30517, XAException.XAER_PROTO, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30517)); } - tc.setTransactionTimeout(timeout); tc.setXid(xid); tc.setTransactionType(TransactionContext.Scope.GLOBAL); if (singleTM) { @@ -254,22 +243,10 @@ public void start(final String threadId, final XidImpl xid, int flags, int timeo throw new XATransactionException(QueryPlugin.Event.TEIID30590, XAException.XAER_INVAL, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30590)); } } else { - FutureWork work = new FutureWork(new Callable() { - @Override - public Transaction call() throws Exception { - return transactionManager.getTransaction(); - } - }, 0); - workManager.doWork(work, WorkManager.INDEFINITE, tc, null); - tc.setTransaction(work.get()); + Transaction txn = xaImporter.importTransaction(transactionManager, xid, timeout); + tc.setTransaction(txn); } - } catch (NotSupportedException e) { - throw new XATransactionException(QueryPlugin.Event.TEIID30512, XAException.XAER_INVAL, e); - } catch (WorkException e) { - throw new XATransactionException(QueryPlugin.Event.TEIID30512, XAException.XAER_INVAL, e); - } catch (InterruptedException e) { - throw new XATransactionException(QueryPlugin.Event.TEIID30512, XAException.XAER_INVAL, e); - } catch (ExecutionException e) { + } catch (XAException e) { throw new XATransactionException(QueryPlugin.Event.TEIID30512, XAException.XAER_INVAL, e); } catch (SystemException e) { throw new XATransactionException(QueryPlugin.Event.TEIID30512, XAException.XAER_INVAL, e); diff --git a/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java b/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java index ca94c8a21f..699e0fa073 100644 --- a/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java +++ b/engine/src/main/java/org/teiid/dqp/service/TransactionContext.java @@ -23,12 +23,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import javax.resource.spi.work.ExecutionContext; import javax.transaction.Transaction; +import javax.transaction.xa.Xid; import org.teiid.core.TeiidRuntimeException; -public class TransactionContext extends ExecutionContext implements Serializable, Cloneable{ +public class TransactionContext implements Serializable, Cloneable{ private static final long serialVersionUID = -8689401273499649058L; @@ -46,6 +46,7 @@ public enum Scope { private Transaction transaction; private Set suspendedBy = Collections.newSetFromMap(new ConcurrentHashMap()); private int isolationLevel; + private Xid xid; public int getIsolationLevel() { return isolationLevel; @@ -113,4 +114,23 @@ public TransactionContext clone() { } } + /** + * set a transaction context. + * + * @param xid transaction context. + */ + public void setXid(Xid xid) + { + this.xid = xid; + } + + /** + * @return an Xid object carrying a transaction context, + * if any. + */ + public Xid getXid() + { + return this.xid; + } + } \ No newline at end of file diff --git a/engine/src/test/java/org/teiid/common/queue/FakeWorkItem.java b/engine/src/test/java/org/teiid/common/queue/FakeWorkItem.java index 44ac9c20bd..58b7b4d4fc 100644 --- a/engine/src/test/java/org/teiid/common/queue/FakeWorkItem.java +++ b/engine/src/test/java/org/teiid/common/queue/FakeWorkItem.java @@ -20,11 +20,9 @@ import java.sql.Timestamp; -import javax.resource.spi.work.Work; - /** */ -public class FakeWorkItem implements Work { +public class FakeWorkItem implements Runnable { private static boolean DEBUG = false; @@ -66,9 +64,4 @@ private void log(String msg) { } } - @Override - public void release() { - - } - } diff --git a/engine/src/test/java/org/teiid/common/queue/FakeWorkManager.java b/engine/src/test/java/org/teiid/common/queue/FakeWorkManager.java deleted file mode 100644 index 569f0ad337..0000000000 --- a/engine/src/test/java/org/teiid/common/queue/FakeWorkManager.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.teiid.common.queue; - -import javax.resource.spi.work.ExecutionContext; -import javax.resource.spi.work.Work; -import javax.resource.spi.work.WorkEvent; -import javax.resource.spi.work.WorkException; -import javax.resource.spi.work.WorkListener; -import javax.resource.spi.work.WorkManager; - -import org.mockito.Mockito; - -public class FakeWorkManager implements WorkManager { - private Thread t; - - @Override - public void doWork(Work arg0) throws WorkException { - execute(arg0, null, true); - } - - @Override - public void doWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException { - execute(arg0, arg3, true); - } - - @Override - public void scheduleWork(Work arg0) throws WorkException { - execute(arg0, null, false); - } - - @Override - public void scheduleWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException { - execute(arg0, arg3, false); - } - - @Override - public long startWork(Work arg0) throws WorkException { - execute(arg0, null, false); - return 0; - } - - @Override - public long startWork(Work arg0, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException { - execute(arg0, arg3, false); - return 0; - } - - void execute(final Work arg0, final WorkListener arg3, boolean join) throws WorkException { - if (arg3 != null) { - arg3.workAccepted(Mockito.mock(WorkEvent.class)); - arg3.workStarted(Mockito.mock(WorkEvent.class)); - } - - t = new Thread(new Runnable() { - - @Override - public void run() { - arg0.run(); - if (arg3 != null) { - arg3.workCompleted(Mockito.mock(WorkEvent.class)); - } - } - }); - t.start(); - if (join) { - try { - t.join(); - } catch (InterruptedException e) { - throw new WorkException(e); - } - } - } - -} diff --git a/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java b/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java index 74fb8fe2f1..7d257266fe 100644 --- a/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java +++ b/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java @@ -26,8 +26,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import javax.resource.spi.work.Work; - import org.junit.After; import org.junit.Test; import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata; @@ -97,17 +95,13 @@ public class TestThreadReuseExecutor { @Test public void testFailingWork() throws Exception { pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$ final Semaphore signal = new Semaphore(1); - pool.execute(new Work() { + pool.execute(new Runnable() { @Override public void run() { signal.release(); throw new RuntimeException(); } - @Override - public void release() { - - } }); assertTrue(signal.tryAcquire(2, TimeUnit.SECONDS)); } diff --git a/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeTransactionService.java b/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeTransactionService.java index c1b00caa42..785e74ab6b 100644 --- a/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeTransactionService.java +++ b/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeTransactionService.java @@ -18,20 +18,18 @@ package org.teiid.dqp.internal.datamgr; -import javax.resource.spi.XATerminator; import javax.transaction.TransactionManager; -import org.teiid.common.queue.FakeWorkManager; import org.teiid.core.util.SimpleMock; import org.teiid.dqp.internal.process.TransactionServerImpl; +import org.teiid.resource.api.XAImporter; public class FakeTransactionService extends TransactionServerImpl { public FakeTransactionService() { this.setTransactionManager(SimpleMock.createSimpleMock(TransactionManager.class)); - this.setXaTerminator(SimpleMock.createSimpleMock(XATerminator.class)); - this.setWorkManager(new FakeWorkManager()); + this.setXaImporter(SimpleMock.createSimpleMock(XAImporter.class)); } } diff --git a/engine/src/test/java/org/teiid/dqp/internal/process/TestTransactionServer.java b/engine/src/test/java/org/teiid/dqp/internal/process/TestTransactionServer.java index 149436ecd2..2b200dba7e 100644 --- a/engine/src/test/java/org/teiid/dqp/internal/process/TestTransactionServer.java +++ b/engine/src/test/java/org/teiid/dqp/internal/process/TestTransactionServer.java @@ -20,7 +20,6 @@ import static org.junit.Assert.*; -import javax.resource.spi.XATerminator; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; @@ -30,13 +29,13 @@ import org.teiid.adminapi.Transaction; import org.teiid.client.xa.XATransactionException; import org.teiid.client.xa.XidImpl; -import org.teiid.common.queue.FakeWorkManager; import org.teiid.dqp.service.TransactionContext; +import org.teiid.resource.api.XAImporter; public class TestTransactionServer { private TransactionServerImpl server; - private XATerminator xaTerminator; + private XAImporter xaImporter; private TransactionManager tm; private javax.transaction.Transaction txn; @@ -49,17 +48,19 @@ public class TestTransactionServer { private static final XidImpl XID2 = new XidImpl(0, new byte[] { 2 }, new byte[0]); + + static int TIMEOUT = 100; @Before public void setUp() throws Exception { server = new TransactionServerImpl(); - xaTerminator = Mockito.mock(XATerminator.class); tm = Mockito.mock(TransactionManager.class); txn = Mockito.mock(javax.transaction.Transaction.class); Mockito.stub(tm.getTransaction()).toReturn(txn); Mockito.stub(tm.suspend()).toReturn(txn); - server.setXaTerminator(xaTerminator); + xaImporter = Mockito.mock(XAImporter.class); + Mockito.stub(xaImporter.importTransaction(Mockito.any(), Mockito.any(), Mockito.eq(TIMEOUT))).toReturn(txn); + server.setXaImporter(xaImporter); server.setTransactionManager(tm); - server.setWorkManager(new FakeWorkManager()); } /** @@ -171,7 +172,7 @@ public class TestTransactionServer { server.end(THREAD1, XID1, XAResource.TMSUCCESS, false); server.commit(THREAD1, XID1, false, false); - Mockito.verify(xaTerminator).commit(XID1, false); + Mockito.verify(xaImporter).commit(XID1, false); } @Test public void testLocalRollback() throws Exception { @@ -279,13 +280,13 @@ public class TestTransactionServer { server.prepare(THREAD1, XID1, false); - Mockito.verify(xaTerminator).prepare(tc.getXid()); + Mockito.verify(xaImporter).prepare(tc.getXid()); server.commit(THREAD1, XID1, true, false); } @Test public void testGlobalPrepareFail() throws Exception { - server.start(THREAD1, XID1, XAResource.TMNOFLAGS, 100,false); + server.start(THREAD1, XID1, XAResource.TMNOFLAGS, TIMEOUT,false); server.end(THREAD1, XID1, XAResource.TMFAIL, false); Mockito.verify(txn).setRollbackOnly(); } @@ -300,7 +301,7 @@ public class TestTransactionServer { server.commit(THREAD1, XID1, true, false); - Mockito.verify(xaTerminator).commit(tc.getXid(), false); + Mockito.verify(xaImporter).commit(tc.getXid(), false); } @Test public void testGlobalOnePhaseCommit_force_prepare_through() throws Exception { @@ -312,8 +313,8 @@ public class TestTransactionServer { server.commit(THREAD1, XID1, true, false); - Mockito.verify(xaTerminator).prepare(tc.getXid()); - Mockito.verify(xaTerminator).commit(tc.getXid(), false); + Mockito.verify(xaImporter).prepare(tc.getXid()); + Mockito.verify(xaImporter).commit(tc.getXid(), false); } @Test public void testGlobalOnePhaseCommit_force_prepare() throws Exception { @@ -326,8 +327,8 @@ public class TestTransactionServer { server.commit(THREAD1, XID1, true, false); // since there are two sources the commit is not single phase - Mockito.verify(xaTerminator).prepare(tc.getXid()); - Mockito.verify(xaTerminator).commit(tc.getXid(), false); + Mockito.verify(xaImporter).prepare(tc.getXid()); + Mockito.verify(xaImporter).commit(tc.getXid(), false); } @@ -343,7 +344,7 @@ public class TestTransactionServer { server.commit(THREAD1, XID1, true, false); // since there are two sources the commit is not single phase - Mockito.verify(xaTerminator).commit(tc.getXid(), false); + Mockito.verify(xaImporter).commit(tc.getXid(), false); } @Test public void testGlobalOnePhaseRoolback() throws Exception { @@ -358,7 +359,7 @@ public class TestTransactionServer { server.rollback(THREAD1, XID1, false); // since there are two sources the commit is not single phase - Mockito.verify(xaTerminator).rollback(tc.getXid()); + Mockito.verify(xaImporter).rollback(tc.getXid()); } @Test public void testRequestCommit() throws Exception{ diff --git a/jboss-integration/pom.xml b/jboss-integration/pom.xml index b4b454d21e..b54a3bff4d 100644 --- a/jboss-integration/pom.xml +++ b/jboss-integration/pom.xml @@ -25,6 +25,11 @@ org.teiid teiid-api + + + org.teiid + teiid-resource-spi + org.teiid diff --git a/jboss-integration/src/main/java/org/teiid/jboss/DQPCoreService.java b/jboss-integration/src/main/java/org/teiid/jboss/DQPCoreService.java index 70eab2f9ad..57c380646c 100644 --- a/jboss-integration/src/main/java/org/teiid/jboss/DQPCoreService.java +++ b/jboss-integration/src/main/java/org/teiid/jboss/DQPCoreService.java @@ -48,6 +48,7 @@ import org.teiid.logging.LogConstants; import org.teiid.logging.LogManager; import org.teiid.logging.MessageLevel; +import org.teiid.resource.spi.XAImporterImpl; import org.teiid.runtime.jmx.JMXService; import org.teiid.services.InternalEventDistributorFactory; @@ -73,8 +74,7 @@ public class DQPCoreService extends DQPConfiguration implements Serializable, Se @Override public void start(final StartContext context) { - this.transactionServerImpl.setWorkManager(getWorkManagerInjector().getValue()); - this.transactionServerImpl.setXaTerminator(getXaTerminatorInjector().getValue()); + this.transactionServerImpl.setXaImporter(new XAImporterImpl(getXaTerminatorInjector().getValue(), getWorkManagerInjector().getValue())); this.transactionServerImpl.setTransactionManager(getTxnManagerInjector().getValue()); this.transactionServerImpl.setDetectTransactions(true); setPreParser(preParserInjector.getValue()); diff --git a/resource-spi/pom.xml b/resource-spi/pom.xml index ca753f248d..43dfc12cf2 100644 --- a/resource-spi/pom.xml +++ b/resource-spi/pom.xml @@ -26,5 +26,10 @@ jboss-connector-api_1.7_spec + + org.jboss.spec.javax.transaction + jboss-transaction-api_1.2_spec + + diff --git a/resource-spi/src/main/java/org/teiid/resource/spi/XAImporterImpl.java b/resource-spi/src/main/java/org/teiid/resource/spi/XAImporterImpl.java new file mode 100644 index 0000000000..d8b5f3a43c --- /dev/null +++ b/resource-spi/src/main/java/org/teiid/resource/spi/XAImporterImpl.java @@ -0,0 +1,110 @@ +/* + * Copyright Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags and + * the COPYRIGHT.txt file distributed with this work. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.teiid.resource.spi; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +import javax.resource.NotSupportedException; +import javax.resource.spi.XATerminator; +import javax.resource.spi.work.ExecutionContext; +import javax.resource.spi.work.Work; +import javax.resource.spi.work.WorkException; +import javax.resource.spi.work.WorkManager; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; + +import org.teiid.resource.api.XAImporter; + +/** + * Simple {@link XAImporter} implementations based upon an {@link XATerminator} and {@link WorkManager} + */ +public class XAImporterImpl implements XAImporter { + + private static class FutureWork extends FutureTask implements Work { + public FutureWork(Callable callable) { + super(callable); + } + + @Override + public void release() { + + } + } + + private XATerminator xaTerminator; + private WorkManager workManager; + + public XAImporterImpl(XATerminator xaTerminator, WorkManager workManager) { + this.xaTerminator = xaTerminator; + this.workManager = workManager; + } + + @Override + public Transaction importTransaction(TransactionManager transactionManager, + Xid xid, int transactionTimeout) throws XAException { + ExecutionContext ec = new ExecutionContext(); + ec.setXid(xid); + try { + ec.setTransactionTimeout(transactionTimeout); + + FutureWork work = new FutureWork<>(new Callable() { + @Override + public Transaction call() throws Exception { + return transactionManager.getTransaction(); + } + }); + workManager.doWork(work, WorkManager.INDEFINITE, ec, null); + return work.get(); + } catch (ExecutionException | NotSupportedException | WorkException | InterruptedException e) { + XAException xaException = new XAException(); + xaException.initCause(e); + throw xaException; + } + } + + @Override + public void commit(Xid xid, boolean onePhase) throws XAException { + xaTerminator.commit(xid, onePhase); + } + + @Override + public void forget(Xid xid) throws XAException { + xaTerminator.forget(xid); + } + + @Override + public int prepare(Xid xid) throws XAException { + return xaTerminator.prepare(xid); + } + + @Override + public Xid[] recover(int flag) throws XAException { + return xaTerminator.recover(flag); + } + + @Override + public void rollback(Xid xid) throws XAException { + xaTerminator.rollback(xid); + } + +} diff --git a/runtime/pom.xml b/runtime/pom.xml index 9174b8e592..d16c7c07d0 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -76,10 +76,6 @@ org.jboss.spec.javax.transaction jboss-transaction-api_1.2_spec - - org.jboss.spec.javax.resource - jboss-connector-api_1.7_spec - org.infinispan infinispan-core diff --git a/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java b/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java index 77f45ea365..ce3fe60ad6 100644 --- a/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java +++ b/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; -import javax.resource.spi.work.WorkManager; import javax.transaction.TransactionManager; import org.infinispan.manager.DefaultCacheManager; @@ -47,7 +46,6 @@ public class EmbeddedConfiguration extends DQPConfiguration { private String securityDomain; private TransactionManager transactionManager; private ObjectReplicator objectReplicator; - private WorkManager workManager; private boolean useDisk = true; private String bufferDirectory; private CacheFactory cacheFactory; @@ -97,7 +95,7 @@ public SecurityHelper getSecurityHelper() { /** * Set the {@link SecurityHelper} that can associate the appropriate SecurityContext * with threads executing Teiid tasks. Will also set the appropriate user/subject information - * on the Teiid contexts. Not required if a {@link WorkManager} is set. + * on the Teiid contexts. * * @param securityHelper */ @@ -125,30 +123,6 @@ public void setObjectReplicator(ObjectReplicator objectReplicator) { this.objectReplicator = objectReplicator; } - /** - * Sets the {@link WorkManager} to be used instead of a {@link ThreadReuseExecutor}. - * This means that Teiid will not own the processing threads and will not necessarily be - * responsible for security context propagation. - * @param workManager - */ - public void setWorkManager(WorkManager workManager) { - this.workManager = workManager; - } - public WorkManager getWorkManager() { - return workManager; - } - - @Override - public TeiidExecutor getTeiidExecutor() { - if (workManager == null) { - return super.getTeiidExecutor(); - } - //TODO: if concurrency is 1, then use a direct executor - //the only scheduled task right now just restarts a workitem, - //so that can be done in the scheduler thread - return new WorkManagerTeiidExecutor(workManager); - } - public boolean isUseDisk() { return useDisk; } diff --git a/runtime/src/main/java/org/teiid/runtime/WorkManagerTeiidExecutor.java b/runtime/src/main/java/org/teiid/runtime/WorkManagerTeiidExecutor.java deleted file mode 100644 index 974f9f9286..0000000000 --- a/runtime/src/main/java/org/teiid/runtime/WorkManagerTeiidExecutor.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags and - * the COPYRIGHT.txt file distributed with this work. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.teiid.runtime; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; - -import javax.resource.spi.work.WorkException; -import javax.resource.spi.work.WorkManager; - -import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata; -import org.teiid.dqp.internal.process.TeiidExecutor; -import org.teiid.dqp.internal.process.ThreadReuseExecutor; -import org.teiid.security.SecurityHelper; - -/** - * A {@link TeiidExecutor} that allows for a dummy {@link SecurityHelper} to be used - * and prevents Teiid from owning processing threads. - */ -final class WorkManagerTeiidExecutor implements TeiidExecutor { - WorkManager workManager; - - WorkManagerTeiidExecutor(WorkManager workManager) { - this.workManager = workManager; - } - - @Override - public List shutdownNow() { - workManager = null; - return Collections.emptyList(); - } - - @Override - public WorkerPoolStatisticsMetadata getStats() { - return null; - } - - @Override - public void execute(Runnable command) { - final ThreadReuseExecutor.RunnableWrapper wrapper = new ThreadReuseExecutor.RunnableWrapper(command); - executeDirect(wrapper); - } - - private void executeDirect( - final ThreadReuseExecutor.RunnableWrapper wrapper) { - try { - workManager.scheduleWork(wrapper); - } catch (WorkException e) { - throw new RejectedExecutionException(e); - } - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - return false; - } -} \ No newline at end of file diff --git a/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java b/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java index 21daf46ef3..dfdee61d7d 100644 --- a/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java +++ b/runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java @@ -45,7 +45,6 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; -import javax.resource.spi.XATerminator; import javax.transaction.*; import org.infinispan.transaction.tm.DummyTransactionManager; @@ -64,7 +63,6 @@ import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache; import org.teiid.common.buffer.impl.FileStorageManager; import org.teiid.common.buffer.impl.SplittableStorageManager; -import org.teiid.common.queue.FakeWorkManager; import org.teiid.core.TeiidRuntimeException; import org.teiid.core.types.DataTypeManager; import org.teiid.core.types.InputStreamFactory; @@ -91,6 +89,7 @@ import org.teiid.metadata.RuntimeMetadata; import org.teiid.metadata.Table; import org.teiid.query.sql.symbol.Reference; +import org.teiid.resource.api.XAImporter; import org.teiid.runtime.EmbeddedServer.ConnectionFactoryProvider; import org.teiid.translator.DataNotAvailableException; import org.teiid.translator.ExecutionContext; @@ -1394,8 +1393,7 @@ public void testExternalMaterializationManagement() throws Exception { EmbeddedConfiguration ec = new EmbeddedConfiguration(); ec.setUseDisk(false); ec.setTransactionManager(SimpleMock.createSimpleMock(TransactionManager.class)); - es.transactionService.setXaTerminator(SimpleMock.createSimpleMock(XATerminator.class)); - es.transactionService.setWorkManager(new FakeWorkManager()); + es.transactionService.setXaImporter(SimpleMock.createSimpleMock(XAImporter.class)); es.start(ec); es.transactionService.setDetectTransactions(false); diff --git a/teiid-feature-pack/client-feature-pack/src/main/resources/modules/system/layers/dv/org/jboss/teiid/api/main/module.xml b/teiid-feature-pack/client-feature-pack/src/main/resources/modules/system/layers/dv/org/jboss/teiid/api/main/module.xml index b894c7a979..6542d08258 100644 --- a/teiid-feature-pack/client-feature-pack/src/main/resources/modules/system/layers/dv/org/jboss/teiid/api/main/module.xml +++ b/teiid-feature-pack/client-feature-pack/src/main/resources/modules/system/layers/dv/org/jboss/teiid/api/main/module.xml @@ -12,7 +12,6 @@ - diff --git a/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java b/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java index 1f0de9e57d..0c410a8aa2 100644 --- a/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java +++ b/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Properties; -import javax.resource.spi.XATerminator; import javax.transaction.TransactionManager; import javax.xml.stream.XMLStreamException; @@ -36,7 +35,6 @@ import org.teiid.adminapi.impl.VDBImportMetadata; import org.teiid.adminapi.impl.VDBMetaData; import org.teiid.adminapi.impl.VDBMetadataParser; -import org.teiid.common.queue.FakeWorkManager; import org.teiid.core.TeiidRuntimeException; import org.teiid.core.util.SimpleMock; import org.teiid.core.util.UnitTestUtil; @@ -59,6 +57,7 @@ import org.teiid.query.metadata.VDBResources.Resource; import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities; import org.teiid.query.optimizer.capabilities.SourceCapabilities; +import org.teiid.resource.api.XAImporter; import org.teiid.runtime.EmbeddedConfiguration; import org.teiid.runtime.EmbeddedServer; import org.teiid.services.SessionServiceImpl; @@ -115,8 +114,7 @@ public void start(EmbeddedConfiguration config, boolean realBufferMangaer) { boolean detectTxn = true; if (config.getTransactionManager() == null) { config.setTransactionManager(SimpleMock.createSimpleMock(TransactionManager.class)); - this.transactionService.setXaTerminator(SimpleMock.createSimpleMock(XATerminator.class)); - this.transactionService.setWorkManager(new FakeWorkManager()); + this.transactionService.setXaImporter(SimpleMock.createSimpleMock(XAImporter.class)); detectTxn = false; } this.realBufferManager = realBufferMangaer;