New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate unnecessary cross joins #6395

Closed
wants to merge 12 commits into
base: master
from
@@ -57,6 +57,7 @@
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
public static final String PLAN_WITH_TABLE_NODE_PARTITIONING = "plan_with_table_node_partitioning";
public static final String COLOCATED_JOIN = "colocated_join";
public static final String REORDER_JOINS = "reorder_joins";
public static final String INITIAL_SPLITS_PER_NODE = "initial_splits_per_node";
public static final String SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL = "split_concurrency_adjustment_interval";
public static final String OPTIMIZE_METADATA_QUERIES = "optimize_metadata_queries";
@@ -230,6 +231,11 @@ public SystemSessionProperties(
"Experimental: Adapt plan to pre-partitioned tables",
true,
false),
booleanSessionProperty(
REORDER_JOINS,
"Experimental: Reorder joins to optimize plan",
featuresConfig.isJoinReorderingEnabled(),
false),
booleanSessionProperty(
COLOCATED_JOIN,
"Experimental: Use a colocated join when possible",
@@ -351,6 +357,11 @@ public static boolean planWithTableNodePartitioning(Session session)
return session.getSystemProperty(PLAN_WITH_TABLE_NODE_PARTITIONING, Boolean.class);
}
public static boolean isJoinReorderingEnabled(Session session)
{
return session.getSystemProperty(REORDER_JOINS, Boolean.class);
}
public static boolean isColocatedJoinEnabled(Session session)
{
return session.getSystemProperty(COLOCATED_JOIN, Boolean.class);
@@ -46,6 +46,7 @@
private boolean distributedIndexJoinsEnabled;
private boolean distributedJoinsEnabled = true;
private boolean colocatedJoinsEnabled;
private boolean reorderJoins;
private boolean redistributeWrites = true;
private boolean optimizeMetadataQueries;
private boolean optimizeHashGeneration = true;
@@ -127,6 +128,19 @@ public FeaturesConfig setColocatedJoinsEnabled(boolean colocatedJoinsEnabled)
return this;
}
public boolean isJoinReorderingEnabled()
{
return reorderJoins;
}
@Config("reorder-joins")
@ConfigDescription("Experimental: Reorder joins to optimize plan")
public FeaturesConfig setJoinReorderingEnabled(boolean reorderJoins)
{
this.reorderJoins = reorderJoins;
return this;
}
public boolean isRedistributeWrites()
{
return redistributeWrites;
@@ -22,6 +22,7 @@
import com.facebook.presto.sql.planner.optimizations.CanonicalizeExpressions;
import com.facebook.presto.sql.planner.optimizations.CountConstantOptimizer;
import com.facebook.presto.sql.planner.optimizations.DesugaringOptimizer;
import com.facebook.presto.sql.planner.optimizations.EliminateCrossJoins;
import com.facebook.presto.sql.planner.optimizations.EmptyDeleteOptimizer;
import com.facebook.presto.sql.planner.optimizations.HashGenerationOptimizer;
import com.facebook.presto.sql.planner.optimizations.ImplementFilteredAggregations;
@@ -104,7 +105,10 @@ public PlanOptimizers(Metadata metadata, SqlParser sqlParser, FeaturesConfig fea
new MergeProjections(),
new PruneUnreferencedOutputs(), // Make sure to run this at the end to help clean the plan for logging/execution and not remove info that other optimizers might need at an earlier point
new PruneIdentityProjections(), // This MUST run after PruneUnreferencedOutputs as it may introduce new redundant projections
new MetadataQueryOptimizer(metadata));
new MetadataQueryOptimizer(metadata),
new EliminateCrossJoins(), // This can pull up Filter and Project nodes from between Joins, so we need to push them down again
new PredicatePushDown(metadata, sqlParser),
new ProjectionPushDown());
if (featuresConfig.isOptimizeSingleDistinct()) {
builder.add(new SingleDistinctOptimizer());
@@ -0,0 +1,206 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.PlanNodeIdAllocator;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.planner.SymbolAllocator;
import com.facebook.presto.sql.planner.optimizations.joins.JoinGraph;
import com.facebook.presto.sql.planner.plan.FilterNode;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.sql.planner.plan.ProjectNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.tree.Expression;
import com.google.common.collect.ImmutableList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import static com.facebook.presto.sql.planner.plan.SimplePlanRewriter.rewriteWith;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
public class EliminateCrossJoins
implements PlanOptimizer
{
@Override
public PlanNode optimize(
PlanNode plan,
Session session,
Map<Symbol, Type> types,
SymbolAllocator symbolAllocator,
PlanNodeIdAllocator idAllocator)
{
if (!SystemSessionProperties.isJoinReorderingEnabled(session)) {
return plan;
}
List<JoinGraph> joinGraphs = JoinGraph.buildFrom(plan);
for (int i = joinGraphs.size() - 1; i >= 0; i--) {
JoinGraph graph = joinGraphs.get(i);
List<Integer> joinOrder = getJoinOrder(graph);
if (isOriginalOrder(joinOrder)) {
continue;
}
plan = rewriteWith(new Rewriter(idAllocator, graph, joinOrder), plan);
}
return plan;
}
public static boolean isOriginalOrder(List<Integer> joinOrder)
{
for (int i = 0; i < joinOrder.size(); i++) {
if (joinOrder.get(i) != i) {
return false;
}
}
return true;
}
/**
* Given JoinGraph determine the order of joins between graph nodes
* by traversing JoinGraph. Any graph traversal algorithm could be used
* here (like BFS or DFS), but we use PriorityQueue to preserve
* original JoinOrder as mush as it is possible. PriorityQueue returns
* next nodes to join in order of their occurrence in original Plan.
*/
public static List<Integer> getJoinOrder(JoinGraph graph)
{
ImmutableList.Builder<PlanNode> joinOrder = ImmutableList.builder();
Map<PlanNodeId, Integer> priorities = new HashMap<>();
for (int i = 0; i < graph.size(); i++) {
priorities.put(graph.getNode(i).getId(), i);
}
PriorityQueue<PlanNode> nodesToVisit = new PriorityQueue<>(
graph.size(),
(Comparator<PlanNode>) (node1, node2) -> priorities.get(node1.getId()).compareTo(priorities.get(node2.getId())));
Set<PlanNode> visited = new HashSet<>();
nodesToVisit.add(graph.getNode(0));
while (!nodesToVisit.isEmpty()) {
PlanNode node = nodesToVisit.poll();
if (!visited.contains(node)) {
visited.add(node);
joinOrder.add(node);
for (JoinGraph.Edge edge : graph.getEdges(node)) {
nodesToVisit.add(edge.getTargetNode());
}
}
if (nodesToVisit.isEmpty() && visited.size() < graph.size()) {
// disconnected graph, find new starting point
Optional<PlanNode> firstNotVisitedNode = graph.getNodes().stream()
.filter(graphNode -> !visited.contains(graphNode))
.findFirst();
if (firstNotVisitedNode.isPresent()) {
nodesToVisit.add(firstNotVisitedNode.get());
}
}
}
checkState(visited.size() == graph.size());
return joinOrder.build().stream()
.map(node -> priorities.get(node.getId()))
.collect(toImmutableList());
}
private class Rewriter
extends SimplePlanRewriter<PlanNode>
{
private final PlanNodeIdAllocator idAllocator;
private final JoinGraph graph;
private final List<Integer> joinOrder;
public Rewriter(PlanNodeIdAllocator idAllocator, JoinGraph graph, List<Integer> joinOrder)
{
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.graph = requireNonNull(graph, "graph is null");
this.joinOrder = requireNonNull(joinOrder, "joinOrder is null");
checkState(joinOrder.size() >= 2);
}
@Override
public PlanNode visitPlan(PlanNode node, RewriteContext<PlanNode> context)
{
if (node.getId() != graph.getRootId()) {
return context.defaultRewrite(node, context.get());
}
PlanNode result = graph.getNode(joinOrder.get(0));
Set<PlanNodeId> alreadyJoinedNodes = new HashSet<>();
alreadyJoinedNodes.add(result.getId());
for (int i = 1; i < joinOrder.size(); i++) {
PlanNode rightNode = graph.getNode(joinOrder.get(i));
alreadyJoinedNodes.add(rightNode.getId());
ImmutableList.Builder<JoinNode.EquiJoinClause> criteria = ImmutableList.builder();
for (JoinGraph.Edge edge : graph.getEdges(rightNode)) {
PlanNode targetNode = edge.getTargetNode();
if (alreadyJoinedNodes.contains(targetNode.getId())) {
criteria.add(new JoinNode.EquiJoinClause(
edge.getTargetSymbol(),
edge.getSourceSymbol()));
}
}
result = new JoinNode(
idAllocator.getNextId(),
JoinNode.Type.INNER,
result,
rightNode,
criteria.build(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
List<Expression> filters = graph.getFilters();
for (Expression filter : filters) {
result = new FilterNode(
idAllocator.getNextId(),
result,
filter);
}
if (!graph.getAssignments().isPresent()) {
return result;
}
return new ProjectNode(
idAllocator.getNextId(),
result,
graph.getAssignments().get());
}
}
}
@@ -21,8 +21,6 @@
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.ProjectNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.SymbolReference;
import com.google.common.collect.ImmutableList;
import java.util.Map;
@@ -61,17 +59,7 @@ public PlanNode visitProject(ProjectNode node, RewriteContext<Void> context)
return replaceChildren(node, ImmutableList.of(source));
}
boolean canElide = true;
for (Map.Entry<Symbol, Expression> entry : node.getAssignments().entrySet()) {
Expression expression = entry.getValue();
Symbol symbol = entry.getKey();
if (!(expression instanceof SymbolReference && ((SymbolReference) expression).getName().equals(symbol.getName()))) {
canElide = false;
break;
}
}
if (canElide) {
if (node.isIdentity()) {
return source;
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.