Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6761][ML][SQL] Approximate quantiles - take 2 #4

Merged
merged 12 commits into from
Feb 21, 2016

Conversation

thunterdb
Copy link

Against the good branch this time.

Hi @viirya , I took your original PR and I made some changes to improve correctness and performance. I suggest we merge these changes into your public PR (apache#6042), and have someone from SQL or MLlib review the changes. What do you think?

Test coverage: should be close to 100% now (with 48 tests that cover most of the scenarios)

Performance: the cost of running the algorithm is now negligible when running benchmarks, most of the time is spent on deserializing RDDs. This was verified by running synthetic benchmarks with the sampler: the time spent in the quantile code itself is now < 5% of the total runtime as reported by VisualVM. However, the cost of using RDDs in DataFrame.describe is prohibitively high, so I suggest just leaving the quantiles in DataFrame.stats for now.

Note that this code only works by appending to buffers or by writing full buffers in an amortized manner, so it should be very easy to port to UDAFs later if someone wants to do it.

viirya added a commit that referenced this pull request Feb 21, 2016
[SPARK-6761][ML][SQL] Approximate quantiles - take 2
@viirya viirya merged commit 9314f1e into viirya:approximate_quantile Feb 21, 2016
viirya pushed a commit that referenced this pull request Apr 6, 2016
…l` in IF/CASEWHEN

## What changes were proposed in this pull request?

Currently, `SimplifyConditionals` handles `true` and `false` to optimize branches. This PR improves `SimplifyConditionals` to take advantage of `null` conditions for `if` and `CaseWhen` expressions, too.

**Before**
```
scala> sql("SELECT IF(null, 1, 0)").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [if (null) 1 else 0 AS (IF(CAST(NULL AS BOOLEAN), 1, 0))#4]
:     +- INPUT
+- Scan OneRowRelation[]
scala> sql("select case when cast(null as boolean) then 1 else 2 end").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [CASE WHEN null THEN 1 ELSE 2 END AS CASE WHEN CAST(NULL AS BOOLEAN) THEN 1 ELSE 2 END#14]
:     +- INPUT
+- Scan OneRowRelation[]
```

**After**
```
scala> sql("SELECT IF(null, 1, 0)").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [0 AS (IF(CAST(NULL AS BOOLEAN), 1, 0))#4]
:     +- INPUT
+- Scan OneRowRelation[]
scala> sql("select case when cast(null as boolean) then 1 else 2 end").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [2 AS CASE WHEN CAST(NULL AS BOOLEAN) THEN 1 ELSE 2 END#4]
:     +- INPUT
+- Scan OneRowRelation[]
```

**Hive**
```
hive> select if(null,1,2);
OK
2
hive> select case when cast(null as boolean) then 1 else 2 end;
OK
2
```

## How was this patch tested?

Pass the Jenkins tests (including new extended test cases).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#12122 from dongjoon-hyun/SPARK-14338.
viirya pushed a commit that referenced this pull request Jul 20, 2017
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes apache#18583 from aokolnychyi/spark-21332.
viirya pushed a commit that referenced this pull request Aug 15, 2017
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
viirya pushed a commit that referenced this pull request Nov 13, 2017
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
viirya pushed a commit that referenced this pull request Oct 13, 2019
…ver)QueryTestSuite

### What changes were proposed in this pull request?
This PR adds 2 changes regarding exception handling in `SQLQueryTestSuite` and `ThriftServerQueryTestSuite`
- fixes an expected output sorting issue in `ThriftServerQueryTestSuite` as if there is an exception then there is no need for sort
- introduces common exception handling in those 2 suites with a new `handleExceptions` method

### Why are the changes needed?

Currently `ThriftServerQueryTestSuite` passes on master, but it fails on one of my PRs (apache#23531) with this error  (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/111651/testReport/org.apache.spark.sql.hive.thriftserver/ThriftServerQueryTestSuite/sql_3/):
```
org.scalatest.exceptions.TestFailedException: Expected "
[Recursion level limit 100 reached but query has not exhausted, try increasing spark.sql.cte.recursion.level.limit
org.apache.spark.SparkException]
", but got "
[org.apache.spark.SparkException
Recursion level limit 100 reached but query has not exhausted, try increasing spark.sql.cte.recursion.level.limit]
" Result did not match for query #4 WITH RECURSIVE r(level) AS (   VALUES (0)   UNION ALL   SELECT level + 1 FROM r ) SELECT * FROM r
```
The unexpected reversed order of expected output (error message comes first, then the exception class) is due to this line: https://github.com/apache/spark/pull/26028/files#diff-b3ea3021602a88056e52bf83d8782de8L146. It should not sort the expected output if there was an error during execution.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes apache#26028 from peter-toth/SPARK-29359-better-exception-handling.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
viirya pushed a commit that referenced this pull request Feb 16, 2021
Change-Id: I564062ba02947b133d9d850977d38470dc84ef5e
viirya pushed a commit that referenced this pull request Feb 9, 2023
### What changes were proposed in this pull request?
This PR introduces sasl retry count in RetryingBlockTransferor.

### Why are the changes needed?
Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario:

1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException

Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4.
Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New test is added, courtesy of Mridul.

Closes apache#39611 from tedyu/sasl-cnt.

Authored-by: Ted Yu <yuzhihong@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
viirya pushed a commit that referenced this pull request Oct 10, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ef0a76e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
viirya pushed a commit that referenced this pull request Oct 19, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ef0a76e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
viirya pushed a commit that referenced this pull request Jul 16, 2024
### What changes were proposed in this pull request?

In the `Window` node, both `partitionSpec` and `orderSpec` must be orderable, but the current type check only verifies `orderSpec` is orderable. This can cause an error in later optimizing phases.

Given a query:

```
with t as (select id, map(id, id) as m from range(0, 10))
select rank() over (partition by m order by id) from t
```

Before the PR, it fails with an `INTERNAL_ERROR`:

```
org.apache.spark.SparkException: [INTERNAL_ERROR] grouping/join/window partition keys cannot be map type. SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.needNormalize(NormalizeFloatingNumbers.scala:103)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.org$apache$spark$sql$catalyst$optimizer$NormalizeFloatingNumbers$$needNormalize(NormalizeFloatingNumbers.scala:94)
...
```

After the PR, it fails with a `EXPRESSION_TYPE_IS_NOT_ORDERABLE`, which is expected:

```
  org.apache.spark.sql.catalyst.ExtendedAnalysisException: [EXPRESSION_TYPE_IS_NOT_ORDERABLE] Column expression "m" cannot be sorted because its type "MAP<BIGINT, BIGINT>" is not orderable. SQLSTATE: 42822; line 2 pos 53;
Project [RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
+- Project [id#1L, m#0, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
   +- Window [rank(id#1L) windowspecdefinition(m#0, id#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4], [m#0], [id#1L ASC NULLS FIRST]
      +- Project [id#1L, m#0]
         +- SubqueryAlias t
            +- SubqueryAlias t
               +- Project [id#1L, map(id#1L, id#1L) AS m#0]
                  +- Range (0, 10, step=1, splits=None)
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
...
```

### How was this patch tested?

Unit test.

Closes apache#45730 from chenhao-db/SPARK-47572.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
viirya pushed a commit that referenced this pull request Jul 16, 2024
… throw internal error

### What changes were proposed in this pull request?

This PR fixes the error messages and classes when Python UDFs are used in higher order functions.

### Why are the changes needed?

To show the proper user-facing exceptions with error classes.

### Does this PR introduce _any_ user-facing change?

Yes, previously it threw internal error such as:

```python
from pyspark.sql.functions import transform, udf, col, array
spark.range(1).select(transform(array("id"), lambda x: udf(lambda y: y)(x))).collect()
```

Before:

```
py4j.protocol.Py4JJavaError: An error occurred while calling o74.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 0.0 failed 1 times, most recent failure: Lost task 15.0 in stage 0.0 (TID 15) (ip-192-168-123-103.ap-northeast-2.compute.internal executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: <lambda>(lambda x_0#3L)#2 SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```

After:

```
pyspark.errors.exceptions.captured.AnalysisException: [INVALID_LAMBDA_FUNCTION_CALL.UNEVALUABLE] Invalid lambda function call. Python UDFs should be used in a lambda function at a higher order function. However, "<lambda>(lambda x_0#3L)" was a Python UDF. SQLSTATE: 42K0D;
Project [transform(array(id#0L), lambdafunction(<lambda>(lambda x_0#3L)#2, lambda x_0#3L, false)) AS transform(array(id), lambdafunction(<lambda>(lambda x_0#3L), namedlambdavariable()))#4]
+- Range (0, 1, step=1, splits=Some(16))
```

### How was this patch tested?

Unittest was added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47079 from HyukjinKwon/SPARK-48706.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants