Skip to content

Commit

Permalink
[SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or …
Browse files Browse the repository at this point in the history
…LIMIT (Kyligence#505)

* [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF

### What changes were proposed in this pull request?
Currently, Spark DS V2 push-down framework supports push down SQL to data sources.
But the DS V2 push-down framework only support push down the built-in functions to data sources.
Each database have a lot very useful functions which not supported by Spark.
If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases.

### Why are the changes needed?

1. Spark can leverage the functions supported by databases
2. Improve the query performance.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes apache#36593 from beliefer/SPARK-39139.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful

### What changes were proposed in this pull request?
apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI).
But he `Rand` in test case looks no meaningful.

### Why are the changes needed?
Let `Rand` in filter is more meaningful.

### Does this PR introduce _any_ user-facing change?
'No'.
Just update test case.

### How was this patch tested?
Just update test case.

Closes apache#37033 from beliefer/SPARK-39453_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT`

### What changes were proposed in this pull request?
apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect.
Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT.
So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT.

### Why are the changes needed?
Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Bug will be fix.

### How was this patch tested?
New test cases.

Closes apache#37090 from beliefer/SPARK-37527_followup2.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39627][SQL] DS V2 pushdown should unify the compile API

### What changes were proposed in this pull request?
Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them.

### Why are the changes needed?
Improve ease of use.

### Does this PR introduce _any_ user-facing change?
'No'.
The two API `compileAggregate` call `compileExpression` not changed.

### How was this patch tested?
N/A

Closes apache#37047 from beliefer/SPARK-39627.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect

### What changes were proposed in this pull request?
Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions.
Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect.

### Why are the changes needed?
Make build-in JDBC dialect support compile linear regression aggregate push-down.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New test cases.

Closes apache#37188 from beliefer/SPARK-39384.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Sean Owen <srowen@gmail.com>

* [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT

### What changes were proposed in this pull request?

This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg.

The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary.

### Why are the changes needed?

support pushing down LIMIT/OFFSET after agg.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated tests

Closes apache#37195 from cloud-fan/agg.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
  • Loading branch information
4 people authored and yhcast0 committed Aug 8, 2022
1 parent 7c2fd74 commit 4c5d98a
Show file tree
Hide file tree
Showing 24 changed files with 1,029 additions and 534 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
testStddevSamp(true)
testCovarPop()
testCovarSamp()
testRegrIntercept()
testRegrSlope()
testRegrR2()
testRegrSXY()
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
testCovarPop()
testCovarSamp()
testCorr()
testRegrIntercept()
testRegrSlope()
testRegrR2()
testRegrSXY()
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
testCovarSamp(true)
testCorr()
testCorr(true)
testRegrIntercept()
testRegrIntercept(true)
testRegrSlope()
testRegrSlope(true)
testRegrR2()
testRegrR2(true)
testRegrSXY()
testRegrSXY(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,25 +386,27 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

protected def caseConvert(tableName: String): String = tableName

private def withOrWithout(isDistinct: Boolean): String = if (isDistinct) "with" else "without"

protected def testVarPop(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: VAR_POP with distinct: $isDistinct") {
test(s"scan with aggregate push-down: VAR_POP ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(s"SELECT VAR_POP(${distinct}bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
checkFilterPushed(df)
checkAggregateRemoved(df)
checkAggregatePushed(df, "VAR_POP")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 10000d)
assert(row(1).getDouble(0) === 2500d)
assert(row(2).getDouble(0) === 0d)
assert(row(0).getDouble(0) === 10000.0)
assert(row(1).getDouble(0) === 2500.0)
assert(row(2).getDouble(0) === 0.0)
}
}

protected def testVarSamp(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: VAR_SAMP with distinct: $isDistinct") {
test(s"scan with aggregate push-down: VAR_SAMP ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT VAR_SAMP(${distinct}bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
Expand All @@ -413,15 +415,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
checkAggregatePushed(df, "VAR_SAMP")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 20000d)
assert(row(1).getDouble(0) === 5000d)
assert(row(0).getDouble(0) === 20000.0)
assert(row(1).getDouble(0) === 5000.0)
assert(row(2).isNullAt(0))
}
}

protected def testStddevPop(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: STDDEV_POP with distinct: $isDistinct") {
test(s"scan with aggregate push-down: STDDEV_POP ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT STDDEV_POP(${distinct}bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
Expand All @@ -430,15 +432,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
checkAggregatePushed(df, "STDDEV_POP")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 100d)
assert(row(1).getDouble(0) === 50d)
assert(row(2).getDouble(0) === 0d)
assert(row(0).getDouble(0) === 100.0)
assert(row(1).getDouble(0) === 50.0)
assert(row(2).getDouble(0) === 0.0)
}
}

protected def testStddevSamp(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: STDDEV_SAMP with distinct: $isDistinct") {
test(s"scan with aggregate push-down: STDDEV_SAMP ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT STDDEV_SAMP(${distinct}bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
Expand All @@ -447,15 +449,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
checkAggregatePushed(df, "STDDEV_SAMP")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 141.4213562373095d)
assert(row(1).getDouble(0) === 70.71067811865476d)
assert(row(0).getDouble(0) === 141.4213562373095)
assert(row(1).getDouble(0) === 70.71067811865476)
assert(row(2).isNullAt(0))
}
}

protected def testCovarPop(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: COVAR_POP with distinct: $isDistinct") {
test(s"scan with aggregate push-down: COVAR_POP ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT COVAR_POP(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
Expand All @@ -464,15 +466,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
checkAggregatePushed(df, "COVAR_POP")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 10000d)
assert(row(1).getDouble(0) === 2500d)
assert(row(2).getDouble(0) === 0d)
assert(row(0).getDouble(0) === 10000.0)
assert(row(1).getDouble(0) === 2500.0)
assert(row(2).getDouble(0) === 0.0)
}
}

protected def testCovarSamp(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: COVAR_SAMP with distinct: $isDistinct") {
test(s"scan with aggregate push-down: COVAR_SAMP ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT COVAR_SAMP(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
Expand All @@ -481,15 +483,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
checkAggregatePushed(df, "COVAR_SAMP")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 20000d)
assert(row(1).getDouble(0) === 5000d)
assert(row(0).getDouble(0) === 20000.0)
assert(row(1).getDouble(0) === 5000.0)
assert(row(2).isNullAt(0))
}
}

protected def testCorr(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: CORR with distinct: $isDistinct") {
test(s"scan with aggregate push-down: CORR ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT CORR(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
Expand All @@ -498,9 +500,77 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
checkAggregatePushed(df, "CORR")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 1d)
assert(row(1).getDouble(0) === 1d)
assert(row(0).getDouble(0) === 1.0)
assert(row(1).getDouble(0) === 1.0)
assert(row(2).isNullAt(0))
}
}

protected def testRegrIntercept(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: REGR_INTERCEPT ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT REGR_INTERCEPT(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
checkFilterPushed(df)
checkAggregateRemoved(df)
checkAggregatePushed(df, "REGR_INTERCEPT")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 0.0)
assert(row(1).getDouble(0) === 0.0)
assert(row(2).isNullAt(0))
}
}

protected def testRegrSlope(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: REGR_SLOPE ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT REGR_SLOPE(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
checkFilterPushed(df)
checkAggregateRemoved(df)
checkAggregatePushed(df, "REGR_SLOPE")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 1.0)
assert(row(1).getDouble(0) === 1.0)
assert(row(2).isNullAt(0))
}
}

protected def testRegrR2(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: REGR_R2 ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT REGR_R2(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
checkFilterPushed(df)
checkAggregateRemoved(df)
checkAggregatePushed(df, "REGR_R2")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 1.0)
assert(row(1).getDouble(0) === 1.0)
assert(row(2).isNullAt(0))
}
}

protected def testRegrSXY(isDistinct: Boolean = false): Unit = {
val distinct = if (isDistinct) "DISTINCT " else ""
test(s"scan with aggregate push-down: REGR_SXY ${withOrWithout(isDistinct)} DISTINCT") {
val df = sql(
s"SELECT REGR_SXY(${distinct}bonus, bonus) FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 GROUP BY dept ORDER BY dept")
checkFilterPushed(df)
checkAggregateRemoved(df)
checkAggregatePushed(df, "REGR_SXY")
val row = df.collect()
assert(row.length === 3)
assert(row(0).getDouble(0) === 20000.0)
assert(row(1).getDouble(0) === 5000.0)
assert(row(2).getDouble(0) === 0.0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.util.V2ExpressionSQLBuilder;
import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;

/**
* The general representation of SQL scalar expressions, which contains the upper-cased
Expand Down Expand Up @@ -398,12 +398,7 @@ public int hashCode() {

@Override
public String toString() {
V2ExpressionSQLBuilder builder = new V2ExpressionSQLBuilder();
try {
return builder.build(this);
} catch (Throwable e) {
return name + "(" +
Arrays.stream(children).map(child -> child.toString()).reduce((a,b) -> a + "," + b) + ")";
}
ToStringSQLBuilder builder = new ToStringSQLBuilder();
return builder.build(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.expressions;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;

/**
* The general representation of user defined scalar function, which contains the upper-cased
* function name, canonical function name and all the children expressions.
*
* @since 3.4.0
*/
@Evolving
public class UserDefinedScalarFunc implements Expression, Serializable {
private String name;
private String canonicalName;
private Expression[] children;

public UserDefinedScalarFunc(String name, String canonicalName, Expression[] children) {
this.name = name;
this.canonicalName = canonicalName;
this.children = children;
}

public String name() { return name; }
public String canonicalName() { return canonicalName; }

@Override
public Expression[] children() { return children; }

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserDefinedScalarFunc that = (UserDefinedScalarFunc) o;
return Objects.equals(name, that.name) && Objects.equals(canonicalName, that.canonicalName) &&
Arrays.equals(children, that.children);
}

@Override
public int hashCode() {
return Objects.hash(name, canonicalName, children);
}

@Override
public String toString() {
ToStringSQLBuilder builder = new ToStringSQLBuilder();
return builder.build(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.sql.connector.expressions.aggregate;

import java.util.Arrays;
import java.util.stream.Collectors;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;

/**
* The general implementation of {@link AggregateFunc}, which contains the upper-cased function
Expand All @@ -47,27 +45,21 @@ public final class GeneralAggregateFunc implements AggregateFunc {
private final boolean isDistinct;
private final Expression[] children;

public String name() { return name; }
public boolean isDistinct() { return isDistinct; }

public GeneralAggregateFunc(String name, boolean isDistinct, Expression[] children) {
this.name = name;
this.isDistinct = isDistinct;
this.children = children;
}

public String name() { return name; }
public boolean isDistinct() { return isDistinct; }

@Override
public Expression[] children() { return children; }

@Override
public String toString() {
String inputsString = Arrays.stream(children)
.map(Expression::describe)
.collect(Collectors.joining(", "));
if (isDistinct) {
return name + "(DISTINCT " + inputsString + ")";
} else {
return name + "(" + inputsString + ")";
}
ToStringSQLBuilder builder = new ToStringSQLBuilder();
return builder.build(this);
}
}
Loading

0 comments on commit 4c5d98a

Please sign in to comment.