Skip to content

Commit

Permalink
Merge 330f4b4 into c7126f4
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 15, 2021
2 parents c7126f4 + 330f4b4 commit cc936c6
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 35 deletions.
19 changes: 16 additions & 3 deletions config/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,24 @@
# Additional steps. analyze, vacuum and transit_load are valid values
"steps": ["analyze"],

# Observability and logging opitons
# Observability and logging options
"monitoring": {
# Snowplow tracking (optional)
"snowplow": null,
"snowplow": {
"appId": "redshift-loader",
"collector": "http://snplow.acme.com",
}
# Sentry (optional)
"sentry": null
"sentry": {
"dsn": "http://sentry.acme.com"
},
# StatsD where metrics get sent (optional)
"statsD": {
"hostname": "localhost",
"port" : 8125,
"metricsPrefix": "snowplow.rdbloader.",
# Key-value pairs to be attached to every metric
"tags": { "app": "redshift-loader" }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object Config {
}
}

final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry])
final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], statsD: Option[StatsD])
final case class SnowplowMonitoring(appId: String, collector: String)

final case class Formats(default: LoaderMessage.Format,
Expand Down Expand Up @@ -171,6 +171,8 @@ object Config {

final case class Sentry(dsn: URI)

final case class StatsD(hostname: String, port: Int, metricsPrefix: String, tags: Map[String, String])

implicit val batchShredderDecoder: Decoder[Shredder.Batch] =
deriveDecoder[Shredder.Batch]
implicit val streamShredderDecoder: Decoder[Shredder.Stream] =
Expand Down Expand Up @@ -224,6 +226,9 @@ object Config {
implicit val sentryDecoder: Decoder[Sentry] =
deriveDecoder[Sentry]

implicit val statsDDecoder: Decoder[StatsD] =
deriveDecoder[StatsD]

implicit val formatsDecoder: Decoder[Formats] =
deriveDecoder[Formats]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ object CommonSpec {
None,
Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com")))
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.StatsD("localhost", 8125, "snowplow.rdbloader.", Map("app" -> "redshift-loader")))
),
"messages",
shredder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ class ConfigSpec extends Specification {
monitoring = {
"snowplow": {
"collector": "snplow.acme.com",
"collector": "http://snplow.acme.com",
"appId": "redshift-loader"
},
"sentry": {
"dsn": "http://sentry.acme.com"
},
"statsD": {
"hostname": "localhost",
"port" : 8125,
"metricsPrefix": "snowplow.rdbloader.",
"tags": { "app": "redshift-loader" }
}
},
Expand Down Expand Up @@ -99,7 +105,11 @@ class ConfigSpec extends Specification {
val expected: Config[StorageTarget] =
(identity[Config[StorageTarget]] _)
.compose(Config.formats.set(Config.Formats.Default))
.compose(Config.monitoring.set(Config.Monitoring(None, None)))
.compose(Config.monitoring.set(Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.StatsD("localhost", 8125, "snowplow.rdbloader.", Map("app" -> "redshift-loader")))
)))
.apply(ConfigSpec.configExample)
.copy(shredder = Config.Shredder.Stream(input, ConfigSpec.configExample.shredder.output, 10.minutes))

Expand Down Expand Up @@ -222,8 +232,9 @@ object ConfigSpec {
"us-east-1",
None,
Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com")))
Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.StatsD("localhost", 8125, "snowplow.rdbloader.", Map("app" -> "redshift-loader")))
),
"messages",
Shredder.Batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.net.URI
import cats.{Functor, Monad}
import cats.implicits._

import cats.effect.{Blocker, Clock, Resource, Timer, ConcurrentEffect, Sync}
import cats.effect.{Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.concurrent.Ref

import fs2.Stream
Expand Down Expand Up @@ -54,7 +54,7 @@ class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws
}

object Environment {
def initialize[F[_] : ConcurrentEffect: Clock: Timer](cli: CliConfig): Resource[F, Environment[F]] = {
def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer](cli: CliConfig): Resource[F, Environment[F]] = {
val init = for {
_ <- initSentry[F](cli.config.monitoring.sentry.map(_.dsn))
cacheMap <- Ref.of[F, Map[String, Option[S3.Key]]](Map.empty)
Expand All @@ -75,7 +75,8 @@ object Environment {
blocker <- Blocker[F]
messages <- Resource.eval(Ref.of[F, List[String]](List.empty[String]))
tracker <- Logging.initializeTracking[F](cli.config.monitoring, blocker.blockingContext)
logging = Logging.loggingInterpreter[F](cli.config.storage, messages, tracker)
statsD = StatsD.getInstance[F](cli.config.monitoring.statsD, blocker)
logging = Logging.loggingInterpreter[F](cli.config.storage, messages, tracker, statsD)
(cache, iglu, aws, state) <- Resource.eval(init)
} yield new Environment(cache, logging, iglu, aws, state, blocker)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import cats.effect.concurrent.Ref

import io.circe.Json

import io.sentry.Sentry

import org.http4s.client.blaze.BlazeClientBuilder

import com.snowplowanalytics.iglu.core.{SchemaVer, SelfDescribingData, SchemaKey}
Expand All @@ -34,13 +36,10 @@ import com.snowplowanalytics.snowplow.scalatracker.{Tracker, Emitter}
import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter

import com.snowplowanalytics.snowplow.rdbloader.LoaderError
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.{Common, LoaderMessage}
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Monitoring
import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget

import io.sentry.Sentry


trait Logging[F[_]] {

/** Track result via Snowplow tracker */
Expand All @@ -54,6 +53,9 @@ trait Logging[F[_]] {

/** Log an error to Sentry if it's configured */
def trackException(e: Throwable): F[Unit]

/** Send metrics to StatsD if configured */
def sendMetrics(shredded: LoaderMessage.ShreddingComplete): F[Unit]
}

object Logging {
Expand All @@ -67,7 +69,8 @@ object Logging {

def loggingInterpreter[F[_]: Sync](targetConfig: StorageTarget,
messages: Ref[F, List[String]],
tracker: Option[Tracker[F]]): Logging[F] =
tracker: Option[Tracker[F]],
statsD: StatsD[F]): Logging[F] =
new Logging[F] {

/** Track result via Snowplow tracker */
Expand Down Expand Up @@ -100,6 +103,10 @@ object Logging {
case NonFatal(_) => ()
}

def sendMetrics(shredded: LoaderMessage.ShreddingComplete): F[Unit] =
statsD.sendMetrics(shredded)
.handleErrorWith(e => error(s"Problem happened when sending metrics to StatsD: ${e.printStackTrace()}"))

private def sanitize(string: String): String =
Common.sanitize(string, List(targetConfig.password.getUnencrypted, targetConfig.username))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl

import java.time.Duration
import java.net.{DatagramPacket, DatagramSocket, InetAddress}
import java.nio.charset.StandardCharsets.UTF_8

import cats.Monad
import cats.implicits._

import cats.effect.{Blocker, ContextShift, Resource, Sync, Timer}

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config

trait StatsD[F[_]] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit]
}

object StatsD {

final case class KVMetric(key: String, value: String)

private val MinLatencyName = "latency.min"
private val MaxLatencyName = "latency.max"
private val CountGoodName = "count.good"

def getInstance[F[_]: ContextShift: Monad: Sync: Timer](statsDConfig: Option[Config.StatsD], blocker: Blocker): StatsD[F] =
statsDConfig match {
case Some(config) =>
new StatsD[F] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] =
mkSocket[F](blocker).use { socket =>
for {
metrics <- getMetrics[F](loaded)
formatted = metrics.map(statsDFormat(config))
ip <- blocker.delay(InetAddress.getByName(config.hostname))
_ <- formatted.traverse_(sendMetric[F](blocker, socket, ip, config.port))
} yield ()
}
}
case None =>
new StatsD[F] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] = Monad[F].unit
}
}
private def mkSocket[F[_]: ContextShift: Sync](blocker: Blocker): Resource[F, DatagramSocket] =
Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new DatagramSocket))

private def getMetrics[F[_]: Monad: Timer](loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] =
Timer[F].clock.instantNow.map { now =>
List(
loaded.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric(MaxLatencyName, l.toString)),
loaded.timestamps.max.map(max => Duration.between(max, now).toSeconds()).map(l => KVMetric(MinLatencyName, l.toString)),
loaded.count.map(c => KVMetric(CountGoodName, c.toString))
)
.collect { case Some(m) => m }
}

private def sendMetric[F[_]: ContextShift: Sync](
blocker: Blocker,
socket: DatagramSocket,
addr: InetAddress,
port: Int
)(
metric: String
): F[Unit] = {
val bytes = metric.getBytes(UTF_8)
val packet = new DatagramPacket(bytes, bytes.length, addr, port)
blocker.delay(socket.send(packet))
}

private def statsDFormat(config: Config.StatsD)(metric: KVMetric): String = {
val tagStr = config.tags.map { case (k, v) => s"$k:$v" }.mkString(",")
val prefix = config.metricsPrefix
s"${prefix}${metric.key}:${metric.value}|g|#$tagStr"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.loading

import java.time.Duration

import scala.concurrent.duration._

Expand All @@ -25,14 +24,13 @@ import retry.{retryingOnSomeErrors, RetryPolicy, RetryPolicies, Sleep, RetryDeta

// This project
import com.snowplowanalytics.snowplow.rdbloader._
import com.snowplowanalytics.snowplow.rdbloader.common.{ Message, LoaderMessage }
import com.snowplowanalytics.snowplow.rdbloader.common.{LoaderMessage, Message}
import com.snowplowanalytics.snowplow.rdbloader.common.config.{ Config, StorageTarget }
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.db.{ Migration, Statement, Manifest }
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, JDBC, Iglu}


/** Entry-point for loading-related logic */
object Load {

Expand All @@ -45,7 +43,10 @@ object Load {
* @param cli RDB Loader app configuration
* @param discovery discovered folder to load
*/
def load[F[_]: MonadThrow: JDBC: Iglu: Timer: Logging](cli: CliConfig, discovery: Message[F, DataDiscovery.WithOrigin]): LoaderAction[F, Unit] =
def load[F[_]: Iglu: JDBC: Logging: MonadThrow: Timer](
cli: CliConfig,
discovery: Message[F, DataDiscovery.WithOrigin]
): LoaderAction[F, Unit] =
cli.config.storage match {
case redshift: StorageTarget.Redshift =>
val redshiftConfig: Config[StorageTarget.Redshift] = cli.config.copy(storage = redshift)
Expand All @@ -63,7 +64,7 @@ object Load {
RedshiftLoader.run[F](redshiftConfig, discovery.data.discovery) <*
Manifest.add[F](redshiftConfig.storage.schema, discovery.data.origin) <*
JDBC[F].executeUpdate(Statement.Commit) <*
congratulate[F](discovery.data.origin)
congratulate[F](discovery.data.origin).liftA
}

// With manifest protecting from double-loading it's safer to ack *after* commit
Expand Down Expand Up @@ -119,14 +120,9 @@ object Load {
.limitRetries[LoaderAction[F, *]](MaxRetries)
.join(RetryPolicies.exponentialBackoff(Backoff))

private def congratulate[F[_]: Monad: Logging: Timer](message: LoaderMessage.ShreddingComplete): LoaderAction[F, Unit] =
Timer[F].clock.instantNow.flatMap { now =>
val count = message.count.map(c => s"${c.good} events").getOrElse("volume is unknown")
val latency = message.timestamps.min match {
case Some(earliest) => s"${Duration.between(earliest, now).toSeconds} seconds"
case None => "unknown"
}
Logging[F].info(s"Folder [${message.base}] has been loaded and committed. Success!") *>
Logging[F].info(s"Folder [${message.base}] ($count) latency: $latency")
}.liftA
private def congratulate[F[_]: Logging: Monad](
loaded: LoaderMessage.ShreddingComplete
): F[Unit] =
Logging[F].info(s"Folder ${loaded.base} loaded successfully") >>
Logging[F].sendMetrics(loaded)
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ object SpecHelpers {
"us-east-1",
None,
Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com")))
Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.StatsD("localhost", 8125, "snowplow.rdbloader.", Map("app" -> "redshift-loader")))
),
"messages",
Config.Shredder.Batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,17 @@ object CliConfigSpec {
monitoring = {
"snowplow": {
"collector": "snplow.acme.com",
"collector": "http://snplow.acme.com",
"appId": "redshift-loader"
},
"sentry": {
"dsn": "http://sentry.acme.com"
},
"statsD": {
"hostname": "localhost",
"port" : 8125,
"metricsPrefix": "snowplow.rdbloader.",
"tags": { "app": "redshift-loader" }
}
},
Expand Down

0 comments on commit cc936c6

Please sign in to comment.