Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
implicit val monadError: MonadError[F] = new CatsMonadError[F]()
val route: Route[F] = Route.combine(routes)
val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
val eventExecutor = new DefaultEventExecutor()
val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)

val channelFuture =
Expand All @@ -81,7 +82,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
}
Expand All @@ -102,6 +103,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): F[Unit] = {
Expand All @@ -114,7 +116,8 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
Async[F].defer {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Async[F].unit
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
}
)
}
val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
val eventExecutor = new DefaultEventExecutor()
val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)

val channelIdFuture = NettyBootstrap(
Expand All @@ -106,7 +107,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,

(
channelId.localAddress().asInstanceOf[SA],
() => stop(channelId, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)
() => stop(channelId, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
}

Expand All @@ -124,6 +125,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Unit = {
Expand All @@ -136,6 +138,7 @@ case class NettyIdServer(routes: Vector[IdRoute], options: NettyIdServerOptions,
ch.close().get()
if (config.shutdownEventLoopGroupOnClose) {
val _ = eventLoopGroup.shutdownGracefully().get()
val _ = eventExecutor.shutdownGracefully().get()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
implicit val monadError: MonadError[Future] = new FutureMonad()
val route = Route.combine(routes)
val channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
val eventExecutor = new DefaultEventExecutor()
val channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)

val channelFuture =
Expand All @@ -76,7 +77,10 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
)

nettyChannelFutureToScala(channelFuture).map(ch =>
(ch.localAddress().asInstanceOf[SA], () => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout))
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
}

Expand All @@ -101,6 +105,7 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): Future[Unit] = {
Expand All @@ -112,7 +117,8 @@ case class NettyFutureServer(routes: Vector[FutureRoute], options: NettyFutureSe
).flatMap { _ =>
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else Future.successful(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
runtime <- ZIO.runtime[R]
routes <- ZIO.foreach(routes)(identity)
eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
channelGroup = new DefaultChannelGroup(new DefaultEventExecutor()) // thread safe
eventExecutor = new DefaultEventExecutor()
channelGroup = new DefaultChannelGroup(eventExecutor) // thread safe
isShuttingDown = new AtomicBoolean(false)
channelFuture = {
implicit val monadError: RIOMonadError[R] = new RIOMonadError[R]
Expand All @@ -99,7 +100,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
binding <- nettyChannelFutureToScala(channelFuture).map(ch =>
(
ch.localAddress().asInstanceOf[SA],
() => stop(ch, eventLoopGroup, channelGroup, isShuttingDown, config.gracefulShutdownTimeout)
() => stop(ch, eventLoopGroup, channelGroup, eventExecutor, isShuttingDown, config.gracefulShutdownTimeout)
)
)
} yield binding
Expand All @@ -120,6 +121,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
ch: Channel,
eventLoopGroup: EventLoopGroup,
channelGroup: ChannelGroup,
eventExecutor: DefaultEventExecutor,
isShuttingDown: AtomicBoolean,
gracefulShutdownTimeout: Option[FiniteDuration]
): RIO[R, Unit] = {
Expand All @@ -132,7 +134,8 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
ZIO.suspend {
nettyFutureToScala(ch.close()).flatMap { _ =>
if (config.shutdownEventLoopGroupOnClose) {
nettyFutureToScala(eventLoopGroup.shutdownGracefully()).map(_ => ())
nettyFutureToScala(eventLoopGroup.shutdownGracefully())
.flatMap(_ => nettyFutureToScala(eventExecutor.shutdownGracefully()).map(_ => ()))
} else ZIO.succeed(())
}
}
Expand Down