Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add applyFilter equivalent to Constraint-based table layout selection #541

Merged
merged 3 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2874,7 +2874,7 @@ private Consumer<Plan> assertRemoteExchangesCount(int expectedRemoteExchangesCou
if (actualRemoteExchangesCount != expectedRemoteExchangesCount) {
Session session = getSession();
Metadata metadata = ((DistributedQueryRunner) getQueryRunner()).getCoordinator().getMetadata();
String formattedPlan = textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata.getFunctionRegistry(), StatsAndCosts.empty(), session, 0);
String formattedPlan = textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata.getFunctionRegistry(), Optional.of(metadata), StatsAndCosts.empty(), session, 0);
throw new AssertionError(format(
"Expected [\n%s\n] remote exchanges but found [\n%s\n] remote exchanges. Actual plan is [\n\n%s\n]",
expectedRemoteExchangesCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Pattern<TableScanNode> getPattern()
protected Optional<PlanNodeStatsEstimate> doCalculate(TableScanNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types)
{
// TODO Construct predicate like AddExchanges's LayoutConstraintEvaluator
Constraint<ColumnHandle> constraint = new Constraint<>(node.getCurrentConstraint());
Constraint<ColumnHandle> constraint = new Constraint<>(metadata.getTableProperties(session, node.getTable()).getPredicate());

TableStatistics tableStatistics = metadata.getTableStatistics(session, node.getTable(), constraint);
verify(tableStatistics != null, "tableStatistics is null for %s", node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class QueryMonitor
private final String environment;
private final SessionPropertyManager sessionPropertyManager;
private final FunctionRegistry functionRegistry;
private final Metadata metadata;
private final int maxJsonLimit;

@Inject
Expand All @@ -113,7 +114,8 @@ public QueryMonitor(
this.serverAddress = requireNonNull(nodeInfo, "nodeInfo is null").getExternalAddress();
this.environment = requireNonNull(nodeInfo, "nodeInfo is null").getEnvironment();
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.functionRegistry = requireNonNull(metadata, "metadata is null").getFunctionRegistry();
this.metadata = requireNonNull(metadata, "metadata is null");
this.functionRegistry = metadata.getFunctionRegistry();
this.maxJsonLimit = toIntExact(requireNonNull(config, "config is null").getMaxOutputStageJsonSize().toBytes());
}

Expand Down Expand Up @@ -306,6 +308,7 @@ private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
return Optional.of(textDistributedPlan(
queryInfo.getOutputStage().get(),
functionRegistry,
Optional.empty(), // transaction is no longer active, so metadata is useless
queryInfo.getSession().toSession(sessionPropertyManager),
false));
}
Expand Down
3 changes: 3 additions & 0 deletions presto-main/src/main/java/io/prestosql/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.prestosql.spi.connector.ConnectorOutputMetadata;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.LimitApplicationResult;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -390,4 +391,6 @@ public interface Metadata
boolean usesLegacyTableLayouts(Session session, TableHandle table);

Optional<LimitApplicationResult<TableHandle>> applyLimit(Session session, TableHandle table, long limit);

Optional<ConstraintApplicationResult<TableHandle>> applyFilter(Session session, TableHandle table, Constraint<ColumnHandle> constraint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.LimitApplicationResult;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
Expand Down Expand Up @@ -1194,6 +1195,21 @@ public Optional<LimitApplicationResult<TableHandle>> applyLimit(Session session,
result.isLimitGuaranteed()));
}

@Override
public Optional<ConstraintApplicationResult<TableHandle>> applyFilter(Session session, TableHandle table, Constraint<ColumnHandle> constraint)
{
ConnectorMetadata metadata = getMetadata(session, table.getCatalogName());

if (metadata.usesLegacyTableLayouts()) {
return Optional.empty();
}

return metadata.applyFilter(table.getConnectorHandle(), constraint)
.map(result -> new ConstraintApplicationResult<>(
new TableHandle(table.getCatalogName(), result.getHandle(), table.getTransaction(), Optional.empty()),
result.getRemainingFilter()));
}

private ViewDefinition deserializeView(String data)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import io.prestosql.execution.StageId;
import io.prestosql.execution.StageInfo;
import io.prestosql.metadata.FunctionRegistry;
import io.prestosql.metadata.Metadata;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.sql.planner.plan.PlanNodeId;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
Expand All @@ -41,6 +43,7 @@ public static class ExplainAnalyzeOperatorFactory
private final PlanNodeId planNodeId;
private final QueryPerformanceFetcher queryPerformanceFetcher;
private final FunctionRegistry functionRegistry;
private final Metadata metadata;
private final boolean verbose;
private boolean closed;

Expand All @@ -49,12 +52,14 @@ public ExplainAnalyzeOperatorFactory(
PlanNodeId planNodeId,
QueryPerformanceFetcher queryPerformanceFetcher,
FunctionRegistry functionRegistry,
Metadata metadata,
boolean verbose)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.queryPerformanceFetcher = requireNonNull(queryPerformanceFetcher, "queryPerformanceFetcher is null");
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.verbose = verbose;
}

Expand All @@ -63,7 +68,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, ExplainAnalyzeOperator.class.getSimpleName());
return new ExplainAnalyzeOperator(operatorContext, queryPerformanceFetcher, functionRegistry, verbose);
return new ExplainAnalyzeOperator(operatorContext, queryPerformanceFetcher, functionRegistry, metadata, verbose);
}

