Skip to content

Commit

Permalink
add stdout reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 22, 2021
1 parent c3b2d03 commit 92a6357
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 99 deletions.
20 changes: 12 additions & 8 deletions config/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,21 @@
"appId": "redshift-loader",
"collector": "http://snplow.acme.com",
}

# Sentry (optional)
"sentry": {
"dsn": "http://sentry.acme.com"
},
# StatsD where metrics get sent (optional)
"statsD": {
"hostname": "localhost",
"port" : 8125,
"metricsPrefix": "snowplow.rdbloader.",
# Key-value pairs to be attached to every metric
"tags": { "app": "redshift-loader" }
}

# Metrics reporting (optional)
"metrics": {
"prefix": "snowplow.rdbloader.",
"type": "Stdout" # to have the raw metrics printed on stdout

# config below is to send metrics to StatsD instead of stdout
#"type": "StatsD",
#"hostname": "localhost",
#"port" : 8125,
#"tags": { "app": "redshift-loader" } # Key-value pairs to be attached to every metric
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object Config {
}
}

final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], statsD: Option[StatsD])
final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Option[Metrics])
final case class SnowplowMonitoring(appId: String, collector: String)

final case class Formats(default: LoaderMessage.Format,
Expand Down Expand Up @@ -171,7 +171,20 @@ object Config {

final case class Sentry(dsn: URI)

final case class StatsD(hostname: String, port: Int, metricsPrefix: String, tags: Map[String, String])
sealed trait Metrics {
def prefix: String
}

object Metrics {
final case class Stdout(prefix: String) extends Metrics
final case class StatsD(
prefix: String,
hostname: String,
port: Int,
metricsPrefix: String,
tags: Map[String, String]
) extends Metrics
}

implicit val batchShredderDecoder: Decoder[Shredder.Batch] =
deriveDecoder[Shredder.Batch]
Expand Down Expand Up @@ -226,9 +239,9 @@ object Config {
implicit val sentryDecoder: Decoder[Sentry] =
deriveDecoder[Sentry]

implicit val statsDDecoder: Decoder[StatsD] =
deriveDecoder[StatsD]

implicit val metricsDecoder: Decoder[Metrics] =
deriveDecoder[Metrics]
implicit val formatsDecoder: Decoder[Formats] =
deriveDecoder[Formats]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ package com.snowplowanalytics.snowplow.rdbloader.dsl

import java.net.URI

import io.sentry.{Sentry, SentryOptions}

import cats.{Functor, Monad}
import cats.implicits._

Expand All @@ -25,19 +27,20 @@ import fs2.concurrent.SignallingRef

import com.snowplowanalytics.iglu.client.Client

import io.sentry.{Sentry, SentryOptions}
import com.snowplowanalytics.snowplow.rdbloader.State
import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Reporter

/** Container for most of interepreters to be used in Main
* JDBC will be instantiated only when necessary, and as a `Reousrce`
*/
class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws: AWS[F], val state: State.Ref[F], val blocker: Blocker) {
class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws: AWS[F], reporter: Reporter[F], val state: State.Ref[F], val blocker: Blocker) {
implicit val cacheF: Cache[F] = cache
implicit val loggingF: Logging[F] = logging
implicit val igluF: Iglu[F] = iglu
implicit val awsF: AWS[F] = aws
implicit val reporterF: Reporter[F] = reporter

def makeBusy(implicit F: Monad[F]): Resource[F, SignallingRef[F, Boolean]] =
Resource.make(busy.flatMap(x => x.set(true).as(x)))(_.set(false))
Expand All @@ -50,7 +53,6 @@ class Environment[F[_]](cache: Cache[F], logging: Logging[F], iglu: Iglu[F], aws

private def busy(implicit F: Functor[F]): F[SignallingRef[F, Boolean]] =
state.get.map(_.busy)

}

object Environment {
Expand All @@ -75,10 +77,10 @@ object Environment {
blocker <- Blocker[F]
messages <- Resource.eval(Ref.of[F, List[String]](List.empty[String]))
tracker <- Logging.initializeTracking[F](cli.config.monitoring, blocker.blockingContext)
statsD = StatsD.getInstance[F](cli.config.monitoring.statsD, blocker)
logging = Logging.loggingInterpreter[F](cli.config.storage, messages, tracker, statsD)
logging = Logging.loggingInterpreter[F](cli.config.storage, messages, tracker)
reporter = Reporter.build[F](cli.config.monitoring.metrics, logging, blocker)
(cache, iglu, aws, state) <- Resource.eval(init)
} yield new Environment(cache, logging, iglu, aws, state, blocker)
} yield new Environment(cache, logging, iglu, aws, reporter, state, blocker)
}

def initSentry[F[_]: Sync](dsn: Option[URI]): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import com.snowplowanalytics.snowplow.scalatracker.{Tracker, Emitter}
import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter

import com.snowplowanalytics.snowplow.rdbloader.LoaderError
import com.snowplowanalytics.snowplow.rdbloader.common.{Common, LoaderMessage}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Monitoring
import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget

Expand All @@ -53,9 +53,6 @@ trait Logging[F[_]] {

/** Log an error to Sentry if it's configured */
def trackException(e: Throwable): F[Unit]

/** Send metrics to StatsD if configured */
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit]
}

object Logging {
Expand All @@ -68,9 +65,8 @@ object Logging {
DateTimeFormatter.ofPattern("YYYY-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault())

def loggingInterpreter[F[_]: Sync](targetConfig: StorageTarget,
messages: Ref[F, List[String]],
tracker: Option[Tracker[F]],
statsD: StatsD[F]): Logging[F] =
messages: Ref[F, List[String]],
tracker: Option[Tracker[F]]): Logging[F] =
new Logging[F] {

/** Track result via Snowplow tracker */
Expand Down Expand Up @@ -103,12 +99,6 @@ object Logging {
case NonFatal(_) => ()
}

def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] =
for {
metrics <- statsD.sendMetrics(loaded)
_ <- info(metrics.toHumanReadableString)
} yield ()

private def sanitize(string: String): String =
Common.sanitize(string, List(targetConfig.password.getUnencrypted, targetConfig.username))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,18 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl
package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics

import java.time.Duration
import java.net.{DatagramPacket, DatagramSocket, InetAddress}
import java.nio.charset.StandardCharsets.UTF_8

import cats.Functor
import cats.implicits._
import cats.Functor

import cats.effect.{Blocker, Clock, ContextShift, Resource, Sync, Timer}
import cats.effect.Clock

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config

trait StatsD[F[_]] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[StatsD.KVMetrics]
}

object StatsD {
object Metrics {

final case class KVMetric(key: String, value: String)

Expand All @@ -52,41 +45,12 @@ object StatsD {
s"""${countGood.value} good events were loaded.
| It took minimum ${collectorLatencyMin.map(_.value).getOrElse("unknown")} seconds and maximum
| ${collectorLatencyMax.map(_.value).getOrElse("unknown")} seconds between the collector and Redshift for these events.
| It took ${shredderStartLatency.value} seconds between the start of the shredder and Redshift
| and ${shredderEndLatency.value} seconds between the completion of the shredder and Redshift
| It took ${shredderStartLatency.value} seconds between the start of shredder and Redshift
| and ${shredderEndLatency.value} seconds between the completion of shredder and Redshift
""".stripMargin
}

private val CountGoodName = "count.good"
private val CollectorLatencyMinName = "latency.collector_to_load.min"
private val CollectorLatencyMaxName = "latency.collector_to_load.max"
private val ShredderStartLatencyName = "latency.shredder_start_to_load"
private val ShredderEndLatencyName = "latency.shredder_end_to_load"

def getInstance[F[_]: ContextShift: Sync: Timer](statsDConfig: Option[Config.StatsD], blocker: Blocker): StatsD[F] =
statsDConfig match {
case Some(config) =>
new StatsD[F] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[KVMetrics] =
mkSocket[F](blocker).use { socket =>
for {
metrics <- getMetrics[F](loaded)
formatted = metrics.toList.map(statsDFormat(config))
ip <- blocker.delay(InetAddress.getByName(config.hostname))
_ <- formatted.traverse_(sendMetric[F](blocker, socket, ip, config.port))
} yield metrics
}
}
case None =>
new StatsD[F] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[KVMetrics] = getMetrics[F](loaded)
}
}

private def mkSocket[F[_]: ContextShift: Sync](blocker: Blocker): Resource[F, DatagramSocket] =
Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new DatagramSocket))

