Skip to content

Commit

Permalink
Merge acf1120 into c7126f4
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 22, 2021
2 parents c7126f4 + acf1120 commit eeddb1b
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 46 deletions.
26 changes: 23 additions & 3 deletions config/config.hocon.sample
Expand Up @@ -77,11 +77,31 @@
# Additional steps. analyze, vacuum and transit_load are valid values
"steps": ["analyze"],

# Observability and logging opitons
# Observability and logging options
"monitoring": {
# Snowplow tracking (optional)
"snowplow": null,
"snowplow": {
"appId": "redshift-loader",
"collector": "http://snplow.acme.com",
}

# Sentry (optional)
"sentry": null
"sentry": {
"dsn": "http://sentry.acme.com"
},

# Metrics reporting (optional)
"metrics": {
# Config below is to print metrics on stdout
"type": "stdout", # to have the raw metrics printed on stdout
"prefix": "snowplow.rdbloader."

# Config below is to send metrics to StatsD instead of stdout
#"type": "statsD",
#"prefix": "snowplow.rdbloader.",
#"hostname": "localhost",
#"port" : 8125,
#"tags": { "app": "redshift-loader" } # Key-value pairs to be attached to every metric
}
}
}
Expand Up @@ -127,9 +127,6 @@ object Config {
}
}

final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry])
final case class SnowplowMonitoring(appId: String, collector: String)

final case class Formats(default: LoaderMessage.Format,
tsv: List[SchemaCriterion],
json: List[SchemaCriterion],
Expand Down Expand Up @@ -169,8 +166,48 @@ object Config {
}
}

final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Option[Metrics])
final case class SnowplowMonitoring(appId: String, collector: String)

final case class Sentry(dsn: URI)

sealed trait Metrics {
def prefix: String
}

object Metrics {
final case class Stdout(prefix: String) extends Metrics
final case class StatsD(
prefix: String,
hostname: String,
port: Int,
tags: Map[String, String]
) extends Metrics

implicit val stdoutDecoder: Decoder[Metrics.Stdout] =
deriveDecoder[Metrics.Stdout]

implicit val statsdDecoder: Decoder[Metrics.StatsD] =
deriveDecoder[Metrics.StatsD]

implicit val metricsDecoder: Decoder[Metrics] =
Decoder.instance { cur =>
val typeCur = cur.downField("type")
typeCur.as[String].map(_.toLowerCase) match {
case Right("stdout") =>
cur.as[Metrics.Stdout]
case Right("statsd") =>
cur.as[Metrics.StatsD]
case Right(other) =>
Left(DecodingFailure(s"Metrics type '$other' is not supported. Supported types: 'stdout' and 'statsD'", typeCur.history))
case Left(DecodingFailure(_, List(CursorOp.DownField("type")))) =>
Left(DecodingFailure("Cannot find 'type' string in metrics configuration", typeCur.history))
case Left(other) =>
Left(other)
}
}
}

