From 35f9a2861e9fcdbf1eff143b704968d97988b2da Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 15 Apr 2021 13:17:11 +0200 Subject: [PATCH] print metrics on stdout --- .../snowplow/rdbloader/dsl/Logging.scala | 7 +++++-- .../snowplow/rdbloader/dsl/StatsD.scala | 10 +++++----- .../snowplow/rdbloader/test/PureLogging.scala | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala index 6bf84b1c5..99174ea77 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Logging.scala @@ -104,8 +104,11 @@ object Logging { } def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] = - statsD.sendMetrics(loaded) - .handleErrorWith(e => error(s"Problem happened when sending metrics to StatsD: ${e.getMessage()}\n${e.printStackTrace()}")) + for { + metrics <- statsD.sendMetrics(loaded) + metricsStr = metrics.map { case StatsD.KVMetric(k, v) => s"$k:$v"}.mkString(", ") + _ <- if(metricsStr.nonEmpty) info(s"Metrics: $metricsStr") else Sync[F].unit + } yield () private def sanitize(string: String): String = Common.sanitize(string, List(targetConfig.password.getUnencrypted, targetConfig.username)) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/StatsD.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/StatsD.scala index 6453e95e0..c3d4059c0 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/StatsD.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/StatsD.scala @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.config.Config trait StatsD[F[_]] { - def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] + def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[List[StatsD.KVMetric]] } object StatsD { @@ -41,26 +41,26 @@ object StatsD { statsDConfig match { case Some(config) => new StatsD[F] { - def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] = + def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] = mkSocket[F](blocker).use { socket => for { metrics <- getMetrics[F](loaded) formatted = metrics.map(statsDFormat(config)) ip <- blocker.delay(InetAddress.getByName(config.hostname)) _ <- formatted.traverse_(sendMetric[F](blocker, socket, ip, config.port)) - } yield () + } yield (metrics) } } case None => new StatsD[F] { - def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[Unit] = Monad[F].unit + def sendMetrics(loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] = getMetrics[F](loaded) } } private def mkSocket[F[_]: ContextShift: Sync](blocker: Blocker): Resource[F, DatagramSocket] = Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new DatagramSocket)) - def getMetrics[F[_]: Monad: Timer](loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] = + private def getMetrics[F[_]: Monad: Timer](loaded: LoaderMessage.ShreddingComplete): F[List[KVMetric]] = Timer[F].clock.instantNow.map { now => List( loaded.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric(MaxCollectorLatencyName, l.toString)), diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureLogging.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureLogging.scala index 857c723ee..a2fc9ce7f 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureLogging.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureLogging.scala @@ -24,7 +24,7 @@ object PureLogging { def trackException(e: Throwable): Pure[Unit] = results.print(s"EXCEPTION ${e.getMessage}") def sendMetrics(loaded: LoaderMessage.ShreddingComplete): Pure[Unit] = - Pure.pure(()) + results.print(s"sendMetrics") } def print(message: String): Pure[Unit] =