From 89cf8a8b201b06ddb0414f21c8f5940464b53267 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Wed, 17 Jan 2024 21:35:32 +0100 Subject: [PATCH 1/2] Netty: Close the DefaultEventExecutor on shutdown --- .../tapir/server/netty/cats/NettyCatsServer.scala | 9 ++++++--- .../sttp/tapir/server/netty/loom/NettyIdServer.scala | 9 ++++++--- .../sttp/tapir/server/netty/NettyFutureServer.scala | 12 +++++++++--- .../sttp/tapir/server/netty/zio/NettyZioServer.scala | 9 ++++++--- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala index b9e2b958a3..e1dc4a61cd 100644 --- a/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala +++ b/server/netty-server/cats/src/main/scala/sttp/tapir/server/netty/cats/NettyCatsServer.scala @@ -67,7 +67,8 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup() implicit val monadError: MonadError[F] = new CatsMonadError[F]() val route: Route[F] = Route.combine(routes) - val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + val eventExecutor = new DefaultEventExecutor() + val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe val isShuttingDown: AtomicBoolean = new AtomicBoolean(false) val channelFuture = @@ -81,7 +82,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty nettyChannelFutureToScala(channelFuture).map(ch => ( ch.localAddress().asInstanceOf[SA], - () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout) + () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) ) ) } @@ -102,6 +103,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty ch: Channel, eventLoopGroup: EventLoopGroup, channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): F[Unit] = { @@ -114,7 +116,8 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty Async[F].defer { nettyFutureToScala(ch.close()).flatMap { _ => if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + nettyFutureToScala(eventLoopGroup.shutdownGracefully()) + .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) } else Async[F].unit } } diff --git a/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala b/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala index aef0b99bdc..4b902bc55f 100644 --- a/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala +++ b/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala @@ -87,7 +87,8 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, } ) } - val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + val eventExecutor = new DefaultEventExecutor() + val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe val isShuttingDown: AtomicBoolean = new AtomicBoolean(false) val channelIdFuture = NettyBootstrap( @@ -106,7 +107,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, ( channelId.localAddress().asInstanceOf[SA], - () => stop(channelId, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout) + () => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) ) } @@ -124,6 +125,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, ch: Channel, eventLoopGroup: EventLoopGroup, channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): Unit = { @@ -135,7 +137,8 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, ) ch.close().get() if (config.shutdownEventLoopGroupOnClose) { - val _ = eventLoopGroup.shutdownGracefully().get() + eventLoopGroup.shutdownGracefully().get(): Unit + eventExecutor.shutdownGracefully().get(): Unit } } } diff --git a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala index 0b6a4fec98..6a44ba55c4 100644 --- a/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala +++ b/server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServer.scala @@ -64,7 +64,8 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup() implicit val monadError: MonadError[Future] = new FutureMonad() val route = Route.combine(routes) - val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + val eventExecutor = new DefaultEventExecutor() + val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe val isShuttingDown: AtomicBoolean = new AtomicBoolean(false) val channelFuture = @@ -76,7 +77,10 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe ) nettyChannelFutureToScala(channelFuture).map(ch => - (ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)) + ( + ch.localAddress().asInstanceOf[SA], + () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) + ) ) } @@ -101,6 +105,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe ch: Channel, eventLoopGroup: EventLoopGroup, channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): Future[Unit] = { @@ -112,7 +117,8 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe ).flatMap { _ => nettyFutureToScala(ch.close()).flatMap { _ => if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + nettyFutureToScala(eventLoopGroup.shutdownGracefully()) + .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) } else Future.successful(()) } } diff --git a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala index be553f0ccf..75f02d251f 100644 --- a/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala +++ b/server/netty-server/zio/src/main/scala/sttp/tapir/server/netty/zio/NettyZioServer.scala @@ -78,7 +78,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: runtime <- ZIO.runtime[R] routes <- ZIO.foreach(routes)(identity) eventLoopGroup = config.eventLoopConfig.initEventLoopGroup() - channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe + eventExecutor = new DefaultEventExecutor() + channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe isShuttingDown = new AtomicBoolean(false) channelFuture = { implicit val monadError: RIOMonadError[R] = new RIOMonadError[R] @@ -99,7 +100,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: binding <- nettyChannelFutureToScala(channelFuture).map(ch => ( ch.localAddress().asInstanceOf[SA], - () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout) + () => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout) ) ) } yield binding @@ -120,6 +121,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: ch: Channel, eventLoopGroup: EventLoopGroup, channelGroup: ChannelGroup, + eventExecutor: DefaultEventExecutor, isShuttingDown: AtomicBoolean, gracefulShutdownTimeout: Option[FiniteDuration] ): RIO[R, Unit] = { @@ -132,7 +134,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: ZIO.suspend { nettyFutureToScala(ch.close()).flatMap { _ => if (config.shutdownEventLoopGroupOnClose) { - nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ()) + nettyFutureToScala(eventLoopGroup.shutdownGracefully()) + .flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ())) } else ZIO.succeed(()) } } From 5c4edf1a418397c806b83318ced6fb8b77bf5645 Mon Sep 17 00:00:00 2001 From: kciesielski Date: Wed, 17 Jan 2024 21:41:47 +0100 Subject: [PATCH 2/2] Handle "discarded non-unit value" warnings --- .../scala/sttp/tapir/server/netty/loom/NettyIdServer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala b/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala index 4b902bc55f..6447dff844 100644 --- a/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala +++ b/server/netty-server/loom/src/main/scala/sttp/tapir/server/netty/loom/NettyIdServer.scala @@ -137,8 +137,8 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions, ) ch.close().get() if (config.shutdownEventLoopGroupOnClose) { - eventLoopGroup.shutdownGracefully().get(): Unit - eventExecutor.shutdownGracefully().get(): Unit + val _ = eventLoopGroup.shutdownGracefully().get() + val _ = eventExecutor.shutdownGracefully().get() } } }