-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52551][SQL] Add a new v2 Predicate BOOLEAN_EXPRESSION #51247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cloud-fan thanks for making the PR.
We can always evaluate the constant-folding-without-context expressions before passing it to V2. WDYT? |
Oh just read the comment. I will leave it to you @gengliangwang and @cloud-fan |
@gengliangwang I don't think #51282 can guarantee that any boolean catalyst expression can be translated to v2 |
if (isPredicate) { | ||
val translated = build() | ||
val translated0 = build() | ||
val conf = SQLConf.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hold the SQLConf
will lead to unable to obtain real-time changes to SQLConf
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is within a single method, I don't think we want to be that dynamic. And in practise this should be run within the same session so it won't change.
val translated0 = build() | ||
val conf = SQLConf.get | ||
val alwaysCreateV2Predicate = conf.getConf(SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE) | ||
val translated = if (alwaysCreateV2Predicate && isPredicate && e.dataType == BooleanType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove isPredicate
here.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
Outdated
Show resolved
Hide resolved
…/V2ExpressionBuilder.scala
@@ -145,5 +151,8 @@ public class Predicate extends GeneralScalarExpression { | |||
|
|||
public Predicate(String name, Expression[] children) { | |||
super(name, children); | |||
if ("BOOLEAN_EXPRESSION".equals(name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use final
to modify "BOOLEAN_EXPRESSION".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all other functions names are hardcoded and we just match the string literal in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a minor comment.
thanks for the review, merging to master! |
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "false") { | ||
intercept[java.lang.AssertionError] { | ||
PushablePredicate.unapply(Literal.create("string")) | ||
test("non-trivial boolean expression") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it fails when ANSI is off:
2025-07-01T01:53:41.0866397Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- non-trivial boolean expression *** FAILED *** (21 milliseconds)�[0m�[0m
2025-07-01T01:53:41.0867934Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m pushable.isDefined was false (PushablePredicateSuite.scala:73)�[0m�[0m
2025-07-01T01:53:41.0869296Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m org.scalatest.exceptions.TestFailedException:�[0m�[0m
2025-07-01T01:53:41.0896382Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)�[0m�[0m
2025-07-01T01:53:41.0927708Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)�[0m�[0m
2025-07-01T01:53:41.0929141Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)�[0m�[0m
2025-07-01T01:53:41.0930649Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)�[0m�[0m
2025-07-01T01:53:41.0932361Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$13(PushablePredicateSuite.scala:73)�[0m�[0m
2025-07-01T01:53:41.0934090Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)�[0m�[0m
2025-07-01T01:53:41.0935881Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)�[0m�[0m
2025-07-01T01:53:41.0938224Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(PushablePredicateSuite.scala:28)�[0m�[0m
2025-07-01T01:53:41.0940417Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)�[0m�[0m
2025-07-01T01:53:41.0941802Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)�[0m�[0m
2025-07-01T01:53:41.0943319Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.withSQLConf(PushablePredicateSuite.scala:28)�[0m�[0m
2025-07-01T01:53:41.0995346Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$12(PushablePredicateSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.0997347Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$12$adapted(PushablePredicateSuite.scala:66)�[0m�[0m
2025-07-01T01:53:41.0998896Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1000441Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$11(PushablePredicateSuite.scala:66)�[0m�[0m
2025-07-01T01:53:41.1002318Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$11$adapted(PushablePredicateSuite.scala:65)�[0m�[0m
2025-07-01T01:53:41.1003808Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1005547Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$10(PushablePredicateSuite.scala:65)�[0m�[0m
2025-07-01T01:53:41.1007134Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)�[0m�[0m
2025-07-01T01:53:41.1008445Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)�[0m�[0m
2025-07-01T01:53:41.1009782Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)�[0m�[0m
2025-07-01T01:53:41.1011135Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
2025-07-01T01:53:41.1012467Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)�[0m�[0m
2025-07-01T01:53:41.1013779Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1015275Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)�[0m�[0m
2025-07-01T01:53:41.1016475Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)�[0m�[0m
2025-07-01T01:53:41.1017584Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)�[0m�[0m
2025-07-01T01:53:41.1018705Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)�[0m�[0m
2025-07-01T01:53:41.1019812Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Transformer.apply(Transformer.scala:22)�[0m�[0m
2025-07-01T01:53:41.1020878Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Transformer.apply(Transformer.scala:20)�[0m�[0m
2025-07-01T01:53:41.1022135Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)�[0m�[0m
2025-07-01T01:53:41.1023509Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)�[0m�[0m
2025-07-01T01:53:41.1025129Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)�[0m�[0m
2025-07-01T01:53:41.1040426Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)�[0m�[0m
2025-07-01T01:53:41.1042058Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)�[0m�[0m
2025-07-01T01:53:41.1043439Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)�[0m�[0m
2025-07-01T01:53:41.1045114Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)�[0m�[0m
2025-07-01T01:53:41.1046784Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1048366Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)�[0m�[0m
2025-07-01T01:53:41.1049792Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)�[0m�[0m
2025-07-01T01:53:41.1051094Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1052533Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)�[0m�[0m
2025-07-01T01:53:41.1053958Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)�[0m�[0m
2025-07-01T01:53:41.1055463Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1056718Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)�[0m�[0m
2025-07-01T01:53:41.1057981Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)�[0m�[0m
2025-07-01T01:53:41.1059186Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)�[0m�[0m
2025-07-01T01:53:41.1060522Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)�[0m�[0m
2025-07-01T01:53:41.1061986Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)�[0m�[0m
2025-07-01T01:53:41.1063360Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)�[0m�[0m
2025-07-01T01:53:41.1064491Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m at org.scalatest.Suite.run(Suite.scala:1114)�[0m�[0m
https://github.com/apache/spark/actions/runs/15987607479/job/45094925023
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, let me force enable ansi for this suite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay ,, it can be only pushed when ANSI is on. Let me just enable aNSI for tihs test case:
case Cast(child, dataType, _, evalMode)
if evalMode == EvalMode.ANSI || Cast.canUpCast(child.dataType, dataType) =>
generateExpression(child).map(v => new V2Cast(v, child.dataType, dataType))
```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
### What changes were proposed in this pull request? This is a followup of #51247 to fix tests in the non ANSI build ### Why are the changes needed? fix test ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #51330 Closes #51329 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This is an extension of #47611 . It's impossible to translate all catalyst expressions returning boolean type into v2
Predicate
, as the return type of a catalyst expression can be dynamic, and for example we can't make v2Cast
to extendPredicate
only when it returns boolean type.This PR adds a new type of v2
Predicate
:BOOLEAN_EXPRESSION
. It's a simple wrapper over any expression that returns boolean type. By doing so, Spark can push down any catalyst expression that returns boolean type as predicates.Why are the changes needed?
To pushdown more v2 predicates.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
updated test cases in
PushablePredicateSuite
Was this patch authored or co-authored using generative AI tooling?
no