Skip to content

Commit

Permalink
RDB Loader: add monitoring for unloaded and corrupted runs (close #457)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jul 19, 2021
1 parent 67e6813 commit c9a2b5b
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 9 deletions.
7 changes: 5 additions & 2 deletions config/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"snowplow": {
"appId": "redshift-loader",
"collector": "snplow.acme.com",
}
},

# Optional, for tracking runtime exceptions
"sentry": {
Expand All @@ -118,6 +118,9 @@
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
}
}
},

# Optional, a path for storing temporary data, necessary for corrupted folders monitoring
"logs": "s3://acme-snowplow/loader/logs/"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object Config {
}
}

final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Option[Metrics], webhook: Option[Webhook])
final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Option[Metrics], webhook: Option[Webhook], logs: Option[S3.Folder])
final case class SnowplowMonitoring(appId: String, collector: String)
final case class Sentry(dsn: URI)
final case class Metrics(statsd: Option[StatsD], stdout: Option[Stdout])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object CommonSpec {
None,
None,
None,
None,
None
),
"messages",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ object ConfigSpec {
Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.Metrics(Some(Config.StatsD("localhost", 8125, Map("app" -> "rdb-loader"), None)), Some(Config.Stdout(None)))),
None
None,
Some(S3.Folder.coerce("s3://acme-snowplow/loader/logs/"))
),
"messages",
Shredder.Batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import fs2.Stream
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.rdbloader.db.Manifest
import com.snowplowanalytics.snowplow.rdbloader.dsl.Environment
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Environment, Alerting}
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.load
Expand Down Expand Up @@ -58,6 +58,9 @@ object Main extends IOApp {
def process(cli: CliConfig, env: Environment[IO]): Stream[IO, Unit] = {
import env._

val alertingStream: Stream[IO, Unit] =
Alerting.run[IO](cli.config.monitoring.logs, cli.config.monitoring.webhook, cli.config.storage, cli.config.shredder.output.path)

Stream.eval_(Manifest.initialize[IO](cli.config.storage)) ++
DataDiscovery
.discover[IO](cli.config, env.state)
Expand All @@ -75,6 +78,7 @@ object Main extends IOApp {
IO.raiseError(error)
}
}
.merge(alertingStream)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,28 @@

package com.snowplowanalytics.snowplow.rdbloader.dsl

import cats.Applicative
import java.net.URI
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit

import cats.{Functor, Applicative}
import cats.implicits._

import cats.effect.{Sync, ContextShift}
import cats.effect.{Timer, Sync, ContextShift}

import doobie.util.Get
import fs2.Stream
import fs2.text.utf8Encode

import org.http4s.{Request, MediaType, Method}
import org.http4s.client.Client
import org.http4s.headers.`Content-Type`

import com.snowplowanalytics.snowplow.rdbloader.common.config.Config
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.LoaderAction
import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.common.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Statement._
import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload


Expand All @@ -37,6 +49,85 @@ object Alerting {
def noop[F[_]: Applicative]: Alerting[F] =
(_: AlertPayload) => Applicative[F].unit

def createAlertPayload(folder: S3.Folder, message: String, tags: Map[String, String]): AlertPayload =
AlertPayload(BuildInfo.version, folder , message, tags)

implicit val s3FolderGet: Get[S3.Folder] =
Get[String].temap(S3.Folder.parse)

val LogTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss")

/** Sink all processed folders in `input` into `output` */
def sinkFolders[F[_]: AWS](input: S3.Folder, output: S3.Key): Stream[F, Unit] =
AWS[F].listS3(input, recursive = false).map(_.key) // TODO: this should be technically S3.Folder
.intersperse(AlertingScanResultSeparator)
.through(utf8Encode[F])
.through(AWS[F].sinkS3(output, true))


def completeKeyExists[F[_]: AWS]: S3.Folder => F[Boolean] =
folder => AWS[F].keyExists(folder.withKey("shredding_complete.json"))

// TODO: this is potentially a blocking operation
def checkShreddingComplete[F[_]: Applicative: AWS](folders: List[S3.Folder]): F[List[(S3.Folder, Boolean)]] =
folders.traverse(folder => AWS[F].keyExists(folder.withKey("shredding_complete.json")).tupleLeft(folder))

def check[F[_]: Sync: AWS: JDBC](loadFrom: S3.Folder, redshiftConfig: StorageTarget.Redshift, tags: Map[String, String]): LoaderAction[F, List[AlertPayload]] = {
for {
_ <- JDBC[F].executeUpdate(CreateAlertingTempTable)
_ <- JDBC[F].executeUpdate(FoldersCopy(loadFrom, redshiftConfig.roleArn))
onlyS3Batches <- JDBC[F].executeQueryList[S3.Folder](FoldersMinusManifest(redshiftConfig.schema))
foldersWithChecks <- LoaderAction.liftF(checkShreddingComplete[F](onlyS3Batches))
} yield foldersWithChecks.map { case (folder, exists) =>
if (exists) createAlertPayload(folder, "Unloaded Batch", tags)
else createAlertPayload(folder, "Corrupted Batch", tags)
}
}

def getOutputKey[F[_]: Timer: Functor](webhook: Config.Webhook, logs: S3.Folder): Stream[F, S3.Key] =
Stream.fixedRate[F](webhook.period).evalMap { _ =>
Timer[F]
.clock
.realTime(TimeUnit.MILLISECONDS)
.map(Instant.ofEpochMilli)
.map(LogTimeFormatter.format)
.map(time => logs.append("shredded").withKey(s"$time.keys"))
}

/**
* Alerting entrypoint. Parses all configuration necessary for monitoring of
* corrupted and unloaded folders and launches a stream or periodic checks.
* If some configurations are not provided - just prints a warning.
* Resulting stream has to be running in background.
*/
def run[F[_]: Sync: Timer: AWS: JDBC: Alerting: Logging](logs: Option[S3.Folder], webhook: Option[Config.Webhook], storage: StorageTarget, output: URI): Stream[F, Unit] =
(logs, webhook, storage) match {
case (Some(logsFolder), Some(webhookConfig), redshift: StorageTarget.Redshift) =>
S3.Folder.parse(output.toString) match {
case Right(folder) =>
stream[F](logsFolder, webhookConfig, redshift, folder)
case Left(error) =>
Stream.raiseError[F](new IllegalArgumentException(s"Shredder output could not be parsed into S3 URI $error"))
}
case (_, _, _: StorageTarget.Redshift) =>
Stream.eval[F, Unit](Logging[F].info("Both monitoring.logs and monitoring.webhook need to be provided to enable monitoring for corrupted and unloaded folders"))
}

/** Same as [[run]], but without parsing preparation */
def stream[F[_]: Sync: Timer: AWS: JDBC: Alerting: Logging](logs: S3.Folder, webhook: Config.Webhook, storage: StorageTarget.Redshift, output: S3.Folder) =
getOutputKey[F](webhook, logs).evalMap { outputKey =>
sinkFolders[F](output, outputKey).compile.drain.flatMap { _ =>
check[F](output, storage, webhook.tags)
.value
.flatMap {
case Left(loaderError) =>
Logging[F].error(loaderError)("Folder monitoring has failed")
case Right(alerts) =>
alerts.traverse_(Alerting[F].alert)
}
}
}

def webhook[F[_]: ContextShift: Sync: Logging](webhookConfig: Option[Config.Webhook], httpClient: Client[F]): Alerting[F] =
webhookConfig match {
case Some(webhook) =>
Expand Down

0 comments on commit c9a2b5b

Please sign in to comment.