Conversation
prometheus metrics
|
|
||
| // metrics | ||
|
|
||
| lazy val prometheusMetrics: ProjectMatrix = (projectMatrix in file ("metrics/prometheus-metrics")) |
There was a problem hiding this comment.
also needs to be added to aggregates in the beginning
| def queryParameters: QueryParams | ||
|
|
||
| lazy val acceptsContentTypes: Either[String, Seq[ContentTypeRange]] = Accepts.parse(headers) | ||
| val timestamp: Deadline = Deadline.now |
There was a problem hiding this comment.
is that the right type to capture a timestamp?
There was a problem hiding this comment.
maybe let's call it simply requestStart? It's not clear what this timestamp is
There was a problem hiding this comment.
It states that it main purpose is for repeated attempts, but on the other hand it's has now which captures current nanoseconds time, and +/- methods which is handy for getting time diff.
I think most appropriate type here would be plain Long from System.nanoTime but it carries no time context, so maybe Duration ?
| _ <- collectMetrics { case Metric(m, Some(onRequest), _, _) => onRequest(request, m) } | ||
| response <- next(None) | ||
| _ <- collectMetrics { case Metric(m, _, Some(onResponse), _) => onResponse(request, response, m) } | ||
| } yield response |
There was a problem hiding this comment.
we should also probably somehow notify the metrics if the effect fails, that is, there's an exception that's being thrown?
that also depends, if the metrics interceptor should wrap the exception-handling one, or the other way round
There was a problem hiding this comment.
Maybe MetricsInterceptor should be always called after ExceptionInterceptor?
There was a problem hiding this comment.
yeah, well ... unless there's an exception during metrics collection ;-) but maybe that should be handled in the server.
If metrics should be called first/last (around the exc interceptor), they have to be added to the front of the interceptors list
There was a problem hiding this comment.
Another problem: the response is only really "done" (well not even then, but it's more "done" than currently) when the whole response is sent. In terms of akka-http/http4s: when the response stream is completed. I think we won't run away from needing some way of interpreter-specific integration, which would allow us to "plug into" the response stream, getting a callback once it is complete.
In other words - a ServerResponse can be created quickly, but with a stream that takes a long time to evaluate
| `.appendInterceptor`. | ||
|
|
||
|
|
||
| ## Observability |
There was a problem hiding this comment.
let's move this to a separate top-level file
| onResponse: Option[(ServerRequest, ServerResponse[_], M) => F[Unit]] = None, | ||
| onDecodeFailure: Option[(ServerRequest, M) => F[Unit]] = None | ||
| ) { | ||
| def onRequest(f: (ServerRequest, M) => F[Unit]): Metric[F, M] = copy(onRequest = Some(f)) |
There was a problem hiding this comment.
I think we should pass in here an implicit MonadError here, as in EndpointInterceptor - that way users don't have to provide one, when creating e.g. PrometheusMetrics. This is supposed to be mainly an internal thing (the MonadError)
| .help("Unserved HTTP requests") | ||
| .labelNames("path", "method") | ||
| .register(registry) | ||
| ).onDecodeFailure { (req, counter) => monad.unit(counter.labels(path(req), method(req)).inc()) } |
There was a problem hiding this comment.
decode failure is not a request failure. A single request can cause mulitple decode failures,until one endpoint matches and gives a decode success. So a failed request is rather one where the status code is non-2xx/3xx
| monad.unit(histogram.labels(path(req), method(req), status(res)).observe((Deadline.now - req.timestamp).toMillis.toDouble / 1000.0)) | ||
| } | ||
|
|
||
| private def path(request: ServerRequest): String = request.pathSegments.mkString("/") |
There was a problem hiding this comment.
We shouldn't use the full request's path, but rather the endpoint's path template. That way, any dynamic url segments are replaced with path captures
| Metric[F, Counter]( | ||
| Counter | ||
| .build() | ||
| .namespace("tapir") |
There was a problem hiding this comment.
These should be somehow configurable - e.g. for most apps this namespace doesn't make sense. It would be best if each metric would coudl be configured taking in the request/endpoint, and yielding a set of labels & label values to apply. With sane defaults, of course, so that adding default metrics is just a couple lines of code
| } | ||
| } | ||
|
|
||
| implicit val monad: FutureMonad = new FutureMonad() |
There was a problem hiding this comment.
our goal should be to be able to remove this from the example - that shouldn't be necessary for users to use the metrics (if possible - maybe not?)
# Conflicts: # core/src/test/scala/sttp/tapir/server/interpreter/ServerInterpreterTest.scala
| def queryParameters: QueryParams | ||
|
|
||
| lazy val acceptsContentTypes: Either[String, Seq[ContentTypeRange]] = Accepts.parse(headers) | ||
| val requestStart: Deadline = Deadline.now |
There was a problem hiding this comment.
maybe instead of adding this here, we could use a RequestInterceptor, capture the starting timestamp there, and create an EndpointInterceptor with it?
There was a problem hiding this comment.
That should work, if this RequestInterceptor would be head of interceptors list, then EndpointInteceptor created by it would be called last which is what we need for metrics.
But requestStart would have to be added as next argument to Metrics callback.
wait in metrics test
| package sttp.tapir.server.interpreter | ||
|
|
||
| trait BodyListener[F[_], B] { | ||
| def onComplete(body: B)(cb: => F[Unit]): F[B] |
There was a problem hiding this comment.
maybe it would be useful to also have an onFailure callback - so that we can also capture situations, where producing the body fails (which results in a broken connection, as the headers are already sent out)
this could be a separate callback, or a single one taking a Try[Unit] parameter. Maybe see the Future/Try API for an inspiration?
|
|
||
| case class Metric[M]( | ||
| metric: M, | ||
| onRequest: Option[(Endpoint[_, _, _, _], ServerRequest, M) => Unit] = None, |
There was a problem hiding this comment.
idea: maybe a metric could be initialised per-request (in the request interceptor) using onRequest: (ServerRequest, M) => P, where P is the payload type. This payload would then be passed into onEndpointRequest: (Endpoint, ServerRequest, M, P) => F[Unit] & similarly to onEndpointResponse.
This would allow us to generalise over the quite specific Deadline parameter.
There was a problem hiding this comment.
And another question: shouldn't the metric function return F[Unit], not Unit? Capturing a metric could be a side-effecting operation (it probably is ;) )
There was a problem hiding this comment.
Or maybe simpler, we could go back to the idea of "initialising" a metric on request start, which would produce a "proper" metric (a constant instance in most cases, but for capturing timings this would include the initial timestamp). So here we'd only have a function onRequest: (ServerRequest, M) => EndpointMetric
| case class Metric[M]( | ||
| metric: M, | ||
| onRequest: Option[(Endpoint[_, _, _, _], ServerRequest, M) => Unit] = None, | ||
| onResponse: Option[(Endpoint[_, _, _, _], ServerRequest, ServerResponse[_], Deadline, M) => Unit] = None |
There was a problem hiding this comment.
Wouldn't Instant be a simpler choice over Deadline?
# Conflicts: # core/src/main/scala/sttp/tapir/server/interpreter/ServerInterpreter.scala # core/src/test/scala/sttp/tapir/server/interpreter/ServerInterpreterTest.scala # server/akka-http-server/src/main/scala/sttp/tapir/server/akkahttp/AkkaHttpServerOptions.scala # server/finatra-server/src/main/scala/sttp/tapir/server/finatra/FinatraServerInterpreter.scala # server/finatra-server/src/main/scala/sttp/tapir/server/finatra/FinatraServerOptions.scala # server/http4s-server/src/main/scala/sttp/tapir/server/http4s/Http4sServerOptions.scala # server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxCatsServerOptions.scala # server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxFutureServerOptions.scala # server/vertx/src/main/scala/sttp/tapir/server/vertx/VertxZioServerOptions.scala
| } | ||
| } yield withMetrics | ||
|
|
||
| responseWithMetrics.handleError { case e: Exception => |
There was a problem hiding this comment.
onException metrics are collected here, so in withBodyOnComplete case Failure is left empty, since it could collect onException twice i think. If body is a stream which fails before the response is returned, then it could be collected in body listener and here in handleError if the exception is thrown?
There was a problem hiding this comment.
I think both should be handled, as these are two different scenarios:
- the effect creating the
ServerResponsecompletes successfully, yielding an instance ofServerResponsewith a streaming body. However, the streaming body might fail when streamed to the client. - the above effect fails, so we never even get a
ServerResponse
Note that a request can be considered "complete" only after 1. successfully finishes (the stream is done). But it can fail either with 1. or 2., but not with both.
| } | ||
|
|
||
| private def collectMetrics(pf: PartialFunction[EndpointMetric[F], F[Unit]])(implicit monad: MonadError[F]): F[Unit] = { | ||
| def collect(metrics: List[EndpointMetric[F]]): F[Unit] = { |
There was a problem hiding this comment.
that's usually called sequence (a function List[F[T]] => F[List[T]] - which we kind of have here)
| )(implicit monad: MonadError[F], bodyListener: BodyListener[F, B]): F[Option[ServerResponse[B]]] = { | ||
| if (ignoreEndpoints.contains(ctx.endpoint)) endpointHandler.onDecodeFailure(ctx) | ||
| else { | ||
| ctx.failure match { |
There was a problem hiding this comment.
I think we should attach run the metrics not based on the nature of the decode failure, but basing on its handling (we don't really know if a mismatch will be handled differently than other errors). So if downstream returns a response, then we need to run the on request & on response callbacks
| def onComplete(body: B)(cb: Try[Unit] => F[Unit]): F[B] | ||
| } | ||
|
|
||
| object BodyListenerSyntax { |
There was a problem hiding this comment.
I think if you add the implicit in the companion object (BodyListener), then you won't have to import the syntax, it will be automatically searched
There was a problem hiding this comment.
I did, and it still requires both
import sttp.tapir.server.interpreter.BodyListener
import sttp.tapir.server.interpreter.BodyListener._
There was a problem hiding this comment.
ah, I thought that this is in the implicit scope. But apparently not :)
| is match { | ||
| case Nil => (request: ServerRequest) => firstNotNone(request, ses, eisAcc.reverse) | ||
| case Nil => | ||
| new RequestHandler[F, B] { |
There was a problem hiding this comment.
maybe it would make sense to add a factory method to RequestHandler which would accept a plain Request => F[Response] function?
| val collectorRegistry = CollectorRegistry.defaultRegistry | ||
|
|
||
| // Metric for counting responses labeled by path, method and status code | ||
| val responsesTotal = Metric[Future, Counter]( |
There was a problem hiding this comment.
I think in the example we should show how to use default metrics, and move the custom metric creation to the docs
| m.unit { | ||
| EndpointMetric() | ||
| .onRequest { ep => m.eval(gauge.labels(labels.forRequest(ep, req): _*).inc()) } | ||
| .onResponse { (ep, _) => m.eval(gauge.labels(labels.forRequest(ep, req): _*).dec()) } |
There was a problem hiding this comment.
we should decrease on exception as well
| `Metric` wraps an aggregation object (like counter or gauge), and requires a function | ||
| returning `EndpointMetric` with metric collection logic. There are three callbacks in `EndpointMetric`: | ||
|
|
||
| 1. `onRequest` - called after successful request decoding |
There was a problem hiding this comment.
would be good to describe the two-stage onRequest (with metric initialisation)
| }) | ||
| ``` | ||
|
|
||
| Labels for default metrics can be customized, any attribute from `Endpoint`, `ServerRequest` and `ServerResponse` could |
There was a problem hiding this comment.
would be good to mention what are the default labels (with a path template, status code etc.)
There was a problem hiding this comment.
I mentioned that couple lines above, before prometheus example, but I'll move it here
There was a problem hiding this comment.
I think it would be good to give an example of what the labels will be
| onRequest = { (req, counter, m) => | ||
| m.unit { | ||
| EndpointMetric() | ||
| .onResponse { (ep, res) => m.eval(counter.labels(labels.forRequest(ep, req) ++ labels.forResponse(res): _*).inc()) } |
There was a problem hiding this comment.
here I guess onException should be counted as a 500?
|
|
||
| import scala.util.{Failure, Success, Try} | ||
|
|
||
| class Http4SBodyListener[F[_], G[_]](gToF: G ~> F)(implicit m: MonadError[G], a: Applicative[F]) |
There was a problem hiding this comment.
[minor] name: Http4sBodyListener (small s :) )
| .body("""{"invalid":"body",}""") | ||
| .send(backend) | ||
| .map { _ => | ||
| wait(200) { |
There was a problem hiding this comment.
why do we have to wait for the counters to be updated?
There was a problem hiding this comment.
Without that it sometimes fail for akka server, maybe transformDataBytes where the callback is executed on Future[Done] completion is called after response is returned, but it shouldn't right?
There was a problem hiding this comment.
Ah, onComplete can run asynchronously, after the request completes and is sent. Hm... maybe we can use eventuallyhere? I wouldn't want to always sleep 200ms, this can be a cause of flaky tests
| } | ||
|
|
||
| object ServerMetricsTest { | ||
| class Counter(var value: Int = 0) { |
There was a problem hiding this comment.
probably should be an AtomicInteger as it can be updated from multiple threads
| m.unit { | ||
| { (ctx: RoutingContext) => | ||
| body { | ||
| ctx.addBodyEndHandler(_ => cb(Success(()))) |
There was a problem hiding this comment.
no way to intercept an exception?
There was a problem hiding this comment.
addBodyEndHandler callback will not be executed on exception, there's addEndHandler which will be executed but will be called when the response is disposed or an exception has been encountered to allow consistent cleanup, so it's not the moment we want to capture here right, since it has to be a body failure?
There was a problem hiding this comment.
can't we use both, one for success, another for error?
| val in_input_stream_out_input_stream: Endpoint[InputStream, Unit, InputStream, Any] = | ||
| endpoint.post.in("api" / "echo").in(inputStreamBody).out(inputStreamBody).name("echo input stream") | ||
|
|
||
| val in_empty_out_empty: Endpoint[Unit, Unit, Unit, Any] = endpoint.post.in("api" / "empty").in(emptyInput).out(emptyOutput) |
There was a problem hiding this comment.
isn't this just endpoint.post.in("api" / "empty")? Also, maybe we can simply use e.g. in_root_path
metrics interceptor
prometheus metrics