From acf112065fa5ccca6d6b2e4152c8d6965c74594e Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 14 Apr 2021 15:00:22 +0200 Subject: [PATCH] RDB Loader: report metrics to StatsD or Stdout (close #384) --- config/config.hocon.sample | 26 ++++++- .../rdbloader/common/config/Config.scala | 49 ++++++++++-- .../CommonSpec.scala | 5 +- .../ConfigSpec.scala | 17 ++++- .../snowplow/rdbloader/dsl/Environment.scala | 15 ++-- .../snowplow/rdbloader/dsl/Logging.scala | 5 +- .../rdbloader/dsl/metrics/Metrics.scala | 69 +++++++++++++++++ .../rdbloader/dsl/metrics/Reporter.scala | 46 ++++++++++++ .../dsl/metrics/StatsDReporter.scala | 60 +++++++++++++++ .../dsl/metrics/StdoutReporter.scala | 31 ++++++++ .../snowplow/rdbloader/loading/Load.scala | 35 +++++---- .../snowplow/rdbloader/SpecHelpers.scala | 5 +- .../rdbloader/config/CliConfigSpec.scala | 9 ++- .../rdbloader/dsl/metrics/MetricsSpec.scala | 74 +++++++++++++++++++ .../snowplow/rdbloader/loading/LoadSpec.scala | 13 +++- .../rdbloader/test/PureReporter.scala | 23 ++++++ 16 files changed, 436 insertions(+), 46 deletions(-) create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Metrics.scala create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Reporter.scala create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StatsDReporter.scala create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StdoutReporter.scala create mode 100644 modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/MetricsSpec.scala create mode 100644 modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureReporter.scala diff --git a/config/config.hocon.sample b/config/config.hocon.sample index 8360514ba..c17c33dc1 100644 --- a/config/config.hocon.sample +++ b/config/config.hocon.sample @@ -77,11 +77,31 @@ # Additional steps. analyze, vacuum and transit_load are valid values "steps": ["analyze"], - # Observability and logging opitons + # Observability and logging options "monitoring": { # Snowplow tracking (optional) - "snowplow": null, + "snowplow": { + "appId": "redshift-loader", + "collector": "http://snplow.acme.com", + } + # Sentry (optional) - "sentry": null + "sentry": { + "dsn": "http://sentry.acme.com" + }, + + # Metrics reporting (optional) + "metrics": { + # Config below is to print metrics on stdout + "type": "stdout", # to have the raw metrics printed on stdout + "prefix": "snowplow.rdbloader." + + # Config below is to send metrics to StatsD instead of stdout + #"type": "statsD", + #"prefix": "snowplow.rdbloader.", + #"hostname": "localhost", + #"port" : 8125, + #"tags": { "app": "redshift-loader" } # Key-value pairs to be attached to every metric + } } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala index 02cb3146d..311d90369 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala @@ -127,9 +127,6 @@ object Config { } } - final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry]) - final case class SnowplowMonitoring(appId: String, collector: String) - final case class Formats(default: LoaderMessage.Format, tsv: List[SchemaCriterion], json: List[SchemaCriterion], @@ -169,8 +166,48 @@ object Config { } } + final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Option[Metrics]) + final case class SnowplowMonitoring(appId: String, collector: String) + final case class Sentry(dsn: URI) + sealed trait Metrics { + def prefix: String + } + + object Metrics { + final case class Stdout(prefix: String) extends Metrics + final case class StatsD( + prefix: String, + hostname: String, + port: Int, + tags: Map[String, String] + ) extends Metrics + + implicit val stdoutDecoder: Decoder[Metrics.Stdout] = + deriveDecoder[Metrics.Stdout] + + implicit val statsdDecoder: Decoder[Metrics.StatsD] = + deriveDecoder[Metrics.StatsD] + + implicit val metricsDecoder: Decoder[Metrics] = + Decoder.instance { cur => + val typeCur = cur.downField("type") + typeCur.as[String].map(_.toLowerCase) match { + case Right("stdout") => + cur.as[Metrics.Stdout] + case Right("statsd") => + cur.as[Metrics.StatsD] + case Right(other) => + Left(DecodingFailure(s"Metrics type '$other' is not supported. Supported types: 'stdout' and 'statsD'", typeCur.history)) + case Left(DecodingFailure(_, List(CursorOp.DownField("type")))) => + Left(DecodingFailure("Cannot find 'type' string in metrics configuration", typeCur.history)) + case Left(other) => + Left(other) + } + } + } + implicit val batchShredderDecoder: Decoder[Shredder.Batch] = deriveDecoder[Shredder.Batch] implicit val streamShredderDecoder: Decoder[Shredder.Stream] = @@ -221,12 +258,12 @@ object Config { implicit val uriDecoder: Decoder[URI] = Decoder[String].emap(s => Either.catchOnly[IllegalArgumentException](URI.create(s)).leftMap(_.toString)) - implicit val sentryDecoder: Decoder[Sentry] = - deriveDecoder[Sentry] - implicit val formatsDecoder: Decoder[Formats] = deriveDecoder[Formats] + implicit val sentryDecoder: Decoder[Sentry] = + deriveDecoder[Sentry] + implicit val snowplowMonitoringDecoder: Decoder[SnowplowMonitoring] = deriveDecoder[SnowplowMonitoring] diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala index 810ea49e0..6b4bc09a5 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala @@ -89,8 +89,9 @@ object CommonSpec { "us-east-1", None, Config.Monitoring( - Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")), - Some(Config.Sentry(URI.create("http://sentry.acme.com"))) + None, + None, + None ), "messages", shredder, diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala index 65c1b70ce..583895648 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala @@ -64,11 +64,15 @@ class ConfigSpec extends Specification { monitoring = { "snowplow": { - "collector": "snplow.acme.com", + "collector": "http://snplow.acme.com", "appId": "redshift-loader" }, "sentry": { "dsn": "http://sentry.acme.com" + }, + "metrics": { + "type" : "Stdout", + "prefix": "snowplow.rdbloader." } }, @@ -99,7 +103,11 @@ class ConfigSpec extends Specification { val expected: Config[StorageTarget] = (identity[Config[StorageTarget]] _) .compose(Config.formats.set(Config.Formats.Default)) - .compose(Config.monitoring.set(Config.Monitoring(None, None))) + .compose(Config.monitoring.set(Config.Monitoring( + Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")), + Some(Config.Sentry(URI.create("http://sentry.acme.com"))), + Some(Config.Metrics.Stdout("snowplow.rdbloader.")) + ))) .apply(ConfigSpec.configExample) .copy(shredder = Config.Shredder.Stream(input, ConfigSpec.configExample.shredder.output, 10.minutes)) @@ -222,8 +230,9 @@ object ConfigSpec { "us-east-1", None, Config.Monitoring( - Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")), - Some(Config.Sentry(URI.create("http://sentry.acme.com"))) + Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")), + Some(Config.Sentry(URI.create("http://sentry.acme.com"))), + Some(Config.Metrics.Stdout("snowplow.rdbloader.")) ), "messages", Shredder.Batch( diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala index 548ec665c..ebd809f2b 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala @@ -14,10 +14,12 @@ package com.snowplowanalytics.snowplow.rdbloader.dsl import java.net.URI +import io.sentry.{Sentry, SentryOptions} + import cats.{Functor, Monad} import cats.implicits._ -import cats.effect.{Blocker, Clock, Resource, Timer, ConcurrentEffect, Sync} +import cats.effect.{Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer} import cats.effect.concurrent.Ref import fs2.Stream @@ -25,19 +27,20 @@ import fs2.concurrent.SignallingRef import com.snowplowanalytics.iglu.client.Client -import io.sentry.{Sentry, SentryOptions} import com.snowplowanalytics.snowplow.rdbloader.State import com.snowplowanalytics.snowplow.rdbloader.common.S3 import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig +import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Reporter /** Container for most of interepreters to be used in Main * JDBC will be instantiated only when necessary, and as a `Reousrce` */ -class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws: AWS[F], val state: State.Ref[F], val blocker: Blocker) { +class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws: AWS[F], reporter: Reporter[F], val state: State.Ref[F], val blocker: Blocker) { implicit val cacheF: Cache[F] = cache implicit val loggingF: Logging[F] = logging implicit val igluF: Iglu[F] = iglu implicit val awsF: AWS[F] = aws + implicit val reporterF: Reporter[F] = reporter def makeBusy(implicit F: Monad[F]): Resource[F, SignallingRef[F, Boolean]] = Resource.make(busy.flatMap(x => x.set(true).as(x)))(_.set(false)) @@ -50,11 +53,10 @@ class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws private def busy(implicit F: Functor[F]): F[SignallingRef[F, Boolean]] = state.get.map(_.busy) - } object Environment { - def initialize[F[_] : ConcurrentEffect: Clock: Timer](cli: CliConfig): Resource[F, Environment[F]] = { + def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer](cli: CliConfig): Resource[F, Environment[F]] = { val init = for { _ <- initSentry[F](cli.config.monitoring.sentry.map(_.dsn)) cacheMap <- Ref.of[F, Map[String, Option[S3.Key]]](Map.empty) @@ -76,8 +78,9 @@ object Environment { messages <- Resource.eval(Ref.of[F, List[String]](List.empty[String])) tracker <- Logging.initializeTracking[F](cli.config.monitoring, blocker.blockingContext) logging = Logging.loggingInterpreter[F](cli.config.storage, messages, tracker) + reporter = Reporter.build[F](cli.config.monitoring.metrics, logging, blocker) (cache, iglu, aws, state) <- Resource.eval(init) - } yield new Environment(cache, logging, iglu, aws, state, blocker) + } yield new Environment(cache, logging, iglu, aws, reporter, state, blocker) } def initSentry[F[_]: Sync](dsn: Option[URI]): F[Unit] = diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala index 77c2ac676..8aaf60afa 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala @@ -26,6 +26,8 @@ import cats.effect.concurrent.Ref import io.circe.Json +import io.sentry.Sentry + import org.http4s.client.blaze.BlazeClientBuilder import com.snowplowanalytics.iglu.core.{SchemaVer, SelfDescribingData, SchemaKey} @@ -38,9 +40,6 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Monitoring import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget -import io.sentry.Sentry - - trait Logging[F[_]] { /** Track result via Snowplow tracker */ diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Metrics.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Metrics.scala new file mode 100644 index 000000000..73556882c --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Metrics.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics + +import java.time.Duration + +import cats.implicits._ +import cats.Functor + +import cats.effect.Clock + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage + +object Metrics { + + final case class KVMetric(key: String, value: String) + + case class KVMetrics( + countGood: KVMetric, + collectorLatencyMin: Option[KVMetric], + collectorLatencyMax: Option[KVMetric], + shredderStartLatency: KVMetric, + shredderEndLatency: KVMetric + ) { + def toList = + List( + Some(countGood), + collectorLatencyMin, + collectorLatencyMax, + Some(shredderStartLatency), + Some(shredderEndLatency) + ).unite + + def toHumanReadableString = + s"""${countGood.value} good events were loaded. + | It took minimum ${collectorLatencyMin.map(_.value).getOrElse("unknown")} seconds and maximum + | ${collectorLatencyMax.map(_.value).getOrElse("unknown")} seconds between the collector and Redshift for these events. + | It took ${shredderStartLatency.value} seconds between the start of shredder and Redshift + | and ${shredderEndLatency.value} seconds between the completion of shredder and Redshift + """.stripMargin + } + + def getMetrics[F[_]: Clock: Functor](loaded: LoaderMessage.ShreddingComplete): F[KVMetrics] = + Clock[F].instantNow.map { now => + KVMetrics( + KVMetric(CountGoodName, loaded.count.map(_.good).getOrElse(0).toString), + loaded.timestamps.max.map(max => Duration.between(max, now).toSeconds()).map(l => KVMetric(CollectorLatencyMinName, l.toString)), + loaded.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric(CollectorLatencyMaxName, l.toString)), + KVMetric(ShredderStartLatencyName, Duration.between(loaded.timestamps.jobStarted, now).toSeconds().toString), + KVMetric(ShredderEndLatencyName, Duration.between(loaded.timestamps.jobCompleted, now).toSeconds().toString) + ) + } + + private[metrics] val CountGoodName = "count.good" + private[metrics] val CollectorLatencyMinName = "latency.collector_to_load.min" + private[metrics] val CollectorLatencyMaxName = "latency.collector_to_load.max" + private[metrics] val ShredderStartLatencyName = "latency.shredder_start_to_load" + private[metrics] val ShredderEndLatencyName = "latency.shredder_end_to_load" +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Reporter.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Reporter.scala new file mode 100644 index 000000000..81374c19a --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/Reporter.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics + +import cats.Applicative + +import cats.effect.{Blocker, ContextShift, Sync, Timer} + +import com.snowplowanalytics.snowplow.rdbloader.common.config.Config +import com.snowplowanalytics.snowplow.rdbloader.dsl.Logging + +trait Reporter[F[_]] { + def report(metrics: List[Metrics.KVMetric]): F[Unit] +} + +object Reporter { + def apply[F[_]](implicit ev: Reporter[F]): Reporter[F] = ev + + def build[F[_]: ContextShift: Sync: Timer]( + metricsConfig: Option[Config.Metrics], + logging: Logging[F], + blocker: Blocker + ): Reporter[F] = + metricsConfig match { + case Some(statsDConfig: Config.Metrics.StatsD) => + StatsDReporter.build[F](statsDConfig, blocker) + case Some(stdoutConfig: Config.Metrics.Stdout) => + StdoutReporter.build[F](stdoutConfig, logging) + case None => + noop[F] + } + + def noop[F[_]: Applicative]: Reporter[F] = new Reporter[F] { + def report(metrics: List[Metrics.KVMetric]) = Applicative[F].unit + } +} \ No newline at end of file diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StatsDReporter.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StatsDReporter.scala new file mode 100644 index 000000000..983896e95 --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StatsDReporter.scala @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics + +import java.net.{DatagramPacket, DatagramSocket, InetAddress} +import java.nio.charset.StandardCharsets.UTF_8 + +import cats.implicits._ + +import cats.effect.{Blocker, ContextShift, Resource, Sync, Timer} + +import com.snowplowanalytics.snowplow.rdbloader.common.config.Config + +object StatsDReporter { + + def build[F[_]: ContextShift: Sync: Timer](statsDConfig: Config.Metrics.StatsD, blocker: Blocker): Reporter[F] = + new Reporter[F] { + def report(metrics: List[Metrics.KVMetric]): F[Unit] = { + mkSocket[F](blocker).use { socket => + for { + formatted <- Sync[F].pure(metrics.map(statsDFormat(statsDConfig))) + ip <- blocker.delay(InetAddress.getByName(statsDConfig.hostname)) + _ <- formatted.traverse_(sendMetric[F](blocker, socket, ip, statsDConfig.port)) + } yield () + } + } + } + + private def mkSocket[F[_]: ContextShift: Sync](blocker: Blocker): Resource[F, DatagramSocket] = + Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new DatagramSocket)) + + private def sendMetric[F[_]: ContextShift: Sync]( + blocker: Blocker, + socket: DatagramSocket, + addr: InetAddress, + port: Int + )( + metric: String + ): F[Unit] = { + val bytes = metric.getBytes(UTF_8) + val packet = new DatagramPacket(bytes, bytes.length, addr, port) + blocker.delay(socket.send(packet)) + } + + private def statsDFormat(config: Config.Metrics.StatsD)(metric: Metrics.KVMetric): String = { + val tagStr = config.tags.map { case (k, v) => s"$k:$v" }.mkString(",") + val prefix = config.prefix + s"${prefix}${metric.key}:${metric.value}|g|#$tagStr" + } +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StdoutReporter.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StdoutReporter.scala new file mode 100644 index 000000000..f7fed69db --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/StdoutReporter.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics + +import com.snowplowanalytics.snowplow.rdbloader.common.config.Config +import com.snowplowanalytics.snowplow.rdbloader.dsl.Logging + +object StdoutReporter { + + def build[F[_]](stdoutConfig: Config.Metrics.Stdout, logging: Logging[F]): Reporter[F] = + new Reporter[F] { + def report(metrics: List[Metrics.KVMetric]): F[Unit] = { + val formatted = metrics.map(formatMetric(stdoutConfig.prefix)) + val metricsStr = s"METRICS - ${formatted.mkString(" ")}" + logging.info(metricsStr) + } + } + + private def formatMetric(prefix: String)(metric: Metrics.KVMetric): String = + s"$prefix${metric.key}=${metric.value}" +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala index 7f19e403d..9ac0aab40 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala @@ -12,26 +12,25 @@ */ package com.snowplowanalytics.snowplow.rdbloader.loading -import java.time.Duration import scala.concurrent.duration._ import cats.{Applicative, Monad, MonadError} import cats.implicits._ -import cats.effect.Timer +import cats.effect.{Clock, Timer} import retry.{retryingOnSomeErrors, RetryPolicy, RetryPolicies, Sleep, RetryDetails} // This project import com.snowplowanalytics.snowplow.rdbloader._ -import com.snowplowanalytics.snowplow.rdbloader.common.{ Message, LoaderMessage } +import com.snowplowanalytics.snowplow.rdbloader.common.{LoaderMessage, Message} import com.snowplowanalytics.snowplow.rdbloader.common.config.{ Config, StorageTarget } import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig import com.snowplowanalytics.snowplow.rdbloader.db.{ Migration, Statement, Manifest } import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, JDBC, Iglu} - +import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.{Metrics, Reporter} /** Entry-point for loading-related logic */ object Load { @@ -45,7 +44,10 @@ object Load { * @param cli RDB Loader app configuration * @param discovery discovered folder to load */ - def load[F[_]: MonadThrow: JDBC: Iglu: Timer: Logging](cli: CliConfig, discovery: Message[F, DataDiscovery.WithOrigin]): LoaderAction[F, Unit] = + def load[F[_]: Iglu: JDBC: Logging: MonadThrow: Reporter: Timer]( + cli: CliConfig, + discovery: Message[F, DataDiscovery.WithOrigin] + ): LoaderAction[F, Unit] = cli.config.storage match { case redshift: StorageTarget.Redshift => val redshiftConfig: Config[StorageTarget.Redshift] = cli.config.copy(storage = redshift) @@ -63,7 +65,7 @@ object Load { RedshiftLoader.run[F](redshiftConfig, discovery.data.discovery) <* Manifest.add[F](redshiftConfig.storage.schema, discovery.data.origin) <* JDBC[F].executeUpdate(Statement.Commit) <* - congratulate[F](discovery.data.origin) + congratulate[F](discovery.data.origin).liftA } // With manifest protecting from double-loading it's safer to ack *after* commit @@ -119,14 +121,15 @@ object Load { .limitRetries[LoaderAction[F, *]](MaxRetries) .join(RetryPolicies.exponentialBackoff(Backoff)) - private def congratulate[F[_]: Monad: Logging: Timer](message: LoaderMessage.ShreddingComplete): LoaderAction[F, Unit] = - Timer[F].clock.instantNow.flatMap { now => - val count = message.count.map(c => s"${c.good} events").getOrElse("volume is unknown") - val latency = message.timestamps.min match { - case Some(earliest) => s"${Duration.between(earliest, now).toSeconds} seconds" - case None => "unknown" - } - Logging[F].info(s"Folder [${message.base}] has been loaded and committed. Success!") *> - Logging[F].info(s"Folder [${message.base}] ($count) latency: $latency") - }.liftA + private def congratulate[F[_]: Clock: Logging: Monad: Reporter]( + loaded: LoaderMessage.ShreddingComplete + ): F[Unit] = { + val reportMetrics: F[Unit] = + for { + metrics <- Metrics.getMetrics[F](loaded) + _ <- Reporter[F].report(metrics.toList) + _ <- Logging[F].info(metrics.toHumanReadableString) + } yield () + Logging[F].info(s"Folder ${loaded.base} loaded successfully") >> reportMetrics + } } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala index 7b8e063d9..ee4d17ae2 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala @@ -52,8 +52,9 @@ object SpecHelpers { "us-east-1", None, Config.Monitoring( - Some(Config.SnowplowMonitoring("redshift-loader","snplow.acme.com")), - Some(Config.Sentry(URI.create("http://sentry.acme.com"))) + Some(Config.SnowplowMonitoring("redshift-loader","http://snplow.acme.com")), + Some(Config.Sentry(URI.create("http://sentry.acme.com"))), + Some(Config.Metrics.StatsD("snowplow.rdbloader.", "localhost", 8125, Map("app" -> "redshift-loader"))) ), "messages", Config.Shredder.Batch( diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala index 64b5ee02b..50c4181a9 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala @@ -97,11 +97,18 @@ object CliConfigSpec { monitoring = { "snowplow": { - "collector": "snplow.acme.com", + "collector": "http://snplow.acme.com", "appId": "redshift-loader" }, "sentry": { "dsn": "http://sentry.acme.com" + }, + "metrics": { + "type": "statsd", + "prefix": "snowplow.rdbloader.", + "hostname": "localhost", + "port" : 8125, + "tags": { "app": "redshift-loader" } } }, diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/MetricsSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/MetricsSpec.scala new file mode 100644 index 000000000..c707f7b76 --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/metrics/MetricsSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics + +import java.time.Instant + +import scala.concurrent.duration.TimeUnit + +import cats.Id + +import cats.effect.Clock + +import org.specs2.mutable.Specification + +import com.snowplowanalytics.snowplow.rdbloader.common.config.{Config, Semver} +import com.snowplowanalytics.snowplow.rdbloader.common.S3 +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage._ + +class MetricsSpec extends Specification { + + val nanos = System.nanoTime() + val now = Instant.ofEpochSecond(0L, nanos) + + implicit val clockIdImpl: Clock[Id] = new Clock[Id] { + def realTime(unit: TimeUnit): Id[Long] = nanos + def monotonic(unit: TimeUnit): Id[Long] = 0L + } + + "getMetrics" should { + "compute the metrics" in { + val countGood = 42L + val collectorLatencyMin = 120L + val collectorLatencyMax = 200L + val shredderStartLatency = 50L + val shredderEndLatency = 10L + + val loaded = ShreddingComplete( + S3.Folder.coerce("s3://shredded/run_id/"), + Nil, + Timestamps( + jobStarted = now.minusSeconds(shredderStartLatency), + jobCompleted = now.minusSeconds(shredderEndLatency), + min = Some(now.minusSeconds(collectorLatencyMax)), + max = Some(now.minusSeconds(collectorLatencyMin)) + ), + Config.Shredder.Compression.Gzip, + Processor("loader_unit_tests", Semver(0, 0, 0, None)), + Some(Count(countGood)) + ) + + val expected = Metrics.KVMetrics( + Metrics.KVMetric(Metrics.CountGoodName, countGood.toString), + Some(Metrics.KVMetric(Metrics.CollectorLatencyMinName, collectorLatencyMin.toString)), + Some(Metrics.KVMetric(Metrics.CollectorLatencyMaxName, collectorLatencyMax .toString)), + Metrics.KVMetric(Metrics.ShredderStartLatencyName, shredderStartLatency.toString), + Metrics.KVMetric(Metrics.ShredderEndLatencyName, shredderEndLatency .toString) + ) + + val actual = Metrics.getMetrics[Id](loaded) + + actual === expected + } + } +} \ No newline at end of file diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 849ebce5b..7893e62d6 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -27,12 +27,13 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Co import com.snowplowanalytics.snowplow.rdbloader.common.config.{Step, Semver} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, Iglu, JDBC} +import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Reporter import com.snowplowanalytics.snowplow.rdbloader.loading.LoadSpec.{isVacuum, failCommit, isFirstCommit, failVacuum} import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Manifest} import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers._ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Timestamps, Processor, Format} import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry -import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, TestState, PureIglu, PureJDBC, PureOps, PureLogging, PureTimer} +import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureIglu, PureJDBC, PureLogging, PureOps, PureReporter, PureTimer, TestState} import org.specs2.mutable.Specification @@ -43,6 +44,7 @@ class LoadSpec extends Specification { implicit val jdbc: JDBC[Pure] = PureJDBC.interpreter(PureJDBC.init) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val timer: Timer[Pure] = PureTimer.interpreter + implicit val reporter: Reporter[Pure] = PureReporter.interpreter val message = Message(LoadSpec.dataDiscoveryWithOrigin, Pure.pure(())) @@ -68,6 +70,7 @@ class LoadSpec extends Specification { implicit val jdbc: JDBC[Pure] = PureJDBC.interpreter(PureJDBC.init) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val timer: Timer[Pure] = PureTimer.interpreter + implicit val reporter: Reporter[Pure] = PureReporter.interpreter val message = Message(LoadSpec.dataDiscoveryWithOrigin, Pure.modify(_.log("ACK"))) @@ -94,6 +97,7 @@ class LoadSpec extends Specification { implicit val jdbc: JDBC[Pure] = PureJDBC.interpreter(PureJDBC.init) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val timer: Timer[Pure] = PureTimer.interpreter + implicit val reporter: Reporter[Pure] = PureReporter.interpreter val message = Message(LoadSpec.dataDiscoveryWithOrigin, Pure.fail[Unit](new RuntimeException("Failed ack"))) @@ -106,7 +110,7 @@ class LoadSpec extends Specification { LogEntry.Sql(Statement.ShreddedCopy("atomic",info, "us-east-1",1,arn,Compression.Gzip)), LogEntry.Sql(Statement.ManifestAdd("atomic",LoadSpec.dataDiscoveryWithOrigin.origin)), LogEntry.Sql(Statement.Commit), - LogEntry.Message("TICK REALTIME"), + LogEntry.Message("TICK REALTIME") ) val result = Load.load[Pure](SpecHelpers.validCliConfig, message).value.runS @@ -119,6 +123,7 @@ class LoadSpec extends Specification { implicit val jdbc: JDBC[Pure] = PureJDBC.interpreter(PureJDBC.init.withExecuteUpdate(isFirstCommit, failCommit)) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val timer: Timer[Pure] = PureTimer.interpreter + implicit val reporter: Reporter[Pure] = PureReporter.interpreter val message = Message(LoadSpec.dataDiscoveryWithOrigin, Pure.pure(())) @@ -138,7 +143,7 @@ class LoadSpec extends Specification { LogEntry.Sql(Statement.ShreddedCopy("atomic",info, "us-east-1",1,arn,Compression.Gzip)), LogEntry.Sql(Statement.ManifestAdd("atomic",LoadSpec.dataDiscoveryWithOrigin.origin)), LogEntry.Sql(Statement.Commit), - LogEntry.Message("TICK REALTIME"), + LogEntry.Message("TICK REALTIME") ) val result = Load.load[Pure](SpecHelpers.validCliConfig, message).value.runS @@ -158,6 +163,7 @@ class LoadSpec extends Specification { implicit val jdbc: JDBC[Pure] = PureJDBC.interpreter(PureJDBC.custom(getResult)) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val timer: Timer[Pure] = PureTimer.interpreter + implicit val reporter: Reporter[Pure] = PureReporter.interpreter val message = Message(LoadSpec.dataDiscoveryWithOrigin, Pure.pure(())) @@ -176,6 +182,7 @@ class LoadSpec extends Specification { implicit val jdbc: JDBC[Pure] = PureJDBC.interpreter(PureJDBC.init.withExecuteUpdate(isVacuum, failVacuum)) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val timer: Timer[Pure] = PureTimer.interpreter + implicit val reporter: Reporter[Pure] = PureReporter.interpreter val message = Message(LoadSpec.dataDiscoveryWithOrigin, Pure.pure(())) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureReporter.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureReporter.scala new file mode 100644 index 000000000..7cf25cadb --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureReporter.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.test + +import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.{Metrics, Reporter} + +object PureReporter { + + def interpreter: Reporter[Pure] = new Reporter[Pure] { + def report(metrics: List[Metrics.KVMetric]): Pure[Unit] = + Pure.pure(()) + } +}