Skip to content

Commit

Permalink
[SPARK-27588] Binary file data source fails fast and doesn't attempt …
Browse files Browse the repository at this point in the history
…to read very large files

If a file is too big (>2GB), we should fail fast and do not try to read the file.

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#24483 from mengxr/SPARK-27588.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
  • Loading branch information
mengxr authored and Will Manning committed Jan 9, 2020
1 parent 47948fc commit 6eedfd1
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,14 @@ object SQLConf {
"a SparkConf entry.")
.booleanConf
.createWithDefault(true)

val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")
.doc("The max length of a file that can be read by the binary file data source. " +
"Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
"The theoretical max is Int.MaxValue, though VMs might implement a smaller max.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -99,6 +101,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
val filterFuncs = filters.map(filter => createFilterFunction(filter))
val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)

file: PartitionedFile => {
val path = new Path(file.filePath)
Expand All @@ -115,6 +118,11 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
case (MODIFICATION_TIME, i) =>
writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime))
case (CONTENT, i) =>
if (status.getLen > maxLength) {
throw new SparkException(
s"The length of ${status.getPath} is ${status.getLen}, " +
s"which exceeds the max length allowed: ${maxLength}.")
}
val stream = fs.open(status.getPath)
try {
writer.write(i, ByteStreams.toByteArray(stream))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.mockito.Mockito.{mock, when}

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -339,4 +341,31 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest
assert(df.select("LENGTH").first().getLong(0) === content.length,
"column pruning should be case insensitive")
}

test("fail fast and do not attempt to read if a file is too big") {
assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
withTempPath { file =>
val path = file.getPath
val content = "123".getBytes
Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
def readContent(): DataFrame = {
spark.read.format(BINARY_FILE)
.load(path)
.select(CONTENT)
}
val expected = Seq(Row(content))
QueryTest.checkAnswer(readContent(), expected)
withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> content.length.toString) {
QueryTest.checkAnswer(readContent(), expected)
}
// Disable read. If the implementation attempts to read, the exception would be different.
file.setReadable(false)
val caught = intercept[SparkException] {
withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> (content.length - 1).toString) {
QueryTest.checkAnswer(readContent(), expected)
}
}
assert(caught.getMessage.contains("exceeds the max length allowed"))
}
}
}

0 comments on commit 6eedfd1

Please sign in to comment.