Skip to content

Commit

Permalink
Add strategy to select layouts in AddExchanges
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Apr 25, 2015
1 parent 87ed3f0 commit cc9fd70
Show file tree
Hide file tree
Showing 4 changed files with 965 additions and 33 deletions.
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConstantProperty;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.sql.planner.Symbol;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -24,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -270,4 +272,41 @@ public ActualProperties build()
return new ActualProperties(partitioningColumns, hashingColumns, localProperties, partitioned, coordinatorOnly, constants);
}
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.add("partitioningColumns", partitioningColumns)
.add("hashingColumns", hashingColumns)
.add("partitioned", partitioned)
.add("coordinatorOnly", coordinatorOnly)
.add("localProperties", localProperties)
.add("constants", constants)
.toString();
}

@Override
public int hashCode()
{
return Objects.hash(partitioningColumns, hashingColumns, partitioned, coordinatorOnly, localProperties, constants.keySet());
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final ActualProperties other = (ActualProperties) obj;
return Objects.equals(this.partitioningColumns, other.partitioningColumns)
&& Objects.equals(this.hashingColumns, other.hashingColumns)
&& Objects.equals(this.partitioned, other.partitioned)
&& Objects.equals(this.coordinatorOnly, other.coordinatorOnly)
&& Objects.equals(this.localProperties, other.localProperties)
&& Objects.equals(this.constants.keySet(), other.constants.keySet());
}
}
Expand Up @@ -60,6 +60,11 @@
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.NullLiteral;
import com.facebook.presto.sql.tree.QualifiedNameReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
Expand All @@ -70,14 +75,17 @@
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.function.Predicate;

import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
import static com.facebook.presto.sql.ExpressionUtils.combineConjuncts;
Expand All @@ -92,7 +100,9 @@
import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.util.stream.Collectors.toList;

public class AddExchanges
extends PlanOptimizer
Expand Down Expand Up @@ -515,44 +525,64 @@ private PlanWithProperties planTableScan(TableScanNode node, Expression predicat
ActualProperties.unpartitioned());
}

TableLayoutResult layout = pickLayout(layouts, preferred);
// Filter out layouts that cannot supply all the required columns
layouts = layouts.stream()
.filter(layoutHasAllNeededOutputs(node))
.collect(toList());
checkState(!layouts.isEmpty(), "No usable layouts for %s", node);

Expression originalConstraint = node.getOriginalConstraint();
if (originalConstraint == null) {
originalConstraint = predicate;
}
List<PlanWithProperties> possiblePlans = layouts.stream()
.map(layout -> {
TableScanNode tableScan = new TableScanNode(
node.getId(),
node.getTable(),
node.getOutputSymbols(),
node.getAssignments(),
Optional.of(layout.getLayout().getHandle()),
simplifiedConstraint.intersect(layout.getLayout().getPredicate()),
Optional.ofNullable(node.getOriginalConstraint()).orElse(predicate));

TableScanNode tableScan = new TableScanNode(
node.getId(),
node.getTable(),
node.getOutputSymbols(),
node.getAssignments(),
Optional.of(layout.getLayout().getHandle()),
simplifiedConstraint.intersect(layout.getLayout().getPredicate()),
originalConstraint);
PlanWithProperties result = new PlanWithProperties(tableScan, deriveProperties(tableScan, ImmutableList.of()));

PlanWithProperties result = new PlanWithProperties(tableScan, deriveProperties(tableScan, ImmutableList.of()));
Expression resultingPredicate = combineConjuncts(
DomainTranslator.toPredicate(
layout.getUnenforcedConstraint().transform(assignments::get),
symbolAllocator.getTypes()),
stripDeterministicConjuncts(predicate),
decomposedPredicate.getRemainingExpression());

Expression resultingPredicate = combineConjuncts(
DomainTranslator.toPredicate(
layout.getUnenforcedConstraint().transform(assignments::get),
symbolAllocator.getTypes()),
stripDeterministicConjuncts(predicate),
decomposedPredicate.getRemainingExpression());
if (!BooleanLiteral.TRUE_LITERAL.equals(resultingPredicate)) {
return withDerivedProperties(
new FilterNode(idAllocator.getNextId(), result.getNode(), resultingPredicate),
deriveProperties(tableScan, ImmutableList.of()));
}

if (!BooleanLiteral.TRUE_LITERAL.equals(resultingPredicate)) {
return withDerivedProperties(
new FilterNode(idAllocator.getNextId(), result.getNode(), resultingPredicate),
deriveProperties(tableScan, ImmutableList.of()));
}
return result;
})
.collect(toList());

return result;
return pickPlan(possiblePlans, preferred);
}

private TableLayoutResult pickLayout(List<TableLayoutResult> layouts, PreferredProperties preferred)
private Predicate<TableLayoutResult> layoutHasAllNeededOutputs(TableScanNode node)
{
// TODO: for now, pick first available layout
return layouts.get(0);
return layout -> !layout.getLayout().getColumns().isPresent()
|| layout.getLayout().getColumns().get().containsAll(Lists.transform(node.getOutputSymbols(), node.getAssignments()::get));
}

/**
* possiblePlans should be provided in layout preference order
*/
private PlanWithProperties pickPlan(List<PlanWithProperties> possiblePlans, PreferredProperties preferred)
{
checkArgument(!possiblePlans.isEmpty());

if (SystemSessionProperties.preferStreamingOperators(session, false)) {
possiblePlans = new ArrayList<>(possiblePlans);
Collections.sort(possiblePlans, Comparator.comparing(PlanWithProperties::getProperties, streamingExecutionPreference(preferred))); // stable sort; is Collections.min() guaranteed to be stable?
}

return possiblePlans.get(0);
}

