Skip to content

Commit

Permalink
DRILL-6910: Allow applying DrillPushProjectIntoScanRule at physical p…
Browse files Browse the repository at this point in the history
…hase

closes apache#1619
  • Loading branch information
vvysotskyi authored and lushuifeng committed Jun 21, 2019
1 parent 71f90b0 commit 977ef3f
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ static RuleSet getPhysicalPruneScanRules(OptimizerRulesContext optimizerRulesCon
// estimation of filter operator, after filter is pushed down to scan.

ParquetPushDownFilter.getFilterOnProject(optimizerRulesContext),
ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext)
ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext),
DrillPushProjectIntoScanRule.DRILL_PHYSICAL_INSTANCE
)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.common.DrillProjectRelBase;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.util.Utilities;

import java.io.IOException;
Expand All @@ -43,51 +49,76 @@ public class DrillPushProjectIntoScanRule extends RelOptRule {
public static final RelOptRule INSTANCE =
new DrillPushProjectIntoScanRule(LogicalProject.class,
EnumerableTableScan.class,
"DrillPushProjIntoEnumerableScan");
"DrillPushProjIntoEnumerableScan") {

@Override
protected boolean skipScanConversion(RelDataType projectRelDataType, TableScan scan) {
// do not allow skipping conversion of EnumerableTableScan to DrillScanRel if rule is applicable
return false;
}
};

public static final RelOptRule DRILL_LOGICAL_INSTANCE =
new DrillPushProjectIntoScanRule(LogicalProject.class,
DrillScanRel.class,
"DrillPushProjIntoDrillRelScan");

public static final RelOptRule DRILL_PHYSICAL_INSTANCE =
new DrillPushProjectIntoScanRule(ProjectPrel.class,
ScanPrel.class,
"DrillPushProjIntoScanPrel") {

@Override
protected ScanPrel createScan(TableScan scan, ProjectPushInfo projectPushInfo) {
ScanPrel drillScan = (ScanPrel) scan;

return new ScanPrel(drillScan.getCluster(),
drillScan.getTraitSet().plus(Prel.DRILL_PHYSICAL),
drillScan.getGroupScan().clone(projectPushInfo.getFields()),
projectPushInfo.createNewRowType(drillScan.getCluster().getTypeFactory()),
drillScan.getTable());
}

@Override
protected ProjectPrel createProject(Project project, TableScan newScan, List<RexNode> newProjects) {
return new ProjectPrel(project.getCluster(),
project.getTraitSet().plus(Prel.DRILL_PHYSICAL),
newScan,
newProjects,
project.getRowType());
}
};

private DrillPushProjectIntoScanRule(Class<? extends Project> projectClass, Class<? extends TableScan> scanClass, String description) {
super(RelOptHelper.some(projectClass, RelOptHelper.any(scanClass)), description);
}

