Skip to content

Commit

Permalink
Merge a738be5 into 256a8f7
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 19, 2024
2 parents 256a8f7 + a738be5 commit 85d6eb6
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,11 @@ private[schemaddl] object Mutate {

val additionalProperties = schema.`type`.map(_.asUnion.value) match {
case Some(set) if set.contains(CommonProperties.Type.Object) =>
Some(ObjectProperty.AdditionalProperties.AdditionalPropertiesAllowed(false))
case _ => None
if (hasDefinedProperties(schema))
Some(ObjectProperty.AdditionalProperties.AdditionalPropertiesAllowed(false))
else
schema.additionalProperties
case _ => schema.additionalProperties
}

schema.copy(
Expand All @@ -283,4 +286,12 @@ private[schemaddl] object Mutate {
)
}

private def hasDefinedProperties(schema: Schema): Boolean =
schema.properties match {
case Some(p) if p.value.nonEmpty =>
true
case _ =>
(schema.oneOf.iterator.flatMap(_.value) ++ schema.anyOf.iterator.flatMap(_.value)).exists(hasDefinedProperties)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.implicits._
import cats.data.{ValidatedNel, Validated}
import cats.data.{NonEmptyVector, ValidatedNel, Validated}
import cats.Semigroup

import java.time.{Instant, LocalDate}
Expand All @@ -33,8 +33,8 @@ trait Caster[A] {
def decimalValue(unscaled: BigInt, details: Type.Decimal): A
def dateValue(v: LocalDate): A
def timestampValue(v: Instant): A
def structValue(vs: List[Caster.NamedValue[A]]): A
def arrayValue(vs: List[A]): A
def structValue(vs: NonEmptyVector[Caster.NamedValue[A]]): A
def arrayValue(vs: Vector[A]): A
}

object Caster {
Expand Down Expand Up @@ -120,7 +120,6 @@ object Caster {
private def castArray[A](caster: Caster[A], array: Type.Array, value: Json): Result[A] =
value.asArray match {
case Some(values) => values
.toList
.map {
case Json.Null =>
if (array.nullability.nullable) caster.nullValue.validNel
Expand Down Expand Up @@ -170,7 +169,7 @@ object Caster {
/** Part of `castStruct`, mapping sub-fields of a JSON object into `FieldValue`s */
private def castStructField[A](caster: Caster[A], field: Field, jsonObject: Map[String, Json]): ValidatedNel[CastError, NamedValue[A]] = {
val ca = field.accessors
.toList
.iterator
.map { name =>
jsonObject.get(name) match {
case Some(Json.Null) => CastAccumulate[A](None, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
*/
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.data.NonEmptyVector
import cats.implicits._

import com.snowplowanalytics.iglu.schemaddl.StringUtils
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.{ArrayProperty, CommonProperties}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.{ArrayProperty, CommonProperties, ObjectProperty}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.mutate.Mutate

case class Field(name: String,
Expand All @@ -28,19 +31,17 @@ object Field {
fieldType: Type,
nullability: Type.Nullability): Field = Field(name, fieldType, nullability, Set(name))

def build(name: String, topSchema: Schema, enforceValuePresence: Boolean): Field = {
val constructedType = buildType(Mutate.forStorage(topSchema))
val nullability = isFieldNullable(constructedType.nullability, enforceValuePresence)

Field(name, constructedType.value, nullability)
}

def buildRepeated(name: String, itemSchema: Schema, enforceItemPresence: Boolean, nullability: Type.Nullability): Field = {
val constructedType = buildType(Mutate.forStorage(itemSchema))
val itemNullability = isFieldNullable(constructedType.nullability, enforceItemPresence)
def build(name: String, topSchema: Schema, enforceValuePresence: Boolean): Option[Field] =
buildType(Mutate.forStorage(topSchema)).map { constructedType =>
val nullability = isFieldNullable(constructedType.nullability, enforceValuePresence)
Field(name, constructedType.value, nullability)
}

Field(name, Type.Array(constructedType.value, itemNullability), nullability)
}
def buildRepeated(name: String, itemSchema: Schema, enforceItemPresence: Boolean, nullability: Type.Nullability): Option[Field] =
buildType(Mutate.forStorage(itemSchema)).map { constructedType =>
val itemNullability = isFieldNullable(constructedType.nullability, enforceItemPresence)
Field(name, Type.Array(constructedType.value, itemNullability), nullability)
}

def normalize(field: Field): Field = {
val fieldType = field.fieldType match {
Expand All @@ -53,20 +54,18 @@ object Field {
field.copy(name = normalizeName(field), fieldType = fieldType)
}

private def collapseDuplicateFields(normFields: List[Field]): List[Field] = {
private def collapseDuplicateFields(normFields: NonEmptyVector[Field]): NonEmptyVector[Field] = {
val endMap = normFields
.groupBy(_.name)
.map { case (key, fs) =>
// Use `min` to deterministically pick the same accessor each time when choosing the type
val lowest = fs.minBy(f => f.accessors.min)
(key, lowest.copy(accessors = fs.flatMap(_.accessors).toSet))
val lowest = fs.minimumBy(f => f.accessors.min)
(key, lowest.copy(accessors = fs.iterator.flatMap(_.accessors).toSet))
}
normFields
.map(_.name)
.distinct
.foldLeft(List.empty[Field])(
(acc, name) => acc :+ endMap(name)
)
.map(endMap(_))
}

private[parquet] def normalizeName(field: Field): String =
Expand Down Expand Up @@ -100,55 +99,69 @@ object Field {
}
}

private def buildType(topSchema: Schema): NullableType = {
private def buildType(topSchema: Schema): Option[NullableType] = {
topSchema.`type` match {
case Some(types) if types.possiblyWithNull(CommonProperties.Type.Object) =>
NullableType(
value = buildObjectType(topSchema),
nullability = JsonNullability.extractFrom(types)
)
buildObjectType(topSchema).map { objectType =>
NullableType(
value = objectType,
nullability = JsonNullability.extractFrom(types)
)
}

case Some(types) if types.possiblyWithNull(CommonProperties.Type.Array) =>
NullableType(
value = buildArrayType(topSchema),
nullability = JsonNullability.extractFrom(types)
)
buildArrayType(topSchema).map { arrayType =>
NullableType(
value = arrayType,
nullability = JsonNullability.extractFrom(types)
)
}

case _ =>
provideSuggestions(topSchema) match {
case Some(matchingSuggestion) => matchingSuggestion
case None => jsonType(topSchema)
case Some(matchingSuggestion) => Some(matchingSuggestion)
case None => Some(jsonType(topSchema))
}
}
}

private def buildObjectType(topSchema: Schema): Type = {
val subfields = topSchema.properties.map(_.value).getOrElse(Map.empty)
if (subfields.nonEmpty) {
val requiredKeys = topSchema.required.toList.flatMap(_.value)
val structFields = subfields
.toList
.map { case (key, schema) =>
val isSubfieldRequired = requiredKeys.contains(key)
val constructedType = buildType(schema)
val nullability = isFieldNullable(constructedType.nullability, isSubfieldRequired)
Field(key, constructedType.value, nullability)
private def buildObjectType(topSchema: Schema): Option[Type] = {
val requiredKeys = topSchema.required.toList.flatMap(_.value)
val subfields = topSchema.properties
.iterator
.flatMap(_.value.toList)
.flatMap { case (key, schema) =>
buildType(schema).map(key -> _)
}.map { case (key, constructedType) =>
val isSubfieldRequired = requiredKeys.contains(key)
val nullability = isFieldNullable(constructedType.nullability, isSubfieldRequired)
Field(key, constructedType.value, nullability)
}
.toVector
.sortBy(_.name)

NonEmptyVector.fromVector(subfields) match {
case Some(nel) =>
Some(Type.Struct(nel))
case None =>
topSchema.additionalProperties match {
case Some(ObjectProperty.AdditionalProperties.AdditionalPropertiesAllowed(false)) =>
None
case _ =>
Some(Type.Json)
}
.sortBy(_.name)
Type.Struct(structFields)
} else {
Type.Json
}
}

private def buildArrayType(topSchema: Schema): Type.Array = {
private def buildArrayType(topSchema: Schema): Option[Type.Array] = {
topSchema.items match {
case Some(ArrayProperty.Items.ListItems(schema)) =>
val typeOfArrayItem = buildType(schema)
val nullability = isFieldNullable(typeOfArrayItem.nullability, true)
Type.Array(typeOfArrayItem.value, nullability)
buildType(schema).map { typeOfArrayItem =>
val nullability = isFieldNullable(typeOfArrayItem.nullability, true)
Type.Array(typeOfArrayItem.value, nullability)
}
case _ =>
Type.Array(Type.Json, Type.Nullability.Nullable)
Some(Type.Array(Type.Json, Type.Nullability.Nullable))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.data.NonEmptyVector

import java.time.{Instant, LocalDate}

Expand All @@ -32,8 +33,8 @@ object FieldValue {
case class DecimalValue(value: BigDecimal, precision: Type.DecimalPrecision) extends FieldValue
case class TimestampValue(value: java.sql.Timestamp) extends FieldValue
case class DateValue(value: java.sql.Date) extends FieldValue
case class StructValue(values: List[NamedValue]) extends FieldValue
case class ArrayValue(values: List[FieldValue]) extends FieldValue
case class StructValue(values: Vector[NamedValue]) extends FieldValue
case class ArrayValue(values: Vector[FieldValue]) extends FieldValue
/* Part of [[StructValue]] */
case class NamedValue(name: String, value: FieldValue)

Expand All @@ -49,13 +50,13 @@ object FieldValue {
DecimalValue(BigDecimal(unscaled, details.scale), details.precision)
def dateValue(v: LocalDate): FieldValue = DateValue(java.sql.Date.valueOf(v))
def timestampValue(v: Instant): FieldValue = TimestampValue(java.sql.Timestamp.from(v))
def structValue(vs: List[Caster.NamedValue[FieldValue]]): FieldValue =
def structValue(vs: NonEmptyVector[Caster.NamedValue[FieldValue]]): FieldValue =
StructValue {
vs.map {
vs.toVector.map {
case Caster.NamedValue(n, v) => NamedValue(n, v)
}
}
def arrayValue(vs: List[FieldValue]): FieldValue = ArrayValue(vs)
def arrayValue(vs: Vector[FieldValue]): FieldValue = ArrayValue(vs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.Show
import cats.data.NonEmptyVector
import cats.syntax.all._
import com.snowplowanalytics.iglu.schemaddl.parquet.Type.{Array, Struct}

Expand Down Expand Up @@ -85,16 +86,16 @@ object Migrations {
// Comparing struct target fields to the source. This will detect additions.
val reverseMigration = targetFields.map(tgtField => MigrationFieldPair(tgtField.name :: path, tgtField, sourceStruct.focus(tgtField.name)).migrations)

migrations ++= forwardMigration.flatMap(_.migrations)
migrations ++= forwardMigration.iterator.flatMap(_.migrations)

migrations ++= reverseMigration.flatMap(_.migrations.flatMap {
migrations ++= reverseMigration.iterator.flatMap(_.migrations.flatMap {
case KeyRemoval(path, value) => List(KeyAddition(path, value))
case _ => Nil // discard the modifications as they would have been detected in forward migration
})

val tgtFields = reverseMigration.traverse(_.result).toList.flatten
val tgtFields = reverseMigration.toVector.traverse(_.result).toVector.flatten
val tgtFieldNames = tgtFields.map(_.name)
val allSrcFields = forwardMigration.traverse(_.result).toList.flatten
val allSrcFields = forwardMigration.toVector.traverse(_.result).toVector.flatten
val allSrcFieldMap = allSrcFields.map(f => f.name -> f).toMap
// swap fields in src and target as they would be rearranged in nested structs or arrays
val reorderedTgtFields = tgtFields.map { t =>
Expand All @@ -104,13 +105,15 @@ object Migrations {
case _ => t
}
}
val srcFields = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
val srcFields: Vector[Field] = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
// drop not null constrains from removed fields.
_.copy(nullability = Type.Nullability.Nullable)
)

// failed migration would produce no fields in source
if (allSrcFields.isEmpty) None else Type.Struct(reorderedTgtFields ++ srcFields).some
NonEmptyVector.fromVector(reorderedTgtFields ++ srcFields).map { nonEmpty =>
Type.Struct(nonEmpty)
}

case _ => addIncompatibleType()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.Eq
import cats.data.NonEmptyVector

sealed trait Type extends Product with Serializable

Expand All @@ -26,7 +27,7 @@ object Type {
case class Decimal(precision: DecimalPrecision, scale: Int) extends Type
case object Date extends Type
case object Timestamp extends Type
case class Struct(fields: List[Field]) extends Type
case class Struct(fields: NonEmptyVector[Field]) extends Type
case class Array(element: Type, nullability: Nullability) extends Type

/* Fallback type for when json schema does not map to a parquet primitive type (e.g. unions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,7 @@ class MutateSpec extends org.specs2.Specification {
val expected = SpecHelpers.parseSchema(
"""
|{
|"type": "object",
|"additionalProperties": false
|"type": "object"
|}
""".stripMargin)
Mutate.forStorage(input) must_== expected
Expand Down

0 comments on commit 85d6eb6

Please sign in to comment.