@Override
Expand All @@ -75,13 +80,14 @@ public void noMoreOperators()
@Override
public OperatorFactory duplicate()
{
return new ExplainAnalyzeOperatorFactory(operatorId, planNodeId, queryPerformanceFetcher, functionRegistry, verbose);
return new ExplainAnalyzeOperatorFactory(operatorId, planNodeId, queryPerformanceFetcher, functionRegistry, metadata, verbose);
}
}

private final OperatorContext operatorContext;
private final QueryPerformanceFetcher queryPerformanceFetcher;
private final FunctionRegistry functionRegistry;
private final Metadata metadata;
private final boolean verbose;
private boolean finishing;
private boolean outputConsumed;
Expand All @@ -90,11 +96,13 @@ public ExplainAnalyzeOperator(
OperatorContext operatorContext,
QueryPerformanceFetcher queryPerformanceFetcher,
FunctionRegistry functionRegistry,
Metadata metadata,
boolean verbose)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.queryPerformanceFetcher = requireNonNull(queryPerformanceFetcher, "queryPerformanceFetcher is null");
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.verbose = verbose;
}

Expand Down Expand Up @@ -145,7 +153,7 @@ public Page getOutput()
return null;
}

String plan = textDistributedPlan(queryInfo.getOutputStage().get().getSubStages().get(0), functionRegistry, operatorContext.getSession(), verbose);
String plan = textDistributedPlan(queryInfo.getOutputStage().get().getSubStages().get(0), functionRegistry, Optional.of(metadata), operatorContext.getSession(), verbose);
BlockBuilder builder = VARCHAR.createBlockBuilder(null, 1);
VARCHAR.writeString(builder, plan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public String getPlan(Session session, Statement statement, Type planType, List<
switch (planType) {
case LOGICAL:
Plan plan = getLogicalPlan(session, statement, parameters, warningCollector);
return PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata.getFunctionRegistry(), plan.getStatsAndCosts(), session, 0, false);
return PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata.getFunctionRegistry(), Optional.of(metadata), plan.getStatsAndCosts(), session, 0, false);
case DISTRIBUTED:
SubPlan subPlan = getDistributedPlan(session, statement, parameters, warningCollector);
return PlanPrinter.textDistributedPlan(subPlan, metadata.getFunctionRegistry(), session, false);
return PlanPrinter.textDistributedPlan(subPlan, metadata.getFunctionRegistry(), Optional.of(metadata), session, false);
case IO:
return IoPlanPrinter.textIoPlan(getLogicalPlan(session, statement, parameters, warningCollector).getRoot(), metadata, session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.prestosql.Session;
import io.prestosql.metadata.Metadata;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.sql.planner.plan.AggregationNode;
import io.prestosql.sql.planner.plan.AssignUniqueId;
import io.prestosql.sql.planner.plan.DistinctLimitNode;
Expand Down Expand Up @@ -79,25 +82,31 @@ public class EffectivePredicateExtractor
};

private final DomainTranslator domainTranslator;
private final Metadata metadata;

public EffectivePredicateExtractor(DomainTranslator domainTranslator)
public EffectivePredicateExtractor(DomainTranslator domainTranslator, Metadata metadata)
{
this.domainTranslator = requireNonNull(domainTranslator, "domainTranslator is null");
this.metadata = requireNonNull(metadata, "metadata is null");
}

public Expression extract(PlanNode node)
public Expression extract(Session session, PlanNode node)
{
return node.accept(new Visitor(domainTranslator), null);
return node.accept(new Visitor(domainTranslator, metadata, session), null);
}

private static class Visitor
extends PlanVisitor<Expression, Void>
{
private final DomainTranslator domainTranslator;
private final Metadata metadata;
private final Session session;

public Visitor(DomainTranslator domainTranslator)
public Visitor(DomainTranslator domainTranslator, Metadata metadata, Session session)
{
this.domainTranslator = requireNonNull(domainTranslator, "domainTranslator is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
}

@Override
Expand Down Expand Up @@ -198,7 +207,9 @@ public Expression visitDistinctLimit(DistinctLimitNode node, Void context)
public Expression visitTableScan(TableScanNode node, Void context)
{
Map<ColumnHandle, Symbol> assignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse();
return domainTranslator.toPredicate(node.getCurrentConstraint().simplify().transform(assignments::get));

TupleDomain<ColumnHandle> predicate = metadata.getTableProperties(session, node.getTable()).getPredicate();
return domainTranslator.toPredicate(predicate.simplify().transform(assignments::get));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ public PhysicalOperation visitExplainAnalyze(ExplainAnalyzeNode node, LocalExecu
node.getId(),
analyzeContext.getQueryPerformanceFetcher(),
metadata.getFunctionRegistry(),
metadata,
node.isVerbose());
return new PhysicalOperation(operatorFactory, makeLayout(node), context, source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
properties.getPartitioningScheme(),
ungroupedExecution(),
statsAndCosts.getForSubplan(root),
Optional.of(jsonFragmentPlan(root, symbols, metadata.getFunctionRegistry(), session)));
Optional.of(jsonFragmentPlan(root, symbols, metadata.getFunctionRegistry(), Optional.of(metadata), session)));

return new SubPlan(fragment, properties.getChildren());
}
Expand Down Expand Up @@ -775,7 +775,6 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
newTable,
node.getOutputSymbols(),
node.getAssignments(),
node.getCurrentConstraint(),
node.getEnforcedConstraint());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ protected Optional<PlanNode> pushDownProjectOff(PlanNodeIdAllocator idAllocator,
tableScanNode.getTable(),
filteredCopy(tableScanNode.getOutputSymbols(), referencedOutputs::contains),
filterKeys(tableScanNode.getAssignments(), referencedOutputs::contains),
tableScanNode.getCurrentConstraint(),
tableScanNode.getEnforcedConstraint()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public Rule.Result apply(LimitNode limit, Captures captures, Rule.Context contex
result.getHandle(),
tableScan.getOutputSymbols(),
tableScan.getAssignments(),
tableScan.getCurrentConstraint(),
tableScan.getEnforcedConstraint());

if (!result.isLimitGuaranteed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import io.prestosql.matching.Captures;
import io.prestosql.matching.Pattern;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayoutResult;
import io.prestosql.operator.scalar.TryFunction;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.sql.planner.DomainTranslator;
Expand Down Expand Up @@ -137,8 +139,7 @@ private boolean arePlansSame(FilterNode filter, TableScanNode tableScan, PlanNod

TableScanNode rewrittenTableScan = (TableScanNode) rewrittenFilter.getSource();

return Objects.equals(tableScan.getCurrentConstraint(), rewrittenTableScan.getCurrentConstraint())
&& Objects.equals(tableScan.getEnforcedConstraint(), rewrittenTableScan.getEnforcedConstraint());
return Objects.equals(tableScan.getEnforcedConstraint(), rewrittenTableScan.getEnforcedConstraint());
}

public static Optional<PlanNode> pushFilterIntoTableScan(
Expand All @@ -152,10 +153,6 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
TypeAnalyzer typeAnalyzer,
DomainTranslator domainTranslator)
{
if (!metadata.usesLegacyTableLayouts(session, node.getTable())) {
return Optional.empty();
}

// don't include non-deterministic predicates
Expression deterministicPredicate = filterDeterministicConjuncts(predicate);

Expand Down Expand Up @@ -192,25 +189,41 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
constraint = new Constraint<>(newDomain);
}

Optional<TableLayoutResult> layout = metadata.getLayout(
session,
node.getTable(),
constraint,
Optional.of(node.getOutputSymbols().stream()
.map(node.getAssignments()::get)
.collect(toImmutableSet())));

if (!layout.isPresent() || layout.get().getTableProperties().getPredicate().isNone()) {
return Optional.of(new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of()));
TableHandle newTable;
TupleDomain<ColumnHandle> remainingFilter;
if (!metadata.usesLegacyTableLayouts(session, node.getTable())) {
Optional<ConstraintApplicationResult<TableHandle>> result = metadata.applyFilter(session, node.getTable(), constraint);

if (!result.isPresent()) {
return Optional.empty();
}

newTable = result.get().getHandle();
remainingFilter = result.get().getRemainingFilter();
}
else {
Optional<TableLayoutResult> layout = metadata.getLayout(
session,
node.getTable(),
constraint,
Optional.of(node.getOutputSymbols().stream()
.map(node.getAssignments()::get)
.collect(toImmutableSet())));

if (!layout.isPresent() || layout.get().getTableProperties().getPredicate().isNone()) {
return Optional.of(new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of()));
}

newTable = layout.get().getNewTableHandle();
remainingFilter = layout.get().getUnenforcedConstraint();
}

TableScanNode tableScan = new TableScanNode(
node.getId(),
layout.get().getNewTableHandle(),
newTable,
node.getOutputSymbols(),
node.getAssignments(),
layout.get().getTableProperties().getPredicate(),
computeEnforced(newDomain, layout.get().getUnenforcedConstraint()));
computeEnforced(newDomain, remainingFilter));

// The order of the arguments to combineConjuncts matters:
// * Unenforced constraints go first because they can only be simple column references,
Expand All @@ -221,7 +234,7 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
// and non-TupleDomain-expressible expressions should be retained. Changing the order can lead
// to failures of previously successful queries.
Expression resultingPredicate = combineConjuncts(
domainTranslator.toPredicate(layout.get().getUnenforcedConstraint().transform(assignments::get)),
domainTranslator.toPredicate(remainingFilter.transform(assignments::get)),
filterNonDeterministicConjuncts(predicate),
decomposedPredicate.getRemainingExpression());

Expand Down