Skip to content

Commit

Permalink
handle Parquet REPEATED schema evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed Feb 25, 2021
1 parent 0d2609b commit 6c00ecb
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 39 deletions.
1 change: 1 addition & 0 deletions parquet/src/main/scala/magnolify/parquet/ParquetType.scala
Expand Up @@ -381,6 +381,7 @@ object ParquetField {
}
override def start(): Unit = ()
override def end(): Unit = addValue(arrayConverter.get)
override def get: C[T] = get(_.headOption.getOrElse(fc.build(Nil)))
}
} else {
arrayConverter
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/main/scala/magnolify/parquet/Schema.scala
Expand Up @@ -91,7 +91,10 @@ private object Schema {
}
if (ff.isPrimitive) Some(rf) else Some(prune(ff.asGroupType(), rf.asGroupType()))
} else {
if (rf.getRepetition != Repetition.OPTIONAL) {
if (
rf.isRepetition(Repetition.REQUIRED) &&
rf.getLogicalTypeAnnotation != LogicalTypeAnnotation.listType()
) {
throw new InvalidRecordException(
s"${rf.getRepetition} field ${rf.getName} missing in file schema"
)
Expand Down
Expand Up @@ -17,7 +17,6 @@
package magnolify.parquet.test

import magnolify.parquet._
import magnolify.parquet.ParquetArray.AvroCompat._
import magnolify.shims.JavaConverters._
import magnolify.test._
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
Expand Down Expand Up @@ -51,19 +50,11 @@ object SchemaEvolutionSuite {
(v1, v2)
}

val aliasesSchema: Schema = {
val schema = Schema.createRecord("Aliases", "", namespace, false)
val aliases =
new Schema.Field("aliases", Schema.createArray(Schema.create(Schema.Type.STRING)), "", null)
schema.setFields(List(aliases).asJava)
schema
}

/*
V2 adds 3 fields
- Nested nullable string field "location.zip"
- Top level nullable string field "email"
- Top level nullable record "aliases" with a repeated string field "aliases"
- Top level repeated string field "aliases"
*/
val (userSchema1, userSchema2): (Schema, Schema) = {
def id = new Schema.Field("id", Schema.create(Schema.Type.LONG), "", null)
Expand All @@ -78,9 +69,9 @@ object SchemaEvolutionSuite {
val email = new Schema.Field("email", nullableString, "", nullVal)
val aliases = new Schema.Field(
"aliases",
Schema.createUnion(List(Schema.create(Schema.Type.NULL), aliasesSchema).asJava),
Schema.createArray(Schema.create(Schema.Type.STRING)),
"",
nullVal
java.util.Collections.emptyList()
)
v2.setFields(List(id, name, location2, email, aliases).asJava)

Expand All @@ -97,13 +88,6 @@ object SchemaEvolutionSuite {
.set("zip", zip)
.build()

def avroAliases(aliases: Seq[String]): GenericRecord =
if (aliases.isEmpty) {
null
} else {
new GenericRecordBuilder(aliasesSchema).set("aliases", aliases.asJava).build()
}

def avroUser1(id: Long, name: String, country: String, state: String): GenericRecord =
new GenericRecordBuilder(userSchema1)
.set("id", id)
Expand All @@ -125,19 +109,18 @@ object SchemaEvolutionSuite {
.set("name", name)
.set("location", avroLocation2(country, state, zip))
.set("email", email)
.set("aliases", avroAliases(aliases))
.set("aliases", aliases.asJava)
.build()

case class Location1(country: String, state: String)
case class Location2(country: String, state: String, zip: Option[String])
case class Aliases(aliases: Seq[String])
case class User1(id: Long, name: String, location: Location1)
case class User2(
id: Long,
name: String,
location: Location2,
email: Option[String],
aliases: Option[Aliases]
aliases: Seq[String]
)

val avro1: Seq[GenericRecord] = Seq(
Expand All @@ -155,10 +138,10 @@ object SchemaEvolutionSuite {
)

val avro1as2: Seq[GenericRecord] = Seq(
avroUser2(0, "Alice", "US", "NY", null, null, Nil),
avroUser2(1, "Bob", "US", "NJ", null, null, Nil),
avroUser2(2, "Carol", "US", "CT", null, null, Nil),
avroUser2(3, "Dan", "US", "MA", null, null, Nil)
avroUser2(0, "Alice", "US", "NY", null, null, null),
avroUser2(1, "Bob", "US", "NJ", null, null, null),
avroUser2(2, "Carol", "US", "CT", null, null, null),
avroUser2(3, "Dan", "US", "MA", null, null, null)
)

val scala1: Seq[User1] = Seq(
Expand All @@ -174,18 +157,18 @@ object SchemaEvolutionSuite {
"Alice",
Location2("US", "NY", Some("12345")),
Some("alice@aol.com"),
Some(Aliases(Seq("Ada", "Ana")))
Seq("Ada", "Ana")
),
User2(1, "Bob", Location2("US", "NJ", None), None, None),
User2(2, "Carol", Location2("US", "CT", None), Some("carol@aol.com"), None),
User2(3, "Dan", Location2("US", "MA", Some("54321")), None, None)
User2(1, "Bob", Location2("US", "NJ", None), None, Nil),
User2(2, "Carol", Location2("US", "CT", None), Some("carol@aol.com"), Nil),
User2(3, "Dan", Location2("US", "MA", Some("54321")), None, Nil)
)

val scala1as2: Seq[User2] = Seq(
User2(0, "Alice", Location2("US", "NY", None), None, None),
User2(1, "Bob", Location2("US", "NJ", None), None, None),
User2(2, "Carol", Location2("US", "CT", None), None, None),
User2(3, "Dan", Location2("US", "MA", None), None, None)
User2(0, "Alice", Location2("US", "NY", None), None, Nil),
User2(1, "Bob", Location2("US", "NJ", None), None, Nil),
User2(2, "Carol", Location2("US", "CT", None), None, Nil),
User2(3, "Dan", Location2("US", "MA", None), None, Nil)
)
}

Expand Down Expand Up @@ -245,6 +228,16 @@ class SchemaEvolutionSuite extends MagnolifySuite {
private val scalaBytes1 = writeScala[User1](scala1)
private val scalaBytes2 = writeScala[User2](scala2)

private val scalaCompatBytes1 = {
import magnolify.parquet.ParquetArray.AvroCompat._
writeScala[User1](scala1)
}

private val scalaCompatBytes2 = {
import magnolify.parquet.ParquetArray.AvroCompat._
writeScala[User2](scala2)
}

//////////////////////////////////////////////////

test("Avro V1 => Avro V1") {
Expand Down Expand Up @@ -283,37 +276,67 @@ class SchemaEvolutionSuite extends MagnolifySuite {

//////////////////////////////////////////////////

test("Scala Compat V1 => Scala Compat V1") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User1](scalaCompatBytes1), scala1)
}

test("Scala Compat V2 => Scala Compat V2") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User2](scalaCompatBytes2), scala2)
}

test("Scala Compat V1 => Scala Compat V2") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User2](scalaCompatBytes1), scala1as2)
}

test("Scala Compat V2 => Scala Compat V1") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User1](scalaCompatBytes2), scala1)
}

//////////////////////////////////////////////////

test("Avro V1 => Scala V1") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User1](avroBytes1), scala1)
}

test("Avro V2 => Scala V2") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User2](avroBytes2), scala2)
}

test("Avro V1 => Scala V2") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User2](avroBytes1), scala1as2)
}

test("Avro V2 => Scala V1") {
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readScala[User1](avroBytes2), scala1)
}

//////////////////////////////////////////////////

test("Scala V1 => Avro V1") {
assertEquals(readAvro(scalaBytes1, userSchema1), avro1)
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readAvro(scalaCompatBytes1, userSchema1), avro1)
}

test("Scala V2 => Avro V2") {
assertEquals(readAvro(scalaBytes2, userSchema2), avro2)
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readAvro(scalaCompatBytes2, userSchema2), avro2)
}

test("Scala V1 => Avro V2") {
assertEquals(readAvro(scalaBytes1, userSchema2), avro1as2)
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readAvro(scalaCompatBytes1, userSchema2), avro1as2)
}

test("Scala V2 => Avro V1") {
assertEquals(readAvro(scalaBytes2, userSchema1), avro1)
import magnolify.parquet.ParquetArray.AvroCompat._
assertEquals(readAvro(scalaCompatBytes2, userSchema1), avro1)
}
}

0 comments on commit 6c00ecb

Please sign in to comment.