Skip to content

Commit

Permalink
rdar://87911623: fix a few issues on Parquet complex type support (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao authored and GitHub Enterprise committed Jan 22, 2022
1 parent 2bcda28 commit c942fc1
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 13 deletions.
Expand Up @@ -169,9 +169,8 @@ class ParquetFileFormat
*/
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
schema.forall(f => isBatchReadSupported(conf, f.dataType)) &&
!WholeStageCodegenExec.isTooManyFields(conf, schema)
conf.wholeStageEnabled && ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
!WholeStageCodegenExec.isTooManyFields(conf, schema)
}

private def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match {
Expand Down Expand Up @@ -256,8 +255,7 @@ class ParquetFileFormat
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.map(_.dataType).forall(isBatchReadSupported(sqlConf, _))
ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.util.Locale

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, PrimitiveColumnIO}
import org.apache.parquet.schema._
Expand Down Expand Up @@ -86,11 +88,18 @@ class ParquetToSparkSchemaConverter(
groupColumn: GroupColumnIO,
sparkReadSchema: Option[StructType] = None,
caseSensitive: Boolean = true): ParquetType = {
// First convert the read schema into a map from field name to the field itself, to avoid O(n)
// lookup cost below.
val schemaMapOpt = sparkReadSchema.map { schema =>
schema.map(f => normalizeFieldName(f.name, caseSensitive) -> f).toMap
}

val converted = (0 until groupColumn.getChildrenCount).map { i =>
val field = groupColumn.getChild(i)
var fieldReadType = sparkReadSchema.flatMap { schema =>
schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive)).map(_.dataType)
val fieldFromReadSchema = schemaMapOpt.flatMap { schemaMap =>
schemaMap.get(normalizeFieldName(field.getName, caseSensitive))
}
var fieldReadType = fieldFromReadSchema.map(_.dataType)

// if a field is repeated here then it is neither contained by a `LIST` nor `MAP`
// annotated group (these should've been handled in `convertGroupField`), e.g.:
Expand Down Expand Up @@ -138,9 +147,8 @@ class ParquetToSparkSchemaConverter(
ParquetType(StructType(converted.map(_._1)), groupColumn, converted.map(_._2))
}

private def isSameFieldName(left: String, right: String, caseSensitive: Boolean): Boolean =
if (!caseSensitive) left.equalsIgnoreCase(right)
else left == right
private def normalizeFieldName(name: String, caseSensitive: Boolean): String =
if (caseSensitive) name else name.toLowerCase(Locale.ROOT)

/**
* Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
Expand Down
Expand Up @@ -20,7 +20,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.ParquetFileWriter

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

object ParquetUtils {
def inferSchema(
Expand Down Expand Up @@ -107,6 +108,30 @@ object ParquetUtils {
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
}

/**
* Whether columnar read is supported for the input `schema`.
*/
def isBatchReadSupportedForSchema(sqlConf: SQLConf, schema: StructType): Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
schema.forall(f => isBatchReadSupported(sqlConf, f.dataType))

def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match {
case _: AtomicType =>
true
case at: ArrayType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
isBatchReadSupported(sqlConf, at.elementType)
case mt: MapType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
isBatchReadSupported(sqlConf, mt.keyType) &&
isBatchReadSupported(sqlConf, mt.valueType)
case st: StructType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
st.fields.forall(f => isBatchReadSupported(sqlConf, f.dataType))
case _ =>
false
}

case class FileTypes(
data: Seq[FileStatus],
metadata: Seq[FileStatus],
Expand Down
Expand Up @@ -66,8 +66,8 @@ case class ParquetPartitionReaderFactory(
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields)
private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
private val enableVectorizedReader: Boolean = sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
private val enableVectorizedReader: Boolean =
ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
private val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
private val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._

abstract class ParquetFileFormatSuite
extends QueryTest
Expand Down Expand Up @@ -63,6 +64,43 @@ abstract class ParquetFileFormatSuite
}.getCause
assert(exception.getMessage().contains("Could not read footer for file"))
}

test("support batch reads for schema") {
val testUDT = new TestUDT.MyDenseVectorUDT
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> enabled.toString) {
Seq(
Seq(StructField("f1", IntegerType), StructField("f2", BooleanType)) -> true,
Seq(StructField("f1", IntegerType), StructField("f2", ArrayType(IntegerType))) -> enabled,
Seq(StructField("f1", BooleanType), StructField("f2", testUDT)) -> false,
).foreach { case (schema, expected) =>
assert(ParquetUtils.isBatchReadSupportedForSchema(conf, StructType(schema)) == expected)
}
}
}
}

test("support batch reads for data type") {
val testUDT = new TestUDT.MyDenseVectorUDT
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> enabled.toString) {
Seq(
IntegerType -> true,
BooleanType -> true,
ArrayType(TimestampType) -> enabled,
StructType(Seq(StructField("f1", DecimalType.SYSTEM_DEFAULT),
StructField("f2", StringType))) -> enabled,
MapType(keyType = LongType, valueType = DateType) -> enabled,
testUDT -> false,
ArrayType(testUDT) -> false,
StructType(Seq(StructField("f1", ByteType), StructField("f2", testUDT))) -> false,
MapType(keyType = testUDT, valueType = BinaryType) -> false
).foreach { case (dt, expected) =>
assert(ParquetUtils.isBatchReadSupported(conf, dt) == expected)
}
}
}
}
}

class ParquetFileFormatV1Suite extends ParquetFileFormatSuite {
Expand Down

0 comments on commit c942fc1

Please sign in to comment.