From 96c394f231099599900d1750e4bc84851620aa1a Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 15 Nov 2019 13:27:30 +0900 Subject: [PATCH] [SPARK-29376][SQL][PYTHON] Upgrade Apache Arrow to version 0.15.1 Upgrade Apache Arrow to version 0.15.1. This includes Java artifacts and increases the minimum required version of PyArrow also. Version 0.12.0 to 0.15.1 includes the following selected fixes/improvements relevant to Spark users: * ARROW-6898 - [Java] Fix potential memory leak in ArrowWriter and several test classes * ARROW-6874 - [Python] Memory leak in Table.to_pandas() when conversion to object dtype * ARROW-5579 - [Java] shade flatbuffer dependency * ARROW-5843 - [Java] Improve the readability and performance of BitVectorHelper#getNullCount * ARROW-5881 - [Java] Provide functionalities to efficiently determine if a validity buffer has completely 1 bits/0 bits * ARROW-5893 - [C++] Remove arrow::Column class from C++ library * ARROW-5970 - [Java] Provide pointer to Arrow buffer * ARROW-6070 - [Java] Avoid creating new schema before IPC sending * ARROW-6279 - [Python] Add Table.slice method or allow slices in \_\_getitem\_\_ * ARROW-6313 - [Format] Tracking for ensuring flatbuffer serialized values are aligned in stream/files. * ARROW-6557 - [Python] Always return pandas.Series from Array/ChunkedArray.to_pandas, propagate field names to Series from RecordBatch, Table * ARROW-2015 - [Java] Use Java Time and Date APIs instead of JodaTime * ARROW-1261 - [Java] Add container type for Map logical type * ARROW-1207 - [C++] Implement Map logical type Changelog can be seen at https://arrow.apache.org/release/0.15.0.html Upgrade to get bug fixes, improvements, and maintain compatibility with future versions of PyArrow. No Existing tests, manually tested with Python 3.7, 3.8 Closes #26133 from BryanCutler/arrow-upgrade-015-SPARK-29376. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- dev/deps/spark-deps-hadoop-palantir | 7 +++---- pom.xml | 4 ++-- python/pyspark/sql/utils.py | 6 +++++- python/setup.py | 2 +- .../spark/sql/execution/arrow/ArrowConverters.scala | 8 ++++---- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-palantir b/dev/deps/spark-deps-hadoop-palantir index bdeecbabe0f38..41cfa70fbc02b 100644 --- a/dev/deps/spark-deps-hadoop-palantir +++ b/dev/deps/spark-deps-hadoop-palantir @@ -10,9 +10,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.12.0.jar -arrow-memory-0.12.0.jar -arrow-vector-0.12.0.jar +arrow-format-0.15.1.jar +arrow-memory-0.15.1.jar +arrow-vector-0.15.1.jar audience-annotations-0.7.0.jar automaton-1.11-8.jar avro-1.8.2.jar @@ -87,7 +87,6 @@ hibernate-validator-5.2.4.Final.jar hk2-api-2.5.0-b32.jar hk2-locator-2.5.0-b32.jar hk2-utils-2.5.0-b32.jar -hppc-0.7.2.jar htrace-core4-4.1.0-incubating.jar httpclient-4.5.6.jar httpcore-4.4.10.jar diff --git a/pom.xml b/pom.xml index ca37f9d0e73f5..623ae764b4ca5 100644 --- a/pom.xml +++ b/pom.xml @@ -219,9 +219,9 @@ 1.0.0 - 0.12.0 + 0.15.1 1.13.0 diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 709d3a0642616..ab6c3f4c998f4 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -136,9 +136,10 @@ def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. - minimum_pyarrow_version = "0.12.1" + minimum_pyarrow_version = "0.15.1" from distutils.version import LooseVersion + import os try: import pyarrow have_arrow = True @@ -150,6 +151,9 @@ def require_minimum_pyarrow_version(): if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version): raise ImportError("PyArrow >= %s must be installed; however, " "your version was %s." % (minimum_pyarrow_version, pyarrow.__version__)) + if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1": + raise RuntimeError("Arrow legacy IPC format is not supported in PySpark, " + "please unset ARROW_PRE_0_15_IPC_FORMAT") def require_test_compiled(): diff --git a/python/setup.py b/python/setup.py index 503c721d5a534..36e47869f17c2 100644 --- a/python/setup.py +++ b/python/setup.py @@ -104,7 +104,7 @@ def _supports_symlinks(): # For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the # binary format protocol with the Java version, see ARROW_HOME/format/* for specifications. _minimum_pandas_version = "0.19.2" -_minimum_pyarrow_version = "0.12.1" +_minimum_pyarrow_version = "0.15.1" try: # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 884dc8c6215ff..d6a9b033bd924 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -26,7 +26,7 @@ import org.apache.arrow.flatbuf.MessageHeader import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel} -import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer} +import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer} import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD @@ -63,7 +63,7 @@ private[sql] class ArrowBatchStreamWriter( * End the Arrow stream, does not close output stream. */ def end(): Unit = { - ArrowStreamWriter.writeEndOfStream(writeChannel) + ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption) } } @@ -250,8 +250,8 @@ private[sql] object ArrowConverters { // Only care about RecordBatch messages, skip Schema and unsupported Dictionary messages if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) { - // Buffer backed output large enough to hold the complete serialized message - val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength) + // Buffer backed output large enough to hold 8-byte length + complete serialized message + val bbout = new ByteBufferOutputStream(8 + msgMetadata.getMessageLength + bodyLength) // Write message metadata to ByteBuffer output stream MessageSerializer.writeMessageBuffer(