Skip to content

[SPARK-52551][SQL] Add a new v2 Predicate BOOLEAN_EXPRESSION #51247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jun 23, 2025

What changes were proposed in this pull request?

This is an extension of #47611 . It's impossible to translate all catalyst expressions returning boolean type into v2 Predicate, as the return type of a catalyst expression can be dynamic, and for example we can't make v2 Cast to extend Predicate only when it returns boolean type.

This PR adds a new type of v2 Predicate: BOOLEAN_EXPRESSION. It's a simple wrapper over any expression that returns boolean type. By doing so, Spark can push down any catalyst expression that returns boolean type as predicates.

Why are the changes needed?

To pushdown more v2 predicates.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

updated test cases in PushablePredicateSuite

Was this patch authored or co-authored using generative AI tooling?

no

@gengliangwang
Copy link
Member

@cloud-fan thanks for making the PR.
FYI @aokolnychyi and I discussed on this offline. We are thinking about adding a new method or trait for the expressions which can be constant folding without context

  • example of constant folding without context:1 > 0, cast(null as boolean)
  • example of constant folding with context: current_date(), current_catalog(), cast('2020-01-01 00:00:00' as timestamp) (requires context of time zone)

We can always evaluate the constant-folding-without-context expressions before passing it to V2. WDYT?

@HyukjinKwon
Copy link
Member

Oh just read the comment. I will leave it to you @gengliangwang and @cloud-fan

@cloud-fan
Copy link
Contributor Author

@gengliangwang I don't think #51282 can guarantee that any boolean catalyst expression can be translated to v2 Predicate properly. Cast and null boolean literal are just examples. What do you think?

if (isPredicate) {
val translated = build()
val translated0 = build()
val conf = SQLConf.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold the SQLConf will lead to unable to obtain real-time changes to SQLConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is within a single method, I don't think we want to be that dynamic. And in practise this should be run within the same session so it won't change.

val translated0 = build()
val conf = SQLConf.get
val alwaysCreateV2Predicate = conf.getConf(SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE)
val translated = if (alwaysCreateV2Predicate && isPredicate && e.dataType == BooleanType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove isPredicate here.

@@ -145,5 +151,8 @@ public class Predicate extends GeneralScalarExpression {

public Predicate(String name, Expression[] children) {
super(name, children);
if ("BOOLEAN_EXPRESSION".equals(name)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use final to modify "BOOLEAN_EXPRESSION".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all other functions names are hardcoded and we just match the string literal in the code.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a minor comment.

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in 6f54429 Jun 30, 2025
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "false") {
intercept[java.lang.AssertionError] {
PushablePredicate.unapply(Literal.create("string"))
test("non-trivial boolean expression") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it fails when ANSI is off:

2025-07-01T01:53:41.0866397Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- non-trivial boolean expression *** FAILED *** (21 milliseconds)�[0m�[0m
2025-07-01T01:53:41.0867934Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  pushable.isDefined was false (PushablePredicateSuite.scala:73)�[0m�[0m
2025-07-01T01:53:41.0869296Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.scalatest.exceptions.TestFailedException:�[0m�[0m
2025-07-01T01:53:41.0896382Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)�[0m�[0m
2025-07-01T01:53:41.0927708Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)�[0m�[0m
2025-07-01T01:53:41.0929141Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)�[0m�[0m
2025-07-01T01:53:41.0930649Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)�[0m�[0m
2025-07-01T01:53:41.0932361Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$13(PushablePredicateSuite.scala:73)�[0m�[0m
2025-07-01T01:53:41.0934090Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)�[0m�[0m
2025-07-01T01:53:41.0935881Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)�[0m�[0m
2025-07-01T01:53:41.0938224Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(PushablePredicateSuite.scala:28)�[0m�[0m
2025-07-01T01:53:41.0940417Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)�[0m�[0m
2025-07-01T01:53:41.0941802Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)�[0m�[0m
2025-07-01T01:53:41.0943319Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.withSQLConf(PushablePredicateSuite.scala:28)�[0m�[0m
2025-07-01T01:53:41.0995346Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$12(PushablePredicateSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.0997347Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$12$adapted(PushablePredicateSuite.scala:66)�[0m�[0m
2025-07-01T01:53:41.0998896Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1000441Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$11(PushablePredicateSuite.scala:66)�[0m�[0m
2025-07-01T01:53:41.1002318Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$11$adapted(PushablePredicateSuite.scala:65)�[0m�[0m
2025-07-01T01:53:41.1003808Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1005547Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$10(PushablePredicateSuite.scala:65)�[0m�[0m
2025-07-01T01:53:41.1007134Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)�[0m�[0m
2025-07-01T01:53:41.1008445Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)�[0m�[0m
2025-07-01T01:53:41.1009782Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)�[0m�[0m
2025-07-01T01:53:41.1011135Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
2025-07-01T01:53:41.1012467Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)�[0m�[0m
2025-07-01T01:53:41.1013779Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1015275Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)�[0m�[0m
2025-07-01T01:53:41.1016475Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)�[0m�[0m
2025-07-01T01:53:41.1017584Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)�[0m�[0m
2025-07-01T01:53:41.1018705Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)�[0m�[0m
2025-07-01T01:53:41.1019812Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Transformer.apply(Transformer.scala:22)�[0m�[0m
2025-07-01T01:53:41.1020878Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Transformer.apply(Transformer.scala:20)�[0m�[0m
2025-07-01T01:53:41.1022135Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)�[0m�[0m
2025-07-01T01:53:41.1023509Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)�[0m�[0m
2025-07-01T01:53:41.1025129Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)�[0m�[0m
2025-07-01T01:53:41.1040426Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)�[0m�[0m
2025-07-01T01:53:41.1042058Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)�[0m�[0m
2025-07-01T01:53:41.1043439Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)�[0m�[0m
2025-07-01T01:53:41.1045114Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)�[0m�[0m
2025-07-01T01:53:41.1046784Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1048366Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)�[0m�[0m
2025-07-01T01:53:41.1049792Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)�[0m�[0m
2025-07-01T01:53:41.1051094Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1052533Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)�[0m�[0m
2025-07-01T01:53:41.1053958Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)�[0m�[0m
2025-07-01T01:53:41.1055463Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1056718Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)�[0m�[0m
2025-07-01T01:53:41.1057981Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)�[0m�[0m
2025-07-01T01:53:41.1059186Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)�[0m�[0m
2025-07-01T01:53:41.1060522Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)�[0m�[0m
2025-07-01T01:53:41.1061986Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)�[0m�[0m
2025-07-01T01:53:41.1063360Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)�[0m�[0m
2025-07-01T01:53:41.1064491Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Suite.run(Suite.scala:1114)�[0m�[0m

https://github.com/apache/spark/actions/runs/15987607479/job/45094925023

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, let me force enable ansi for this suite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay ,, it can be only pushed when ANSI is on. Let me just enable aNSI for tihs test case:

    case Cast(child, dataType, _, evalMode)
        if evalMode == EvalMode.ANSI || Cast.canUpCast(child.dataType, dataType) =>
      generateExpression(child).map(v => new V2Cast(v, child.dataType, dataType))
      ```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyukjinKwon pushed a commit that referenced this pull request Jul 1, 2025
### What changes were proposed in this pull request?

This is a followup of #51247 to fix tests in the non ANSI build

### Why are the changes needed?

fix test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #51330

Closes #51329 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants