Skip to content

Commit

Permalink
Merge pull request #2145 from softwaremill/prometheus-update
Browse files Browse the repository at this point in the history
Updated Prometheus to 1.2.1
  • Loading branch information
Pask423 committed Apr 25, 2024
2 parents f0e0a0d + 1149ddd commit 6d86f81
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 121 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -937,7 +937,7 @@ lazy val prometheusBackend = (projectMatrix in file("observability/prometheus-ba
.settings(
name := "prometheus-backend",
libraryDependencies ++= Seq(
"io.prometheus" % "simpleclient" % "0.16.0"
"io.prometheus" % "prometheus-metrics-core" % "1.2.1"
),
scalaTest
)
Expand Down
@@ -1,8 +1,11 @@
package sttp.client4.prometheus

import io.prometheus.metrics.core.datapoints.{GaugeDataPoint, Timer}
import io.prometheus.metrics.core.metrics.{Counter, Gauge, Histogram, Summary}
import io.prometheus.metrics.model.registry.{Collector, PrometheusRegistry}

import java.util.concurrent.ConcurrentHashMap
import sttp.client4.{wrappers, _}
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram, Summary}
import sttp.client4.listener.{ListenerBackend, RequestListener}
import sttp.client4.prometheus.PrometheusBackend.RequestCollectors
import sttp.client4.wrappers.FollowRedirectsBackend
Expand All @@ -11,6 +14,7 @@ import sttp.model.StatusCode
import scala.collection.mutable

object PrometheusBackend {
// TODO: Refactor metrics names according to Prometheus/OpenTelemetry standards
val DefaultHistogramName = "sttp_request_latency"
val DefaultRequestsInProgressGaugeName = "sttp_requests_in_progress"
val DefaultSuccessCounterName = "sttp_requests_success_count"
Expand Down Expand Up @@ -64,11 +68,11 @@ object PrometheusBackend {
(r: (GenericRequest[_, _], Throwable)) => config.requestToFailureCounterMapper(r._1, r._2),
(req: GenericRequest[_, _]) => config.requestToSizeSummaryMapper(req),
(rr: (GenericRequest[_, _], Response[_])) => config.responseToSizeSummaryMapper(rr._1, rr._2),
config.collectorRegistry,
cacheFor(histograms, config.collectorRegistry),
cacheFor(gauges, config.collectorRegistry),
cacheFor(counters, config.collectorRegistry),
cacheFor(summaries, config.collectorRegistry)
config.prometheusRegistry,
cacheFor(histograms, config.prometheusRegistry),
cacheFor(gauges, config.prometheusRegistry),
cacheFor(counters, config.prometheusRegistry),
cacheFor(summaries, config.prometheusRegistry)
)

/** Add, if not present, a "method" label. That is, if the user already supplied such a label, it is left as-is.
Expand Down Expand Up @@ -117,13 +121,11 @@ object PrometheusBackend {

/** Clear cached collectors (gauges and histograms) both from the given collector registry, and from the backend.
*/
def clear(collectorRegistry: CollectorRegistry): Unit = {
collectorRegistry.clear()
histograms.remove(collectorRegistry)
gauges.remove(collectorRegistry)
counters.remove(collectorRegistry)
summaries.remove(collectorRegistry)
()
def clear(prometheusRegistry: PrometheusRegistry): Unit = {
clear(prometheusRegistry, histograms)
clear(prometheusRegistry, gauges)
clear(prometheusRegistry, counters)
clear(prometheusRegistry, summaries)
}

/*
Expand All @@ -132,19 +134,28 @@ object PrometheusBackend {
Hence, we need to store a global cache o created histograms/gauges, so that we can properly re-use them.
*/

private val histograms = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Histogram]]
private val gauges = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Gauge]]
private val counters = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Counter]]
private val summaries = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Summary]]
private def clear[T <: Collector](prometheusRegistry: PrometheusRegistry, collectors: mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, T]]): Unit = {
collectors
.getOrElse(prometheusRegistry, new ConcurrentHashMap[String, T]())
.values()
.forEach(c => prometheusRegistry.unregister(c))
collectors.remove(prometheusRegistry)
}

private val histograms = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Histogram]]
private val gauges = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Gauge]]
private val counters = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Counter]]
private val summaries = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Summary]]

private def cacheFor[T](
cache: mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, T]],
collectorRegistry: CollectorRegistry
cache: mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, T]],
prometheusRegistry: PrometheusRegistry
): ConcurrentHashMap[String, T] =
cache.synchronized {
cache.getOrElseUpdate(collectorRegistry, new ConcurrentHashMap[String, T]())
// TODO remove synchronized and improve caching
cache.getOrElseUpdate(prometheusRegistry, new ConcurrentHashMap[String, T]())
}
final case class RequestCollectors(maybeTimer: Option[Histogram.Timer], maybeGauge: Option[Gauge.Child])
final case class RequestCollectors(maybeTimer: Option[Timer], maybeGauge: Option[GaugeDataPoint])
}

class PrometheusListener(
Expand All @@ -155,22 +166,22 @@ class PrometheusListener(
requestToFailureCounterMapper: ((GenericRequest[_, _], Exception)) => Option[CollectorConfig],
requestToSizeSummaryMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSizeSummaryMapper: ((GenericRequest[_, _], Response[_])) => Option[CollectorConfig],
collectorRegistry: CollectorRegistry,
prometheusRegistry: PrometheusRegistry,
histogramsCache: ConcurrentHashMap[String, Histogram],
gaugesCache: ConcurrentHashMap[String, Gauge],
countersCache: ConcurrentHashMap[String, Counter],
summariesCache: ConcurrentHashMap[String, Summary]
) extends RequestListener[Identity, RequestCollectors] {

override def beforeRequest(request: GenericRequest[_, _]): RequestCollectors = {
val requestTimer: Option[Histogram.Timer] = for {
val requestTimer: Option[Timer] = for {
histogramData <- requestToHistogramNameMapper(request)
histogram: Histogram = getOrCreateMetric(histogramsCache, histogramData, createNewHistogram)
} yield histogram.labels(histogramData.labelValues: _*).startTimer()
} yield histogram.labelValues(histogramData.labelValues: _*).startTimer()

val gauge: Option[Gauge.Child] = for {
val gauge: Option[GaugeDataPoint] = for {
gaugeData <- requestToInProgressGaugeNameMapper(request)
} yield getOrCreateMetric(gaugesCache, gaugeData, createNewGauge).labels(gaugeData.labelValues: _*)
} yield getOrCreateMetric(gaugesCache, gaugeData, createNewGauge).labelValues(gaugeData.labelValues: _*)

observeRequestContentLengthSummaryIfMapped(request, requestToSizeSummaryMapper)

Expand Down Expand Up @@ -214,7 +225,7 @@ class PrometheusListener(
mapper: T => Option[BaseCollectorConfig]
): Unit =
mapper(request).foreach { data =>
getOrCreateMetric(countersCache, data, createNewCounter).labels(data.labelValues: _*).inc()
getOrCreateMetric(countersCache, data, createNewCounter).labelValues(data.labelValues: _*).inc()
}

private def observeResponseContentLengthSummaryIfMapped(
Expand All @@ -224,7 +235,7 @@ class PrometheusListener(
): Unit =
mapper((request, response)).foreach { data =>
response.contentLength.map(_.toDouble).foreach { size =>
getOrCreateMetric(summariesCache, data, createNewSummary).labels(data.labelValues: _*).observe(size)
getOrCreateMetric(summariesCache, data, createNewSummary).labelValues(data.labelValues: _*).observe(size)
}
}

Expand All @@ -234,7 +245,7 @@ class PrometheusListener(
): Unit =
mapper(request).foreach { data =>
(request.contentLength: Option[Long]).map(_.toDouble).foreach { size =>
getOrCreateMetric(summariesCache, data, createNewSummary).labels(data.labelValues: _*).observe(size)
getOrCreateMetric(summariesCache, data, createNewSummary).labelValues(data.labelValues: _*).observe(size)
}
}

Expand All @@ -252,36 +263,36 @@ class PrometheusListener(

private def createNewHistogram(data: HistogramCollectorConfig): Histogram =
Histogram
.build()
.buckets(data.buckets: _*)
.builder()
.classicUpperBounds(data.buckets: _*)
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)

private def createNewGauge(data: BaseCollectorConfig): Gauge =
Gauge
.build()
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)

private def createNewCounter(data: BaseCollectorConfig): Counter =
Counter
.build()
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)

private def createNewSummary(data: BaseCollectorConfig): Summary =
Summary
.build()
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)
}

trait BaseCollectorConfig {
Expand Down
@@ -1,6 +1,6 @@
package sttp.client4.prometheus

import io.prometheus.client.CollectorRegistry
import io.prometheus.metrics.model.registry.PrometheusRegistry
import sttp.client4.GenericRequest
import sttp.client4.Response
import sttp.client4.prometheus.PrometheusBackend._
Expand All @@ -25,7 +25,7 @@ final case class PrometheusConfig(
responseToSizeSummaryMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(req: GenericRequest[_, _], resp: Response[_]) =>
Some(addStatusLabel(addMethodLabel(CollectorConfig(DefaultResponseSizeName), req), resp)),
collectorRegistry: CollectorRegistry = CollectorRegistry.defaultRegistry
prometheusRegistry: PrometheusRegistry = PrometheusRegistry.defaultRegistry
)

object PrometheusConfig {
Expand Down

0 comments on commit 6d86f81

Please sign in to comment.