Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A prototype of vectorized UDAF No. 2. #3

Closed
wants to merge 7 commits into from
Closed

Conversation

ueshin
Copy link
Owner

@ueshin ueshin commented Nov 20, 2017

What changes were proposed in this pull request?

This is a prototype of vectorized UDAF.

Proposed API

Introduce @pandas_udaf decorator (annotation) to define vectorized UDAFs which takes one or more pandas.Series and returns one or more scalar values.

We can define vectorized UDAFs if the function supports partial aggregation as:

@pandas_udaf(LongType(), supportsPartial=True)
def p_sum(v):
    return v.sum()

or if the function does not support partial aggregation as:

@pandas_udaf(DoubleType(), supportsPartial=False)
def p_avg(v):
    return v.mean()

We can use it similar to aggregate functions as:

df.groupBy(col('g')).agg(p_sum(col('n')), expr('count(n)'), p_avg(col('n')))

ueshin pushed a commit that referenced this pull request Oct 28, 2019
### What changes were proposed in this pull request?
`org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite` failed lately. After had a look at the logs it just shows the following fact without any details:
```
Caused by: sbt.ForkMain$ForkError: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - Server not found in Kerberos database
```
Since the issue is intermittent and not able to reproduce it we should add more debug information and wait for reproduction with the extended logs.

### Why are the changes needed?
Failing test doesn't give enough debug information.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
I've started the test manually and checked that such additional debug messages show up:
```
>>> KrbApReq: APOptions are 00000000 00000000 00000000 00000000
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Looking for keys for: kafka/localhostEXAMPLE.COM
Added key: 17version: 0
Added key: 23version: 0
Added key: 16version: 0
Found unsupported keytype (3) for kafka/localhostEXAMPLE.COM
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
Using builtin default etypes for permitted_enctypes
default etypes for permitted_enctypes: 17 16 23.
>>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType
MemoryCache: add 1571936500/174770/16C565221B70AAB2BEFE31A83D13A2F4/client/localhostEXAMPLE.COM to client/localhostEXAMPLE.COM|kafka/localhostEXAMPLE.COM
MemoryCache: Existing AuthList:
#3: 1571936493/200803/8CD70D280B0862C5DA1FF901ECAD39FE/client/localhostEXAMPLE.COM
#2: 1571936499/985009/BAD33290D079DD4E3579A8686EC326B7/client/localhostEXAMPLE.COM
#1: 1571936499/995208/B76B9D78A9BE283AC78340157107FD40/client/localhostEXAMPLE.COM
```

Closes apache#26252 from gaborgsomogyi/SPARK-29580.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 18, 2019
@github-actions github-actions bot closed this Dec 19, 2019
ueshin added a commit that referenced this pull request Jun 10, 2020
### What changes were proposed in this pull request?

This PR proposes to make `PythonFunction` holds `Seq[Byte]` instead of `Array[Byte]` to be able to compare if the byte array has the same values for the cache manager.

### Why are the changes needed?

Currently the cache manager doesn't use the cache for `udf` if the `udf` is created again even if the functions is the same.

```py
>>> func = lambda x: x

>>> df = spark.range(1)
>>> df.select(udf(func)("id")).cache()
```
```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#14 AS <lambda>(id)apache#12]
+- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#14]
 +- *(1) Range (0, 1, step=1, splits=12)
```

This is because `PythonFunction` holds `Array[Byte]`, and `equals` method of array equals only when the both array is the same instance.

### Does this PR introduce _any_ user-facing change?

Yes, if the user reuse the Python function for the UDF, the cache manager will detect the same function and use the cache for it.

### How was this patch tested?

I added a test case and manually.

```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
InMemoryTableScan [<lambda>(id)apache#12]
   +- InMemoryRelation [<lambda>(id)apache#12], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) Project [pythonUDF0#5 AS <lambda>(id)#3]
            +- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#5]
               +- *(1) Range (0, 1, step=1, splits=12)
```

Closes apache#28774 from ueshin/issues/SPARK-31945/udf_cache.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
ueshin pushed a commit that referenced this pull request Jul 10, 2020
… without WindowExpression

### What changes were proposed in this pull request?

Add WindowFunction check at `CheckAnalysis`.

### Why are the changes needed?
Provide friendly error msg.

**BEFORE**
```scala
scala> sql("select rank() from values(1)").show
java.lang.UnsupportedOperationException: Cannot generate code for expression: rank()
```

**AFTER**
```scala
scala> sql("select rank() from values(1)").show
org.apache.spark.sql.AnalysisException: Window function rank() requires an OVER clause.;;
Project [rank() AS RANK()#3]
+- LocalRelation [col1#2]
```

### Does this PR introduce _any_ user-facing change?

Yes, user wiill be given a better error msg.

### How was this patch tested?

Pass the newly added UT.

Closes apache#28808 from ulysses-you/SPARK-31975.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ueshin pushed a commit that referenced this pull request Oct 20, 2022
…ly equivalent children in `RewriteDistinctAggregates`

### What changes were proposed in this pull request?

In `RewriteDistinctAggregates`, when grouping aggregate expressions by function children, treat children that are semantically equivalent as the same.

### Why are the changes needed?

This PR will reduce the number of projections in the Expand operator when there are multiple distinct aggregations with superficially different children. In some cases, it will eliminate the need for an Expand operator.

Example: In the following query, the Expand operator creates 3\*n rows (where n is the number of incoming rows) because it has a projection for each of function children `b + 1`, `1 + b` and `c`.

```
create or replace temp view v1 as
select * from values
(1, 2, 3.0),
(1, 3, 4.0),
(2, 4, 2.5),
(2, 3, 1.0)
v1(a, b, c);

select
  a,
  count(distinct b + 1),
  avg(distinct 1 + b) filter (where c > 0),
  sum(c)
from
  v1
group by a;
```
The Expand operator has three projections (each producing a row for each incoming row):
```
[a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
[a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for distinct aggregation of b + 1)
[a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
```
In reality, the Expand only needs one projection for `1 + b` and `b + 1`, because they are semantically equivalent.

With the proposed change, the Expand operator's projections look like this:
```
[a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular aggregations)
[a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
```
With one less projection, Expand produces 2\*n rows instead of 3\*n rows, but still produces the correct result.

In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.

Benchmark code in the JIRA (SPARK-40382).

Before the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                       14721          14859         195          5.7         175.5       1.0X
some semantically equivalent                      14569          14572           5          5.8         173.7       1.0X
none semantically equivalent                      14408          14488         113          5.8         171.8       1.0X
```
After the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                        3658           3692          49         22.9          43.6       1.0X
some semantically equivalent                       9124           9214         127          9.2         108.8       0.4X
none semantically equivalent                      14601          14777         250          5.7         174.1       0.3X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests.

Closes apache#37825 from bersprockets/rewritedistinct_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ueshin pushed a commit that referenced this pull request Mar 21, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ueshin pushed a commit that referenced this pull request Mar 24, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ef0a76e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant