Skip to content

Commit

Permalink
[SPARK-29376][SQL][PYTHON] Upgrade Apache Arrow to version 0.15.1
Browse files Browse the repository at this point in the history
Upgrade Apache Arrow to version 0.15.1. This includes Java artifacts and increases the minimum required version of PyArrow also.

Version 0.12.0 to 0.15.1 includes the following selected fixes/improvements relevant to Spark users:

* ARROW-6898 - [Java] Fix potential memory leak in ArrowWriter and several test classes
* ARROW-6874 - [Python] Memory leak in Table.to_pandas() when conversion to object dtype
* ARROW-5579 - [Java] shade flatbuffer dependency
* ARROW-5843 - [Java] Improve the readability and performance of BitVectorHelper#getNullCount
* ARROW-5881 - [Java] Provide functionalities to efficiently determine if a validity buffer has completely 1 bits/0 bits
* ARROW-5893 - [C++] Remove arrow::Column class from C++ library
* ARROW-5970 - [Java] Provide pointer to Arrow buffer
* ARROW-6070 - [Java] Avoid creating new schema before IPC sending
* ARROW-6279 - [Python] Add Table.slice method or allow slices in \_\_getitem\_\_
* ARROW-6313 - [Format] Tracking for ensuring flatbuffer serialized values are aligned in stream/files.
* ARROW-6557 - [Python] Always return pandas.Series from Array/ChunkedArray.to_pandas, propagate field names to Series from RecordBatch, Table
* ARROW-2015 - [Java] Use Java Time and Date APIs instead of JodaTime
* ARROW-1261 - [Java] Add container type for Map logical type
* ARROW-1207 - [C++] Implement Map logical type

Changelog can be seen at https://arrow.apache.org/release/0.15.0.html

Upgrade to get bug fixes, improvements, and maintain compatibility with future versions of PyArrow.

No

Existing tests, manually tested with Python 3.7, 3.8

Closes apache#26133 from BryanCutler/arrow-upgrade-015-SPARK-29376.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
BryanCutler authored and rshkv committed May 23, 2020
1 parent c2daee4 commit 96c394f
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
7 changes: 3 additions & 4 deletions dev/deps/spark-deps-hadoop-palantir
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.12.0.jar
arrow-memory-0.12.0.jar
arrow-vector-0.12.0.jar
arrow-format-0.15.1.jar
arrow-memory-0.15.1.jar
arrow-vector-0.15.1.jar
audience-annotations-0.7.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
Expand Down Expand Up @@ -87,7 +87,6 @@ hibernate-validator-5.2.4.Final.jar
hk2-api-2.5.0-b32.jar
hk2-locator-2.5.0-b32.jar
hk2-utils-2.5.0-b32.jar
hppc-0.7.2.jar
htrace-core4-4.1.0-incubating.jar
httpclient-4.5.6.jar
httpcore-4.4.10.jar
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@
<commons-crypto.version>1.0.0</commons-crypto.version>
<!--
If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py,
./python/run-tests.py and ./python/setup.py too.
and ./python/setup.py too.
-->
<arrow.version>0.12.0</arrow.version>
<arrow.version>0.15.1</arrow.version>

<!-- Async shuffle upload plugin dependency versions -->
<safe-logging.version>1.13.0</safe-logging.version>
Expand Down
6 changes: 5 additions & 1 deletion python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ def require_minimum_pyarrow_version():
""" Raise ImportError if minimum version of pyarrow is not installed
"""
# TODO(HyukjinKwon): Relocate and deduplicate the version specification.
minimum_pyarrow_version = "0.12.1"
minimum_pyarrow_version = "0.15.1"

from distutils.version import LooseVersion
import os
try:
import pyarrow
have_arrow = True
Expand All @@ -150,6 +151,9 @@ def require_minimum_pyarrow_version():
if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version):
raise ImportError("PyArrow >= %s must be installed; however, "
"your version was %s." % (minimum_pyarrow_version, pyarrow.__version__))
if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1":
raise RuntimeError("Arrow legacy IPC format is not supported in PySpark, "
"please unset ARROW_PRE_0_15_IPC_FORMAT")


def require_test_compiled():
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _supports_symlinks():
# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the
# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications.
_minimum_pandas_version = "0.19.2"
_minimum_pyarrow_version = "0.12.1"
_minimum_pyarrow_version = "0.15.1"

try:
# We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.arrow.flatbuf.MessageHeader
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel}
import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer}
import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer}

import org.apache.spark.TaskContext
import org.apache.spark.api.java.JavaRDD
Expand Down Expand Up @@ -63,7 +63,7 @@ private[sql] class ArrowBatchStreamWriter(
* End the Arrow stream, does not close output stream.
*/
def end(): Unit = {
ArrowStreamWriter.writeEndOfStream(writeChannel)
ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption)
}
}

Expand Down Expand Up @@ -250,8 +250,8 @@ private[sql] object ArrowConverters {
// Only care about RecordBatch messages, skip Schema and unsupported Dictionary messages
if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) {

// Buffer backed output large enough to hold the complete serialized message
val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength)
// Buffer backed output large enough to hold 8-byte length + complete serialized message
val bbout = new ByteBufferOutputStream(8 + msgMetadata.getMessageLength + bodyLength)

// Write message metadata to ByteBuffer output stream
MessageSerializer.writeMessageBuffer(
Expand Down

0 comments on commit 96c394f

Please sign in to comment.