Skip to content

Commit

Permalink
Implement loading via SQS [WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Dec 22, 2020
1 parent 3c2439a commit f873a43
Show file tree
Hide file tree
Showing 25 changed files with 565 additions and 527 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ lazy val loader = project.in(file("modules/loader"))
Dependencies.circeLiteral,
Dependencies.fs2,
Dependencies.schemaDdl,
Dependencies.catsRetry,

Dependencies.redshift,
Dependencies.redshiftSdk,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,24 @@ object LoaderMessageSpec {
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/1-0-0",
"data": {
"base" : "s3://bucket/folder/",
"types" : [
{
"schemaKey" : "iglu:com.acme/event-a/jsonschema/1-0-0",
"format" : "TSV"
}
],
"timestamps" : {
"jobStarted" : "2020-09-17T11:32:21.145Z",
"jobCompleted" : "2020-09-17T11:32:21.145Z",
"min" : null,
"max" : null
},
"processor": {
"artifact" : "test-shredder",
"version" : "1.1.2"
"types" : [
{
"schemaKey" : "iglu:com.acme/event-a/jsonschema/1-0-0",
"format" : "TSV"
}
],
"timestamps" : {
"jobStarted" : "2020-09-17T11:32:21.145Z",
"jobCompleted" : "2020-09-17T11:32:21.145Z",
"min" : null,
"max" : null
},
"processor": {
"artifact" : "test-shredder",
"version" : "1.1.2"
}
}"""
}
}"""

val ValidMessage: LoaderMessage = LoaderMessage.ShreddingComplete(
S3.Folder.coerce("s3://bucket/folder/"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -13,60 +13,47 @@
package com.snowplowanalytics.snowplow.rdbloader

import cats.Show
import cats.data.ValidatedNel
import cats.data.{ValidatedNel, NonEmptyList}

import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure

/** Root error type */
sealed trait LoaderError extends Product with Serializable
sealed trait LoaderError extends Throwable with Product with Serializable {
override def getMessage: String =
LoaderError.loaderErrorShow.show(this)
}

object LoaderError {

implicit object ErrorShow extends Show[LoaderError] {
def show(error: LoaderError): String = error match {
case c: ConfigError => "Configuration error" + c.message
case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.map(_.getMessage).mkString("\n")
case l: StorageTargetError => "Data loading error " + l.message
case l: LoaderLocalError => "Internal Exception " + l.message
case m: LoadManifestError => "Load Manifest: " + m.message
case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}"
}
implicit val loaderErrorShow: Show[LoaderError] = {
case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n")
case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}"
case l: StorageTargetError => s"Data loading error ${l.message}"
case l: RuntimeError => s"Internal Exception ${l.message}"
}

/**
* Top-level error, representing error in configuration
* Will always be printed to EMR stderr
*/
case class ConfigError(message: String) extends LoaderError

/**
* Error representing failure on events (or types, or JSONPaths) discovery
* Contains multiple step failures
*/
case class DiscoveryError(failures: List[DiscoveryFailure]) extends LoaderError
final case class DiscoveryError(failures: NonEmptyList[DiscoveryFailure]) extends LoaderError
object DiscoveryError {
def apply(single: DiscoveryFailure): LoaderError = DiscoveryError(List(single))
def apply(single: DiscoveryFailure): LoaderError = DiscoveryError(NonEmptyList.one(single))

/** Turn non-empty list of discovery failures into top-level `LoaderError` */
def fromValidated[A](validated: ValidatedNel[DiscoveryFailure, A]): Either[LoaderError, A] =
validated.leftMap(errors => DiscoveryError(errors): LoaderError).toEither
}

/**
* Error representing failure on database loading (or executing any statements)
* These errors have short-circuit semantics (as in `scala.Either`)
*/
case class StorageTargetError(message: String) extends LoaderError
final case class StorageTargetError(message: String) extends LoaderError

/** `atomic.manifest` prevents this folder to be loaded */
case class LoadManifestError(message: String) extends LoaderError

/** Turn non-empty list of discovery failures into top-level `LoaderError` */
def flattenValidated[A](validated: ValidatedNel[DiscoveryFailure, A]): Either[LoaderError, A] =
validated.leftMap(errors => DiscoveryError(errors.toList): LoaderError).toEither

/** Other errors */
case class LoaderLocalError(message: String) extends Throwable with LoaderError {
override def getMessage: String = message
}
/** Other errors, not related to a warehouse */
final case class RuntimeError(message: String) extends LoaderError

/** Error happened during DDL-statements execution. Critical */
case class MigrationError(message: String) extends LoaderError

final case class MigrationError(message: String) extends LoaderError
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,64 +12,74 @@
*/
package com.snowplowanalytics.snowplow.rdbloader

import cats.Monad
import scala.util.control.NonFatal

import cats.data.Validated._
import cats.implicits._

import cats.effect.{IOApp, IO, ExitCode}
import cats.effect.{IOApp, IO, ExitCode, Resource}

import fs2.Stream

import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, JDBC, RealWorld, AWS}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{JDBC, Environment, Logging}
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.{discover, load}
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.load
import com.snowplowanalytics.snowplow.rdbloader.utils.SSH

import io.sentry.Sentry

object Main extends IOApp {
/**
* If arguments or config is invalid exit with 1
* and print errors to EMR stdout
* If arguments and config are valid, but loading failed
* print message to `track` bucket
*/

def run(argv: List[String]): IO[ExitCode] =
CliConfig.parse(argv) match {
case Valid(config) =>
RealWorld.initialize[IO](config).flatMap { dsls =>
import dsls._
workStream(config, dsls)
.compile
.drain
.value
.attempt
.map {
case Left(e) =>
Sentry.captureException(e)
e.printStackTrace(System.out)
(LoaderError.LoaderLocalError(e.getMessage): LoaderError).asLeft
case Right(e) => e
}
.flatMap(close[IO])
case Valid(cli) =>
Environment.initialize[IO](cli).flatMap { env =>
env.loggingF.info(s"RDB Loader [${cli.config.name}] has started. Listening ${cli.config.messageQueue}") *>
process(cli, env)
.compile
.drain
.attempt
.flatMap {
case Left(e) =>
Sentry.captureException(e)
e.printStackTrace(System.out)
env.loggingF.track(LoaderError.RuntimeError(e.getMessage).asLeft).as(ExitCode.Error)
case Right(_) =>
IO.pure(ExitCode.Success)
}
}
case Invalid(errors) =>
IO.delay(println("Configuration error")) *>
errors.traverse_(message => IO.delay(println(message))).as(ExitCode.Error)
IO.delay(System.err.println("Configuration error")) *>
errors.traverse_(message => IO.delay(System.err.println(message))).as(ExitCode(2))
}

def workStream(config: CliConfig, dsls: RealWorld[IO]): Stream[LoaderAction[IO, ?], Unit] = {
import dsls._
/**
* Main application workflow, responsible for discovering new data via message queue
* and processing this data with loaders
*
* @param cli whole app configuration
* @param env initialised environment containing resources and effect interpreters
* @return endless stream waiting for messages
*/
def process(cli: CliConfig, env: Environment[IO]): Stream[IO, Unit] = {
import env._

DataDiscovery.discover[IO](cli.config, env.state)
.pauseWhen[IO](env.isBusy)
.evalMap { message =>
val jdbc: Resource[IO, JDBC[IO]] = env.makeBusy *>
SSH.resource[IO](cli.config.storage.sshTunnel) *>
JDBC.interpreter[IO](cli.config.storage, cli.dryRun)

discover[IO](config).evalMap { case (discovery, _) =>
val jdbc = SSH.resource[IO](config.config.storage.sshTunnel) *>
JDBC.interpreter[IO](config.config.storage, config.dryRun)
val action = jdbc.use { implicit conn => load[IO](cli, message) *> env.incrementLoaded }

LoaderAction(jdbc.use { implicit conn => load[IO](config, discovery).value })
}
// Make sure that stream is never interrupted
action.recoverWith {
case NonFatal(e) =>
Sentry.captureException(e)
Logging[IO].error(s"Fatal failure during message processing (base ${message.data.base}), message hasn't been ack'ed. ${e.getMessage}")
}
}
}

/** Get exit status based on all previous steps */
private def close[F[_]: Monad: Logging: AWS](result: Either[LoaderError, Unit]): F[ExitCode] =
Logging[F].track(result).as(result.fold(_ => ExitCode.Error, _ => ExitCode.Success))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader

import cats.effect.Concurrent
import cats.effect.concurrent.{ Ref => CERef }

import cats.implicits._

import fs2.concurrent.SignallingRef

/**
* Primary (mutable) state of the loader
* Every Loader's action has two input parameters: message and current state
* The state is used to exchange data between data discovery stream and load actions
*
* @param attempts amount of attempts the Loader took to load **current** folder
* zero'ed after every message ack'ed
* @param busy whether Loader is ready to accept new message at the moment
* if Loader is busy - it must be retrying until fubr, then it must
* set `busy` back to `false`
* @param loaded amount of folders the loader managed to load
*/
case class State[F[_]](attempts: Int,
busy: SignallingRef[F, Boolean],
loaded: Int,
messages: Int) {
def incrementAttempts: State[F] =
this.copy(attempts = attempts + 1)
def incrementMessages: State[F] =
this.copy(messages = messages + 1)
def incrementLoaded: State[F] =
this.copy(loaded = loaded + 1)
}

object State {

/** Mutable state */
type Ref[F[_]] = CERef[F, State[F]]

/** Initiate state for a fresh app */
def mk[F[_]: Concurrent]: F[CERef[F, State[F]]] =
for {
busy <- SignallingRef[F, Boolean](false)
ref <- CERef.of[F, State[F]](State(0, busy, 0, 0))
} yield ref
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,32 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader
package db
package com.snowplowanalytics.snowplow.rdbloader.db

import cats.{Functor, Monad}
import cats.data.EitherT
import cats.implicits._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer}

import com.snowplowanalytics.iglu.schemaddl.StringUtils
import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, Migration => DMigration, SchemaList => DSchemaList}
import com.snowplowanalytics.iglu.schemaddl.redshift.Ddl
import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlGenerator, MigrationGenerator}

import com.snowplowanalytics.snowplow.rdbloader.{ LoaderAction, LoaderError, ActionOps }
import com.snowplowanalytics.snowplow.rdbloader.db.Entities.{Columns, TableState}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, DiscoveryFailure, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, Iglu, JDBC}
import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.SqlString

object Migration {
/**
* Perform all the machinery to check if any tables for tabular data do not match
* latest state on the Iglu Server. Create or update tables in that case.
* Do nothing in case there's only legacy JSON data
*/
def perform[F[_]: Monad: Logging: Iglu: JDBC](dbSchema: String)(discovery: DataDiscovery): LoaderAction[F, Unit] =
def perform[F[_]: Monad: Logging: Iglu: JDBC](dbSchema: String, discovery: DataDiscovery): LoaderAction[F, Unit] =
discovery.shreddedTypes.traverse_ {
case ShreddedType.Tabular(ShreddedType.Info(_, vendor, name, model, _)) =>
for {
Expand Down Expand Up @@ -69,28 +70,28 @@ object Migration {
}

/** Check if table exists in `dbSchema` */
def tableExists[F[_]: Functor: JDBC](dbSchema: String, table: String): LoaderAction[F, Boolean] = {
def tableExists[F[_]: Functor: JDBC](dbSchema: String, tableName: String): LoaderAction[F, Boolean] = {
val query = SqlString.unsafeCoerce(
s"""
|SELECT EXISTS (
| SELECT 1
| FROM pg_tables
| WHERE schemaname = '$dbSchema'
| AND tablename = '$table') AS exists;
| AND tablename = '$tableName') AS exists;
""".stripMargin)

JDBC[F].executeQuery[Boolean](query).leftMap(annotateError(dbSchema, table))
JDBC[F].executeQuery[Boolean](query).leftMap(annotateError(dbSchema, tableName))
}

def createTable[F[_]: Monad: Logging: JDBC](dbSchema: String, name: String, schemas: DSchemaList): LoaderAction[F, Unit] = {
val subschemas = FlatSchema.extractProperties(schemas)
val tableName = StringUtils.getTableName(schemas.latest)
val ddl = DdlGenerator.generateTableDdl(subschemas, tableName, Some(dbSchema), 4096, false)
val comment = DdlGenerator.getTableComment(name, Some(dbSchema), schemas.latest)
Logging[F].print(s"Creating $dbSchema.$name table for ${comment.comment}").liftA *>
Logging[F].info(s"Creating $dbSchema.$name table for ${comment.comment}").liftA *>
JDBC[F].executeUpdate(ddl.toSql).void *>
JDBC[F].executeUpdate(comment.toSql).void *>
Logging[F].print(s"Table created").liftA
Logging[F].info(s"Table created").liftA
}

/** Update existing table specified by `current` into a final version present in `state` */
Expand All @@ -102,15 +103,15 @@ object Migration {
case Some(relevantMigration) =>
val ddlFile = MigrationGenerator.generateMigration(relevantMigration, 4096, Some(dbSchema))
val ddl = SqlString.unsafeCoerce(ddlFile.render)
LoaderAction.liftF(ddlFile.warnings.traverse_(Logging[F].print)) *>
LoaderAction.liftF(Logging[F].print(s"Executing migration DDL statement: $ddl")) *>
LoaderAction.liftF(ddlFile.warnings.traverse_(Logging[F].info)) *>
LoaderAction.liftF(Logging[F].info(s"Executing migration DDL statement: $ddl")) *>
JDBC[F].executeUpdate(ddl).void
case None =>
val message = s"Warning: Table's schema key '${current.toSchemaUri}' cannot be found in fetched schemas $state. Migration cannot be created"
LoaderAction.liftE[F, Unit](DiscoveryFailure.IgluError(message).toLoaderError.asLeft)
}
case _: DSchemaList.Single =>
Logging[F].print(s"Warning: updateTable executed for a table with single schema\ncolumns: $columns\nstate: $state").liftA
Logging[F].info(s"Warning: updateTable executed for a table with single schema\ncolumns: $columns\nstate: $state").liftA
}

/** List all columns in the table */
Expand Down
Loading

0 comments on commit f873a43

Please sign in to comment.