Skip to content

Commit

Permalink
validate Parquet reader schema
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed Feb 20, 2021
1 parent 0563278 commit 2aea4e8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
1 change: 1 addition & 0 deletions parquet/src/main/scala/magnolify/parquet/ParquetType.scala
Expand Up @@ -110,6 +110,7 @@ object ParquetType {
parquetType = SerializationUtils.fromBase64(context.getConfiguration.get(ReadTypeKey))
}
val requestedSchema = Schema.message(parquetType.schema)
context.getFileSchema.checkContains(requestedSchema)
new hadoop.ReadSupport.ReadContext(requestedSchema, java.util.Collections.emptyMap())
}

Expand Down
Expand Up @@ -27,6 +27,7 @@ import magnolify.test._
import magnolify.test.Time._
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.io.InvalidRecordException

import scala.reflect.ClassTag

Expand Down Expand Up @@ -75,10 +76,27 @@ class ProjectionPredicateSuite extends MagnolifySuite {
testProjection[ProjectionSubset](t => ProjectionSubset(t.b1, t.i1, t.s1))
testProjection[ProjectionOrdering1](t => ProjectionOrdering1(t.s1, t.i1, t.b1))
testProjection[ProjectionOrdering2](t => ProjectionOrdering2(t.b2, t.b1, t.i2, t.i1))
testProjection[ProjectionRepetition](t => ProjectionRepetition(Some(t.s1), t.o.toList, t.r))
testProjection[ProjectionLogical](t => ProjectionLogical(t.i.toEpochMilli))
}

private def testBadProjection[T: ClassTag](implicit rt: ParquetType[T]): Unit =
test(s"BadProjection.${className[T]}") {
val in = new TestInputFile(bytes)
val reader = rt.readBuilder(in).build()
intercept[InvalidRecordException](reader.read())
}

{
testBadProjection[ProjectionBadName]
testBadProjection[ProjectionBadType]
testBadProjection[ProjectionBadRepetition1]
testBadProjection[ProjectionBadRepetition2]
testBadProjection[ProjectionBadRepetition3]
testBadProjection[ProjectionBadRepetition4]
testBadProjection[ProjectionBadRepetition5]
testBadProjection[ProjectionBadRepetition6]
}

private def testPredicate[T: ClassTag](
name: String,
predicate: FilterPredicate,
Expand Down Expand Up @@ -187,5 +205,13 @@ case class Wide(
case class ProjectionSubset(b1: Boolean, i1: Int, s1: String)
case class ProjectionOrdering1(s1: String, i1: Int, b1: Boolean)
case class ProjectionOrdering2(b2: Boolean, b1: Boolean, i2: Int, i1: Int)
case class ProjectionRepetition(s1: Option[String], o: List[Int], r: List[Int])
case class ProjectionLogical(i: Long)

case class ProjectionBadName(b1: Boolean, i3: Int)
case class ProjectionBadType(b1: Boolean, i1: Long)
case class ProjectionBadRepetition1(b1: Boolean, i1: Option[Int])
case class ProjectionBadRepetition2(b1: Boolean, i1: List[Int])
case class ProjectionBadRepetition3(b1: Boolean, o: Int)
case class ProjectionBadRepetition4(b1: Boolean, o: List[Int])
case class ProjectionBadRepetition5(b1: Boolean, r: Int)
case class ProjectionBadRepetition6(b1: Boolean, r: Option[Int])

0 comments on commit 2aea4e8

Please sign in to comment.