Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge the latest spark update 20201230 #1

Open
wants to merge 10,000 commits into
base: master
Choose a base branch
from
Open

Conversation

joohnnie
Copy link

What changes were proposed in this pull request?

Merge the latest spark update 20201230

How was this patch tested?

don't need to test

subham611 and others added 30 commits October 17, 2024 16:01
### What changes were proposed in this pull request?
Adds support for size based partition creation during kafka read.

### Why are the changes needed?
Currently Spark structured streaming provides `minPartitions` config to create more number of partitions than kafka has. This is helpful to increase parallelism but this value is can not be changed dynamically.

It would be better to dynamically increase spark partitions based on input size, if input size is high create more partitions. With this change we can dynamically create more partitions to handle varying loads.

### Does this PR introduce _any_ user-facing change?
An additional parameter(maxRecordsPerPartition) will be accepted on the Kafka source provider.

<img width="940" alt="Screenshot 2024-10-17 at 11 13 27 AM" src="https://github.com/user-attachments/assets/29ecc65e-98fa-40ff-8565-480eeb207ff7">

<img width="1580" alt="Screenshot 2024-10-17 at 11 11 51 AM" src="https://github.com/user-attachments/assets/63652f82-f24f-4a24-ab24-acd3feb5e0d6">

### How was this patch tested?
Added Unit tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47927 from SubhamSinghal/SPARK-49259-structured-streaming-size-based-partition-creation-kafka.

Authored-by: subham611 <subhamsinghal@sharechat.co>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ed repository

### What changes were proposed in this pull request?

This PR is a followup of 4f640e2 that disables GitHub Pages workflow in forked repository

### Why are the changes needed?

To automatically disable GitHub packages workflow in developers' forked repository. We can manually disable them too but this is a bit easier.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48515 from HyukjinKwon/SPARK-49495-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ABLE_NOT_VIEW`

### What changes were proposed in this pull request?

This PR proposes to Integrate `_LEGACY_ERROR_TEMP_1252` into `EXPECT_TABLE_NOT_VIEW`

### Why are the changes needed?

To improve the error message by assigning proper error condition and SQLSTATE

### Does this PR introduce _any_ user-facing change?

No, only user-facing error message improved

### How was this patch tested?

Updated the existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48510 from itholic/SPARK-49998.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…MP_1097

### What changes were proposed in this pull request?

This PR proposes to assign proper error condition & sqlstate for` _LEGACY_ERROR_TEMP_1097`

### Why are the changes needed?

To improve the error message by assigning proper error condition and SQLSTATE

### Does this PR introduce _any_ user-facing change?

No, only user-facing error message improved

### How was this patch tested?

Updated the existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48471 from itholic/LEGACY_1097.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…D_RECORD_IN_PARSING`

### What changes were proposed in this pull request?

This PR proposes to Integrate `_LEGACY_ERROR_TEMP_2165` into `MALFORMED_RECORD_IN_PARSING`

### Why are the changes needed?

To improve the error message by assigning proper error condition and SQLSTATE

### Does this PR introduce _any_ user-facing change?

No, only user-facing error message improved

### How was this patch tested?

Updated the existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48508 from itholic/SPARK-49997.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…re` module

### What changes were proposed in this pull request?
Exclude pandas/resource/testing from `pyspark-core` module

### Why are the changes needed?
avoid unnecessary tests, e.g. in #48516, a pyspark-pandas only change trigger `spark-core` and then all the pyspark tests.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
manually check like:
```
In [6]: re.match("python/(?!pyspark/(ml|mllib|sql|streaming))", "python/pyspark/pandas/plots")
Out[6]: <re.Match object; span=(0, 7), match='python/'>

In [7]: re.match("python/(?!pyspark/(ml|mllib|sql|streaming|pandas))", "python/pyspark/pandas/plots")
```

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48518 from zhengruifeng/infra_pyspark_core.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?

Remove unused vanilla hadoop dependency(and transitive deps) management, i.e. `hadoop-client`, `xerces:xercesImpl`, and inline deps defined in `hadoop3` because it's the only supported hadoop profile.

### Why are the changes needed?

Simplify pom.xml.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI and verified runtime jars are not affected by running`dev/test-dependencies.sh`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48491 from pan3793/SPARK-49988.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Add a separate docker file for doc build

### Why are the changes needed?
currently we only have single test image, for `pyspark`, `sparkr`, `lint` and `docs`, it has two major issues:
1, disk space limitation: we are adding more and more packages in it, the disk space left for testing is very limited, and cause `No space left on device` from time to time;
2, environment conflicts: for example, even though we already install some packages for `docs` in the docker file, we still need to install some additional python packages in `build_and_test`, due to the conflicts between `docs` and `pyspark`. It is hard to maintain because the related packages are installed in two different places.

so I am thinking of spinning off some installations (e.g. `docs`) from the base image, so that:
1, we can completely cache all the dependencies for `docs`;
2, the related installations are centralized;
3, we can free up disk space on the base image (after we spin off other dependency, we can remove unneeded packages from it);

Furthermore, if we want to apply multiple images, we can easily support different environments, e.g. adding a separate image for old versions of `pandas/pyarrow/etc`.

### Does this PR introduce _any_ user-facing change?
no, infra-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48520 from zhengruifeng/infra_multiple_docker_file.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR proposes to add API compatibility check for I/O

### Why are the changes needed?

To guarantee of the same behavior between Spark Classic and Spark Connect

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added UTs

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48511 from itholic/compat_readwriter.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…a plot parameter type hints

### What changes were proposed in this pull request?
Add doc examples to `pie` plot and correct type hints of `area` plot parameter.

### Why are the changes needed?
Improve readability and typing.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48512 from xinrong-meng/minor_fix.

