Skip to content

Commit

Permalink
TEIID-2369 adding plan validation prior to reexecution
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed May 7, 2013
1 parent 45476d6 commit 925fc58
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 159 deletions.
Expand Up @@ -122,5 +122,9 @@ public boolean prepare(TupleBufferCache bufferManager) {
public boolean restore(TupleBufferCache bufferManager) {
return true; //no remotable actions
}

public boolean validate() {
return this.accessInfo.validate(false, 0);
}

}
Expand Up @@ -69,15 +69,25 @@ public QueryProcessorFactoryImpl(BufferManager bufferMgr,

@Override
public QueryProcessor createQueryProcessor(String query, String recursionGroup, CommandContext commandContext, Object... params) throws TeiidProcessingException, TeiidComponentException {
PreparedPlan pp = commandContext.getPlan(query);
CommandContext copy = commandContext.clone();
if (recursionGroup != null) {
copy.pushCall(recursionGroup);
}
CommandContext copy = commandContext.clone();
QueryMetadataInterface metadata = commandContext.getMetadata();
if (metadata == null) {
metadata = defaultMetadata;
}
PreparedPlan pp = getPreparedPlan(query, recursionGroup, copy, metadata);
copy.pushVariableContext(new VariableContext());
PreparedStatementRequest.resolveParameterValues(pp.getReferences(), Arrays.asList(params), copy, metadata);
return new QueryProcessor(pp.getPlan().clone(), copy, bufferMgr, dataMgr);
}

@Override
public PreparedPlan getPreparedPlan(String query, String recursionGroup,
CommandContext commandContext, QueryMetadataInterface metadata) throws
TeiidComponentException, TeiidProcessingException {
if (recursionGroup != null) {
commandContext.pushCall(recursionGroup);
}
PreparedPlan pp = commandContext.getPlan(query);
if (pp == null) {
ParseInfo parseInfo = new ParseInfo();
Command newCommand = QueryParser.getQueryParser().parseCommand(query, parseInfo);
Expand All @@ -87,23 +97,22 @@ public QueryProcessor createQueryProcessor(String query, String recursionGroup,

AbstractValidationVisitor visitor = new ValidationVisitor();
Request.validateWithVisitor(visitor, metadata, newCommand);
Determinism determinismLevel = copy.resetDeterminismLevel();
Determinism determinismLevel = commandContext.resetDeterminismLevel();
try {
newCommand = QueryRewriter.rewrite(newCommand, metadata, copy);
newCommand = QueryRewriter.rewrite(newCommand, metadata, commandContext);
AnalysisRecord record = new AnalysisRecord(false, false);
ProcessorPlan plan = QueryOptimizer.optimizePlan(newCommand, metadata, idGenerator, finder, record, copy);
ProcessorPlan plan = QueryOptimizer.optimizePlan(newCommand, metadata, idGenerator, finder, record, commandContext);
pp = new PreparedPlan();
pp.setPlan(plan, copy);
pp.setPlan(plan, commandContext);
pp.setReferences(references);
pp.setAnalysisRecord(record);
pp.setCommand(newCommand);
commandContext.putPlan(query, pp, copy.getDeterminismLevel());
commandContext.putPlan(query, pp, commandContext.getDeterminismLevel());
} finally {
copy.setDeterminismLevel(determinismLevel);
commandContext.setDeterminismLevel(determinismLevel);
}
}
copy.pushVariableContext(new VariableContext());
PreparedStatementRequest.resolveParameterValues(pp.getReferences(), Arrays.asList(params), copy, metadata);
return new QueryProcessor(pp.getPlan().clone(), copy, bufferMgr, dataMgr);
return pp;
}

}
Expand Up @@ -309,7 +309,7 @@ public static void validateWithVisitor(
private void createProcessor() throws TeiidComponentException {
this.context.setTransactionContext(getTransactionContext(true));
this.processor = new QueryProcessor(processPlan, context, bufferManager, processorDataManager);
this.processor.setContinuous(this.requestMsg.getRequestOptions().isContinuous());
this.processor.setContinuous(this.requestMsg.getRequestOptions().isContinuous(), this.requestMsg.getCommandString());
}

TransactionContext getTransactionContext(boolean startAutoWrap) throws TeiidComponentException {
Expand Down
37 changes: 29 additions & 8 deletions engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
Expand Up @@ -34,12 +34,14 @@
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.dqp.internal.process.TupleSourceCache;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.QueryPlugin;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.util.CommandContext;

Expand All @@ -56,6 +58,9 @@ public static class ExpiredTimeSliceException extends BlockedException {

public interface ProcessorFactory {
QueryProcessor createQueryProcessor(String query, String recursionGroup, CommandContext commandContext, Object... params) throws TeiidProcessingException, TeiidComponentException;
PreparedPlan getPreparedPlan(String query, String recursionGroup,
CommandContext commandContext, QueryMetadataInterface metadata)
throws TeiidProcessingException, TeiidComponentException;
}

private CommandContext context;
Expand All @@ -71,6 +76,8 @@ public interface ProcessorFactory {
private boolean processorClosed;

private boolean continuous;
private String query; //used only in continuous mode
private PreparedPlan plan;
private int rowOffset = 1;

/**
Expand Down Expand Up @@ -198,14 +205,27 @@ private TupleBatch nextBatchDirect()
}

public void init() throws TeiidComponentException, TeiidProcessingException {
// initialize if necessary
if(!initialized) {
reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
this.processPlan.initialize(context, dataMgr, bufferMgr);
initialized = true;
}

if (!open) {
if (continuous && context.getReuseCount() > 0) {
//validate the plan prior to the next run
//ideally we would reuse the prepared plan from initial planning in the case of a prepared statement
if (this.plan != null && !this.plan.validate()) {
this.plan = null;
}
if (this.plan == null) {
this.plan = context.getQueryProcessorFactory().getPreparedPlan(query, null, context, context.getMetadata());
this.processPlan = this.plan.getPlan().clone();
this.processPlan.initialize(context, dataMgr, bufferMgr);
}
}

// initialize if necessary
if(!initialized) {
reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
this.processPlan.initialize(context, dataMgr, bufferMgr);
initialized = true;
}

// Open the top node for reading
processPlan.open();
open = true;
Expand Down Expand Up @@ -297,9 +317,10 @@ public BufferManager getBufferManager() {
return bufferMgr;
}

public void setContinuous(boolean continuous) {
public void setContinuous(boolean continuous, String sql) {
this.continuous = continuous;
if (this.continuous) {
this.query = sql;
this.context.setContinuous();
}
}
Expand Down

0 comments on commit 925fc58

Please sign in to comment.