Skip to content

Commit

Permalink
Cast to parquet without FieldValue intermediate
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and oguzhanunlu committed Oct 13, 2023
1 parent 568a2c6 commit a216771
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright (c) 2021-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.implicits._
import cats.data.{ValidatedNel, Validated}
import cats.Semigroup

import java.time.{Instant, LocalDate}
import java.time.format.DateTimeFormatter
import CastError._

trait Caster[A] {

def nullValue: A
def jsonValue(v: Json): A
def stringValue(v: String): A
def booleanValue(v: Boolean): A
def intValue(v: Int): A
def longValue(v: Long): A
def doubleValue(v: Double): A
def decimalValue(unscaled: BigInt, details: Type.Decimal): A
def dateValue(v: LocalDate): A
def timestampValue(v: Instant): A
def structValue(vs: List[Caster.NamedValue[A]]): A
def arrayValue(vs: List[A]): A
}

object Caster {

case class NamedValue[A](name: String, value: A)

/** Result of (Schema, JSON) -> A transformation */
type Result[A] = ValidatedNel[CastError, A]

/**
* Turn JSON value into Parquet-compatible row, matching schema defined in `field`
* Top-level function, called only one columns
* Performs following operations in order to prevent runtime insert failure:
* * removes unexpected additional properties
* * turns all unexpected types into string
*/
def cast[A](caster: Caster[A], field: Field, value: Json): Result[A] =
value match {
case Json.Null =>
if (field.nullability.nullable) caster.nullValue.validNel
else if (field.fieldType === Type.Json) caster.jsonValue(value).validNel
else WrongType(value, field.fieldType).invalidNel
case nonNull =>
castNonNull(caster, field.fieldType, nonNull)
}

private def castNonNull[A](caster: Caster[A], fieldType: Type, value: Json): Result[A] =
fieldType match {
case Type.Json => castJson(caster, value)
case Type.String => castString(caster, value)
case Type.Boolean => castBoolean(caster, value)
case Type.Integer => castInteger(caster, value)
case Type.Long => castLong(caster, value)
case Type.Double => castDouble(caster, value)
case d: Type.Decimal => castDecimal(caster, d, value)
case Type.Timestamp => castTimestamp(caster, value)
case Type.Date => castDate(caster, value)
case s: Type.Struct => castStruct(caster, s, value)
case a: Type.Array => castArray(caster, a, value)
}

private def castJson[A](caster: Caster[A], value: Json): Result[A] =
caster.jsonValue(value).validNel

private def castString[A](caster: Caster[A], value: Json): Result[A] =
value.asString.fold(WrongType(value, Type.String).invalidNel[A])(caster.stringValue(_).validNel)

private def castBoolean[A](caster: Caster[A], value: Json): Result[A] =
value.asBoolean.fold(WrongType(value, Type.Boolean).invalidNel[A])(caster.booleanValue(_).validNel)

private def castInteger[A](caster: Caster[A], value: Json): Result[A] =
value.asNumber.flatMap(_.toInt).fold(WrongType(value, Type.Integer).invalidNel[A])(caster.intValue(_).validNel)

private def castLong[A](caster: Caster[A], value: Json): Result[A] =
value.asNumber.flatMap(_.toLong).fold(WrongType(value, Type.Long).invalidNel[A])(caster.longValue(_).validNel)

private def castDouble[A](caster: Caster[A], value: Json): Result[A] =
value.asNumber.fold(WrongType(value, Type.Double).invalidNel[A])(num => caster.doubleValue(num.toDouble).validNel)

private def castDecimal[A](caster: Caster[A], decimalType: Type.Decimal, value: Json): Result[A] =
value.asNumber.flatMap(_.toBigDecimal).fold(WrongType(value, decimalType).invalidNel[A]) {
bigDec =>
Either.catchOnly[java.lang.ArithmeticException](bigDec.setScale(decimalType.scale, BigDecimal.RoundingMode.UNNECESSARY)) match {
case Right(scaled) if bigDec.precision <= Type.DecimalPrecision.toInt(decimalType.precision) =>
caster.decimalValue(scaled.underlying.unscaledValue, decimalType).validNel
case _ =>
WrongType(value, decimalType).invalidNel
}
}

private def castTimestamp[A](caster: Caster[A], value: Json): Result[A] =
value.asString
.flatMap(s => Either.catchNonFatal(DateTimeFormatter.ISO_DATE_TIME.parse(s)).toOption)
.flatMap(a => Either.catchNonFatal(Instant.from(a)).toOption)
.fold(WrongType(value, Type.Timestamp).invalidNel[A])(caster.timestampValue(_).validNel)

private def castDate[A](caster: Caster[A], value: Json): Result[A] =
value.asString
.flatMap(s => Either.catchNonFatal(DateTimeFormatter.ISO_LOCAL_DATE.parse(s)).toOption)
.flatMap(a => Either.catchNonFatal(LocalDate.from(a)).toOption)
.fold(WrongType(value, Type.Date).invalidNel[A])(caster.dateValue(_).validNel)

private def castArray[A](caster: Caster[A], array: Type.Array, value: Json): Result[A] =
value.asArray match {
case Some(values) => values
.toList
.map {
case Json.Null =>
if (array.nullability.nullable) caster.nullValue.validNel
else if (array.element === Type.Json) caster.jsonValue(Json.Null).validNel
else WrongType(Json.Null, array.element).invalidNel
case nonNull => castNonNull(caster, array.element, nonNull)
}
.sequence[Result, A]
.map(caster.arrayValue(_))
case None => WrongType(value, array).invalidNel
}


private def castStruct[A](caster: Caster[A], struct: Type.Struct, value: Json): Result[A] =
value.asObject match {
case None =>
WrongType(value, struct).invalidNel[A]
case Some(obj) =>
val map = obj.toMap
struct.fields
.map(castStructField(caster, _, map))
.sequence[Result, NamedValue[A]]
.map(caster.structValue(_))
}


// Used internally as part of `castStructField`
private case class CastAccumulate[A](result: Option[ValidatedNel[CastError,A]], observedNull: Boolean)

private implicit def semigroupCastAccumulate[A]: Semigroup[CastAccumulate[A]] = new Semigroup[CastAccumulate[A]] {
def combine(x: CastAccumulate[A], y: CastAccumulate[A]): CastAccumulate[A] = {
val result = (x.result, y.result) match {
case (None, r) => r
case (r, None) => r
case (Some(Validated.Valid(f)), Some(Validated.Invalid(_))) => Some(Validated.Valid(f))
case (Some(Validated.Invalid(_)), Some(Validated.Valid(f))) => Some(Validated.Valid(f))
case (Some(Validated.Valid(f)), Some(Validated.Valid(_))) =>
// We are forced to pick on or the other
Some(Validated.Valid(f))
case (Some(Validated.Invalid(f)), Some(Validated.Invalid(_))) =>
Some(Validated.Invalid(f))
}
CastAccumulate(result, x.observedNull || y.observedNull)
}
}

/** Part of `castStruct`, mapping sub-fields of a JSON object into `FieldValue`s */
private def castStructField[A](caster: Caster[A], field: Field, jsonObject: Map[String, Json]): ValidatedNel[CastError, NamedValue[A]] = {
val ca = field.accessors
.toList
.map { name =>
jsonObject.get(name) match {
case Some(Json.Null) => CastAccumulate[A](None, true)
case None => CastAccumulate[A](None, true)
case Some(json) => CastAccumulate(cast(caster, field, json).some, false)
}
}
.reduce(_ |+| _)

ca match {
case CastAccumulate(Some(Validated.Invalid(_)), true) if field.nullability.nullable =>
NamedValue(Field.normalizeName(field), caster.nullValue).validNel
case CastAccumulate(None, true) if field.nullability.nullable =>
NamedValue(Field.normalizeName(field), caster.nullValue).validNel
case CastAccumulate(None, _) =>
MissingInValue(field.name, Json.fromFields(jsonObject)).invalidNel
case CastAccumulate(Some(Validated.Invalid(f)), _) =>
Validated.Invalid(f)
case CastAccumulate(Some(Validated.Valid(f)), _) =>
Validated.Valid(NamedValue(Field.normalizeName(field), f))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.implicits._
import cats.data.{ValidatedNel, Validated}

import java.time.Instant
import java.time.format.DateTimeFormatter
import CastError._
import java.time.{Instant, LocalDate}

/** Run-time value, conforming [[Field]] (type) */
@deprecated("Use `Caster` instead", "0.20.0")
sealed trait FieldValue extends Product with Serializable

@deprecated("Use `Caster` instead", "0.20.0")
object FieldValue {
case object NullValue extends FieldValue
case class JsonValue(value: Json) extends FieldValue
Expand All @@ -39,6 +37,27 @@ object FieldValue {
/* Part of [[StructValue]] */
case class NamedValue(name: String, value: FieldValue)

private val caster: Caster[FieldValue] = new Caster[FieldValue] {
def nullValue: FieldValue = NullValue
def jsonValue(v: Json): FieldValue = JsonValue(v)
def stringValue(v: String): FieldValue = StringValue(v)
def booleanValue(v: Boolean): FieldValue = BooleanValue(v)
def intValue(v: Int): FieldValue = IntValue(v)
def longValue(v: Long): FieldValue = LongValue(v)
def doubleValue(v: Double): FieldValue = DoubleValue(v)
def decimalValue(unscaled: BigInt, details: Type.Decimal): FieldValue =
DecimalValue(BigDecimal(unscaled, details.scale), details.precision)
def dateValue(v: LocalDate): FieldValue = DateValue(java.sql.Date.valueOf(v))
def timestampValue(v: Instant): FieldValue = TimestampValue(java.sql.Timestamp.from(v))
def structValue(vs: List[Caster.NamedValue[FieldValue]]): FieldValue =
StructValue {
vs.map {
case Caster.NamedValue(n, v) => NamedValue(n, v)
}
}
def arrayValue(vs: List[FieldValue]): FieldValue = ArrayValue(vs)
}

/**
* Turn JSON value into Parquet-compatible row, matching schema defined in `field`
* Top-level function, called only one columns
Expand All @@ -47,124 +66,5 @@ object FieldValue {
* * turns all unexpected types into string
*/
def cast(field: Field)(value: Json): CastResult =
value match {
case Json.Null =>
if (field.nullability.nullable) NullValue.validNel
else if (field.fieldType === Type.Json) JsonValue(value).validNel
else WrongType(value, field.fieldType).invalidNel
case nonNull =>
castNonNull(field.fieldType)(nonNull)
}

/**
* Turn primitive JSON or JSON object into Parquet row
*/
private def castNonNull(fieldType: Type): Json => CastResult =
fieldType match {
case Type.Json => castJson
case Type.String => castString
case Type.Boolean => castBoolean
case Type.Integer => castInteger
case Type.Long => castLong
case Type.Double => castDouble
case d: Type.Decimal => castDecimal(d)
case Type.Timestamp => castTimestamp
case Type.Date => castDate
case s: Type.Struct => castStruct(s)
case a: Type.Array => castArray(a)
}

private def castString(value: Json): CastResult =
value.asString.fold(WrongType(value, Type.String).invalidNel[FieldValue])(StringValue(_).validNel)

private def castBoolean(value: Json): CastResult =
value.asBoolean.fold(WrongType(value, Type.Boolean).invalidNel[FieldValue])(BooleanValue(_).validNel)

private def castInteger(value: Json): CastResult =
value.asNumber.flatMap(_.toInt).fold(WrongType(value, Type.Integer).invalidNel[FieldValue])(IntValue(_).validNel)

private def castLong(value: Json): CastResult =
value.asNumber.flatMap(_.toLong).fold(WrongType(value, Type.Long).invalidNel[FieldValue])(LongValue(_).validNel)

private def castDouble(value: Json): CastResult =
value.asNumber.fold(WrongType(value, Type.Double).invalidNel[FieldValue])(num => DoubleValue(num.toDouble).validNel)

private def castDecimal(decimalType: Type.Decimal)(value: Json): CastResult =
value.asNumber.flatMap(_.toBigDecimal).fold(WrongType(value, decimalType).invalidNel[FieldValue]) {
bigDec =>
Either.catchOnly[java.lang.ArithmeticException](bigDec.setScale(decimalType.scale, BigDecimal.RoundingMode.UNNECESSARY)) match {
case Right(scaled) if bigDec.precision <= Type.DecimalPrecision.toInt(decimalType.precision) =>
DecimalValue(scaled, decimalType.precision).validNel
case _ =>
WrongType(value, decimalType).invalidNel
}
}

private def castTimestamp(value: Json): CastResult =
value.asString
.flatMap(s => Either.catchNonFatal(DateTimeFormatter.ISO_DATE_TIME.parse(s)).toOption)
.flatMap(a => Either.catchNonFatal(java.sql.Timestamp.from(Instant.from(a))).toOption)
.fold(WrongType(value, Type.Timestamp).invalidNel[FieldValue])(TimestampValue(_).validNel)

private def castDate(value: Json): CastResult =
value.asString
.flatMap(s => Either.catchNonFatal(java.sql.Date.valueOf(s)).toOption)
.fold(WrongType(value, Type.Date).invalidNel[FieldValue])(DateValue(_).validNel)

private def castJson(value: Json): CastResult =
JsonValue(value).validNel

private def castArray(array: Type.Array)(value: Json): CastResult =
value.asArray match {
case Some(values) => values
.toList
.map {
case Json.Null =>
if (array.nullability.nullable) NullValue.validNel
else if (array.element === Type.Json) JsonValue(Json.Null).validNel
else WrongType(Json.Null, array.element).invalidNel
case nonNull => castNonNull(array.element)(nonNull)
}
.sequence[ValidatedNel[CastError, *], FieldValue]
.map(ArrayValue.apply)
case None => WrongType(value, array).invalidNel
}

private def castStruct(struct: Type.Struct)(value: Json): CastResult =
value.asObject match {
case None =>
WrongType(value, struct).invalidNel[FieldValue]
case Some(obj) =>
val map = obj.toMap
struct.fields
.map(castStructField(_, map))
.sequence[ValidatedNel[CastError, *], NamedValue]
.map(StructValue.apply)
}

/** Part of `castStruct`, mapping sub-fields of a JSON object into `FieldValue`s */
private def castStructField(field: Field, jsonObject: Map[String, Json]): ValidatedNel[CastError, NamedValue] =
field.accessors
.toList
.map { name =>
jsonObject.get(name) match {
case Some(json) => cast(field)(json).map(NamedValue(Field.normalizeName(field), _))
case None =>
field.nullability match {
case Type.Nullability.Nullable => NamedValue(Field.normalizeName(field), NullValue).validNel
case Type.Nullability.Required => MissingInValue(field.name, Json.fromFields(jsonObject)).invalidNel
}
}
}
.reduce[ValidatedNel[CastError, NamedValue]] {
case (Validated.Valid(f), Validated.Invalid(_)) => Validated.Valid(f)
case (Validated.Invalid(_), Validated.Valid(f)) => Validated.Valid(f)
case (Validated.Valid(f), Validated.Valid(NamedValue(_, NullValue))) => Validated.Valid(f)
case (Validated.Valid(NamedValue(_, NullValue)), Validated.Valid(f)) => Validated.Valid(f)
case (Validated.Valid(f), Validated.Valid(_)) =>
// We must non-deterministically pick on or the other
Validated.Valid(f)
case (Validated.Invalid(f), Validated.Invalid(_)) =>
Validated.Invalid(f)
}
Caster.cast(caster, field, value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ package object parquet {
private[parquet] type Suggestion = Schema => Option[Field.NullableType]

/** Result of (Schema, JSON) -> Row transformation */
@deprecated("Use `Cast.Result` instead", "0.20.0")
type CastResult = ValidatedNel[CastError, FieldValue]
}

0 comments on commit a216771

Please sign in to comment.