Authored-by: Xinrong Meng <xinrong@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…stributed_sequence_column`

### What changes were proposed in this pull request?
Avoid unnecessary operations in `attach_distributed_sequence_column`

### Why are the changes needed?
1, `attach_distributed_sequence_column` always needs `sdf.columns`, which may trigger an analysis task in Spark Connect if the `sdf.schema` has not been cached;
2, for zero columns dataframe, it trigger `sdf.count` which seems redundant;

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48516 from zhengruifeng/ps_attach_distributed_column.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Adding the Python API for the 4 new string validation expressions:
- is_valid_utf8
- make_valid_utf8
- validate_utf8
- try_validate_utf8

### Why are the changes needed?
Offer a complete Python API for the new expressions in Spark 4.0.

### Does this PR introduce _any_ user-facing change?
Yes, adding Python API for the 4 new Spark expressions.

### How was this patch tested?
New tests for the Python API.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48455 from uros-db/api-validation-python.

Authored-by: Uros Bojanic <uros.bojanic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
This pr aims to upgrade `datasketches-java` from 6.0.0 to 6.1.1.

### Why are the changes needed?
The new version is now dependent on `datasketches-memory` 3.x. The full release notes as follows:
- https://github.com/apache/datasketches-java/releases/tag/6.1.0
- https://github.com/apache/datasketches-java/releases/tag/6.1.1
- https://github.com/apache/datasketches-memory/releases/tag/3.0.0
- https://github.com/apache/datasketches-memory/releases/tag/3.0.1
- https://github.com/apache/datasketches-memory/releases/tag/3.0.2

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48380 from LuciferYang/test-dm-3.0.2.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Simplifying the AbstractStringType hierarchy.

### Why are the changes needed?
The addition of trim-sensitive collation (#48336) highlighted the complexity of extending the existing AbstractStringType structure. Besides adding a new parameter to all types inheriting from AbstractStringType, it caused changing the logic of every subclass as well as changing the name of a derived class StringTypeAnyCollation into StringTypeWithCaseAccentSensitivity which could again be subject to change if we keep adding new specifiers.

Looking ahead, the introduction of support for indeterminate collation would further complicate these types. To address this, the proposed changes simplify the design by consolidating common logic into a single base class. This base class will handle core functionality such as trim or indeterminate collation, while a derived class, StringTypeWithCollation (previously awkwardly called StringTypeWithCaseAccentSensitivity), will manage collation specifiers.

This approach allows for easier future extensions: fundamental checks can be handled in the base class, while any new specifiers can be added as optional fields in StringTypeWithCollation.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
With existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48459 from stefankandic/refactorStringTypes.

Authored-by: Stefan Kandic <stefan.kandic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?

This PR aims to use `grpcio` and `grpcio-status` `1.67.0` for Python 3.13 tests in order to reveal the remaining test failures after installing the official `grpcio` in Python 3.13 environment.

### Why are the changes needed?

`grpcio` added Python 3.13 support since 1.66.2.
- https://pypi.org/project/grpcio/1.67.0/
- https://pypi.org/project/grpcio/1.66.2/

### Does this PR introduce _any_ user-facing change?

No, this is an infra change for test coverage. Currently, `pyspark-connect` module test fails due to the missing required package, `grpc`, like the following.

- https://github.com/apache/spark/actions/runs/11372942311/job/31638495254
```
ModuleNotFoundError: No module named 'grpc'
```

### How was this patch tested?

Manual check the generated image of this PR builder.
```
$ docker run -it --rm ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-11389776259 python3.13 -m pip list | grep grpcio
grpcio                   1.67.0
grpcio-status            1.67.0
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48522 from dongjoon-hyun/SPARK-50014.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…n 3.13 image

### What changes were proposed in this pull request?

This PR aims to install `BASIC_PIP_PKGS` except `pyarrow` in Python 3.13 image.

### Why are the changes needed?

- https://github.com/apache/spark/actions/runs/11392144577/job/31698382766
```
Traceback (most recent call last):
  File "/__w/spark/spark/python/pyspark/sql/pandas/utils.py", line 28, in require_minimum_pandas_version
    import pandas
ModuleNotFoundError: No module named 'pandas'
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual check with the built image of this PR builder.

```
$ docker run -it --rm ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-11392974455 python3.13 -m pip list | grep pandas
pandas                   2.2.3
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48528 from dongjoon-hyun/SPARK-50019.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…`spark.default.parallelism`

### What changes were proposed in this pull request?

spark.session.set should not fail when setting `spark.default.parallelism`.

### Why are the changes needed?

This is to fix a behavior change where before `SPARK-48773`, set `spark.default.parallelism` through spark session does not fail and is a no op.

### Does this PR introduce _any_ user-facing change?

Yes.

before `SPARK-48773`, spark.conf.set("spark.default.parallelism") does not fail and is a no-op.
after ``SPARK-48773`, spark.conf.set("spark.default.parallelism") will fail with a `CANNOT_MODIFY_CONFIG` exception.
With this followup, we restore the behavior to spark.conf.set("spark.default.parallelism") does not fail and is a no-op.

### How was this patch tested?

manually testing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48526 from amaliujia/SPARK-48773.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…state store in stream-stream join

### What changes were proposed in this pull request?

The PR proposes to revise the optimization on adding input to state store in stream-stream join, to fix correctness issue.

### Why are the changes needed?

Here is the logic of optimization before this PR:

https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677

```
        val isLeftSemiWithMatch =
          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
        // Add to state store only if both removal predicates do not match,
        // and the row is not matched for left side of left semi join.
        val shouldAddToState =
          !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
          !isLeftSemiWithMatch