private def getMetrics[F[_]: Clock: Functor](loaded: LoaderMessage.ShreddingComplete): F[KVMetrics] =
def getMetrics[F[_]: Clock: Functor](loaded: LoaderMessage.ShreddingComplete): F[KVMetrics] =
Clock[F].instantNow.map { now =>
KVMetrics(
KVMetric(CountGoodName, loaded.count.getOrElse(0).toString),
Expand All @@ -97,22 +61,9 @@ object StatsD {
)
}

private def sendMetric[F[_]: ContextShift: Sync](
blocker: Blocker,
socket: DatagramSocket,
addr: InetAddress,
port: Int
)(
metric: String
): F[Unit] = {
val bytes = metric.getBytes(UTF_8)
val packet = new DatagramPacket(bytes, bytes.length, addr, port)
blocker.delay(socket.send(packet))
}

private def statsDFormat(config: Config.StatsD)(metric: KVMetric): String = {
val tagStr = config.tags.map { case (k, v) => s"$k:$v" }.mkString(",")
val prefix = config.metricsPrefix
s"${prefix}${metric.key}:${metric.value}|g|#$tagStr"
}
private val CountGoodName = "count.good"
private val CollectorLatencyMinName = "latency.collector_to_load.min"
private val CollectorLatencyMaxName = "latency.collector_to_load.max"
private val ShredderStartLatencyName = "latency.shredder_start_to_load"
private val ShredderEndLatencyName = "latency.shredder_end_to_load"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics

import cats.Applicative

import cats.effect.{Blocker, ContextShift, Sync, Timer}

import com.snowplowanalytics.snowplow.rdbloader.common.config.Config
import com.snowplowanalytics.snowplow.rdbloader.dsl.Logging

trait Reporter[F[_]] {
def report(metrics: List[Metrics.KVMetric]): F[Unit]
}

object Reporter {
def apply[F[_]](implicit ev: Reporter[F]): Reporter[F] = ev

def build[F[_]: ContextShift: Sync: Timer](
metricsConfig: Option[Config.Metrics],
logging: Logging[F],
blocker: Blocker
): Reporter[F] =
metricsConfig match {
case Some(statsDConfig: Config.Metrics.StatsD) =>
StatsDReporter.build[F](statsDConfig, blocker)
case Some(stdoutConfig: Config.Metrics.Stdout) =>
StdoutReporter.build[F](stdoutConfig, logging)
case None =>
noop[F]
}

def noop[F[_]: Applicative]: Reporter[F] = new Reporter[F] {
def report(metrics: List[Metrics.KVMetric]) = Applicative[F].unit
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl.metrics

import java.net.{DatagramPacket, DatagramSocket, InetAddress}
import java.nio.charset.StandardCharsets.UTF_8

import cats.implicits._

import cats.effect.{Blocker, ContextShift, Resource, Sync, Timer}

import com.snowplowanalytics.snowplow.rdbloader.common.config.Config

object StatsDReporter {

def build[F[_]: ContextShift: Sync: Timer](statsDConfig: Config.Metrics.StatsD, blocker: Blocker): Reporter[F] =
new Reporter[F] {
def report(metrics: List[Metrics.KVMetric]): F[Unit] = {
mkSocket[F](blocker).use { socket =>
for {
formatted <- Sync[F].pure(metrics.map(statsDFormat(statsDConfig)))
ip <- blocker.delay(InetAddress.getByName(statsDConfig.hostname))
_ <- formatted.traverse_(sendMetric[F](blocker, socket, ip, statsDConfig.port))
} yield ()
}
}
}

private def mkSocket[F[_]: ContextShift: Sync](blocker: Blocker): Resource[F, DatagramSocket] =
Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new DatagramSocket))

private def sendMetric[F[_]: ContextShift: Sync](
blocker: Blocker,
socket: DatagramSocket,
addr: InetAddress,
port: Int
)(
metric: String
): F[Unit] = {
val bytes = metric.getBytes(UTF_8)
val packet = new DatagramPacket(bytes, bytes.length, addr, port)
blocker.delay(socket.send(packet))
}

private def statsDFormat(config: Config.Metrics.StatsD)(metric: Metrics.KVMetric): String = {
val tagStr = config.tags.map { case (k, v) => s"$k:$v" }.mkString(",")
val prefix = config.metricsPrefix
s"${prefix}${metric.key}:${metric.value}|g|#$tagStr"
}
}
Loading

0 comments on commit 92a6357

Please sign in to comment.