Skip to content

Commit

Permalink
Improve foldMapMergeRedshiftSchemas signature (close #192)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Sep 4, 2023
1 parent f934330 commit bde34b0
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.snowplowanalytics.iglu.schemaddl.redshift

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel.{GoodModel, RecoveryModel}

final case class MergeRedshiftSchemasResult(goodModel: GoodModel, recoveryModels: Map[SchemaKey, RecoveryModel])
Original file line number Diff line number Diff line change
Expand Up @@ -88,35 +88,38 @@ object ShredModelEntry {
if (s == NullCharacter) "\\\\N"
else s.replace('\t', ' ').replace('\n', ' ')

private val extraCols = List(
("schema_vendor", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
("schema_name", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
("schema_format", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
("schema_version", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
("root_id", "CHAR(36)", "ENCODE RAW", "NOT NULL"),
("root_tstamp", "TIMESTAMP", "ENCODE ZSTD", "NOT NULL"),
("ref_root", "VARCHAR(255)", "ENCODE ZSTD", "NOT NULL"),
("ref_tree", "VARCHAR(1500)", "ENCODE ZSTD", "NOT NULL"),
("ref_parent", "VARCHAR(255)", "ENCODE ZSTD", "NOT NULL")
)

/** List of column names common across all shredded tables */
val commonColumnNames: List[String] = extraCols.map(_._1)

sealed trait ColumnType

implicit val showProps: Show[List[ShredModelEntry]] = Show.show(props => {
val colsAsString = props.map(prop =>
(s""""${prop.columnName}"""", prop.columnType.show, prop.compressionEncoding.show, if (prop.isNullable) "" else "NOT NULL")
)
val extraCols = List(
(""""schema_vendor"""", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
(""""schema_name"""", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
(""""schema_format"""", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
(""""schema_version"""", "VARCHAR(128)", "ENCODE ZSTD", "NOT NULL"),
(""""root_id"""", "CHAR(36)", "ENCODE RAW", "NOT NULL"),
(""""root_tstamp"""", "TIMESTAMP", "ENCODE ZSTD", "NOT NULL"),
(""""ref_root"""", "VARCHAR(255)", "ENCODE ZSTD", "NOT NULL"),
(""""ref_tree"""", "VARCHAR(1500)", "ENCODE ZSTD", "NOT NULL"),
(""""ref_parent"""", "VARCHAR(255)", "ENCODE ZSTD", "NOT NULL")
(prop.columnName, prop.columnType.show, prop.compressionEncoding.show, if (prop.isNullable) "" else "NOT NULL")
)
val allCols = extraCols ++ colsAsString
val (mName, mType, mComp) = allCols.foldLeft((0, 0, 0))(
val (mName, mType, mComp) = allCols.foldLeft((2, 0, 0))(
(acc, col) => (
math.max(col._1.length, acc._1),
math.max(col._1.length + 2, acc._1),
math.max(col._2.length, acc._2),
math.max(col._3.length, acc._3),
))
val fmtStr = s" %-${mName}s %${-mType}s %-${mComp}s %s"
val fmtStr = s""" %-${mName}s %${-mType}s %-${mComp}s %s"""

allCols
.map(cols => fmtStr.format(cols._1, cols._2, cols._3, cols._4).replaceAll("""\s+$""", ""))
.map(cols => fmtStr.format(s""""${cols._1}"""", cols._2, cols._3, cols._4).replaceAll("""\s+$""", ""))
.mkString(",\n")
})

Expand Down Expand Up @@ -180,4 +183,4 @@ object ShredModelEntry {
case object ZstdEncoding extends CompressionEncoding
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.data.NonEmptyList
import cats.syntax.option._
import cats.syntax.either._
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel.GoodModel
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel.{GoodModel, RecoveryModel}
import com.snowplowanalytics.iglu.schemaddl.redshift.internal.Migrations

import scala.collection.mutable
Expand All @@ -24,7 +24,7 @@ package object redshift {
): Either[NonEmptyList[Migrations.Breaking], List[Migrations.NonBreaking]] =
src match {
case Nil => Nil.asRight
case ::(head, tl) => getFinalMergedModel(NonEmptyList(head, tl))
case ::(head, tl) => foldMapMergeRedshiftSchemas(NonEmptyList(head, tl)).goodModel
.merge(ShredModel.good(tgt))
.leftMap(_.errors)
.map(_.getMigrationsFor(tgt.self.schemaKey))
Expand All @@ -37,11 +37,6 @@ package object redshift {
def isRedshiftMigrationBreaking(src: IgluSchema, tgt: IgluSchema): Boolean =
assessRedshiftMigration(src, tgt).isLeft

def getFinalMergedModel(schemas: NonEmptyList[IgluSchema]): GoodModel =
foldMapMergeRedshiftSchemas(schemas).values.collectFirst {
case model: GoodModel => model
}.get // first schema always would be there due to Nel, so `get` is safe

/**
* Build a map between schema key and their models.
*
Expand Down Expand Up @@ -69,36 +64,27 @@ package object redshift {
acc
}


/**
* Build a map between schema key and a merged or recovered model. For example if schemas X and Y and mergable, both
* would link to schema XY (product).
*
* @param schemas - ordered list of schemas for the same family
* @return
*/
def foldMapMergeRedshiftSchemas(schemas: NonEmptyList[IgluSchema]): collection.Map[SchemaKey, ShredModel] = {
def foldMapMergeRedshiftSchemas(schemas: NonEmptyList[IgluSchema]): MergeRedshiftSchemasResult = {
val models = schemas.map(ShredModel.good)
var lastGoodModel = models.head
val acc: mutable.Map[SchemaKey, ShredModel] = mutable.Map(models.head.schemaKey -> models.head)
val recoveryModels = mutable.Map.empty[SchemaKey, RecoveryModel]

// first pass to build the mapping between key and accumulated model
models.tail.foreach { model =>
lastGoodModel.merge(model) match {
case Left(badModel) =>
acc.update(model.schemaKey, badModel)
recoveryModels.update(model.schemaKey, badModel)
case Right(mergedModel) =>
acc.update(mergedModel.schemaKey, mergedModel)
lastGoodModel = mergedModel
}
}

// seconds pass to backfill the last model version for initial keys.
acc.map {
case (k, model) => (k, if (model.isInstanceOf[GoodModel])
lastGoodModel
else
model)
}.toMap
MergeRedshiftSchemasResult(lastGoodModel,recoveryModels.toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class ShredModelSpec extends Specification {
}}
}""".schema)

getFinalMergedModel(NonEmptyList.of(s1, s2, s3))
foldMapMergeRedshiftSchemas(NonEmptyList.of(s1, s2, s3)).goodModel
.asRight[RecoveryModel].toTestString must beRight(
"""CREATE TABLE IF NOT EXISTS s.com_acme_example_1 (
| "schema_vendor" VARCHAR(128) ENCODE ZSTD NOT NULL,
Expand Down Expand Up @@ -524,7 +524,7 @@ class ShredModelSpec extends Specification {
}}
}""".schema)

getFinalMergedModel(NonEmptyList.of(s1, s2, s3)).asInstanceOf[GoodModel]
foldMapMergeRedshiftSchemas(NonEmptyList.of(s1, s2, s3)).goodModel
.asRight[RecoveryModel].toTestString must beRight(
"""CREATE TABLE IF NOT EXISTS s.com_acme_example_1 (
| "schema_vendor" VARCHAR(128) ENCODE ZSTD NOT NULL,
Expand Down

0 comments on commit bde34b0

Please sign in to comment.