```

The criteria of `both removal predicates do not match` means the input is going to be evicted in this batch. I'm not sure about the coverage of this optimization, but there are two major issues with the above optimization:

1) missing to add the input to state store in left side prevents the input on the right side to match with "that" input. Even though the input is going to be evicted in this batch, there could be still inputs on the right side in this batch which can match with that input.

2) missing to add the input to state store prevents that input to produce unmatched (null-outer) output, as we produce unmatched output during the eviction of state.

Worth noting that `state watermark != watermark for eviction` and eviction we mentioned in above is based on "state watermark". state watermark could be either 1) equal or earlier than watermark for eviction or 2) "later" than watermark for eviction.

### Does this PR introduce _any_ user-facing change?

Yes, there are correctness issues among stream-stream join, especially when the output of the stateful operator is provided as input of stream-stream join. The correctness issue is fixed with the PR.

### How was this patch tested?

New UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48297 from HeartSaVioR/SPARK-49829.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Andrzej Zera <andrzejzera@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…uce.output.basename' to generate file names

### What changes were proposed in this pull request?

In 'HadoopMapReduceCommitProtocol', task output files are generated ahead instead of calling `org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getDefaultWorkFile`, which uses the `mapreduce.output.basename` as the prefix of output files.
In this pull request, we modify the `HadoopMapReduceCommitProtocol.getFilename` method to also look up this config instead of using the hardcoded 'part'.

### Why are the changes needed?

Given a custom file name is a useful feature for users. They can use it to distinguish files added by different engines, on different days, etc. We can also align the usage scenario with other SQL on Hadoop engines for better Hadoop compatibility.

### Does this PR introduce _any_ user-facing change?

Yes, a Hadoop configuration 'mapreduce.output.basename' can be used in file datasource output files

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?
no`

Closes #48494 from yaooqinn/SPARK-49991.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ypes used in stateful processors

### What changes were proposed in this pull request?
Add support for read change feed for map and list types used in stateful processors

### Why are the changes needed?
Without this change, reading change feed for map and list types is not supported.

### Does this PR introduce _any_ user-facing change?
Yes

Users can query state using following query:
```
        val stateReaderDf = spark.read
          .format("statestore")
          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
          .option(StateSourceOptions.STATE_VAR_NAME, "mapState")
          .option(StateSourceOptions.READ_CHANGE_FEED, true)
          .option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
          .load()
```

### How was this patch tested?
Added unit tests

```
[info] Run completed in 24 seconds, 422 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48274 from anishshri-db/task/SPARK-49802.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…r and stateful operators

### What changes were proposed in this pull request?

This is an incremental step to implement RocksDB state store checkpoint format V2.

Once conf STATE_STORE_CHECKPOINT_FORMAT_VERSION is set to be higher than version 2, the executor returns checkpointID to the driver (only done for RocksDB). The driver stores is locally. For the next batch, the State Store Checkpoint ID is sent to the executor to be used to load the state store. If the local version of the executor doesn't match the uniqueID, it will reload from the checkpoint.

There is no behavior change if the default checkpoint format is used.

### Why are the changes needed?

This is an incremental step of the project of a new RocksDB State Store checkpoint format. The new format is to simplify checkpoint mechanism to make it less bug prone, and fix some unexpected query results in rare queries.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
A new unit test is added to cover format version. And another unit test is added to validate the uniqueID is passed back and force as expected.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47895 from siying/unique_id2.

Authored-by: Siying Dong <siying.dong@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…T/CLUSTER/DISTRIBUTE BY

### What changes were proposed in this pull request?

This PR adds SQL pipe syntax support for LIMIT/OFFSET and ORDER/SORT/CLUSTER/DISTRIBUTE BY.

For example:

```
CREATE TABLE t(x INT, y STRING) USING CSV;
INSERT INTO t VALUES (0, 'abc'), (1, 'def');

TABLE t
|> ORDER BY x
|> LIMIT 1 OFFSET 1