implicit val batchShredderDecoder: Decoder[Shredder.Batch] =
deriveDecoder[Shredder.Batch]
implicit val streamShredderDecoder: Decoder[Shredder.Stream] =
Expand Down Expand Up @@ -221,12 +258,12 @@ object Config {
implicit val uriDecoder: Decoder[URI] =
Decoder[String].emap(s => Either.catchOnly[IllegalArgumentException](URI.create(s)).leftMap(_.toString))

implicit val sentryDecoder: Decoder[Sentry] =
deriveDecoder[Sentry]

implicit val formatsDecoder: Decoder[Formats] =
deriveDecoder[Formats]

implicit val sentryDecoder: Decoder[Sentry] =
deriveDecoder[Sentry]

implicit val snowplowMonitoringDecoder: Decoder[SnowplowMonitoring] =
deriveDecoder[SnowplowMonitoring]

Expand Down
Expand Up @@ -89,8 +89,9 @@ object CommonSpec {
"us-east-1",
None,
Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com")))
None,
None,
None
),
"messages",
shredder,
Expand Down
Expand Up @@ -64,11 +64,15 @@ class ConfigSpec extends Specification {
monitoring = {
"snowplow": {
"collector": "snplow.acme.com",
"collector": "http://snplow.acme.com",
"appId": "redshift-loader"
},
"sentry": {
"dsn": "http://sentry.acme.com"
},
"metrics": {
"type" : "Stdout",
"prefix": "snowplow.rdbloader."
}
},
Expand Down Expand Up @@ -99,7 +103,11 @@ class ConfigSpec extends Specification {
val expected: Config[StorageTarget] =
(identity[Config[StorageTarget]] _)
.compose(Config.formats.set(Config.Formats.Default))
.compose(Config.monitoring.set(Config.Monitoring(None, None)))
.compose(Config.monitoring.set(Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.Metrics.Stdout("snowplow.rdbloader."))
)))
.apply(ConfigSpec.configExample)
.copy(shredder = Config.Shredder.Stream(input, ConfigSpec.configExample.shredder.output, 10.minutes))

Expand Down Expand Up @@ -222,8 +230,9 @@ object ConfigSpec {
"us-east-1",
None,
Config.Monitoring(
Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com")))
Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")),
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Some(Config.Metrics.Stdout("snowplow.rdbloader."))
),
"messages",
Shredder.Batch(
Expand Down
Expand Up @@ -14,30 +14,33 @@ package com.snowplowanalytics.snowplow.rdbloader.dsl

import java.net.URI

import io.sentry.{Sentry, SentryOptions}

import cats.{Functor, Monad}
import cats.implicits._

import cats.effect.{Blocker, Clock, Resource, Timer, ConcurrentEffect, Sync}
import cats.effect.{Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.concurrent.Ref

import fs2.Stream
import fs2.concurrent.SignallingRef

import com.snowplowanalytics.iglu.client.Client

import io.sentry.{Sentry, SentryOptions}
import com.snowplowanalytics.snowplow.rdbloader.State
import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Reporter

/** Container for most of interepreters to be used in Main
* JDBC will be instantiated only when necessary, and as a `Reousrce`
*/
class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws: AWS[F], val state: State.Ref[F], val blocker: Blocker) {
class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws: AWS[F], reporter: Reporter[F], val state: State.Ref[F], val blocker: Blocker) {
implicit val cacheF: Cache[F] = cache
implicit val loggingF: Logging[F] = logging
implicit val igluF: Iglu[F] = iglu
implicit val awsF: AWS[F] = aws
implicit val reporterF: Reporter[F] = reporter

def makeBusy(implicit F: Monad[F]): Resource[F, SignallingRef[F, Boolean]] =
Resource.make(busy.flatMap(x => x.set(true).as(x)))(_.set(false))
Expand All @@ -50,11 +53,10 @@ class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws

private def busy(implicit F: Functor[F]): F[SignallingRef[F, Boolean]] =
state.get.map(_.busy)

}

object Environment {
def initialize[F[_] : ConcurrentEffect: Clock: Timer](cli: CliConfig): Resource[F, Environment[F]] = {
def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer](cli: CliConfig): Resource[F, Environment[F]] = {
val init = for {
_ <- initSentry[F](cli.config.monitoring.sentry.map(_.dsn))
cacheMap <- Ref.of[F, Map[String, Option[S3.Key]]](Map.empty)
Expand All @@ -76,8 +78,9 @@ object Environment {
messages <- Resource.eval(Ref.of[F, List[String]](List.empty[String]))
tracker <- Logging.initializeTracking[F](cli.config.monitoring, blocker.blockingContext)
logging = Logging.loggingInterpreter[F](cli.config.storage, messages, tracker)
reporter = Reporter.build[F](cli.config.monitoring.metrics, logging, blocker)
(cache, iglu, aws, state) <- Resource.eval(init)
} yield new Environment(cache, logging, iglu, aws, state, blocker)
} yield new Environment(cache, logging, iglu, aws, reporter, state, blocker)
}

def initSentry[F[_]: Sync](dsn: Option[URI]): F[Unit] =
Expand Down
Expand Up @@ -26,6 +26,8 @@ import cats.effect.concurrent.Ref

import io.circe.Json

import io.sentry.Sentry

import org.http4s.client.blaze.BlazeClientBuilder

import com.snowplowanalytics.iglu.core.{SchemaVer, SelfDescribingData, SchemaKey}
Expand All @@ -38,9 +40,6 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Monitoring
import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget

import io.sentry.Sentry


trait Logging[F[_]] {

/** Track result via Snowplow tracker */
Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics

import java.time.Duration

import cats.implicits._
import cats.Functor

import cats.effect.Clock

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage

object Metrics {

final case class KVMetric(key: String, value: String)

case class KVMetrics(
countGood: KVMetric,
collectorLatencyMin: Option[KVMetric],
collectorLatencyMax: Option[KVMetric],
shredderStartLatency: KVMetric,
shredderEndLatency: KVMetric
) {
def toList =
List(
Some(countGood),
collectorLatencyMin,
collectorLatencyMax,
Some(shredderStartLatency),
Some(shredderEndLatency)
).unite

def toHumanReadableString =
s"""${countGood.value} good events were loaded.
| It took minimum ${collectorLatencyMin.map(_.value).getOrElse("unknown")} seconds and maximum
| ${collectorLatencyMax.map(_.value).getOrElse("unknown")} seconds between the collector and Redshift for these events.
| It took ${shredderStartLatency.value} seconds between the start of shredder and Redshift
| and ${shredderEndLatency.value} seconds between the completion of shredder and Redshift
""".stripMargin
}

def getMetrics[F[_]: Clock: Functor](loaded: LoaderMessage.ShreddingComplete): F[KVMetrics] =
Clock[F].instantNow.map { now =>
KVMetrics(
KVMetric(CountGoodName, loaded.count.map(_.good).getOrElse(0).toString),
loaded.timestamps.max.map(max => Duration.between(max, now).toSeconds()).map(l => KVMetric(CollectorLatencyMinName, l.toString)),
loaded.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric(CollectorLatencyMaxName, l.toString)),
KVMetric(ShredderStartLatencyName, Duration.between(loaded.timestamps.jobStarted, now).toSeconds().toString),
KVMetric(ShredderEndLatencyName, Duration.between(loaded.timestamps.jobCompleted, now).toSeconds().toString)
)
}

private[metrics] val CountGoodName = "count.good"
private[metrics] val CollectorLatencyMinName = "latency.collector_to_load.min"
private[metrics] val CollectorLatencyMaxName = "latency.collector_to_load.max"
private[metrics] val ShredderStartLatencyName = "latency.shredder_start_to_load"
private[metrics] val ShredderEndLatencyName = "latency.shredder_end_to_load"
}
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics

import cats.Applicative

import cats.effect.{Blocker, ContextShift, Sync, Timer}

import com.snowplowanalytics.snowplow.rdbloader.common.config.Config
import com.snowplowanalytics.snowplow.rdbloader.dsl.Logging

trait Reporter[F[_]] {
def report(metrics: List[Metrics.KVMetric]): F[Unit]
}

object Reporter {
def apply[F[_]](implicit ev: Reporter[F]): Reporter[F] = ev

def build[F[_]: ContextShift: Sync: Timer](
metricsConfig: Option[Config.Metrics],
logging: Logging[F],
blocker: Blocker
): Reporter[F] =
metricsConfig match {
case Some(statsDConfig: Config.Metrics.StatsD) =>
StatsDReporter.build[F](statsDConfig, blocker)
case Some(stdoutConfig: Config.Metrics.Stdout) =>
StdoutReporter.build[F](stdoutConfig, logging)
case None =>
noop[F]
}

def noop[F[_]: Applicative]: Reporter[F] = new Reporter[F] {
def report(metrics: List[Metrics.KVMetric]) = Applicative[F].unit
}
}

0 comments on commit eeddb1b

Please sign in to comment.