Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Project operator pushdown (#933)
Browse files Browse the repository at this point in the history
* init

* update

* update

* update doc
  • Loading branch information
penghuo committed Dec 15, 2020
1 parent c377026 commit c700249
Show file tree
Hide file tree
Showing 17 changed files with 476 additions and 34 deletions.
Expand Up @@ -62,7 +62,7 @@ public T visitLiteral(LiteralExpression node, C context) {
}

public T visitNamed(NamedExpression node, C context) {
return visitNode(node, context);
return node.getDelegated().accept(this, context);
}

public T visitReference(ReferenceExpression node, C context) {
Expand Down
35 changes: 30 additions & 5 deletions docs/user/optimization/optimization.rst
Expand Up @@ -44,7 +44,7 @@ The consecutive Filter operator will be merged as one Filter operator::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
},
"children": []
}
Expand All @@ -71,7 +71,7 @@ The Filter operator should be push down under Sort operator::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
},
"children": []
}
Expand All @@ -85,6 +85,31 @@ Elasticsearch Specific Optimization

The Elasticsearch `Query DSL <https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_ and `Aggregation <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html>`_ also enabling the storage engine specific optimization.

Push Project Into Query DSL
---------------------------
The Project list will push down to Query DSL to `filter the source <https://www.elastic.co/guide/en/elasticsearch/reference/7.x/search-fields.html#source-filtering>`_::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X POST localhost:9200/_opendistro/_sql/_explain \
... -d '{"query" : "SELECT age FROM accounts"}'
{
"root": {
"name": "ProjectOperator",
"description": {
"fields": "[age]"
},
"children": [
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
},
"children": []
}
]
}
}

Filter Merge Into Query DSL
---------------------------

Expand All @@ -103,7 +128,7 @@ The Filter operator will merge into Elasticsearch Query DSL::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
},
"children": []
}
Expand All @@ -129,7 +154,7 @@ The Sort operator will merge into Elasticsearch Query DSL::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
},
"children": []
}
Expand Down Expand Up @@ -191,7 +216,7 @@ The Limit operator will merge in Elasticsearch Query DSL::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\"}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
},
"children": []
}
Expand Down
Expand Up @@ -20,10 +20,12 @@
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanNodeVisitor;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -54,7 +56,7 @@ public class ElasticsearchLogicalIndexScan extends LogicalPlan {
* Projection List.
*/
@Setter
private List<NamedExpression> projectList;
private Set<ReferenceExpression> projectList;

/**
* Sort List.
Expand All @@ -75,7 +77,7 @@ public class ElasticsearchLogicalIndexScan extends LogicalPlan {
public ElasticsearchLogicalIndexScan(
String relationName,
Expression filter,
List<NamedExpression> projectList,
Set<ReferenceExpression> projectList,
List<Pair<Sort.SortOption, Expression>> sortList,
Integer limit, Integer offset) {
super(ImmutableList.of());
Expand All @@ -95,4 +97,13 @@ public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
public boolean hasLimit() {
return limit != null;
}

/**
* Test has projects or not.
*
* @return true for has projects, otherwise false.
*/
public boolean hasProjects() {
return projectList != null && !projectList.isEmpty();
}
}
Expand Up @@ -25,6 +25,8 @@
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndIndexAgg;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndIndexScan;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndRelation;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.PushProjectAndIndexScan;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.PushProjectAndRelation;
import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.LogicalPlanOptimizer;
import java.util.Arrays;
import lombok.experimental.UtilityClass;
Expand All @@ -48,7 +50,9 @@ public static LogicalPlanOptimizer create() {
new MergeSortAndIndexAgg(),
new MergeSortAndIndexScan(),
new MergeLimitAndRelation(),
new
MergeLimitAndIndexScan()));
new MergeLimitAndIndexScan(),
new PushProjectAndRelation(),
new PushProjectAndIndexScan()
));
}
}
Expand Up @@ -18,8 +18,14 @@
package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule;

import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort;
import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalSort;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.experimental.UtilityClass;

@UtilityClass
Expand Down Expand Up @@ -50,4 +56,37 @@ public static boolean sortByDefaultOptionOnly(LogicalSort logicalSort) {
|| Sort.SortOption.DEFAULT_DESC.equals(sort.getLeft()))
.reduce(true, Boolean::logicalAnd);
}