1	def
```

### Why are the changes needed?

The SQL pipe operator syntax will let users compose queries in a more flexible fashion.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds a few unit test cases, but mostly relies on golden file test coverage. I did this to make sure the answers are correct as this feature is implemented and also so we can look at the analyzer output plans to ensure they look right as well.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48413 from dtenedor/pipe-order-by.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Refresh the image cache job

### Why are the changes needed?
this job has been broken: https://github.com/apache/spark/actions/runs/11387123331/job/31682246504

### Does this PR introduce _any_ user-facing change?
no, infra-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48533 from zhengruifeng/infra_build_cache.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?
Include the implicit string collations in the `COLLATION_MISMATCH.IMPLICIT` error message.

### Why are the changes needed?
Make the implicit collation mismatch error more user-friendly.

### Does this PR introduce _any_ user-facing change?
Yes, implicit collation mismatch error is changed.

### How was this patch tested?
Updated existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48495 from uros-db/implicit-collation-mismatch.

Authored-by: Uros Bojanic <uros.bojanic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Make `AbstractStringType` class serializable, so that the derived classes can be used in expressions that perform replacement using `Invoke`-like Spark expressions.

### Why are the changes needed?
Objects with custom parameters cannot be used as inputTypes unless the underlying class is serializable. For example, `ValidateUTF8` is a string function that uses `StaticInvoke` replacement, and an object derived from `AbstractStringType` as one of the input types.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests suffice. More will be added with appropriate collation support in various Spark expressions.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48527 from uros-db/fix-abstractstringtype.

Authored-by: Uros Bojanic <uros.bojanic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…x CI

### What changes were proposed in this pull request?

This PR is a followup of #48413 which formats the Scala API module

### Why are the changes needed?

To fix the CI.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually checked by `./dev/lint-scala`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48537 from HyukjinKwon/followup-fmt.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
1, Introduce a helper class `Lazy` to replace the lazy vals
2, Fix a deadlock in subquery execution

### Why are the changes needed?

we observed a deadlock between `QueryPlan.canonicalized` and `QueryPlan.references`:

The main thread `TakeOrderedAndProject.doExecute` is trying to compute `outputOrdering`, it top-down traverse the tree, and requires the lock of `QueryPlan.canonicalized` in the path.
In this deadlock, it successfully obtained the lock of `WholeStageCodegenExec` and requires the lock of `HashAggregateExec`;

Concurrently, a subquery execution thread is performing code generation and bottom-up traverses the tree via `def consume`, which checks `WholeStageCodegenExec.usedInputs` and refererences a lazy val `QueryPlan.references`. It requires the lock of `QueryPlan.references` in the path.
In this deadlock, it successfully obtained the lock of `HashAggregateExec` and requires the lock of `WholeStageCodegenExec`;

This is due to Scala's lazy val internally calls this.synchronized on the instance that contains the val. This creates a potential for deadlocks.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
manually test:

before the fix, the deadlock happened twice in first 20 runs;
after the fix, the deadlock didn't happen in consecutive 100+ runs

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48391 from zhengruifeng/query_plan_lazy_ref.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Include the explicit string collations in the `COLLATION_MISMATCH.EXPLICIT` error message.

### Why are the changes needed?
Make the explicit collation mismatch error more user-friendly.

### Does this PR introduce _any_ user-facing change?
Yes, explicit collation mismatch error is changed.

### How was this patch tested?
Updated existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48525 from uros-db/explicit-collation-mismatch.

Authored-by: Uros Bojanic <uros.bojanic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
The pr aims to revert #48385.
This reverts commit 52538f0.

### Why are the changes needed?
When upgrading spark from `an old version` to `the latest version`, some end-users may rely on the `original schema` (`although it may not be correct`), which can make the `upgrade` very difficult. so, let's first restore it to its original state.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48530 from panbingkun/SPARK-49909_revert.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…API when physical node is lost in executed plan

### What changes were proposed in this pull request?

This PR proposes to provide default values for metrics on observe API, when physical node (CollectMetricsExec) is lost in executed plan. This includes the case where logical node (CollectMetrics) is lost during optimization (and it's mostly the case).

### Why are the changes needed?

When user defines the metrics via observe API, they expect the metrics to be retrieved via Observation (batch query) or update event of StreamingQueryListener.

But when the node (CollectMetrics) is lost in any reason (e.g. subtree is pruned by PruneFilters), Spark does behave like the metrics were not defined, instead of providing default values.

When the query runs successfully, user wouldn't expect the metric being bound to the query to be unavailable, hence they missed to guard the code for this case and encountered some issue. Arguably it's lot better to provide default values - when the node is pruned out from optimizer, it is mostly logically equivalent that there were no input being processed with the node (except the bug in analyzer/optimizer/etc which drop the node incorrectly), hence it's valid to just have default value.

### Does this PR introduce _any_ user-facing change?

Yes, user can consistently query about metrics being defined with observe API. It is available even with aggressive optimization which drop the CollectMetrics(Exec) node.

### How was this patch tested?

New UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48517 from HeartSaVioR/SPARK-50007.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
jingz-db and others added 30 commits November 6, 2024 10:51
…mWithStateInPandas

### What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2 in Python.
The Scala PR for supporting initial state is here: #45467

We propose to create a new PythonRunner that handles initial state specifically for TransformWithStateInPandas. From JVM, we coGroup input rows and initial state rows on the same grouping key. Then we create a new row that contains one row in the input rows iterator and one row in the initial state iterator, and send the new grouped row to Py4j. Inside the python worker, we deserialize the grouped row into input rows and initial state rows separately and input those into `handleInitialState` and `handleInputRows`.
We will launch a python worker for each partition that has a non-empty input rows in either input rows or initial states. This will guarantee all keys in the initial state will be processed even if they do not appear in the first batch or they don't lie in the same partition with keys in the first batch.

### Why are the changes needed?

We need to couple the API as we support initial state handling in Scala.

### Does this PR introduce _any_ user-facing change?

Yes.
This PR introduces a new API in the `StatefulProcessor` which allows users to define their own udf for processing initial state:
```
 def handleInitialState(
        self, key: Any, initialState: "PandasDataFrameLike"
    ) -> None:
```
The implementation of this function is optional. If not defined, then it will act as no-op.

### How was this patch tested?

Unit tests & integration tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48005 from jingz-db/python-init-state-impl.

Authored-by: jingz-db <jing.zhan@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Add a separate docker file for doc build

### Why are the changes needed?
currently we only have single test image, for `pyspark`, `sparkr`, `lint` and `docs`, it has two major issues:
1, disk space limitation: we are adding more and more packages in it, the disk space left for testing is very limited, and cause `No space left on device` from time to time;
2, environment conflicts: for example, even though we already install some packages for `docs` in the docker file, we still need to install some additional python packages in `build_and_test`, due to the conflicts between `docs` and `pyspark`. It is hard to maintain because the related packages are installed in two different places.

so I am thinking of spinning off some installations (e.g. `docs`) from the base image, so that:
1, we can completely cache all the dependencies for `docs`;
2, the related installations are centralized;
3, we can free up disk space on the base image (after we spin off other dependency, we can remove unneeded packages from it);

Furthermore, if we want to apply multiple images, we can easily support different environments, e.g. adding a separate image for old versions of `pandas/pyarrow/etc`.

### Does this PR introduce _any_ user-facing change?
no, infra-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48690 from zhengruifeng/infra_separate_doc.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…ing the lifetime of AttributeReference objects created during logical planning

### What changes were proposed in this pull request?

This PR changes the `allAttributes` in `QueryPlan` to be a `def` instead of a `lazy val` to avoid holding on to a copy of `AttributeReferences` in the `QueryPlan` object. This change should not result in a performance penalty as `allAttributes` is used only once during canonicalization in `doCanonicalize` (which only happens once since it itself is a `lazy val`).

## Context
The allAttributes method in QueryPlan ([code](https://github.com/apache/spark/blob/57f6824e78e2e615778827ddebce9d7fcaae1698/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L2101)) unions the output of all of its children. Although this is okay in an optimized plan, in a pre-optimized analyzed plan, these attributes add up multiplicatively with the size of the plan. Because output is usually defined as a def, each node’s allAttributes also ends up with a distinct copy of each attribute, potentially causing significant memory pressure on the driver (especially under concurrency and with wide tables).

Here is a simple example with TPC-DS Q42. SQL:
```
  select dt.d_year, item.i_category_id, item.i_category, sum(ss_ext_sales_price)
 from 	date_dim dt, store_sales, item
 where dt.d_date_sk = store_sales.ss_sold_date_sk
 	and store_sales.ss_item_sk = item.i_item_sk
 	and item.i_manager_id = 1
 	and dt.d_moy=11
 	and dt.d_year=2000
 group by 	dt.d_year
 		,item.i_category_id
 		,item.i_category
 order by       sum(ss_ext_sales_price) desc,dt.d_year
 		,item.i_category_id
 		,item.i_category
 limit 100
