Skip to content

Commit

Permalink
[SPARK-38273][SQL] decodeUnsafeRows's iterators should close underl…
Browse files Browse the repository at this point in the history
…ying input streams

### What changes were proposed in this pull request?
Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a NextIterator as opposed to a plain Iterator, this will allow us to close the DataInputStream properly. This happens in Spark driver only.

### Why are the changes needed?
SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of `CompressionCodec.compressedInputStream` would need to manually close the stream as this would no longer be handled by the finaliser mechanism.

In SparkPlan, the result of `CompressionCodec.compressedInputStream` is wrapped in an Iterator which never calls close.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

#### Spark Shell Configuration
```bash
$> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g"
$> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd
```

#### Test Script
```scala
import java.sql.Timestamp
import java.time.Instant
import spark.implicits._

case class Record(timestamp: Timestamp, batch: Long, value: Long)

(1 to 300).foreach { batch =>
  sc.parallelize(1 to 1000000).map(Record(Timestamp.from(Instant.now()), batch, _)).toDS.write.parquet(s"test_data/batch_$batch")
}

(1 to 300).foreach(batch => spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect())

```

#### Memory Monitor
```shell
$> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done;
```

#### Results

##### Before
```
"2022-02-22 11:55:23",1400016
"2022-02-22 11:55:33",1522024
"2022-02-22 11:55:43",1587812
"2022-02-22 11:55:53",1631868
"2022-02-22 11:56:03",1657252
"2022-02-22 11:56:13",1659728
"2022-02-22 11:56:23",1664640
"2022-02-22 11:56:33",1674152
"2022-02-22 11:56:43",1697320
"2022-02-22 11:56:53",1689636
"2022-02-22 11:57:03",1783888
"2022-02-22 11:57:13",1896920
"2022-02-22 11:57:23",1950492
"2022-02-22 11:57:33",2010968
"2022-02-22 11:57:44",2066560
"2022-02-22 11:57:54",2108232
"2022-02-22 11:58:04",2158188
"2022-02-22 11:58:14",2211344
"2022-02-22 11:58:24",2260180
"2022-02-22 11:58:34",2316352
"2022-02-22 11:58:44",2367412
"2022-02-22 11:58:54",2420916
"2022-02-22 11:59:04",2472132
"2022-02-22 11:59:14",2519888
"2022-02-22 11:59:24",2571372
"2022-02-22 11:59:34",2621992
"2022-02-22 11:59:44",2672400
"2022-02-22 11:59:54",2728924
"2022-02-22 12:00:04",2777712
"2022-02-22 12:00:14",2834272
"2022-02-22 12:00:24",2881344
"2022-02-22 12:00:34",2935552
"2022-02-22 12:00:44",2984896
"2022-02-22 12:00:54",3034116
"2022-02-22 12:01:04",3087092
"2022-02-22 12:01:14",3134432
"2022-02-22 12:01:25",3198316
"2022-02-22 12:01:35",3193484
"2022-02-22 12:01:45",3193212
"2022-02-22 12:01:55",3192872
"2022-02-22 12:02:05",3191772
"2022-02-22 12:02:15",3187780
"2022-02-22 12:02:25",3177084
"2022-02-22 12:02:35",3173292
"2022-02-22 12:02:45",3173292
"2022-02-22 12:02:55",3173292
```

##### After
```
"2022-02-22 12:05:03",1377124
"2022-02-22 12:05:13",1425132
"2022-02-22 12:05:23",1564060
"2022-02-22 12:05:33",1616116
"2022-02-22 12:05:43",1637448
"2022-02-22 12:05:53",1637700
"2022-02-22 12:06:03",1653912
"2022-02-22 12:06:13",1659532
"2022-02-22 12:06:23",1673368
"2022-02-22 12:06:33",1687580
"2022-02-22 12:06:43",1711076
"2022-02-22 12:06:53",1849752
"2022-02-22 12:07:03",1861528
"2022-02-22 12:07:13",1871200
"2022-02-22 12:07:24",1878860
"2022-02-22 12:07:34",1879332
"2022-02-22 12:07:44",1886552
"2022-02-22 12:07:54",1884160
"2022-02-22 12:08:04",1880924
"2022-02-22 12:08:14",1876084
"2022-02-22 12:08:24",1878800
"2022-02-22 12:08:34",1879068
"2022-02-22 12:08:44",1880088
"2022-02-22 12:08:54",1880160
"2022-02-22 12:09:04",1880496
"2022-02-22 12:09:14",1891672
"2022-02-22 12:09:24",1878552
"2022-02-22 12:09:34",1876136
"2022-02-22 12:09:44",1890056
"2022-02-22 12:09:54",1878076
"2022-02-22 12:10:04",1882440
"2022-02-22 12:10:14",1893172
"2022-02-22 12:10:24",1894216
"2022-02-22 12:10:34",1894204
"2022-02-22 12:10:44",1894716
"2022-02-22 12:10:54",1894720
"2022-02-22 12:11:04",1894720
"2022-02-22 12:11:15",1895232
"2022-02-22 12:11:25",1895496
"2022-02-22 12:11:35",1895496
```

Closes apache#35613 from kevins-29/spark-38273.

Lead-authored-by: Kevin Sewell <kevins_25@apple.com>
Co-authored-by: kevins-29 <100220899+kevins-29@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 43c89dc)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
2 people authored and sunchao committed Feb 28, 2022
1 parent 67ffb49 commit 6065a0f
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator

object SparkPlan {
/** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. */
Expand Down Expand Up @@ -370,17 +371,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val bis = new ByteArrayInputStream(bytes)
val ins = new DataInputStream(codec.compressedInputStream(bis))

new Iterator[InternalRow] {
new NextIterator[InternalRow] {
private var sizeOfNextRow = ins.readInt()
override def hasNext: Boolean = sizeOfNextRow >= 0
override def next(): InternalRow = {
private def _next(): InternalRow = {
val bs = new Array[Byte](sizeOfNextRow)
ins.readFully(bs)
val row = new UnsafeRow(nFields)
row.pointTo(bs, sizeOfNextRow)
sizeOfNextRow = ins.readInt()
row
}

override def getNext(): InternalRow = {
if (sizeOfNextRow >= 0) {
try {
_next()
} catch {
case t: Throwable if ins != null =>
ins.close()
throw t
}
} else {
finished = true
null
}
}
override def close(): Unit = ins.close()
}
}

Expand Down

0 comments on commit 6065a0f

Please sign in to comment.