From fdc5a673dcc06cae87ea4698844628c322ed3324 Mon Sep 17 00:00:00 2001 From: shawkins Date: Mon, 17 Jul 2017 19:36:51 -0400 Subject: [PATCH] TEIID-4993 further refining transaction semantics --- .../optimizer/SourceTriggerActionPlanner.java | 1 + .../query/optimizer/TriggerActionPlanner.java | 3 + .../query/processor/proc/ForEachRowPlan.java | 58 ++++++++++++++++- .../query/processor/proc/ProcedurePlan.java | 2 +- .../teiid/query/processor/proc/Program.java | 6 +- .../processor/relational/RelationalPlan.java | 36 +++-------- .../query/processor/TestTriggerActions.java | 64 +++++++++++++++++++ 7 files changed, 137 insertions(+), 33 deletions(-) diff --git a/engine/src/main/java/org/teiid/query/optimizer/SourceTriggerActionPlanner.java b/engine/src/main/java/org/teiid/query/optimizer/SourceTriggerActionPlanner.java index 4cd93abad9..cc4184c6dd 100644 --- a/engine/src/main/java/org/teiid/query/optimizer/SourceTriggerActionPlanner.java +++ b/engine/src/main/java/org/teiid/query/optimizer/SourceTriggerActionPlanner.java @@ -309,6 +309,7 @@ public ProcessorPlan optimize(Command command, IDGenerator idGenerator, } //create plan ForEachRowPlan result = new ForEachRowPlan(); + result.setSingleRow(true); result.setParams(params); TriggerAction parseProcedure; GroupSymbol gs = new GroupSymbol(sec.table.getFullName()); diff --git a/engine/src/main/java/org/teiid/query/optimizer/TriggerActionPlanner.java b/engine/src/main/java/org/teiid/query/optimizer/TriggerActionPlanner.java index f12b54051e..8aa5792f25 100644 --- a/engine/src/main/java/org/teiid/query/optimizer/TriggerActionPlanner.java +++ b/engine/src/main/java/org/teiid/query/optimizer/TriggerActionPlanner.java @@ -78,11 +78,13 @@ public ProcessorPlan optimize(ProcedureContainer userCommand, TriggerAction ta, for (Map.Entry entry : mapping.entrySet()) { entry.setValue(QueryRewriter.rewriteExpression(entry.getValue(), context, metadata)); } + boolean singleRow = false; if (userCommand instanceof Insert) { Insert insert = (Insert)userCommand; if (insert.getQueryExpression() != null) { query = insert.getQueryExpression(); } else { + singleRow = true; query = new Query(); ((Query)query).setSelect(new Select(RuleChooseJoinStrategy.createExpressionSymbols(insert.getValues()))); } @@ -106,6 +108,7 @@ public ProcessorPlan optimize(ProcedureContainer userCommand, TriggerAction ta, } } ForEachRowPlan result = new ForEachRowPlan(); + result.setSingleRow(singleRow); result.setParams(params); ProcessorPlan queryPlan = QueryOptimizer.optimizePlan(query, metadata, idGenerator, capFinder, analysisRecord, context); result.setQueryPlan(queryPlan); diff --git a/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java b/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java index c27a42be70..a7a2867902 100644 --- a/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java +++ b/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java @@ -53,6 +53,7 @@ public class ForEachRowPlan extends ProcessorPlan { private ProcedurePlan rowProcedure; private Map params; private Map lookupMap; + private boolean singleRow; private ProcessorDataManager dataMgr; private BufferManager bufferMgr; @@ -64,6 +65,9 @@ public class ForEachRowPlan extends ProcessorPlan { private long updateCount; private TransactionContext planContext; + private List nextTuple; + private boolean first = true; + private boolean nextNull = false; @Override public ProcessorPlan clone() { @@ -117,7 +121,11 @@ public TupleBatch nextBatch() throws BlockedException, try { while (true) { if (currentTuple == null) { - currentTuple = tupleSource.nextTuple(); + if (nextTuple != null) { + currentTuple = nextTuple; + } else if (!nextNull) { + currentTuple = tupleSource.nextTuple(); + } if (currentTuple == null) { if (this.planContext != null) { TransactionService ts = this.getContext().getTransactionServer(); @@ -129,6 +137,28 @@ public TupleBatch nextBatch() throws BlockedException, return result; } } + if (first) { + TransactionContext tc = this.getContext().getTransactionContext(); + if (this.planContext == null && tc != null && tc.getTransactionType() == Scope.NONE) { + Boolean txnRequired = rowProcedure.requiresTransaction(false); + boolean start = false; + if (txnRequired == null) { + nextTuple = tupleSource.nextTuple(); + if (nextTuple != null) { + start = true; + } else { + nextNull = true; + } + } else if (Boolean.TRUE.equals(txnRequired)) { + start = true; + } + if (start) { + this.getContext().getTransactionServer().begin(tc); + this.planContext = tc; + } + } + first = false; + } if (this.rowProcessor == null) { rowProcedure.reset(); CommandContext context = getContext().clone(); @@ -160,7 +190,8 @@ public TupleBatch nextBatch() throws BlockedException, @Override public void open() throws TeiidComponentException, TeiidProcessingException { TransactionContext tc = this.getContext().getTransactionContext(); - if (tc != null && tc.getTransactionType() == Scope.NONE) { + if (tc != null && tc.getTransactionType() == Scope.NONE && queryPlan != null + && !Boolean.FALSE.equals(queryPlan.requiresTransaction(false))) { //start a transaction - if not each of the row plans will //be executed in it's own transaction, which is bad for performance @@ -206,11 +237,28 @@ public void reset() { this.queryProcessor = null; this.tupleSource = null; this.planContext = null; + this.first = true; + this.nextTuple = null; + this.nextNull = false; } @Override public Boolean requiresTransaction(boolean transactionalReads) { - return true; + Boolean requiresTxn = queryPlan.requiresTransaction(transactionalReads); + if (!Boolean.FALSE.equals(requiresTxn)) { + return true; + } + Boolean forEach = rowProcedure.requiresTransaction(transactionalReads); + if (Boolean.TRUE.equals(forEach)) { + return true; + } + if (forEach == null) { + if (!singleRow) { + return true; + } + return null; + } + return false; } @Override @@ -226,4 +274,8 @@ public void setTupleSource(TupleSource tupleSource) { this.tupleSource = tupleSource; } + public void setSingleRow(boolean singleRow) { + this.singleRow = singleRow; + } + } diff --git a/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java b/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java index f82737e098..c6277a28c0 100644 --- a/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java +++ b/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java @@ -742,7 +742,7 @@ public void push(Program program) throws XATransactionException { if (program.isAtomic() && this.blockContext == null) { TransactionContext tc = this.getContext().getTransactionContext(); - if (tc != null && tc.getTransactionType() == Scope.NONE && program.instructionsRequireTransaction(false) != Boolean.FALSE) { + if (tc != null && tc.getTransactionType() == Scope.NONE && Boolean.TRUE.equals(program.instructionsRequireTransaction(false))) { //start a transaction this.getContext().getTransactionServer().begin(tc); this.blockContext = tc; diff --git a/engine/src/main/java/org/teiid/query/processor/proc/Program.java b/engine/src/main/java/org/teiid/query/processor/proc/Program.java index 27c1a1bf9d..771010ef0f 100644 --- a/engine/src/main/java/org/teiid/query/processor/proc/Program.java +++ b/engine/src/main/java/org/teiid/query/processor/proc/Program.java @@ -295,15 +295,15 @@ public void setTrappingExceptions(boolean trappingExceptions) { } public Boolean requiresTransaction(boolean transactionalReads) { - if (!transactionalReads && this.isAtomic()) { - return false; - } return instructionsRequireTransaction(transactionalReads); } public Boolean instructionsRequireTransaction(boolean transactionalReads) { boolean possiblyRequired = false; boolean last = false; + if (this.programInstructions == null) { + return false; + } for (ProgramInstruction instruction : this.programInstructions) { Boolean instructionRequires = instruction.requiresTransaction(transactionalReads); if (instructionRequires == null) { diff --git a/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java b/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java index f7c396a551..cc842ef9c3 100644 --- a/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java +++ b/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java @@ -236,47 +236,24 @@ public void setOutputElements(List outputCols) { @Override public Boolean requiresTransaction(boolean transactionalReads) { - boolean txnWtih = false; if (this.with != null) { for (WithQueryCommand withCommand : this.with) { if (withCommand.isRecursive()) { SetQuery setQuery = (SetQuery)withCommand.getCommand(); Boolean leftRequires = setQuery.getLeftQuery().getProcessorPlan().requiresTransaction(transactionalReads); Boolean rightRequires = setQuery.getLeftQuery().getProcessorPlan().requiresTransaction(transactionalReads); - if (Boolean.TRUE.equals(leftRequires) || Boolean.TRUE.equals(rightRequires)) { + if (!Boolean.FALSE.equals(leftRequires) || !Boolean.FALSE.equals(rightRequires)) { return true; } - if (leftRequires == null || rightRequires == null) { - if (txnWtih) { - return true; - } - txnWtih = true; - } } else { Boolean requires = withCommand.getCommand().getProcessorPlan().requiresTransaction(transactionalReads); - if (Boolean.TRUE.equals(requires)) { + if (!Boolean.FALSE.equals(requires)) { return true; } - if (requires == null) { - if (txnWtih) { - return true; - } - txnWtih = true; - } } } } - Boolean requires = requiresTransaction(transactionalReads, root); - if (Boolean.TRUE.equals(requires)) { - return true; - } - if (requires == null && txnWtih) { - return true; - } - if (txnWtih || requires == null) { - return null; - } - return false; + return requiresTransaction(transactionalReads, root); } static Boolean requiresTransaction(boolean transactionalReads, RelationalNode node) { @@ -284,6 +261,7 @@ static Boolean requiresTransaction(boolean transactionalReads, RelationalNode no if (Boolean.TRUE.equals(requiresTxn)) { return true; } + boolean last = true; for (RelationalNode child : node.getChildren()) { if (child == null) { continue; @@ -297,8 +275,14 @@ static Boolean requiresTransaction(boolean transactionalReads, RelationalNode no return true; } requiresTxn = null; + last = true; + } else { + last = false; } } + if (requiresTxn == null && !last) { + return true; + } return requiresTxn; } diff --git a/engine/src/test/java/org/teiid/query/processor/TestTriggerActions.java b/engine/src/test/java/org/teiid/query/processor/TestTriggerActions.java index ffa6fdf88d..5b0e42062e 100644 --- a/engine/src/test/java/org/teiid/query/processor/TestTriggerActions.java +++ b/engine/src/test/java/org/teiid/query/processor/TestTriggerActions.java @@ -29,16 +29,24 @@ import java.util.List; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.teiid.api.exception.query.QueryProcessingException; +import org.teiid.dqp.service.TransactionContext; +import org.teiid.dqp.service.TransactionContext.Scope; +import org.teiid.dqp.service.TransactionService; import org.teiid.metadata.Table; import org.teiid.query.metadata.TransformationMetadata; import org.teiid.query.optimizer.TestOptimizer; import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities; import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder; +import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability; import org.teiid.query.resolver.TestResolver; import org.teiid.query.unittest.RealMetadataFactory; import org.teiid.query.util.CommandContext; import org.teiid.query.validator.TestUpdateValidator; +import org.teiid.translator.ExecutionFactory.TransactionSupport; @SuppressWarnings("nls") public class TestTriggerActions { @@ -331,5 +339,61 @@ public class TestTriggerActions { List[] expected = new List[] {Arrays.asList(1)}; helpProcess(plan, context, dm, expected); } + + @Test public void testTransactions() throws Exception { + TransformationMetadata metadata = RealMetadataFactory.fromDDL("x", + new RealMetadataFactory.DDLHolder("virt", "CREATE VIEW users options (updatable true) as select id, dummy_data, updated_at from db1.user1 union all select id, dummy_data, updated_at from db2.user2; " + + "CREATE TRIGGER ON users INSTEAD OF DELETE AS FOR EACH ROW IF (OLD.id < (SELECT MAX(id) FROM db1.user1)) BEGIN ATOMIC DELETE FROM db1.user1 WHERE id = OLD.id; END ELSE IF (OLD.id = (SELECT MAX(id) FROM db1.user1)) BEGIN ATOMIC UPDATE db1.user1 SET dummy_data = '', updated_at = NOW() WHERE id = OLD.id; END ELSE IF (OLD.id > (SELECT MAX(id) FROM db1.user1)) BEGIN ATOMIC DELETE FROM db2.user2 WHERE id = OLD.id; END; " + + "CREATE TRIGGER ON users INSTEAD OF INSERT AS FOR EACH ROW begin atomic insert into db1.user1 (id) values (new.id); insert into db2.user2 (id) values (new.id); end; " + + "CREATE TRIGGER ON users INSTEAD OF update AS FOR EACH ROW begin atomic end; "), + new RealMetadataFactory.DDLHolder("db1", "create foreign table user1 (id integer, dummy_data string, updated_at timestamp) options (updatable true);"), + new RealMetadataFactory.DDLHolder("db2", "create foreign table user2 (id integer, dummy_data string, updated_at timestamp) options (updatable true);")); + + String sql = "delete from users where id = 203"; + HardcodedDataManager dm = new HardcodedDataManager(); + dm.addData("SELECT g_0.id, g_0.dummy_data, g_0.updated_at FROM db1.user1 AS g_0 WHERE g_0.id = 203", Arrays.asList(203, "", null)); + dm.addData("SELECT g_0.id, g_0.dummy_data, g_0.updated_at FROM db2.user2 AS g_0 WHERE g_0.id = 203"); + dm.addData("SELECT g_0.id FROM db1.user1 AS g_0", Arrays.asList(203), Arrays.asList(204)); + dm.addData("DELETE FROM db1.user1 WHERE id = 203", Arrays.asList(1)); + dm.addData("INSERT INTO db1.user1 (id) VALUES (205)", Arrays.asList(1)); + dm.addData("INSERT INTO db2.user2 (id) VALUES (205)", Arrays.asList(1)); + + CommandContext context = createCommandContext(); + BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities(); + caps.setSourceProperty(Capability.TRANSACTION_SUPPORT, TransactionSupport.XA); + ProcessorPlan plan = TestProcessor.helpGetPlan(TestResolver.helpResolve(sql, metadata), metadata, new DefaultCapabilitiesFinder(caps), context); + //assumed required, but won't start a txn + assertTrue(plan.requiresTransaction(false)); + + TransactionContext tc = new TransactionContext(); + TransactionService ts = Mockito.mock(TransactionService.class); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ((TransactionContext)invocation.getArguments()[0]).setTransactionType(Scope.REQUEST); + return null; + } + }).when(ts).begin(tc); + + context.setTransactionService(ts); + context.setTransactionContext(tc); + + List[] expected = new List[] {Arrays.asList(1)}; + helpProcess(plan, context, dm, expected); + Mockito.verify(ts, Mockito.never()).begin(tc); + + //required, and will start a txn + sql = "insert into users (id) values (205)"; + plan = TestProcessor.helpGetPlan(TestResolver.helpResolve(sql, metadata), metadata, new DefaultCapabilitiesFinder(caps), context); + assertTrue(plan.requiresTransaction(false)); + + helpProcess(plan, context, dm, expected); + Mockito.verify(ts, Mockito.times(1)).begin(tc); + + //does nothing + sql = "update users set id = 205"; + plan = TestProcessor.helpGetPlan(TestResolver.helpResolve(sql, metadata), metadata, new DefaultCapabilitiesFinder(caps), context); + assertFalse(plan.requiresTransaction(false)); + } }