```
If we print out the size of each operator’s output and the size of its allAttributes:
```
GlobalLimit: allAttrs: 4, output: 4
  LocalLimit: allAttrs: 4, output: 4
    Sort: allAttrs: 4, output: 4
      Aggregate: allAttrs: 73, output: 4
        Filter: allAttrs: 73, output: 73
          Join: allAttrs: 73, output: 73
            Join: allAttrs: 51, output: 51
              SubqueryAlias: allAttrs: 28, output: 28
                SubqueryAlias: allAttrs: 28, output: 28
                  LogicalRelation: allAttrs: 0, output: 28
              SubqueryAlias: allAttrs: 23, output: 23
                LogicalRelation: allAttrs: 0, output: 23
            SubqueryAlias: allAttrs: 22, output: 22
              LogicalRelation: allAttrs: 0, output: 22
```
Note how the joins and aggregate have 73 attributes each, by adding the width of each relation. For prod queries with wide schemas, this issue is much worse. Optimized plans after column pruning look far better:
```
Aggregate: allAttrs: 0, output: 1
  Project: allAttrs: 2, output: 0
    Join: allAttrs: 2, output: 2
      Project: allAttrs: 3, output: 1
        Join: allAttrs: 3, output: 3
          Project: allAttrs: 4, output: 2
            Join: allAttrs: 4, output: 4
              Project: allAttrs: 5, output: 3
                Join: allAttrs: 5, output: 5
                  Project: allAttrs: 23, output: 4
                    Filter: allAttrs: 23, output: 23
                      LogicalRelation: allAttrs: 0, output: 23
                  Project: allAttrs: 29, output: 1
                    Filter: allAttrs: 29, output: 29
                      LogicalRelation: allAttrs: 0, output: 29
              Project: allAttrs: 13, output: 1
                Filter: allAttrs: 13, output: 13
                  LogicalRelation: allAttrs: 0, output: 13
          Project: allAttrs: 22, output: 1
            Filter: allAttrs: 22, output: 22
              LogicalRelation: allAttrs: 0, output: 22
      Project: allAttrs: 18, output: 1
        Filter: allAttrs: 18, output: 18
          LogicalRelation: allAttrs: 0, output: 18
```

### Why are the changes needed?

Reduce driver's heap usage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48762 from utkarsh39/SPARK-50229.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…lysis`

### What changes were proposed in this pull request?
In the PR, I propose to move the `RewriteCollationJoin` rule added by #46599 to `FinishAnalysis`.

### Why are the changes needed?
The conversions of join keys that the rule does **should not be** considered as an optimization, but it is needed for correctness. So, we shall not run it as an optimization rule, it should be applied before any. Currently, optimization rules can produce plans (for instance, sub-queries) that become semantically incorrect after `RewriteCollationJoin`.

We could modify `RewriteCollationJoin` and take into account DPP and maybe other optimizations, but might miss something else. Fxing **all** results of optimizations in `RewriteCollationJoin` introduces some unnecessary dependencies.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By existing GAs. It is improssible to test wrong results of DPP because non-binary collated strings as partitions have not been supported by any built-in datasources yet.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48759 from MaxGekk/dpp-collation-bug.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…IZED_STATISTIC`

### What changes were proposed in this pull request?

This PR proposes to Integrate `_LEGACY_ERROR_TEMP_2113` into `UNRECOGNIZED_STATISTIC`

### Why are the changes needed?

To improve the error message by assigning proper error condition and SQLSTATE

### Does this PR introduce _any_ user-facing change?

No, only user-facing error message improved

### How was this patch tested?

Updated the existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48692 from itholic/LEGACY_2113.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…ROR_TEMP_2150`: `TUPLE_SIZE_EXCEEDS_LIMIT`

### What changes were proposed in this pull request?

This PR proposes to assign proper error condition & sqlstate for `_LEGACY_ERROR_TEMP_2150`: `TUPLE_SIZE_EXCEEDS_LIMIT`

### Why are the changes needed?

To improve the error message by assigning proper error condition and SQLSTATE

### Does this PR introduce _any_ user-facing change?

No, only user-facing error message improved

### How was this patch tested?

Updated the existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48631 from itholic/LEGACY_2150.

Lead-authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Co-authored-by: Haejoon Lee <haejoon@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…l rows in ColumnarToRowExec

### What changes were proposed in this pull request?

This patch cleans up ColumnVector resource after processing all rows in ColumnarToRowExec. This patch only focus on codeben implementation of ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and currently no good way has proposed, so leaving it to a follow up.

### Why are the changes needed?

Currently we only assign null to ColumnarBatch object but it doesn't release the resources hold by the vectors in the batch. For OnHeapColumnVector, the Java arrays may be automatically collected by JVM, but for OffHeapColumnVector, the allocated off-heap memory will be leaked.

