Skip to content

Commit

Permalink
print metrics on stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 15, 2021
1 parent 1e5791f commit 35f9a28
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ object Logging {
}

def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] =
statsD.sendMetrics(loaded)
.handleErrorWith(e => error(s"Problem happened when sending metrics to StatsD: ${e.getMessage()}\n${e.printStackTrace()}"))
for {
metrics <- statsD.sendMetrics(loaded)
metricsStr = metrics.map { case StatsD.KVMetric(k, v) => s"$k:$v"}.mkString(", ")
_ <- if(metricsStr.nonEmpty) info(s"Metrics: $metricsStr") else Sync[F].unit
} yield ()

private def sanitize(string: String): String =
Common.sanitize(string, List(targetConfig.password.getUnencrypted, targetConfig.username))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config

trait StatsD[F[_]] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit]
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[List[StatsD.KVMetric]]
}

object StatsD {
Expand All @@ -41,26 +41,26 @@ object StatsD {
statsDConfig match {
case Some(config) =>
new StatsD[F] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] =
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] =
mkSocket[F](blocker).use { socket =>
for {
metrics <- getMetrics[F](loaded)
formatted = metrics.map(statsDFormat(config))
ip <- blocker.delay(InetAddress.getByName(config.hostname))
_ <- formatted.traverse_(sendMetric[F](blocker, socket, ip, config.port))
} yield ()
} yield (metrics)
}
}
case None =>
new StatsD[F] {
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] = Monad[F].unit
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] = getMetrics[F](loaded)
}
}

private def mkSocket[F[_]: ContextShift: Sync](blocker: Blocker): Resource[F, DatagramSocket] =
Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new DatagramSocket))

def getMetrics[F[_]: Monad: Timer](loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] =
private def getMetrics[F[_]: Monad: Timer](loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] =
Timer[F].clock.instantNow.map { now =>
List(
loaded.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric(MaxCollectorLatencyName, l.toString)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object PureLogging {
def trackException(e: Throwable): Pure[Unit] =
results.print(s"EXCEPTION ${e.getMessage}")
def sendMetrics(loaded: LoaderMessage.ShreddingComplete): Pure[Unit] =
Pure.pure(())
results.print(s"sendMetrics")
}

def print(message: String): Pure[Unit] =
Expand Down

0 comments on commit 35f9a28

Please sign in to comment.