Skip to content

Commit

Permalink
[SPARK-29378][R] Upgrade SparkR to use Arrow 0.15 API
Browse files Browse the repository at this point in the history
[[SPARK-29376] Upgrade Apache Arrow to version 0.15.1](apache#26133) upgrades to Arrow 0.15 at Scala/Java/Python. This PR aims to upgrade `SparkR` to use Arrow 0.15 API. Currently, it's broken.

First of all, it turns out that our Jenkins jobs (including PR builder) ignores Arrow test. Arrow 0.15 has a breaking R API changes at [ARROW-5505](https://issues.apache.org/jira/browse/ARROW-5505) and we missed that. AppVeyor was the only one having SparkR Arrow tests but it's broken now.

**Jenkins**
```
Skipped ------------------------------------------------------------------------
1. createDataFrame/collect Arrow optimization (test_sparkSQL_arrow.R#25)
- arrow not installed
```

Second, Arrow throws OOM on AppVeyor environment (Windows JDK8) like the following because it still has Arrow 0.14.
```
Warnings -----------------------------------------------------------------------
1. createDataFrame/collect Arrow optimization (test_sparkSQL_arrow.R#39) - createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.sparkr.enabled' is set to true; however, failed, attempting non-optimization. Reason: Error in handleErrors(returnStatus, conn): java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243)
```

It is due to the version mismatch.
```java
int messageLength = MessageSerializer.bytesToInt(buffer.array());
if (messageLength == IPC_CONTINUATION_TOKEN) {
  buffer.clear();
  // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length
  if (in.readFully(buffer) == 4) {
    messageLength = MessageSerializer.bytesToInt(buffer.array());
  }
}

// Length of 0 indicates end of stream
if (messageLength != 0) {
  // Read the message into the buffer.
  ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength);
```
 After upgrading this to 0.15, we are hitting ARROW-5505. This PR upgrades Arrow version in AppVeyor and fix the issue.

No.

Pass the AppVeyor.

This PR passed here.
- https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/builds/28909044

```
SparkSQL Arrow optimization: Spark package found in SPARK_HOME: C:\projects\spark\bin\..
................
```

Closes apache#26555 from dongjoon-hyun/SPARK-R-TEST.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
dongjoon-hyun authored and rshkv committed Jul 15, 2020
1 parent 01340a2 commit cff4c3f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
for (rdf_slice in rdf_slices) {
batch <- arrow::record_batch(rdf_slice)
if (is.null(stream_writer)) {
stream <- arrow::FileOutputStream(fileName)
stream <- arrow::FileOutputStream$create(fileName)
schema <- batch$schema
stream_writer <- arrow::RecordBatchStreamWriter(stream, schema)
stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema)
}

stream_writer$write_batch(batch)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) {
# for now.
dataLen <- readInt(inputCon)
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
batches <- arrow::RecordBatchStreamReader(arrowData)$batches()
batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()

if (useAsTibble) {
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
Expand Down

0 comments on commit cff4c3f

Please sign in to comment.