For custom ColumnVector implementations like Arrow-based, it also possibly causes issues on memory safety if the underlying buffers are reused across batches. Because when ColumnarToRowExec begins to fill values for next batch, the arrays in previous batch are still hold.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48767 from viirya/close_if_not_writable.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
The pr aims to upgrade `netty` from `4.1.110.Final` to `4.1.114.Final`.

### Why are the changes needed?
https://netty.io/news/2024/10/01/4-1-114-Final.html
https://netty.io/news/2024/09/04/4-1-113-Final.html
https://netty.io/news/2024/07/19/4-1-112-Final.html
https://netty.io/news/2024/06/11/4-1-111-Final.html

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46945 from panbingkun/SPARK-48590.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…RTITION

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
  8. If you want to add or modify an error type or message, please read the guideline first in
     'common/utils/src/main/resources/error/README.md'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

This patch adds more description to SQL config `SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION`.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

We have a customer issue not long ago that they cannot read a partitioned table correctly. It turns out that the partition value is normalized and doesn't match to the unnormalized value in the table. This config is needed to be enabled to read the partition correctly. This patch is to make the config description more clear.

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->

No

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->

Existing test. This is doc only change.

### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->

No

Closes #48765 from viirya/add_config_desc.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?

Adds support for target encoding of ml features.
Target Encoding maps a column of categorical indices into a numerical feature derived from the target.
Leveraging the relationship between categorical variables and the target variable, target encoding usually performs better than one-hot encoding (while avoiding the need to add extra columns)

### Why are the changes needed?

Target Encoding is a well-known encoding technique for categorical features.
It's supported on most ml frameworks
https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.TargetEncoder.html
https://search.r-project.org/CRAN/refmans/dataPreparation/html/target_encode.html

### Does this PR introduce _any_ user-facing change?

Spark API now includes 2 new classes in package org.apache.spark.ml
- TargetEncoder (estimator)
- TargetEncoderModel (transformer)

### How was this patch tested?

Scala => org.apache.spark.ml.feature.TargetEncoderSuite
Java => org.apache.spark.ml.feature.JavaTargetEncoderSuite
Python => python.pyspark.ml.tests.test_feature.FeatureTests (added 2 tests)

### Was this patch authored or co-authored using generative AI tooling?

No

### Some design notes ...                              |-
- binary and continuous target types (no multi-label yet)
- available in Scala, Java and Python APIs
- fitting implemented on RDD API (treeAggregate)
- transformation implemented on Dataframe API (no UDFs)
- categorical features must be indices (integers) in Double-typed columns (as if StringIndexer were used before)
- unseen categories in training are represented as class -1.0

- <b>Encodings structure</b>
    - Map[String, Map[Double, Double]]) => Map[ feature_name,  Map[ original_category,  encoded category ] ]

- <b>Parameters</b>
   - inputCol(s) / outputCol(s) / labelCol => as usual
   - targetType
      - binary => encodings calculated as in-category conditional probability (counting)
      - continuous => encodings calculated as in-category target mean (incrementally)
  - handleInvalid
      - error => raises an error if trying to encode an unseen category
      - keep => encodes an unseen category with the overall statistics
  - smoothing => controls how in-category stats and overall stats are weighted to calculate final encodings (to avoid overfitting)

Closes #48347 from rebo16v/sparkml-target-encoding.

Lead-authored-by: Enrique Rebollo <enrique.rebollo@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… `useDriverIdAsAppName`

### What changes were proposed in this pull request?

This is a follow-up of SPARK-50208 in order to rename a variable `useDriverIdAsAppId` to `useDriverIdAsAppName`.
- #48740

### Why are the changes needed?

To fix a misleading typo.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48782 from dongjoon-hyun/SPARK-50208-2.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ExecutorExitCode`

### What changes were proposed in this pull request?

This PR aims to define a new error code, `BLOCK_MANAGER_REREGISTRATION_FAILED` as `ExecutorExitCode` officially from Apache Spark 4.0 like the existing `HEARTBEAT_FAILURE`.

https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala#L46

### Why are the changes needed?

Until Spark 3, Spark executor fails with `-1` like the following without providing a way to handle this specific error specifically.

https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L673-L674

### Does this PR introduce _any_ user-facing change?

To handle this executor failure reason properly.

### How was this patch tested?

Pass with the newly added test cases.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48776 from dongjoon-hyun/SPARK-50247.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
Enable test `DataFramePlotParityTests`, we have reimplemented these plotting functions with Spark SQL, and no longer depend on ML.

### Why are the changes needed?
test coverage

### Does this PR introduce _any_ user-facing change?
no, test only

### How was this patch tested?
CI

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48784 from zhengruifeng/enable_ps_plot_test.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
Avoid unnecessary casting in `compute_hist`

### Why are the changes needed?
the `__bucket` should be integer by its nature, it was double just because of the output type of `Bucketizer` in MLlib (almost all ML implementations returns double in transformation).
After reimplementing it with Spark SQL, it no longer needs to be float.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
CI

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48785 from zhengruifeng/plt_hist_cast.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
The pr aims to upgrade `ZooKeeper` from `3.9.2` to `3.9.3`.
This PR is to fix potential issues with PR #48666

### Why are the changes needed?
The full release notes: https://zookeeper.apache.org/doc/r3.9.3/releasenotes.html

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.
- Manually check
```shell
./build/sbt -Phadoop-3 -Pkubernetes -Pkinesis-asl -Phive-thriftserver -Pdocker-integration-tests -Pyarn -Phadoop-cloud -Pspark-ganglia-lgpl -Phive -Pjvm-profiler clean package

