Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metric collection in vertx and netty #2579

Merged
merged 2 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ import sttp.monad.MonadError
import sttp.tapir.integ.cats.CatsMonadError
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.Route
import sttp.tapir.server.netty.internal.NettyServerInterpreter
import sttp.tapir.server.netty.internal.{NettyServerInterpreter, RunAsync}

trait NettyCatsServerInterpreter[F[_]] {
implicit def async: Async[F]
def nettyServerOptions: NettyCatsServerOptions[F, _]

def toRoute(ses: List[ServerEndpoint[Any, F]]): Route[F] = {
implicit val monad: MonadError[F] = new CatsMonadError[F]
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile)
val runAsync = new RunAsync[F] {
override def apply[T](f: => F[T]): Unit = nettyServerOptions.dispatcher.unsafeRunAndForget(f)
}
NettyServerInterpreter.toRoute(
ses,
nettyServerOptions.interceptors,
nettyServerOptions.createFile,
nettyServerOptions.deleteFile,
runAsync
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package sttp.tapir.server.netty

import sttp.monad.FutureMonad
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.internal.NettyServerInterpreter
import sttp.tapir.server.netty.NettyFutureServerInterpreter.FutureRunAsync
import sttp.tapir.server.netty.internal.{NettyServerInterpreter, RunAsync}

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -13,7 +14,13 @@ trait NettyFutureServerInterpreter {
ses: List[ServerEndpoint[Any, Future]]
)(implicit ec: ExecutionContext): FutureRoute = {
implicit val monad: FutureMonad = new FutureMonad()
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile)
NettyServerInterpreter.toRoute(
ses,
nettyServerOptions.interceptors,
nettyServerOptions.createFile,
nettyServerOptions.deleteFile,
FutureRunAsync
)
}
}

Expand All @@ -23,4 +30,8 @@ object NettyFutureServerInterpreter {
override def nettyServerOptions: NettyFutureServerOptions[_] = serverOptions
}
}

