Skip to content

Commit

Permalink
Merge 957ef44 into 6fc63a0
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed May 15, 2023
2 parents 6fc63a0 + 957ef44 commit 39ff502
Show file tree
Hide file tree
Showing 37 changed files with 555 additions and 318 deletions.
31 changes: 15 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -75,56 +75,55 @@ lazy val loader = project
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.loaderDependencies)
.dependsOn(aws % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

lazy val redshiftLoader = project
.in(file("modules/redshift-loader"))
.settings(BuildSettings.redshiftBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.redshiftDependencies)
.dependsOn(loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

lazy val redshiftLoaderDistroless = project
.in(file("modules/distroless/redshift-loader"))
.settings(sourceDirectory := (redshiftLoader / sourceDirectory).value)
.settings(BuildSettings.redshiftDistrolessBuildSettings)
.settings(BuildSettings.redshiftBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.redshiftDependencies)
.dependsOn(loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val snowflakeLoader = project
.in(file("modules/snowflake-loader"))
.settings(BuildSettings.snowflakeBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.snowflakeDependencies)
.dependsOn(common % "compile->compile;test->test",loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

lazy val snowflakeLoaderDistroless = project
.in(file("modules/distroless/snowflake-loader"))
.settings(sourceDirectory := (snowflakeLoader / sourceDirectory).value)
.settings(BuildSettings.snowflakeDistrolessBuildSettings)
.settings(BuildSettings.snowflakeBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.snowflakeDependencies)
.dependsOn(loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val databricksLoader = project
.in(file("modules/databricks-loader"))
.settings(BuildSettings.databricksBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.dependsOn(loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

lazy val databricksLoaderDistroless = project
.in(file("modules/distroless/databricks-loader"))
.settings(sourceDirectory := (databricksLoader / sourceDirectory).value)
.settings(BuildSettings.databricksDistrolessBuildSettings)
.settings(BuildSettings.databricksBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.dependsOn(loader % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val transformerBatch = project
.in(file("modules/transformer-batch"))
Expand All @@ -140,17 +139,17 @@ lazy val transformerKinesis = project
.settings(libraryDependencies ++= Dependencies.transformerKinesisDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", aws % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

lazy val transformerKinesisDistroless = project
.in(file("modules/distroless/transformer-kinesis"))
.settings(sourceDirectory := (transformerKinesis / sourceDirectory).value)
.settings(BuildSettings.transformerKinesisDistrolessBuildSettings)
.settings(BuildSettings.transformerKinesisBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKinesisDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", aws % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val transformerPubsub = project
.in(file("modules/transformer-pubsub"))
Expand All @@ -159,14 +158,14 @@ lazy val transformerPubsub = project
.settings(libraryDependencies ++= Dependencies.transformerPubsubDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

lazy val transformerPubsubDistroless = project
.in(file("modules/distroless/transformer-pubsub"))
.settings(sourceDirectory := (transformerPubsub / sourceDirectory).value)
.settings(BuildSettings.transformerPubsubDistrolessBuildSettings)
.settings(BuildSettings.transformerPubsubBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerPubsubDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ object DatabricksEventsTable {
| event_version VARCHAR(128),
| event_fingerprint VARCHAR(128),
| true_tstamp TIMESTAMP,
| load_tstamp TIMESTAMP,
| collector_tstamp_date DATE GENERATED ALWAYS AS (DATE(collector_tstamp))
|)
|PARTITIONED BY (collector_tstamp_date, event_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class ConfigSpec extends Specification {
password = StorageTarget.PasswordConfig.PlainText("Supersecret1")
)
val cloud = Config.Cloud.AWS(RegionSpec.DefaultTestRegion, exampleMessageQueue.copy(region = Some(RegionSpec.DefaultTestRegion)))
val retries = exampleRetries.copy(cumulativeBound = None)
val retries = exampleRetries.copy(cumulativeBound = Some(20.minutes))
val readyCheck = exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds)
val initRetries = exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes))
val expected = Config(
Expand All @@ -110,7 +110,7 @@ class ConfigSpec extends Specification {
catalog = None,
password = StorageTarget.PasswordConfig.PlainText("Supersecret1")
)
val retries = exampleRetries.copy(cumulativeBound = None)
val retries = exampleRetries.copy(cumulativeBound = Some(20.minutes))
val readyCheck = exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds)
val initRetries = exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes))
val expected = Config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object DatabricksSpec {
Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None),
None,
Config.Schedules(Nil),
Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute),
Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute, 30.seconds),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Expand Down
4 changes: 3 additions & 1 deletion modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"backoff": "30 seconds",
"strategy": "EXPONENTIAL",
"attempts": 3
"cumulativeBound": "20 minutes"
},
"readyCheck": {
"backoff": "15 seconds",
Expand All @@ -34,7 +35,8 @@
"loading": "45 minutes",
"nonLoading": "10 minutes",
"sqsVisibility": "5 minutes",
"rollbackCommit": "20 minutes"
"rollbackCommit": "20 minutes",
"connectionIsValid": "30 seconds"
},
"featureFlags": {
"addLoadTstampColumn": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.rdbloader

import scala.concurrent.duration._
import cats.{Applicative, Apply, Monad, MonadThrow}
import cats.{Apply, Monad, MonadThrow}
import cats.implicits._
import cats.effect.{Async, Clock}
import retry._
Expand All @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.db.Columns._
import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement, Target}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{
AlertMessage,
Cache,
DAO,
FolderMonitoring,
Expand All @@ -35,7 +36,7 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl.{
Transaction,
VacuumScheduling
}
import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload
import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring
import com.snowplowanalytics.snowplow.rdbloader.loading.{EventsTable, Load, Retry, Stage, TargetCheck}
import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._
import com.snowplowanalytics.snowplow.rdbloader.cloud.{JsonPathDiscovery, LoadAuthService}
Expand Down Expand Up @@ -77,27 +78,23 @@ object Loader {
telemetry: Telemetry[F],
target: Target[I]
): F[Unit] = {
val folderMonitoring: Stream[F, Unit] = for {
initQueryResult <- initQuery[F, C, I](target)
_ <- FolderMonitoring.run[F, C, I](
config.monitoring.folders,
control.isBusy,
initQueryResult,
target.prepareAlertTable
)
} yield ()
def folderMonitoring(initQueryResult: I): Stream[F, Unit] =
FolderMonitoring.run[F, C, I](
config.monitoring.folders,
control.isBusy,
initQueryResult,
target.prepareAlertTable
)

val noOpScheduling: Stream[F, Unit] =
NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading))

val vacuumScheduling: Stream[F, Unit] =
VacuumScheduling.run[F, C](config.storage, config.schedules)
val healthCheck =
HealthCheck.start[F, C](config.monitoring.healthCheck)
val loading: Stream[F, Unit] =
for {
initQueryResult <- initQuery[F, C, I](target)
_ <- loadStream[F, C, I](config, control, initQueryResult, target)
} yield ()
def loading(initQueryResult: I): Stream[F, Unit] =
loadStream[F, C, I](config, control, initQueryResult, target)
val stateLogging: Stream[F, Unit] =
Stream
.awakeDelay[F](StateLoggingFrequency)
Expand All @@ -106,24 +103,37 @@ object Loader {
val periodicMetrics: Stream[F, Unit] =
Monitoring[F].periodicMetrics.report

def initRetry(f: F[Unit]) = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(f)
def initRetry(f: F[Unit]) = Clock[F].realTime.flatMap { now =>
retryingOnSomeErrors(
Retry.getRetryPolicy[F](config.initRetries),
{ _: Throwable => Retry.isWithinCumulativeBound[F](config.initRetries, now) },
initRetryLog[F]
)(f)
}

val blockUntilReady = initRetry(TargetCheck.prepareTarget[F, C]).onError { case t: Throwable =>
Monitoring[F].alert(AlertMessage.FailedInitialConnection(t))
} *> Logging[F].info("Target check is completed")

val blockUntilReady = initRetry(TargetCheck.prepareTarget[F, C]) *>
Logging[F].info("Target check is completed")
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
Logging[F].info("No operation prepare step is completed")
val eventsTableInit = initRetry(createEventsTable[F, C](target)) *>
Logging[F].info("Events table initialization is completed")
val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)) *>
Logging[F].info("Manifest initialization is completed")

val eventsTableInit = initRetry(createEventsTable[F, C](target)).onError { case t: Throwable =>
Monitoring[F].alert(AlertMessage.FailedToCreateEventsTable(t))
} *> Logging[F].info("Events table initialization is completed")

val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)).onError { case t: Throwable =>
Monitoring[F].alert(AlertMessage.FailedToCreateManifestTable(t))
} *> Logging[F].info("Manifest initialization is completed")

val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *>
Logging[F].info("Adding load_tstamp column is completed")

val init: F[Unit] = blockUntilReady *> noOperationPrepare *> eventsTableInit *> manifestInit *> addLoadTstamp
val init: F[I] = blockUntilReady *> noOperationPrepare *> eventsTableInit *> manifestInit *> addLoadTstamp *> initQuery[F, C, I](target)

val process = Stream.eval(init).flatMap { _ =>
loading
.merge(folderMonitoring)
val process = Stream.eval(init).flatMap { initQueryResult =>
loading(initQueryResult)
.merge(folderMonitoring(initQueryResult))
.merge(noOpScheduling)
.merge(healthCheck)
.merge(stateLogging)
Expand Down Expand Up @@ -208,13 +218,34 @@ object Loader {
_ <- control.incrementLoaded
} yield ()
case fal: Load.FolderAlreadyLoaded =>
Monitoring[F].alert(fal.toAlertPayload)
Monitoring[F].alert(fal.toAlertMessage)
}
_ <- control.removeFailure(folder)
} yield ()
}

loading.handleErrorWith(reportLoadFailure[F](discovery, addFailure))
// Handle all three possible failure scenarios:
// 1. The kind of error for which it is not worth proceeding to the next batch from the queue,
// because the next batch is unlikely to succeed. e.g. a connection error. We keep on
// retrying the same batch, because we don't want to ack the message from the queue.
// 2. A failure which is worth retrying later, and we have space available in the internal
// retry queue
// 3. A failure which is not worth retrying, or we don't have space to add to the retry queue
loading.handleErrorWith {
case UnskippableException(t) =>
Logging[F].error(t)(s"Loading of $folder has failed. This exception type cannot be skipped so it will be retried immediately.") *>
Monitoring[F].alert(AlertMessage.UnskippableLoadFailure(folder, t)) *>
processDiscovery(config, control, initQueryResult, target)(discovery)
case t: Throwable =>
addFailure(t).flatMap {
case true =>
Logging[F].error(t)(s"Loading of $folder has failed. Adding into retry queue.") *>
Monitoring[F].alert(AlertMessage.RetryableLoadFailure(folder, t))
case false =>
Logging[F].error(t)(s"Loading of $folder has failed. Not adding into retry queue.") *>
Monitoring[F].alert(AlertMessage.TerminalLoadFailure(folder, t))
}
}
}

private def addLoadTstampColumn[F[_]: Transaction[*[_], C]: Monitoring: Logging: MonadThrow, C[_]: DAO: Monad: Logging](
Expand All @@ -239,7 +270,7 @@ object Loader {
}
Transaction[F, C].transact(f).recoverWith { case e: Throwable =>
val err = s"Error while adding load_tstamp column: ${getErrorMessage(e)}"
Logging[F].error(err) *> Monitoring[F].alert(AlertPayload.error(err))
Logging[F].error(err)
}
}

Expand All @@ -249,53 +280,30 @@ object Loader {
.contains(AtomicColumns.ColumnsWithDefault.LoadTstamp.value)

/** Query to get necessary bits from the warehouse during initialization of the application */
private def initQuery[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow, I](target: Target[I]): Stream[F, I] =
Stream.eval(Transaction[F, C].run(target.initQuery))
private def initQuery[F[_]: MonadThrow: Monitoring: Transaction[*[_], C], C[_]: DAO: MonadThrow, I](target: Target[I]): F[I] =
Transaction[F, C]
.run(target.initQuery)
.onError { case t: Throwable =>
Monitoring[F].alert(AlertMessage.FailedInitialConnection(t))
}

private def createEventsTable[F[_]: Transaction[*[_], C], C[_]: DAO: Monad](target: Target[_]): F[Unit] =
Transaction[F, C].transact(DAO[C].executeUpdate(target.getEventTable, DAO.Purpose.NonLoading).void)

/**
* Handle a failure during loading. `Load.getTransaction` can fail only in one "expected" way - if
* the folder is already loaded everything else in the transaction and outside (migration
* building, pre-transaction migrations, ack) is handled by this function. It's called on
* non-fatal loading failure and just reports the failure, without crashing the process
*
* @param discovery
* the original discovery
* @param error
* the actual error, typically `SQLException`
*/
private def reportLoadFailure[F[_]: Logging: Monitoring: Monad](
discovery: DataDiscovery.WithOrigin,
addFailure: Throwable => F[Boolean]
)(
error: Throwable
): F[Unit] = {
val message = getErrorMessage(error)
val alert = Monitoring.AlertPayload.warn(message, discovery.origin.base)
val logNoRetry = Logging[F].error(s"Loading of ${discovery.origin.base} has failed. Not adding into retry queue. $message")
val logRetry = Logging[F].error(s"Loading of ${discovery.origin.base} has failed. Adding intro retry queue. $message")

Monitoring[F].alert(alert) *>
addFailure(error).ifM(logRetry, logNoRetry)
}

/**
* Last level of failure handling, called when non-loading stream fail. Called on an application
* crash
*/
private def reportFatal[F[_]: Apply: Logging: Monitoring]: PartialFunction[Throwable, F[Unit]] = { case error =>
Logging[F].error("Loader shutting down") *>
Monitoring[F].alert(Monitoring.AlertPayload.error(error.toString)) *>
Monitoring[F].trackException(error)
}

private def initRetryLog[F[_]: Logging: Applicative: Monitoring](e: Throwable, d: RetryDetails): F[Unit] = {
private def initRetryLog[F[_]: Logging](e: Throwable, d: RetryDetails): F[Unit] = {
val errMessage =
show"""Exception from init block. $d
|${getErrorMessage(e)}""".stripMargin
Logging[F].error(errMessage) *> Monitoring[F].alert(Monitoring.AlertPayload.error(errMessage))
Logging[F].error(errMessage)
}

private def getErrorMessage(error: Throwable): String =
Expand Down
Loading

0 comments on commit 39ff502

Please sign in to comment.