Skip to content

Commit

Permalink
TEIID-2878 pushing limit/order by over union
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Mar 4, 2014
1 parent d3ac501 commit ff4071e
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.teiid.query.sql.lang.CompareCriteria;
import org.teiid.query.sql.lang.Criteria;
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.sql.lang.OrderBy;
import org.teiid.query.sql.lang.OrderByItem;
import org.teiid.query.sql.lang.SetQuery;
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.Expression;
Expand Down Expand Up @@ -154,22 +156,14 @@ boolean canPushLimit(PlanNode rootNode, PlanNode limitNode, List<PlanNode> limit
}
case NodeConstants.Types.SET_OP:
{
if (!SetQuery.Operation.UNION.equals(child.getProperty(NodeConstants.Info.SET_OPERATION))) {
return false;
}
if (!child.hasBooleanProperty(NodeConstants.Info.USE_ALL) && !limitNode.hasBooleanProperty(Info.IS_NON_STRICT)) {
if (!canPushToBranches(limitNode, child)) {
return false;
}
//distribute the limit
List<PlanNode> grandChildren = new LinkedList<PlanNode>(child.getChildren());
for (PlanNode grandChild : grandChildren) {
PlanNode newLimit = newLimit(limitNode);
newLimit.setProperty(NodeConstants.Info.MAX_TUPLE_LIMIT, op(SourceSystemFunctions.ADD_OP, parentLimit, parentOffset, metadata.getFunctionLibrary()));
grandChild.addAsParent(newLimit);
limitNodes.add(newLimit);
if (grandChild.getType() == NodeConstants.Types.SET_OP) {
newLimit.setProperty(Info.IS_COPIED, true);
}
addBranchLimit(limitNode, limitNodes, metadata,
parentLimit, parentOffset, grandChild);
}

return false;
Expand Down Expand Up @@ -223,13 +217,146 @@ boolean canPushLimit(PlanNode rootNode, PlanNode limitNode, List<PlanNode> limit
case NodeConstants.Types.SELECT:
case NodeConstants.Types.DUP_REMOVE:
return limitNode.hasBooleanProperty(Info.IS_NON_STRICT);
case NodeConstants.Types.SORT:
if (child.getFirstChild().getType() == NodeConstants.Types.SET_OP) {
PlanNode setOp = child.getFirstChild();
if (!canPushToBranches(limitNode, setOp)) {
return false;
}
OrderBy parentOrderBy = (OrderBy) child.getProperty(NodeConstants.Info.SORT_ORDER);
distributeLimit(limitNode, setOp, parentOrderBy, metadata, limitNodes, parentLimit, parentOffset, capFinder);
}
return false;
default:
{
return false;
}
}
}

private void addBranchLimit(PlanNode limitNode, List<PlanNode> limitNodes,
QueryMetadataInterface metadata, Expression parentLimit,
Expression parentOffset, PlanNode grandChild) {
PlanNode newLimit = newLimit(limitNode);
newLimit.setProperty(NodeConstants.Info.MAX_TUPLE_LIMIT, op(SourceSystemFunctions.ADD_OP, parentLimit, parentOffset, metadata.getFunctionLibrary()));
grandChild.addAsParent(newLimit);
limitNodes.add(newLimit);
if (grandChild.getType() == NodeConstants.Types.SET_OP) {
newLimit.setProperty(Info.IS_COPIED, true);
}
}

/**
* Push the limit and order by to each union branch
* TODO: check if the top limit is smaller and implement sorted sublist processing, rather than performing a full resort
*/
private void distributeLimit(PlanNode limitNode, PlanNode setOp,
OrderBy parentOrderBy, QueryMetadataInterface metadata, List<PlanNode> limitNodes, Expression parentLimit, Expression parentOffset, CapabilitiesFinder capFinder) throws QueryMetadataException, TeiidComponentException {
outer: for (PlanNode branch : setOp.getChildren()) {
PlanNode branchSort = NodeEditor.findNodePreOrder(branch, NodeConstants.Types.SORT, NodeConstants.Types.SET_OP | NodeConstants.Types.SOURCE);
if (branchSort != null) {
//implies there is a limit
OrderBy orderBy = (OrderBy) branchSort.getProperty(NodeConstants.Info.SORT_ORDER);
//can only proceed if order by matches
if (parentOrderBy.getOrderByItems().size() > orderBy.getOrderByItems().size()) {
continue;
}

List<OrderByItem> parentkeys = parentOrderBy.getOrderByItems();
List<OrderByItem> keys = orderBy.getOrderByItems();

for (int i = 0; i < parentkeys.size(); i++) {
int pos1 = parentkeys.get(i).getExpressionPosition();
int pos2 = keys.get(i).getExpressionPosition();
if (pos1 == -1 || pos2 == -1 || pos1 != pos2) {
continue outer;
}
}
addBranchLimit(limitNode, limitNodes, metadata, parentLimit, parentOffset, branch);
} else {
if (branch.getType() == NodeConstants.Types.SET_OP && canPushToBranches(limitNode, branch)) {
//go to the children
distributeLimit(limitNode, branch, parentOrderBy, metadata, limitNodes, parentLimit, parentOffset, capFinder);
continue;
}
PlanNode newSort = NodeFactory.getNewNode(NodeConstants.Types.SORT);

//push both the limit and order by

List<OrderByItem> parentkeys = parentOrderBy.getOrderByItems();
List<Expression> cols = (List<Expression>) NodeEditor.findNodePreOrder(branch, NodeConstants.Types.PROJECT).getProperty(NodeConstants.Info.PROJECT_COLS);

OrderBy newOrderBy = new OrderBy();
for (int i = 0; i < parentkeys.size(); i++) {
OrderByItem item = parentkeys.get(i).clone();
if (item.getExpressionPosition() == -1) {
continue outer;
}
Expression ex = cols.get(item.getExpressionPosition());
item.setSymbol((Expression) ex.clone());
newOrderBy.getOrderByItems().add(item);
}
newSort.setProperty(Info.SORT_ORDER, newOrderBy);
PlanNode childLimit = NodeEditor.findNodePreOrder(branch, NodeConstants.Types.TUPLE_LIMIT, NodeConstants.Types.SET_OP | NodeConstants.Types.SOURCE);
if (childLimit != null) {
PlanNode parentAccess = NodeEditor.findParent(childLimit, NodeConstants.Types.ACCESS, NodeConstants.Types.SET_OP);
if (parentAccess != null) {
//if there is a parent access, we need to handle pushing the sort
boolean removedLimit = false;
if (parentAccess.getFirstChild() == childLimit) {
parentAccess.removeChild(childLimit);
parentAccess.addFirstChild(childLimit.getFirstChild());
removedLimit = true;
}
boolean canRaise = RuleRaiseAccess.canRaiseOverSort(parentAccess, metadata, capFinder, newSort, null, false);
if (removedLimit) {
childLimit.addFirstChild(parentAccess.getFirstChild());
parentAccess.addFirstChild(childLimit);
}
if (canRaise) {
//put under the limit
//TODO: check to make sure that we're narrowing the limit
//we won't know this in all cases since it could be parameterized
childLimit.getFirstChild().addAsParent(newSort);
} else {
continue outer;
//TODO - once we support sorted sublist processing, then we'll want to push
//put over the access node
//parentAccess.addAsParent(newSort);
//branch = newSort;
}
} else {
continue outer;
//TODO - once we support sorted sublist processing, then we'll want to push
//branch.addAsParent(newSort);
//branch = newSort;
}
} else {
if (branch.getType() == NodeConstants.Types.ACCESS &&
(!RuleRaiseAccess.canRaiseOverSort(branch, metadata, capFinder, newSort, null, false)
|| !CapabilitiesUtil.supportsRowLimit(RuleRaiseAccess.getModelIDFromAccess(branch, metadata), metadata, capFinder))) {
//TODO - once we support sorted sublist processing, then we'll want to push
continue outer;
}
//TODO: a better check to see if limit is supported - as it may not be pushed
branch.addAsParent(newSort);
branch = newSort;
}
addBranchLimit(limitNode, limitNodes, metadata, parentLimit, parentOffset, branch);
}
}
}

private boolean canPushToBranches(PlanNode limitNode, PlanNode child) {
if (!SetQuery.Operation.UNION.equals(child.getProperty(NodeConstants.Info.SET_OPERATION))) {
return false;
}
if (!child.hasBooleanProperty(NodeConstants.Info.USE_ALL) && !limitNode.hasBooleanProperty(Info.IS_NON_STRICT)) {
return false;
}
return true;
}

private static PlanNode newLimit(PlanNode limitNode) {
PlanNode newLimit = NodeFactory.getNewNode(NodeConstants.Types.TUPLE_LIMIT);
if (limitNode.hasBooleanProperty(Info.IS_NON_STRICT)) {
Expand Down
135 changes: 135 additions & 0 deletions engine/src/test/java/org/teiid/query/optimizer/TestUnionPlanning.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.teiid.query.optimizer.capabilities.FakeCapabilitiesFinder;
import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.processor.relational.LimitNode;
import org.teiid.query.unittest.RealMetadataFactory;

@SuppressWarnings("nls")
Expand Down Expand Up @@ -382,5 +383,139 @@ public void testUnionPushDown3() {
});

}

@Test public void testUnionWithOrderedLimits1() throws Exception {
String sql = "select * from ((select e1, e2, 'a' source from pm1.g1 order by e2 desc limit 5000)"
+ " union all (select e1, e2, 'b' source from pm2.g2 order by e2 desc limit 5000)"
+ " union all (select e1, e2, 'c' source from pm1.g3 order by e2 desc limit 5000)) x"
+ " order by e2 desc limit 0, 500";

BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setCapabilitySupport(Capability.ROW_LIMIT, true);

ProcessorPlan plan = TestOptimizer.helpPlan(sql, RealMetadataFactory.example1Cached(), null, new DefaultCapabilitiesFinder(caps),//$NON-NLS-1$
new String[] { "SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm1.g1 AS g_0 ORDER BY c_1 DESC LIMIT 500",
"SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm2.g2 AS g_0 ORDER BY c_1 DESC LIMIT 500",
"SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm1.g3 AS g_0 ORDER BY c_1 DESC LIMIT 500" }, ComparisonMode.EXACT_COMMAND_STRING);

TestOptimizer.checkNodeTypes(plan, new int[] {
3, // Access
0, // DependentAccess
0, // DependentSelect
0, // DependentProject
0, // DupRemove
0, // Grouping
0, // NestedLoopJoinStrategy
0, // MergeJoinStrategy
0, // Null
0, // PlanExecution
0, // Project
0, // Select
1, // Sort
1 // UnionAll
});


TestOptimizer.checkNodeTypes(plan, new int[] {1}, new Class<?>[] {LimitNode.class});
}

//here the second branch does not match the top level ordering, so the limit is not combined
@Test public void testUnionWithOrderedLimits2() throws Exception {
String sql = "select * from ((select e1, e2, 'a' source from pm1.g1 order by e2 desc limit 5000)"
+ " union all (select e1, e2, 'b' source from pm2.g2 order by e1 desc limit 5000)) x"
+ " order by e2 desc limit 0, 500";

BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setCapabilitySupport(Capability.ROW_LIMIT, true);

ProcessorPlan plan = TestOptimizer.helpPlan(sql, RealMetadataFactory.example1Cached(), null, new DefaultCapabilitiesFinder(caps),//$NON-NLS-1$
new String[] { "SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm1.g1 AS g_0 ORDER BY c_1 DESC LIMIT 500"
, "SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm2.g2 AS g_0 ORDER BY c_0 DESC LIMIT 5000" }, ComparisonMode.EXACT_COMMAND_STRING);

TestOptimizer.checkNodeTypes(plan, new int[] {
2, // Access
0, // DependentAccess
0, // DependentSelect
0, // DependentProject
0, // DupRemove
0, // Grouping
0, // NestedLoopJoinStrategy
0, // MergeJoinStrategy
0, // Null
0, // PlanExecution
0, // Project
0, // Select
1, // Sort
1 // UnionAll
});

TestOptimizer.checkNodeTypes(plan, new int[] {1}, new Class<?>[] {LimitNode.class});
}

@Test public void testUnionWithOrderedLimits3() throws Exception {
String sql = "select * from ((select e1, e2, 'a' source from pm1.g1)"
+ " union all (select e1, e2, 'b' source from pm2.g2 limit 5000)) x"
+ " order by e2 desc limit 0, 500";

BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setCapabilitySupport(Capability.ROW_LIMIT, true);

ProcessorPlan plan = TestOptimizer.helpPlan(sql, RealMetadataFactory.example1Cached(), null, new DefaultCapabilitiesFinder(caps),//$NON-NLS-1$
new String[] { "SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm1.g1 AS g_0 ORDER BY c_1 DESC LIMIT 500", "SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm2.g2 AS g_0 ORDER BY c_1 DESC LIMIT 500" }, ComparisonMode.EXACT_COMMAND_STRING);

TestOptimizer.checkNodeTypes(plan, new int[] {
2, // Access
0, // DependentAccess
0, // DependentSelect
0, // DependentProject
0, // DupRemove
0, // Grouping
0, // NestedLoopJoinStrategy
0, // MergeJoinStrategy
0, // Null
0, // PlanExecution
1, // Project
0, // Select
1, // Sort
1 // UnionAll
});

TestOptimizer.checkNodeTypes(plan, new int[] {1}, new Class<?>[] {LimitNode.class});
}

@Test public void testUnionWithOrderedLimits4() throws Exception {
String sql = "select * from ((select e1, e2, 'a' source from pm1.g1)"
+ " union all (select e1, e2, 'b' source from pm2.g2 limit 5000)) x"
+ " order by e2 desc limit 0, 500";

BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setCapabilitySupport(Capability.ROW_LIMIT, true);
caps.setCapabilitySupport(Capability.QUERY_ORDERBY, false);

ProcessorPlan plan = TestOptimizer.helpPlan(sql, RealMetadataFactory.example1Cached(), null, new DefaultCapabilitiesFinder(caps),//$NON-NLS-1$
new String[] { "SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm2.g2 AS g_0 LIMIT 5000", "SELECT g_0.e1, g_0.e2 FROM pm1.g1 AS g_0" }, ComparisonMode.EXACT_COMMAND_STRING);

TestOptimizer.checkNodeTypes(plan, new int[] {
2, // Access
0, // DependentAccess
0, // DependentSelect
0, // DependentProject
0, // DupRemove
0, // Grouping
0, // NestedLoopJoinStrategy
0, // MergeJoinStrategy
0, // Null
0, // PlanExecution
0, // Project
0, // Select
1, // Sort
1 // UnionAll
});

}

//TODO: enhancement for ordering over a partition



}

0 comments on commit ff4071e

Please sign in to comment.