private boolean shouldPrune(Expression predicate, Map<Symbol, ColumnHandle> assignments, Map<ColumnHandle, ?> bindings)
Expand Down Expand Up @@ -794,8 +824,8 @@ private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, PlanWithProp

private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, List<PlanWithProperties> children)
{
PlanNode result = ChildReplacer.replaceChildren(node, children.stream().map(PlanWithProperties::getNode).collect(Collectors.toList()));
return new PlanWithProperties(result, deriveProperties(result, children.stream().map(PlanWithProperties::getProperties).collect(Collectors.toList())));
PlanNode result = ChildReplacer.replaceChildren(node, children.stream().map(PlanWithProperties::getNode).collect(toList()));
return new PlanWithProperties(result, deriveProperties(result, children.stream().map(PlanWithProperties::getProperties).collect(toList())));
}

private PlanWithProperties withDerivedProperties(PlanNode node, ActualProperties inputProperties)
Expand All @@ -814,7 +844,83 @@ private ActualProperties deriveProperties(PlanNode result, List<ActualProperties
}
}

private static class PlanWithProperties
@VisibleForTesting
static Comparator<ActualProperties> streamingExecutionPreference(PreferredProperties preferred)
{
// Calculating the matches can be a bit expensive, so cache the results between comparisons
LoadingCache<List<LocalProperty<Symbol>>, List<Optional<LocalProperty<Symbol>>>> matchCache = CacheBuilder.newBuilder()
.build(new CacheLoader<List<LocalProperty<Symbol>>, List<Optional<LocalProperty<Symbol>>>>()
{
@Override
public List<Optional<LocalProperty<Symbol>>> load(List<LocalProperty<Symbol>> actualProperties)
{
return LocalProperties.match(actualProperties, preferred.getLocalProperties());
}
});

return (actual1, actual2) -> {
List<Optional<LocalProperty<Symbol>>> matchLayout1 = matchCache.getUnchecked(actual1.getLocalProperties());
List<Optional<LocalProperty<Symbol>>> matchLayout2 = matchCache.getUnchecked(actual2.getLocalProperties());

return ComparisonChain.start()
.compareTrueFirst(hasLocalOptimization(preferred.getLocalProperties(), matchLayout1), hasLocalOptimization(preferred.getLocalProperties(), matchLayout2))
.compareTrueFirst(meetsPartitioningRequirements(preferred, actual1), meetsPartitioningRequirements(preferred, actual2))
.compare(matchLayout1, matchLayout2, matchedLayoutPreference())
.result();
};
}

private static <T> boolean hasLocalOptimization(List<LocalProperty<T>> desiredLayout, List<Optional<LocalProperty<T>>> matchResult)
{
checkArgument(desiredLayout.size() == matchResult.size());
if (matchResult.isEmpty()) {
return false;
}
// Optimizations can be applied if the first LocalProperty has been modified in the match in any way
return !matchResult.get(0).equals(Optional.of(desiredLayout.get(0)));
}

private static boolean meetsPartitioningRequirements(PreferredProperties preferred, ActualProperties actual)
{
if (!preferred.getPartitioningProperties().isPresent()) {
return true;
}
PartitioningPreferences partitioningPreferences = preferred.getPartitioningProperties().get();
if (!partitioningPreferences.isPartitioned()) {
return !actual.isPartitioned();
}
if (!partitioningPreferences.getPartitioningColumns().isPresent()) {
return actual.isPartitioned();
}
return actual.isPartitionedOn(partitioningPreferences.getPartitioningColumns().get());
}

// Prefer the match result that satisfied the most requirements
private static <T> Comparator<List<Optional<LocalProperty<T>>>> matchedLayoutPreference()
{
return (matchLayout1, matchLayout2) -> {
Iterator<Optional<LocalProperty<T>>> match1Iterator = matchLayout1.iterator();
Iterator<Optional<LocalProperty<T>>> match2Iterator = matchLayout2.iterator();
while (match1Iterator.hasNext() && match2Iterator.hasNext()) {
Optional<LocalProperty<T>> match1 = match1Iterator.next();
Optional<LocalProperty<T>> match2 = match2Iterator.next();
if (match1.isPresent() && match2.isPresent()) {
return Integer.compare(match1.get().getColumns().size(), match2.get().getColumns().size());
}
else if (match1.isPresent()) {
return 1;
}
else if (match2.isPresent()) {
return -1;
}
}
checkState(!match1Iterator.hasNext() && !match2Iterator.hasNext()); // Should be the same size
return 0;
};
}

@VisibleForTesting
static class PlanWithProperties
{
private final PlanNode node;
private final ActualProperties properties;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.sql.tree.Expression;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -133,4 +134,17 @@ public <C, R> R accept(PlanVisitor<C, R> visitor, C context)
{
return visitor.visitTableScan(this, context);
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.add("table", table)
.add("tableLayout", tableLayout)
.add("outputSymbols", outputSymbols)
.add("assignments", assignments)
.add("currentConstraint", currentConstraint)
.add("originalConstraint", originalConstraint)
.toString();
}
}

0 comments on commit cc9fd70

Please sign in to comment.