Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: Use ForkJoinPool with nCPU x 2 max thread by default #1557

Merged
merged 2 commits into from Mar 29, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,7 +21,9 @@ import wvlet.airframe.{Design, Session}
import wvlet.log.LogSupport
import wvlet.log.io.IOUtil

import java.util.concurrent.{ExecutorService, Executors}
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ExecutorService, ForkJoinPool, ForkJoinWorkerThread}
import scala.language.existentials

/**
Expand All @@ -33,9 +35,8 @@ case class GrpcServerConfig(
router: Router = Router.empty,
interceptors: Seq[ServerInterceptor] = Seq.empty,
serverInitializer: ServerBuilder[_] => ServerBuilder[_] = identity,
executorProvider: GrpcServerConfig => ExecutorService = { config: GrpcServerConfig =>
Executors.newCachedThreadPool()
},
executorProvider: GrpcServerConfig => ExecutorService = GrpcServer.newAsyncExecutorFactory,
maxThreads: Int = (Runtime.getRuntime.availableProcessors() * 2).max(2),
codecFactory: MessageCodecFactory = MessageCodecFactory.defaultFactoryForMapOutput,
requestLoggerProvider: GrpcServerConfig => GrpcRequestLogger = { config: GrpcServerConfig =>
GrpcRequestLogger
Expand Down Expand Up @@ -73,6 +74,12 @@ case class GrpcServerConfig(
def withExecutorServiceProvider(provider: GrpcServerConfig => ExecutorService) =
this.copy(executorProvider = provider)

/**
* Set the maximum number of grpc server threads. The default is max(the number of CPU x 2, 2).
* If you are using a custom ExecutorService, this setting might not be effective.
*/
def withMaxThreads(numThreads: Int) = this.copy(maxThreads = numThreads)

def withCodecFactory(newCodecFactory: MessageCodecFactory) = this.copy(codecFactory = newCodecFactory)

def withRequestLoggerProvider(provider: GrpcServerConfig => GrpcRequestLogger) = this
Expand Down Expand Up @@ -145,3 +152,24 @@ class GrpcServer(grpcService: GrpcService, server: Server) extends AutoCloseable
grpcService.close()
}
}

object GrpcServer {
private[grpc] def newAsyncExecutorFactory(config: GrpcServerConfig): ExecutorService = {
new ForkJoinPool(
// The number of threads
config.maxThreads,
new ForkJoinWorkerThreadFactory() {
private val threadCount = new AtomicInteger()
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)
val name = s"grpc-${config.name}-${threadCount.getAndIncrement()}"
thread.setDaemon(true)
thread.setName(name)
thread
}
},
null, // Use the default behavior for unrecoverable exceptions
true // Enable asyncMode as grpc server will never join
);
}
}