Skip to content

Commit

Permalink
TEIID-4993 further refining transaction semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Jul 17, 2017
1 parent 0f97a00 commit fdc5a67
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 33 deletions.
Expand Up @@ -309,6 +309,7 @@ public ProcessorPlan optimize(Command command, IDGenerator idGenerator,
}
//create plan
ForEachRowPlan result = new ForEachRowPlan();
result.setSingleRow(true);
result.setParams(params);
TriggerAction parseProcedure;
GroupSymbol gs = new GroupSymbol(sec.table.getFullName());
Expand Down
Expand Up @@ -78,11 +78,13 @@ public ProcessorPlan optimize(ProcedureContainer userCommand, TriggerAction ta,
for (Map.Entry<ElementSymbol, Expression> entry : mapping.entrySet()) {
entry.setValue(QueryRewriter.rewriteExpression(entry.getValue(), context, metadata));
}
boolean singleRow = false;
if (userCommand instanceof Insert) {
Insert insert = (Insert)userCommand;
if (insert.getQueryExpression() != null) {
query = insert.getQueryExpression();
} else {
singleRow = true;
query = new Query();
((Query)query).setSelect(new Select(RuleChooseJoinStrategy.createExpressionSymbols(insert.getValues())));
}
Expand All @@ -106,6 +108,7 @@ public ProcessorPlan optimize(ProcedureContainer userCommand, TriggerAction ta,
}
}
ForEachRowPlan result = new ForEachRowPlan();
result.setSingleRow(singleRow);
result.setParams(params);
ProcessorPlan queryPlan = QueryOptimizer.optimizePlan(query, metadata, idGenerator, capFinder, analysisRecord, context);
result.setQueryPlan(queryPlan);
Expand Down
Expand Up @@ -53,6 +53,7 @@ public class ForEachRowPlan extends ProcessorPlan {
private ProcedurePlan rowProcedure;
private Map<ElementSymbol, Expression> params;
private Map<Expression, Integer> lookupMap;
private boolean singleRow;

private ProcessorDataManager dataMgr;
private BufferManager bufferMgr;
Expand All @@ -64,6 +65,9 @@ public class ForEachRowPlan extends ProcessorPlan {
private long updateCount;

private TransactionContext planContext;
private List<?> nextTuple;
private boolean first = true;
private boolean nextNull = false;

@Override
public ProcessorPlan clone() {
Expand Down Expand Up @@ -117,7 +121,11 @@ public TupleBatch nextBatch() throws BlockedException,
try {
while (true) {
if (currentTuple == null) {
currentTuple = tupleSource.nextTuple();
if (nextTuple != null) {
currentTuple = nextTuple;
} else if (!nextNull) {
currentTuple = tupleSource.nextTuple();
}
if (currentTuple == null) {
if (this.planContext != null) {
TransactionService ts = this.getContext().getTransactionServer();
Expand All @@ -129,6 +137,28 @@ public TupleBatch nextBatch() throws BlockedException,
return result;
}
}
if (first) {
TransactionContext tc = this.getContext().getTransactionContext();
if (this.planContext == null && tc != null && tc.getTransactionType() == Scope.NONE) {
Boolean txnRequired = rowProcedure.requiresTransaction(false);
boolean start = false;
if (txnRequired == null) {
nextTuple = tupleSource.nextTuple();
if (nextTuple != null) {
start = true;
} else {
nextNull = true;
}
} else if (Boolean.TRUE.equals(txnRequired)) {
start = true;
}
if (start) {
this.getContext().getTransactionServer().begin(tc);
this.planContext = tc;
}
}
first = false;
}
if (this.rowProcessor == null) {
rowProcedure.reset();
CommandContext context = getContext().clone();
Expand Down Expand Up @@ -160,7 +190,8 @@ public TupleBatch nextBatch() throws BlockedException,
@Override
public void open() throws TeiidComponentException, TeiidProcessingException {
TransactionContext tc = this.getContext().getTransactionContext();
if (tc != null && tc.getTransactionType() == Scope.NONE) {
if (tc != null && tc.getTransactionType() == Scope.NONE && queryPlan != null
&& !Boolean.FALSE.equals(queryPlan.requiresTransaction(false))) {
//start a transaction - if not each of the row plans will
//be executed in it's own transaction, which is bad for performance

Expand Down Expand Up @@ -206,11 +237,28 @@ public void reset() {
this.queryProcessor = null;
this.tupleSource = null;
this.planContext = null;
this.first = true;
this.nextTuple = null;
this.nextNull = false;
}

@Override
public Boolean requiresTransaction(boolean transactionalReads) {
return true;
Boolean requiresTxn = queryPlan.requiresTransaction(transactionalReads);
if (!Boolean.FALSE.equals(requiresTxn)) {
return true;
}
Boolean forEach = rowProcedure.requiresTransaction(transactionalReads);
if (Boolean.TRUE.equals(forEach)) {
return true;
}
if (forEach == null) {
if (!singleRow) {
return true;
}
return null;
}
return false;
}

@Override
Expand All @@ -226,4 +274,8 @@ public void setTupleSource(TupleSource tupleSource) {
this.tupleSource = tupleSource;
}

public void setSingleRow(boolean singleRow) {
this.singleRow = singleRow;
}

}
Expand Up @@ -742,7 +742,7 @@ public void push(Program program) throws XATransactionException {

if (program.isAtomic() && this.blockContext == null) {
TransactionContext tc = this.getContext().getTransactionContext();
if (tc != null && tc.getTransactionType() == Scope.NONE && program.instructionsRequireTransaction(false) != Boolean.FALSE) {
if (tc != null && tc.getTransactionType() == Scope.NONE && Boolean.TRUE.equals(program.instructionsRequireTransaction(false))) {
//start a transaction
this.getContext().getTransactionServer().begin(tc);
this.blockContext = tc;
Expand Down
Expand Up @@ -295,15 +295,15 @@ public void setTrappingExceptions(boolean trappingExceptions) {
}

public Boolean requiresTransaction(boolean transactionalReads) {
if (!transactionalReads && this.isAtomic()) {
return false;
}
return instructionsRequireTransaction(transactionalReads);
}

public Boolean instructionsRequireTransaction(boolean transactionalReads) {
boolean possiblyRequired = false;
boolean last = false;
if (this.programInstructions == null) {
return false;
}
for (ProgramInstruction instruction : this.programInstructions) {
Boolean instructionRequires = instruction.requiresTransaction(transactionalReads);
if (instructionRequires == null) {
Expand Down
Expand Up @@ -236,54 +236,32 @@ public void setOutputElements(List<? extends Expression> outputCols) {

@Override
public Boolean requiresTransaction(boolean transactionalReads) {
boolean txnWtih = false;
if (this.with != null) {
for (WithQueryCommand withCommand : this.with) {
if (withCommand.isRecursive()) {
SetQuery setQuery = (SetQuery)withCommand.getCommand();
Boolean leftRequires = setQuery.getLeftQuery().getProcessorPlan().requiresTransaction(transactionalReads);
Boolean rightRequires = setQuery.getLeftQuery().getProcessorPlan().requiresTransaction(transactionalReads);
if (Boolean.TRUE.equals(leftRequires) || Boolean.TRUE.equals(rightRequires)) {
if (!Boolean.FALSE.equals(leftRequires) || !Boolean.FALSE.equals(rightRequires)) {
return true;
}
if (leftRequires == null || rightRequires == null) {
if (txnWtih) {
return true;
}
txnWtih = true;
}
} else {
Boolean requires = withCommand.getCommand().getProcessorPlan().requiresTransaction(transactionalReads);
if (Boolean.TRUE.equals(requires)) {
if (!Boolean.FALSE.equals(requires)) {
return true;
}
if (requires == null) {
if (txnWtih) {
return true;
}
txnWtih = true;
}
}
}
}
Boolean requires = requiresTransaction(transactionalReads, root);
if (Boolean.TRUE.equals(requires)) {
return true;
}
if (requires == null && txnWtih) {
return true;
}
if (txnWtih || requires == null) {
return null;
}
return false;
return requiresTransaction(transactionalReads, root);
}

static Boolean requiresTransaction(boolean transactionalReads, RelationalNode node) {
Boolean requiresTxn = node.requiresTransaction(transactionalReads);
if (Boolean.TRUE.equals(requiresTxn)) {
return true;
}
boolean last = true;
for (RelationalNode child : node.getChildren()) {
if (child == null) {
continue;
Expand All @@ -297,8 +275,14 @@ static Boolean requiresTransaction(boolean transactionalReads, RelationalNode no
return true;
}
requiresTxn = null;
last = true;
} else {
last = false;
}
}
if (requiresTxn == null && !last) {
return true;
}
return requiresTxn;
}

Expand Down
Expand Up @@ -29,16 +29,24 @@
import java.util.List;

import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.teiid.api.exception.query.QueryProcessingException;
import org.teiid.dqp.service.TransactionContext;
import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.dqp.service.TransactionService;
import org.teiid.metadata.Table;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.optimizer.TestOptimizer;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
import org.teiid.query.resolver.TestResolver;
import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.query.util.CommandContext;
import org.teiid.query.validator.TestUpdateValidator;
import org.teiid.translator.ExecutionFactory.TransactionSupport;

@SuppressWarnings("nls")
public class TestTriggerActions {
Expand Down Expand Up @@ -331,5 +339,61 @@ public class TestTriggerActions {
List<?>[] expected = new List[] {Arrays.asList(1)};
helpProcess(plan, context, dm, expected);
}

@Test public void testTransactions() throws Exception {
TransformationMetadata metadata = RealMetadataFactory.fromDDL("x",
new RealMetadataFactory.DDLHolder("virt", "CREATE VIEW users options (updatable true) as select id, dummy_data, updated_at from db1.user1 union all select id, dummy_data, updated_at from db2.user2; "
+ "CREATE TRIGGER ON users INSTEAD OF DELETE AS FOR EACH ROW IF (OLD.id < (SELECT MAX(id) FROM db1.user1)) BEGIN ATOMIC DELETE FROM db1.user1 WHERE id = OLD.id; END ELSE IF (OLD.id = (SELECT MAX(id) FROM db1.user1)) BEGIN ATOMIC UPDATE db1.user1 SET dummy_data = '', updated_at = NOW() WHERE id = OLD.id; END ELSE IF (OLD.id > (SELECT MAX(id) FROM db1.user1)) BEGIN ATOMIC DELETE FROM db2.user2 WHERE id = OLD.id; END; "
+ "CREATE TRIGGER ON users INSTEAD OF INSERT AS FOR EACH ROW begin atomic insert into db1.user1 (id) values (new.id); insert into db2.user2 (id) values (new.id); end; "
+ "CREATE TRIGGER ON users INSTEAD OF update AS FOR EACH ROW begin atomic end; "),
new RealMetadataFactory.DDLHolder("db1", "create foreign table user1 (id integer, dummy_data string, updated_at timestamp) options (updatable true);"),
new RealMetadataFactory.DDLHolder("db2", "create foreign table user2 (id integer, dummy_data string, updated_at timestamp) options (updatable true);"));

String sql = "delete from users where id = 203";
HardcodedDataManager dm = new HardcodedDataManager();
dm.addData("SELECT g_0.id, g_0.dummy_data, g_0.updated_at FROM db1.user1 AS g_0 WHERE g_0.id = 203", Arrays.asList(203, "", null));
dm.addData("SELECT g_0.id, g_0.dummy_data, g_0.updated_at FROM db2.user2 AS g_0 WHERE g_0.id = 203");
dm.addData("SELECT g_0.id FROM db1.user1 AS g_0", Arrays.asList(203), Arrays.asList(204));
dm.addData("DELETE FROM db1.user1 WHERE id = 203", Arrays.asList(1));
dm.addData("INSERT INTO db1.user1 (id) VALUES (205)", Arrays.asList(1));
dm.addData("INSERT INTO db2.user2 (id) VALUES (205)", Arrays.asList(1));

CommandContext context = createCommandContext();
BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setSourceProperty(Capability.TRANSACTION_SUPPORT, TransactionSupport.XA);
ProcessorPlan plan = TestProcessor.helpGetPlan(TestResolver.helpResolve(sql, metadata), metadata, new DefaultCapabilitiesFinder(caps), context);
//assumed required, but won't start a txn
assertTrue(plan.requiresTransaction(false));

TransactionContext tc = new TransactionContext();
TransactionService ts = Mockito.mock(TransactionService.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
((TransactionContext)invocation.getArguments()[0]).setTransactionType(Scope.REQUEST);
return null;
}
}).when(ts).begin(tc);

context.setTransactionService(ts);
context.setTransactionContext(tc);

List<?>[] expected = new List[] {Arrays.asList(1)};
helpProcess(plan, context, dm, expected);
Mockito.verify(ts, Mockito.never()).begin(tc);

//required, and will start a txn
sql = "insert into users (id) values (205)";
plan = TestProcessor.helpGetPlan(TestResolver.helpResolve(sql, metadata), metadata, new DefaultCapabilitiesFinder(caps), context);
assertTrue(plan.requiresTransaction(false));

helpProcess(plan, context, dm, expected);
Mockito.verify(ts, Mockito.times(1)).begin(tc);

//does nothing
sql = "update users set id = 205";
plan = TestProcessor.helpGetPlan(TestResolver.helpResolve(sql, metadata), metadata, new DefaultCapabilitiesFinder(caps), context);
assertFalse(plan.requiresTransaction(false));
}

}

0 comments on commit fdc5a67

Please sign in to comment.