/**
* Find reference expression from expression.
* @param expressions a list of expression.
*
* @return a list of ReferenceExpression
*/
public static Set<ReferenceExpression> findReferenceExpressions(
List<NamedExpression> expressions) {
Set<ReferenceExpression> projectList = new HashSet<>();
for (NamedExpression namedExpression : expressions) {
projectList.addAll(findReferenceExpression(namedExpression));
}
return projectList;
}

/**
* Find reference expression from expression.
* @param expression expression.
*
* @return a list of ReferenceExpression
*/
public static List<ReferenceExpression> findReferenceExpression(
NamedExpression expression) {
List<ReferenceExpression> results = new ArrayList<>();
expression.accept(new ExpressionNodeVisitor<Object, Object>() {
@Override
public Object visitReference(ReferenceExpression node, Object context) {
return results.add(node);
}
}, null);
return results;
}
}
@@ -0,0 +1,73 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule;

import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.OptimizationRuleUtils.findReferenceExpressions;
import static com.amazon.opendistroforelasticsearch.sql.planner.optimizer.pattern.Patterns.source;
import static com.facebook.presto.matching.Pattern.typeOf;

import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject;
import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.Rule;
import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.Set;

/**
* Push Project list into ElasticsearchLogicalIndexScan.
*/
public class PushProjectAndIndexScan implements Rule<LogicalProject> {

private final Capture<ElasticsearchLogicalIndexScan> indexScanCapture;

private final Pattern<LogicalProject> pattern;

private Set<ReferenceExpression> pushDownProjects;

/**
* Constructor of MergeProjectAndIndexScan.
*/
public PushProjectAndIndexScan() {
this.indexScanCapture = Capture.newCapture();
this.pattern = typeOf(LogicalProject.class).matching(
project -> {
pushDownProjects = findReferenceExpressions(project.getProjectList());
return !pushDownProjects.isEmpty();
}).with(source()
.matching(typeOf(ElasticsearchLogicalIndexScan.class)
.matching(indexScan -> !indexScan.hasProjects())
.capturedAs(indexScanCapture)));

}

@Override
public Pattern<LogicalProject> pattern() {
return pattern;
}

@Override
public LogicalPlan apply(LogicalProject project,
Captures captures) {
ElasticsearchLogicalIndexScan indexScan = captures.get(indexScanCapture);
indexScan.setProjectList(pushDownProjects);
return new LogicalProject(indexScan, project.getProjectList());
}
}
@@ -0,0 +1,77 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule;

import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.OptimizationRuleUtils.findReferenceExpressions;
import static com.amazon.opendistroforelasticsearch.sql.planner.optimizer.pattern.Patterns.source;
import static com.facebook.presto.matching.Pattern.typeOf;

import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalRelation;
import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.Rule;
import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.Set;

/**
* Push Project list into Relation. The transformed plan is Project - IndexScan
*/
public class PushProjectAndRelation implements Rule<LogicalProject> {

private final Capture<LogicalRelation> relationCapture;

private final Pattern<LogicalProject> pattern;

private Set<ReferenceExpression> pushDownProjects;

/**
* Constructor of MergeProjectAndRelation.
*/
public PushProjectAndRelation() {
this.relationCapture = Capture.newCapture();
this.pattern = typeOf(LogicalProject.class)
.matching(project -> {
pushDownProjects = findReferenceExpressions(project.getProjectList());
return !pushDownProjects.isEmpty();
})
.with(source().matching(typeOf(LogicalRelation.class).capturedAs(relationCapture)));
}

@Override
public Pattern<LogicalProject> pattern() {
return pattern;
}

@Override
public LogicalPlan apply(LogicalProject project,
Captures captures) {
LogicalRelation relation = captures.get(relationCapture);
return new LogicalProject(
ElasticsearchLogicalIndexScan
.builder()
.relationName(relation.getRelationName())
.projectList(findReferenceExpressions(project.getProjectList()))
.build(),
project.getProjectList()
);
}
}
Expand Up @@ -124,6 +124,10 @@ public PhysicalPlan visitIndexScan(ElasticsearchLogicalIndexScan node,
if (node.getLimit() != null) {
context.pushDownLimit(node.getLimit(), node.getOffset());
}

if (node.hasProjects()) {
context.pushDownProjects(node.getProjectList());
}
return indexScan;
}

Expand Down

0 comments on commit c700249

Please sign in to comment.