private object FutureRunAsync extends RunAsync[Future] {
override def apply[T](f: => Future[T]): Unit = f
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import sttp.tapir.server.netty.NettyResponse

import scala.util.{Failure, Success, Try}

class NettyBodyListener[F[_]](implicit m: MonadError[F]) extends BodyListener[F, NettyResponse] {
class NettyBodyListener[F[_]](runAsync: RunAsync[F])(implicit m: MonadError[F]) extends BodyListener[F, NettyResponse] {
override def onComplete(body: NettyResponse)(cb: Try[Unit] => F[Unit]): F[NettyResponse] = {
m.eval((ctx: ChannelHandlerContext) => {
val nettyResponseContent = body(ctx)
nettyResponseContent.channelPromise.addListener((future: Future[_ >: Void]) => {
if (future.isSuccess) {
cb(Success(()))
runAsync(cb(Success(())))
} else {
cb(Failure(future.cause()))
runAsync(cb(Failure(future.cause())))
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ object NettyServerInterpreter {
ses: List[ServerEndpoint[Any, F]],
interceptors: List[Interceptor[F]],
createFile: ServerRequest => F[TapirFile],
deleteFile: TapirFile => F[Unit]
deleteFile: TapirFile => F[Unit],
runAsync: RunAsync[F]
): Route[F] = {
implicit val bodyListener: BodyListener[F, NettyResponse] = new NettyBodyListener
implicit val bodyListener: BodyListener[F, NettyResponse] = new NettyBodyListener(runAsync)
val serverInterpreter = new ServerInterpreter[Any, F, NettyResponse, NoStreams](
FilterServerEndpoints(ses),
new NettyRequestBody(createFile),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package sttp.tapir.server.netty.internal

trait RunAsync[F[_]] {
def apply[T](f: => F[T]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ object ServerMetricsTest {
}

def newRequestCounter[F[_]]: Metric[F, Counter] =
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onEndpointRequest { _ => m.unit(c.++()) }) })
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onEndpointRequest { _ => m.eval(c.++()) }) })

def newResponseCounter[F[_]]: Metric[F, Counter] =
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onResponseBody { (_, _) => m.unit(c.++()) }) })
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onResponseBody { (_, _) => m.eval(c.++()) }) })
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxBodyListener
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, monadError}
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, monadError}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.streams.ReadStreamCompatible
import sttp.tapir.server.vertx.cats.streams.fs2.fs2ReadStreamCompatible
Expand Down Expand Up @@ -47,7 +47,8 @@ trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter {
readStreamCompatible: ReadStreamCompatible[S]
): Handler[RoutingContext] = {
implicit val monad: MonadError[F] = monadError[F]
implicit val bodyListener: BodyListener[F, RoutingContext => Future[Void]] = new VertxBodyListener[F]
implicit val bodyListener: BodyListener[F, RoutingContext => Future[Void]] =
new VertxBodyListener[F](new CatsRunAsync(vertxCatsServerOptions.dispatcher))
val fFromVFuture = new CatsFFromVFuture[F]
val interpreter: ServerInterpreter[Fs2Streams[F], F, RoutingContext => Future[Void], S] = new ServerInterpreter(
_ => List(e),
Expand Down Expand Up @@ -126,6 +127,10 @@ object VertxCatsServerInterpreter {
def apply[T](f: => Future[T]): F[T] = f.asF
}

private[cats] class CatsRunAsync[F[_]: Async](dispatcher: Dispatcher[F]) extends RunAsync[F] {
override def apply[T](f: => F[T]): Unit = dispatcher.unsafeRunAndForget(f)
}

implicit class VertxFutureToCatsF[A](f: => Future[A]) {
def asF[F[_]: Async]: F[A] = {
Async[F].async_ { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import io.vertx.core.Future
import io.vertx.ext.web.RoutingContext
import sttp.monad.MonadError
import sttp.tapir.server.interpreter.BodyListener
import sttp.tapir.server.vertx.interpreters.RunAsync

import scala.util.{Failure, Success, Try}

class VertxBodyListener[F[_]](implicit m: MonadError[F]) extends BodyListener[F, RoutingContext => Future[Void]] {
class VertxBodyListener[F[_]](runAsync: RunAsync[F])(implicit m: MonadError[F]) extends BodyListener[F, RoutingContext => Future[Void]] {
override def onComplete(body: RoutingContext => Future[Void])(cb: Try[Unit] => F[Unit]): F[RoutingContext => Future[Void]] = {
m.unit {
{ (ctx: RoutingContext) =>
body {
ctx.addBodyEndHandler(_ => cb(Success(())))
ctx.addEndHandler(res => if (res.failed()) cb(Failure(res.cause())))
ctx.addBodyEndHandler(_ => runAsync(cb(Success(()))))
ctx.addEndHandler(res => if (res.failed()) runAsync(cb(Failure(res.cause()))))
ctx
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import sttp.monad.FutureMonad
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxFutureServerInterpreter.FutureFromVFuture
import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.streams.{VertxStreams, ReadStreamCompatible}
import sttp.tapir.server.vertx.streams.{ReadStreamCompatible, VertxStreams}

import scala.concurrent.{ExecutionContext, Future, Promise}

Expand Down Expand Up @@ -48,7 +48,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
): Handler[RoutingContext] = { rc =>
implicit val ec: ExecutionContext = vertxFutureServerOptions.executionContextOrCurrentCtx(rc)
implicit val monad: FutureMonad = new FutureMonad()
implicit val bodyListener: BodyListener[Future, RoutingContext => VFuture[Void]] = new VertxBodyListener[Future]
implicit val bodyListener: BodyListener[Future, RoutingContext => VFuture[Void]] = new VertxBodyListener[Future](FutureRunAsync)
val reactiveStreamsReadStream: ReadStreamCompatible[VertxStreams] = streams.reactiveStreamsReadStreamCompatible()
val interpreter = new ServerInterpreter[VertxStreams, Future, RoutingContext => VFuture[Void], VertxStreams](
_ => List(e),
Expand Down Expand Up @@ -86,6 +86,10 @@ object VertxFutureServerInterpreter {
def apply[T](f: => VFuture[T]): Future[T] = f.asScala
}

private[vertx] object FutureRunAsync extends RunAsync[Future] {
override def apply[T](f: => Future[T]): Unit = f
}

implicit class VertxFutureToScalaFuture[A](future: => VFuture[A]) {
def asScala: Future[A] = {
val promise = Promise[A]()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package sttp.tapir.server.vertx.interpreters

trait RunAsync[F[_]] {
def apply[T](f: => F[T]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxBodyListener
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.RioFromVFuture
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync}
import sttp.tapir.server.vertx.zio.streams._
import sttp.tapir.ztapir.{RIOMonadError, ZServerEndpoint}
import zio._
Expand All @@ -32,7 +32,8 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter {
)(implicit runtime: Runtime[R]): Handler[RoutingContext] = {
val fromVFuture = new RioFromVFuture[R]
implicit val monadError: RIOMonadError[R] = new RIOMonadError[R]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] = new VertxBodyListener[RIO[R, *]]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] =
new VertxBodyListener[RIO[R, *]](new ZioRunAsync(runtime))
val zioReadStream = zioReadStreamCompatible(vertxZioServerOptions)
val interpreter = new ServerInterpreter[ZioStreams, RIO[R, *], RoutingContext => Future[Void], ZioStreams](
_ => List(e),
Expand Down Expand Up @@ -119,6 +120,12 @@ object VertxZioServerInterpreter {
def apply[T](f: => Future[T]): RIO[R, T] = f.asRIO
}

private[vertx] class ZioRunAsync[R](runtime: Runtime[R]) extends RunAsync[RIO[R, *]] {
override def apply[T](f: => RIO[R, T]): Unit = Unsafe.unsafe(implicit u => {
runtime.unsafe.runToFuture(f)
})
}

implicit class VertxFutureToRIO[A](f: => Future[A]) {
def asRIO[R]: RIO[R, A] = {
ZIO.async { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.RioFromVFuture
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.zio.streams._
import sttp.tapir.server.vertx.VertxBodyListener
Expand All @@ -33,7 +33,8 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter {
)(implicit runtime: Runtime[R]): Handler[RoutingContext] = {
val fromVFuture = new RioFromVFuture[R]
implicit val monadError: RIOMonadError[R] = new RIOMonadError[R]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] = new VertxBodyListener[RIO[R, *]]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] =
new VertxBodyListener[RIO[R, *]](new ZioRunAsync(runtime))
val zioReadStream = zioReadStreamCompatible(vertxZioServerOptions)
val interpreter = new ServerInterpreter[ZioStreams, RIO[R, *], RoutingContext => Future[Void], ZioStreams](
_ => List(e),
Expand Down Expand Up @@ -119,6 +120,10 @@ object VertxZioServerInterpreter {
def apply[T](f: => Future[T]): RIO[R, T] = f.asRIO
}

private[vertx] class ZioRunAsync[R](runtime: Runtime[R]) extends RunAsync[RIO[R, *]] {
override def apply[T](f: => RIO[R, T]): Unit = runtime.unsafeRunAsync(f)(_ => ())
}

implicit class VertxFutureToRIO[A](f: => Future[A]) {
def asRIO[R]: RIO[R, A] = {
RIO.effectAsync { cb =>
Expand Down