[info] Note: Some input files use or override a deprecated API.
[info] Note: Recompile with -Xlint:deprecation for details.
[warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list
[success] Total time: 272 s (04:32), completed Nov 6, 2024, 4:29:52 PM
```

```shell
(pyspark) ➜  spark-community git:(SPARK-50135_FOLLOWUP) ✗ ./python/run-tests --python-executables=python3 --testnames "pyspark.sql.tests.connect.test_connect_collection"
Running PySpark tests. Output is in /Users/panbingkun/Developer/spark/spark-community/python/unit-tests.log
Will test against the following Python executables: ['python3']
Will test the following Python tests: ['pyspark.sql.tests.connect.test_connect_collection']
python3 python_implementation is CPython
python3 version is: Python 3.9.19
Starting test(python3): pyspark.sql.tests.connect.test_connect_collection (temp output: /Users/panbingkun/Developer/spark/spark-community/python/target/097bd7e0-9311-4484-ae2d-c0f4c63fc6f9/python3__pyspark.sql.tests.connect.test_connect_collection__8dzaeio9.log)
Finished test(python3): pyspark.sql.tests.connect.test_connect_collection (14s)
Tests passed in 14 seconds

```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48771 from panbingkun/SPARK-50135_FOLLOWUP.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…f not supported

### What changes were proposed in this pull request?
In stream-stream join, only call getLatestCheckpointInfo() when the format version indicates that checkpoint version is supported.
The other place already have it applied: https://github.com/apache/spark/blob/07301ddb889bdf361499f65e1708b5fdcab7e539/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L350-L362

### Why are the changes needed?
These code is not needed and wasteful. Also inside it has assertion that we are not sure whether it is correct when the function is called when the state store is not created for the checkpointID feature supported.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Make sure existing CI passes.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48783 from siying/idjoinfix.

Authored-by: Siying Dong <siying.dong@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?

This PR aims to add `getSystemProperty` to PySpark `SparkContext` like `setSystemProperty` at Apache Spark 4.0.0.

https://spark.apache.org/docs/4.0.0-preview2/api/python/reference/api/pyspark.SparkContext.setSystemProperty.html

### Why are the changes needed?

Since Apache Spark 0.9.0, `setSystemProperty` has been provided because Python doesn't have JVM's `SystemProperties` concept. This is usefully used like the following.

https://github.com/apache/spark/blob/99d27c9701019a0f534cc13c084895a00badee12/python/pyspark/shell.py#L84

This PR aims to add `getSystemProperty` additionally and symmetrically to provide an easier and better experience for both Spark developers and Spark App developers.

### Does this PR introduce _any_ user-facing change?

No. This is a new API.

### How was this patch tested?

Pass the CIs with newly added doctests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48781 from dongjoon-hyun/SPARK-50251.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Addition of two new expression try_make_interval.

### Why are the changes needed?
This is a split PR from #48499 so that we divide the reasonings for PRs.

### Does this PR introduce _any_ user-facing change?
Yes, new expressions added.

### How was this patch tested?
Tests added.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48580 from mihailom-db/addTryConv-TryMakeInterval.

Authored-by: Mihailo Milosevic <mihailo.milosevic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…ame as utf8_binary

### What changes were proposed in this pull request?

I propose adding a new `SQLConf` entry which enables spark to read an invalid collation name as `UTF8_BINARY`.

### Why are the changes needed?

These changes are needed in case when spark needs to read a delta table which has metadata with other convention for naming collations. Instead of failing, when this conf is enabled, spark would return the `UTF8_BINARY` collation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This patch was tested by adding tests in `DataTypeSuite`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48760 from vladanvasi-db/vladanvasi-db/unknown-collation-name-enablement.

Authored-by: Vladan Vasić <vladan.vasic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…lan becomes unresolved after every optimizer rule

### What changes were proposed in this pull request?

This PR adds a new "lightweight" plan change validation, that will be run after every optimizer rule and enabled by default. Note that this is an extension to existing validation logic, which is currently enabled for tests but disabled by default in production. This new validation will be enabled by default but will be skipped if regular validation is ever enabled (since regular validation is a superset of this).

Right now, the lightweight validation only consists of checking if the plan becomes unresolved (which is a cheap O(1) lookup).

### Why are the changes needed?

If a query fails somewhere in optimization or physical planning due to an unresolved reference, it is likely due to a bug somewhere in optimization. Adding this validation helps us know where exactly in optimization that the plan became invalid.

### Does this PR introduce _any_ user-facing change?

No, this should only fail queries that would crash somewhere else during query compilation. It effectively brings the query failure closer to the cause.

### How was this patch tested?

Added a UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48787 from kelvinjian-db/SPARK-50256-lightweight-validation.

Authored-by: Kelvin Jiang <kelvin.jiang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Changed type of parameter for error message for invalid fraction of second from Decimal to Double.

### Why are the changes needed?
To keep things consistent with other errors.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tests were updated to reflect the changes.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48793 from markonik-db/SPARK-50261-FollowUp.

Authored-by: Marko Nikacevic <marko.nikacevic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…teInPandas

### What changes were proposed in this pull request?

- Support deleteIfExists for TransformWithStateInPandas.
- Added `close()` support for StatefulProcessor.

### Why are the changes needed?

Add parity to TransformWithStateInPandas for functionalities we support in TransformWithState

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48373 from bogao007/delete-if-exists.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…or various collations

### What changes were proposed in this pull request?

Extend collation-related unit and e2e sql tests for various collations in addition to the 4 common collations already used.
This is a follow up PR from #48608, where it was decided to split the changes in separate PRs. This PR includes the additional test suites mentioned in the comments of the original PR.

### Why are the changes needed?

Further expand collation testing coverage for various collations, incorporating different languages, scripts, case/accent sensitivity, etc.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Extending existing collation-related unit and e2e sql tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48799 from dejankrak-db/collation-additional-tests.

Authored-by: Dejan Krakovic <dejan.krakovic@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…park

### What changes were proposed in this pull request?

This PR is a follow-up in order to support `spark.submit.appName` in PySpark.
- #48755

### Why are the changes needed?

This follow-up will override the static PySpark application's appName like the following in the same way like Java/Scala.

https://github.com/apache/spark/blob/d9c596c040b021f8062bbe6fd38e711a25536421/examples/src/main/python/pi.py#L29-L32

**BEFORE**
```
$ bin/spark-submit --name NAME examples/src/main/python/pi.py 2>&1 | grep app_name | jq
{
  "ts": "2024-11-07T08:07:20.806Z",
  "level": "INFO",
  "msg": "Submitted application: PythonPi",
  "context": {
    "app_name": "PythonPi"
  },
  "logger": "SparkContext"
}
```

```
$ bin/spark-submit -c spark.app.name=NAME examples/src/main/python/pi.py 2>&1 | grep app_name | jq
{
  "ts": "2024-11-07T08:08:23.685Z",
  "level": "INFO",
  "msg": "Submitted application: PythonPi",
  "context": {
    "app_name": "PythonPi"
  },
  "logger": "SparkContext"
}
```

**AFTER**
```
$ bin/spark-submit -c spark.submit.appName=NAME examples/src/main/python/pi.py 2>&1 | grep app_name | jq
{
  "ts": "2024-11-07T08:09:02.084Z",
  "level": "INFO",
  "msg": "Submitted application: NAME",
  "context": {
    "app_name": "NAME"
  },
  "logger": "SparkContext"
}
```

### Does this PR introduce _any_ user-facing change?

No, this is a new configuration.

### How was this patch tested?

Pass the CIs with a newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48788 from dongjoon-hyun/SPARK-50222-3.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Improve logging for RocksDB lock acquire/release cases

### Why are the changes needed?
Improve log formatting for lock acquire/release scenarios

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests

Sample logs when task context is not available:
```
sql/core/target/unit-tests.log:3728150:15:17:27.982 pool-1-thread-1-ScalaTest-running-RocksDBSuite INFO RocksDB [Thread-17]: RocksDB instance was acquired by ownerThread=[ThreadId: Some(17)] for opType=close_store
sql/core/target/unit-tests.log:3728172:15:17:27.985 pool-1-thread-1-ScalaTest-running-RocksDBSuite INFO RocksDB [Thread-17]: RocksDB instance was released by releaseThread=[ThreadId: Some(17)] with ownerThread=[ThreadId: Some(17)] for opType=close_store
```

Sample logs when task context is available:
```
sql/core/target/unit-tests.log:3771705:15:23:03.517 Executor task launch worker for task 0.0 in stage 45.0 (TID 136) INFO RocksDB StateStoreId(opId=0,partId=0,name=default): RocksDB instance was acquired by ownerThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136] for opType=load_store
```
```
sql/core/target/unit-tests.log:3771832:15:23:03.781 Executor task launch worker for task 0.0 in stage 45.0 (TID 136) INFO RocksDB StateStoreId(opId=0,partId=0,name=default): RocksDB instance was released by releaseThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136] with ownerThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136] for opType=load_store
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48806 from anishshri-db/task/SPARK-50273.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…hStateInPandas