@Override
public void onMatch(RelOptRuleCall call) {
final Project project = call.rel(0);
final TableScan scan = call.rel(1);
Project project = call.rel(0);
TableScan scan = call.rel(1);

try {

if (scan.getRowType().getFieldList().isEmpty()) {
return;
}

ProjectPushInfo projectPushInfo = DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects());
if (!canPushProjectIntoScan(scan.getTable(), projectPushInfo)) {
if (!canPushProjectIntoScan(scan.getTable(), projectPushInfo)
|| skipScanConversion(projectPushInfo.createNewRowType(project.getCluster().getTypeFactory()), scan)) {
// project above scan may be removed in ProjectRemoveRule for the case when it is trivial
return;
}

final DrillScanRel newScan =
new DrillScanRel(scan.getCluster(),
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
scan.getTable(),
projectPushInfo.createNewRowType(project.getInput().getCluster().getTypeFactory()),
projectPushInfo.getFields());
DrillScanRelBase newScan = createScan(scan, projectPushInfo);

List<RexNode> newProjects = new ArrayList<>();
for (RexNode n : project.getChildExps()) {
newProjects.add(n.accept(projectPushInfo.getInputReWriter()));
}

final DrillProjectRel newProject =
new DrillProjectRel(project.getCluster(),
project.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
newScan,
newProjects,
project.getRowType());
DrillProjectRelBase newProject =
createProject(project, newScan, newProjects);

if (ProjectRemoveRule.isTrivial(newProject)) {
call.transformTo(newScan);
Expand All @@ -99,6 +130,52 @@ public void onMatch(RelOptRuleCall call) {
}
}

/**
* Checks whether conversion of input {@code TableScan} instance to target
* {@code TableScan} may be omitted.
*
* @param projectRelDataType project rel data type
* @param scan TableScan to be checked
* @return true if specified {@code TableScan} has the same row type as specified.
*/
protected boolean skipScanConversion(RelDataType projectRelDataType, TableScan scan) {
return projectRelDataType.equals(scan.getRowType());
}

/**
* Creates new {@code DrillProjectRelBase} instance with specified {@code TableScan newScan} child
* and {@code List<RexNode> newProjects} expressions using specified {@code Project project} as prototype.
*
* @param project the prototype of resulting project
* @param newScan new project child
* @param newProjects new project expressions
* @return new project instance
*/
protected DrillProjectRelBase createProject(Project project, TableScan newScan, List<RexNode> newProjects) {
return new DrillProjectRel(project.getCluster(),
project.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
newScan,
newProjects,
project.getRowType());
}

/**
* Creates new {@code DrillScanRelBase} instance with row type and fields list
* obtained from specified {@code ProjectPushInfo projectPushInfo}
* using specified {@code TableScan scan} as prototype.
*
* @param scan the prototype of resulting scan
* @param projectPushInfo the source of row type and fields list
* @return new scan instance
*/
protected DrillScanRelBase createScan(TableScan scan, ProjectPushInfo projectPushInfo) {
return new DrillScanRel(scan.getCluster(),
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
scan.getTable(),
projectPushInfo.createNewRowType(scan.getCluster().getTypeFactory()),
projectPushInfo.getFields());
}

/**
* Push project into scan be done only if this is not a star query and
* table supports project push down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public static void setupTestFiles() {
}

@Test
@Ignore
public void testGroupBy() throws Exception {
String expectedColNames = " \"columns\" : [ \"`marital_status`\" ]";
testPhysicalPlan(
Expand All @@ -48,7 +47,6 @@ public void testGroupBy() throws Exception {
}

@Test
@Ignore
public void testOrderBy() throws Exception {
String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
testPhysicalPlan("select employee_id , full_name, first_name , last_name "
Expand All @@ -57,7 +55,6 @@ public void testOrderBy() throws Exception {
}

@Test
@Ignore
public void testExprInSelect() throws Exception {
String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
testPhysicalPlan(
Expand All @@ -67,7 +64,6 @@ public void testExprInSelect() throws Exception {
}

@Test
@Ignore
public void testExprInWhere() throws Exception {
String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
testPhysicalPlan(
Expand Down Expand Up @@ -291,12 +287,26 @@ public void testProjectPushdownPastJoinWithJoinPushExpressions() throws Exceptio
final String query = "SELECT L.L_QUANTITY FROM cp.`tpch/lineitem.parquet` L, cp.`tpch/orders.parquet` O" +
" WHERE cast(L.L_ORDERKEY as int) = cast(O.O_ORDERKEY as int)";
final String[] expectedPatterns = {
".*HashJoin.*", "Project.*\\(L_QUANTITY.*CAST\\(\\$0\\)\\:INTEGER.*", "Project.*CAST\\(\\$0\\)\\:INTEGER.*"};
".*HashJoin.*", "Project.*\\(L_QUANTITY\\=\\[\\$0\\].*CAST\\(\\$1\\)\\:INTEGER.*", "Project.*CAST\\(\\$0\\)\\:INTEGER.*"};
// L_ORDERKEY, O_ORDERKEY should not be present in the projects below the join
final String[] excludedPatterns = {".*Project\\(L_ORDERKEY=.*", ".*Project\\(O_ORDERKEY=.*"};
PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns);
}

@Test
public void testProjectPushdownAfterFilterRemoving() throws Exception {
test("create table dfs.tmp.`nation` as\n" +
"select * from cp.`tpch/nation.parquet` where n_regionkey < 10");
try {
// filter will be removed form the plan
String query = "select n_nationkey from dfs.tmp.`nation` where n_regionkey < 10";
PlanTestBase.testPlanMatchingPatterns(query,
new String[]{"columns\\=\\[`n_nationkey`\\]"}, new String[]{"n_regionkey"});
} finally {
test("drop table if exists dfs.tmp.`nation`");
}
}

protected void testPushDown(PushDownTestInstance test) throws Exception {
testPhysicalPlan(test.getSql(), test.getExpected());
}
Expand Down

0 comments on commit 977ef3f

Please sign in to comment.