RDB Loader: add 2nd gen load manifest table (close #366)
chuwy committed Apr 2, 2021
1 parent d992a1d commit 90f5bd1
Expand Up @@ -53,13 +53,18 @@ object LoaderMessage {
final case object JSON extends Format
// Another options can be Parquet and InAtomic for Snowflake-like structure

def fromString(str: String): Either[String, Format] =
str match {
case "TSV" => TSV.asRight
case "JSON" => JSON.asRight
case _ => s"$str is unexpected format. TSV and JSON are possible options".asLeft

implicit val loaderMessageFormatEncoder: Encoder[Format] =
implicit val loaderMessageFormatDecoder: Decoder[Format] =
Decoder.instance { c =>[String].map(_.toUpperCase) match {
case Right("TSV") => Format.TSV.asRight
case Right("JSON") => Format.JSON.asRight
case Right(other) => DecodingFailure(s"$other is unexpected format", c.history).asLeft
case Right(str) => fromString(str).leftMap(err => DecodingFailure(err, c.history))
case Left(error) => error.asLeft
} }
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@
package com.snowplowanalytics.snowplow.rdbloader.common

final case class Message[F[_], A](data: A, ack: F[Unit])
final case class Message[F[_], A](data: A, ack: F[Unit]) {
def map[B](f: A => B): Message[F, B] =
Message(f(data), ack)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object LoaderError {
implicit val loaderErrorShow: Show[LoaderError] = {
case d: DiscoveryError => "Data discovery error with following issues:\n" +"\n")
case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}"
case l: StorageTargetError => s"Data loading error ${l.message}"
case l: StorageTargetError => s"Database error: ${l.message}"
case l: RuntimeError => s"Internal Exception ${l.message}"

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cats.effect.{IOApp, IO, ExitCode, Resource}

import fs2.Stream

import com.snowplowanalytics.snowplow.rdbloader.db.Manifest
import com.snowplowanalytics.snowplow.rdbloader.dsl.{JDBC, Environment, Logging}
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
Expand Down Expand Up @@ -65,21 +66,22 @@ object Main extends IOApp {
def process(cli: CliConfig, env: Environment[IO]): Stream[IO, Unit] = {
import env._[IO](cli.config, env.state)
.evalMap { message =>
val jdbc: Resource[IO, JDBC[IO]] = env.makeBusy *>
SSH.resource[IO]( *>
JDBC.interpreter[IO](, cli.dryRun, env.blocker)
Stream.eval_(Manifest.initialize[IO](, cli.dryRun, env.blocker)) ++[IO](cli.config, env.state)
.evalMap { discovery =>
val jdbc: Resource[IO, JDBC[IO]] = env.makeBusy *>
SSH.resource[IO]( *>
JDBC.interpreter[IO](, cli.dryRun, env.blocker)

val action = jdbc.use { implicit conn => load[IO](cli, message) *> env.incrementLoaded }
val action = jdbc.use { implicit conn => load[IO](cli, discovery) *> env.incrementLoaded }

// Make sure that stream is never interrupted
action.recoverWith {
case NonFatal(e) =>
Logging[IO].error(s"Fatal failure during message processing (base ${}), message hasn't been ack'ed. ${e.getMessage}")
// Make sure that stream is never interrupted
action.recoverWith {
case NonFatal(e) =>
Logging[IO].error(s"Fatal failure during message processing (base ${}), message hasn't been ack'ed. ${e.getMessage}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.snowplowanalytics.snowplow.rdbloader.db

import cats.{Functor, Monad}
import cats.implicits._

import com.snowplowanalytics.snowplow.rdbloader.dsl.JDBC
import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError}

import com.snowplowanalytics.iglu.schemaddl.redshift.generators.DdlFile
import com.snowplowanalytics.iglu.schemaddl.redshift.{AlterTable, RenameTo}

/** Set of common functions to control DB entities */
object Control {
def renameTable[F[_]: Functor: JDBC](schema: String, from: String, to: String): LoaderAction[F, Unit] = {
val alterTable = DdlFile(List(AlterTable(s"$schema.$from", RenameTo(to))))

def tableExists[F[_]: Functor: JDBC](dbSchema: String, tableName: String): LoaderAction[F, Boolean] =
JDBC[F].executeQuery[Boolean](Statement.TableExists(dbSchema, tableName)).leftMap(annotateError(dbSchema, tableName))

def schemaExists[F[_]: Functor: JDBC](dbSchema: String): LoaderAction[F, Boolean] =

/** List all columns in the table */
def getColumns[F[_]: Monad: JDBC](dbSchema: String, tableName: String): LoaderAction[F, List[String]] =
for {
_ <- JDBC[F].executeUpdate(Statement.SetSchema(dbSchema))
columns <- JDBC[F].executeQueryList[String](Statement.GetColumns(tableName)).leftMap(annotateError(dbSchema, tableName))
} yield columns

def annotateError(dbSchema: String, tableName: String)(error: LoaderError): LoaderError =
error match {
case LoaderError.StorageTargetError(message) =>
LoaderError.StorageTargetError(s"$dbSchema.$tableName. " ++ message)
case other =>
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.snowplowanalytics.snowplow.rdbloader.db

import java.time.Instant

import cats.{Functor, Monad}
import cats.implicits._

import cats.effect.{Timer, Async, Blocker, ContextShift}

import doobie.implicits.javatimedrivernative._

import com.snowplowanalytics.iglu.schemaddl.redshift._

import com.snowplowanalytics.snowplow.rdbloader._
import com.snowplowanalytics.snowplow.rdbloader.LoaderAction
import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, JDBC, AWS}

object Manifest {

val Name = "manifest"
val LegacyName = "manifest_legacy"

private[db] val Columns = List(
Column("base", RedshiftVarchar(512), Set(CompressionEncoding(ZstdEncoding)),Set(Nullability(NotNull),KeyConstaint(PrimaryKey))),



private val LegacyColumns = List(

/** Add `schema` to otherwise static definition of manifest table */
def getManifestDef(schema: String): CreateTable =
Set(Diststyle(Key), DistKeyTable("base"), SortKeyTable(None,"ingestion_tstamp")))

def initialize[F[_]: Async: ContextShift: Logging: Timer: AWS](target: StorageTarget, dryRun: Boolean, blocker: Blocker): F[Unit] = {
JDBC.interpreter[F](target, dryRun, blocker).use { implicit jdbc =>
setup[F](target.schema).value.flatMap {
case Right(InitStatus.Created) =>
Logging[F].info("The manifest table has been created")
case Right(InitStatus.Migrated) =>
Logging[F].info(s"The new manifest table has been created, legacy 0.1.0 manifest can be found at $LegacyName and can be deleted manually")
case Right(InitStatus.NoChanges) =>
case Left(error) =>
Logging[F].error(s"Fatal error has happened during manifest table initialization") *>
Async[F].raiseError(new IllegalStateException(

def setup[F[_]: Monad: JDBC](schema: String): LoaderAction[F, InitStatus] =
for {
_ <- Control.schemaExists[F](schema).flatMap {
case true =>
case false =>
val msg = s"Database schema $schema does not exist. Please, create the schema and events table"
LoaderAction.raiseError[F, Unit](LoaderError.StorageTargetError(msg))
exists <- Control.tableExists[F](schema, Name)
status <- if (exists) for {
columns <- Control.getColumns[F](schema, Name)
legacy = columns.toSet === LegacyColumns.toSet
status <- if (legacy)
Control.renameTable[F](schema, Name, LegacyName) *>
LoaderAction.pure[F, InitStatus](InitStatus.NoChanges)
} yield status else create[F](schema).as(InitStatus.Created)
_ <- status match {
case InitStatus.Migrated | InitStatus.Created =>
JDBC[F].executeUpdate(Statement.CommentOn(CommentOn(s"$schema.$Name", "0.2.0")))
case _ =>
} yield status

def add[F[_]: Functor: JDBC](schema: String, message: LoaderMessage.ShreddingComplete): LoaderAction[F, Unit] =
JDBC[F].executeUpdate(Statement.ManifestAdd(schema, message)).void

def get[F[_]: Functor: JDBC](schema: String, base: S3.Folder): LoaderAction[F, Option[Entry]] =
JDBC[F].executeQueryOption[Entry](Statement.ManifestGet(schema, base))

/** Create manifest table */
def create[F[_]: Functor: JDBC](schema: String): LoaderAction[F, Unit] =

case class Entry(ingestion: Instant, meta: LoaderMessage.ShreddingComplete)

sealed trait InitStatus extends Product with Serializable
object InitStatus {
case object NoChanges extends InitStatus
case object Migrated extends InitStatus
case object Created extends InitStatus
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
package com.snowplowanalytics.snowplow.rdbloader.db

import cats.{Functor, Monad}
import cats.Monad
import cats.implicits._

Expand All @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.schemaddl.StringUtils
import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, Migration => DMigration, SchemaList => DSchemaList}
import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlGenerator, MigrationGenerator}

import com.snowplowanalytics.snowplow.rdbloader.{ LoaderAction, LoaderError, ActionOps }
import com.snowplowanalytics.snowplow.rdbloader.{ LoaderAction, ActionOps }
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, DiscoveryFailure, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, Iglu, JDBC}

Expand All @@ -40,11 +40,11 @@ object Migration {
schemas <- EitherT(Iglu[F].getSchemas(vendor, name, model))
tableName = StringUtils.getTableName(SchemaMap(SchemaKey(vendor, name, "jsonschema", SchemaVer.Full(model, 0, 0))))
_ <- for {
exists <- tableExists[F](dbSchema, tableName)
exists <- Control.tableExists[F](dbSchema, tableName)
_ <- if (exists) for {
schemaKey <- getVersion[F](dbSchema, tableName)
matches = schemas.latest.schemaKey == schemaKey
columns <- getColumns[F](dbSchema, tableName)
columns <- Control.getColumns[F](dbSchema, tableName)
_ <- if (matches) LoaderAction.unit[F] else updateTable[F](dbSchema, schemaKey, columns, schemas)
} yield () else createTable[F](dbSchema, tableName, schemas)
} yield ()
Expand All @@ -54,12 +54,9 @@ object Migration {

/** Find the latest schema version in the table and confirm that it is the latest in `schemas` */
def getVersion[F[_]: Monad: JDBC](dbSchema: String, tableName: String): LoaderAction[F, SchemaKey] =
JDBC[F].executeQuery[SchemaKey](Statement.GetVersion(dbSchema, tableName)).leftMap(annotateError(dbSchema, tableName))
JDBC[F].executeQuery[SchemaKey](Statement.GetVersion(dbSchema, tableName)).leftMap(Control.annotateError(dbSchema, tableName))

/** Check if table exists in `dbSchema` */
def tableExists[F[_]: Functor: JDBC](dbSchema: String, tableName: String): LoaderAction[F, Boolean] =
JDBC[F].executeQuery[Boolean](Statement.TableExists(dbSchema, tableName)).leftMap(annotateError(dbSchema, tableName))

def createTable[F[_]: Monad: Logging: JDBC](dbSchema: String, name: String, schemas: DSchemaList): LoaderAction[F, Unit] = {
val subschemas = FlatSchema.extractProperties(schemas)
val tableName = StringUtils.getTableName(schemas.latest)
Expand Down Expand Up @@ -89,19 +86,4 @@ object Migration {
case s: DSchemaList.Single =>
Logging[F].info(s"Warning: updateTable executed for a table with known single schema [${s.schema.self.schemaKey.toSchemaUri}]\ncolumns: ${columns.mkString(", ")}\nstate: $state").liftA

/** List all columns in the table */
def getColumns[F[_]: Monad: JDBC](dbSchema: String, tableName: String): LoaderAction[F, List[String]] =
for {
_ <- JDBC[F].executeUpdate(Statement.SetSchema(dbSchema))
columns <- JDBC[F].executeQueryList[String](Statement.GetColumns(tableName)).leftMap(annotateError(dbSchema, tableName))
} yield columns

private def annotateError(dbSchema: String, tableName: String)(error: LoaderError): LoaderError =
error match {
case LoaderError.StorageTargetError(message) =>
LoaderError.StorageTargetError(s"$dbSchema.$tableName. " ++ message)
case other =>
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ package com.snowplowanalytics.snowplow.rdbloader.db

import doobie.Fragment
import doobie.implicits._
import doobie.implicits.javatimedrivernative._

import io.circe.syntax._

import com.snowplowanalytics.iglu.schemaddl.redshift

import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Common}
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Common}
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Compression
import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType
Expand Down Expand Up @@ -135,13 +138,16 @@ object Statement {
// Migration
case class TableExists(schema: String, tableName: String) extends Statement {
def toFragment: Fragment =
| FROM pg_tables
| WHERE schemaname = $schema
| AND tablename = $tableName) AS exists;
| FROM pg_tables
| WHERE schemaname = $schema
| AND tablename = $tableName)
| AS exists""".stripMargin
case class SchemaExists(schema: String) extends Statement {
def toFragment: Fragment =
sql"SELECT schema_name FROM information_schema.schemata WHERE schema_name = $schema"
case class GetVersion(schema: String, tableName: String) extends Statement {
def toFragment: Fragment =
Expand All @@ -164,6 +170,26 @@ object Statement {
sql"""SELECT "column" FROM PG_TABLE_DEF WHERE tablename = $tableName"""

// Manifest
case class ManifestAdd(schema: String, message: LoaderMessage.ShreddingComplete) extends Statement {
def toFragment: Fragment = {
val types = if (message.types.isEmpty) None else Some(message.types.asJson.noSpaces)
sql"""INSERT INTO $schema.manifest " +
(base, types, shredding_started, shredding_completed,
min_collector_tstamp, max_collector_tstamp, ingestion_tstamp,
compression, processor_artifact, processor_version)
VALUES (${message.base} $types, ${message.timestamps.jobStarted}, ${message.timestamps.jobCompleted},
${message.timestamps.min}, ${message.timestamps.max}, getdate(),
${message.compression.asString}, ${message.processor.artifact}, ${message.processor.version})"""
case class ManifestGet(schema: String, base: S3.Folder) extends Statement {
def toFragment: Fragment =
sql"""SELECT ingestion_tstamp, base, types, shredding_started, shredding_completed,
min_collector_tstamp, max_collector_tstamp, ingestion_tstamp,
compression, processor_artifact, processor_version FROM $schema.manifest WHERE base = $base"""

// Schema DDL
case class CreateTable(ddl: redshift.CreateTable) extends Statement {
def toFragment: Fragment =
Expand All @@ -174,8 +200,10 @@ object Statement {
case class DdlFile(ddl: redshift.generators.DdlFile) extends Statement {
def toFragment: Fragment =
Fragment.const0(ddl.render.split("\n").filterNot(l => l.startsWith("--") || l.isBlank).mkString("\n"))
def toFragment: Fragment = {
val str = ddl.render.split("\n").filterNot(l => l.startsWith("--") || l.isBlank).mkString("\n")

private def getCompressionFormat(compression: Config.Shredder.Compression): Fragment =
Expand Down