### What changes were proposed in this pull request?

- Added custom state metrics for TransformWithStateInPandas.
- Clean up TTL properly.

### Why are the changes needed?

Bring parity with Scala TransformWithState.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Python unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48808 from bogao007/state-metrics.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Improve `TargetEncoder.fit` to be based on DataFrame APIs

### Why are the changes needed?
1, simplify the implementation;
2, with DataFrame APIs, it will benefit from the optimization from Spark SQL

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48797 from zhengruifeng/target_encoder_fit.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…OutputStream

### What changes were proposed in this pull request?

`DirectByteBufferOutputStream#close()` calls `StorageUtils.dispose()` to free its direct byte buffer. This puts the object into an unspecified and dangerous state after being closed, and can cause unpredictable JVM crashes if it the object is used after close.

This PR makes this safer by modifying `close()` to place the object into a known-closed state, and modifying all methods to assert not closed.

To minimize the performance impact from the extra checks, this PR also changes `DirectByteBufferOutputStream#buffer` from `private` to `private[this]`, which should produce more efficient direct field accesses.

### Why are the changes needed?

Improves debuggability for users of DirectByteBufferOutputStream such as PythonRunner.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a test in DirectByteBufferOutputStreamSuite to verify that use after close throws IllegalStateException.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48807 from ankurdave/SPARK-50274-DirectByteBufferOutputStream-checkNotClosed.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…hods

### What changes were proposed in this pull request?
This PR makes the following changes:
- It adds shims for a couple of a number of classes exposed in the (classic) SQL interface:  `BaseRelation` , `ExperimentalMethods`, `ExecutionListenerManager`, `SharedState`, `SessionState`, `SparkConf`, `SparkSessionExtensions`, `QueryExecution`, and `SQLContext`.
- It adds all public methods involving these classes. For classic they will just work like before. For connect they will throw `SparkUnsupportedOperationExceptions` when used.
- It reduces the visibility of a couple of classes added recently: `DataSourceRegistration`, and `UDTFRegistration`.
- I have also reorganized all the shims into a single class.

Please note that this is by no means reflects the final state:
- We intent to support `SQLContext`.
- We intent to support 'SparkSession.executeCommand`.
- We are thinking about removing `ExperimentalMethods`, `SharedState`, `SessionState` from the public interface.
- For `QueryExecution`, and `ExecutionListenerManager` we are considering adding a plan representation similar that is not tied to Catalyst.

### Why are the changes needed?
We are creating a shared Scala (JVM) SQL interface for both Classic and Connect.

### Does this PR introduce _any_ user-facing change?
It adds unusable public methods to the connect interface.

### How was this patch tested?
I have added tests that checks if the connect client throws the proper exceptions.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48687 from hvanhovell/SPARK-50102.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
### What changes were proposed in this pull request?
We missed a couple of methods when we introduced the `DataStreamWriter` interface. This PR adds them back.

### Why are the changes needed?
`DataStreamWriter` interface must have all user facing methods.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48796 from hvanhovell/SPARK-50264.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.