Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Even with useBlockingTaskExecutor enabled, gRPC's ServerInterceptor runs in a non-blocking thread #4275

Closed
be-hase opened this issue Jun 1, 2022 · 4 comments · Fixed by #4331
Labels
Milestone

Comments

@be-hase
Copy link
Member

be-hase commented Jun 1, 2022

Issue

First of all, gRPC's ServerInterceptor is a synchronous API.

Therefore, when implementing an authentication process that requires network IO on the ServerInterceptor, blocking the thread is the only way.

In the case of gRPC-java, this is not a major problem since it runs on a thread pool. (ref)
However, in Armeria, it is a problem because it runs on the event loop.
Unfortunately, even if I specify useBlockingTaskExecutor, the ServerInterceptor is executed on the event loop. (ref)

Suggestion

Unfortunately, the ServerInterceptor is synchronous at this time, so it would be better to allow the ServerInterceptor to run on a thread pool, as in gRPC-java.
(At least, if useBlockingTaskExecutor is enabled, it would be better to do so.)

For reference, in the case of gRPC-Kotlin, the ServerInterceptor runs in a thread pool and the Service runs in coroutines.

Ideally, of course, the ServerInterceptor should support async.

Or we could consider another method like this.
#2607 (comment)

@be-hase
Copy link
Member Author

be-hase commented Jun 2, 2022

Or we may prepare an AsyncServerInterceptor using the technique introduced here.
https://stackoverflow.com/questions/53651024/grpc-java-async-call-in-serverinterceptor

But, it would be nice if it was prepared on the gRPC-Java side...

@ikhoon
Copy link
Contributor

ikhoon commented Jun 2, 2022

Unfortunately, even if I specify useBlockingTaskExecutor, the ServerInterceptor is executed on the event loop. (ref)

😱 It should be a bug. Let us fix it soon.

@ikhoon ikhoon added the defect label Jun 2, 2022
@ikhoon ikhoon added this to the 1.17.0 milestone Jun 2, 2022
@ikhoon
Copy link
Contributor

ikhoon commented Jun 2, 2022

I was working on refactoring Armeria's ServerCall implementations. #4192
The change to fix this issue won't be difficult but it may heavily conflict with #4192. I am going to fix this problem once the refactoring PR is merged.

@be-hase
Copy link
Member Author

be-hase commented Jun 2, 2022

Thank you.

In my opinion, ServerInterceptor should be runned at BlockingTaskExecutor when the user has specified useBlockingTaskExecutor=true.

By the way, I am using gRPC-Kotlin. I wrote the following code using the technique introduced in this article and it seems to work fine.
(However, it's still a rough code, and the test isn't enough)

It would be very convenient if armeria had a ServerInterceptor that could be written asynchronously.
(This code uses Coroutines, but armeria needs to be usable in java, so it's better to use CompletableFuture.)

/**
 * ref: https://stackoverflow.com/questions/53651024/grpc-java-async-call-in-serverinterceptor
 */
abstract class SuspendableServerInterceptor(
    private val context: CoroutineContext = EmptyCoroutineContext
) : ServerInterceptor {
    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        val armeriaCtx = ServiceRequestContext.current()
        val delayedListener = DelayedListener<ReqT>()
        delayedListener.job = CoroutineScope(
            ArmeriaRequestCoroutineContext(armeriaCtx)
                    + GrpcContextElement.current()
                    + COROUTINE_CONTEXT_KEY.get()
                    + context
        ).launch {
            try {
                delayedListener.realListener = suspendableInterceptCall(call, headers, next)
                delayedListener.drainPendingCallbacks()
            } catch (e: CancellationException) {
                log.debug { "Caught CancellationException. $e" }
                call.close(Status.CANCELLED, Metadata())
            } catch (e: Exception) {
                log.error(e) { "Unhandled exception. $e" }
                call.close(Status.UNKNOWN, Metadata())
            }
        }
        return delayedListener
    }

    abstract suspend fun <ReqT : Any, RespT : Any> suspendableInterceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT>

    /**
     * ref: https://github.com/grpc/grpc-java/blob/84edc332397ed01fae2400c25196fc90d8c1a6dd/core/src/main/java/io/grpc/internal/DelayedClientCall.java#L415
     */
    private class DelayedListener<ReqT> : ServerCall.Listener<ReqT>() {
        var realListener: ServerCall.Listener<ReqT>? = null

        @Volatile
        private var passThrough = false

        @GuardedBy("this")
        private var pendingCallbacks: MutableList<Runnable> = mutableListOf()

        var job: Job? = null

        override fun onMessage(message: ReqT) {
            if (passThrough) {
                checkNotNull(realListener).onMessage(message)
            } else {
                delayOrExecute { checkNotNull(realListener).onMessage(message) }
            }
        }

        override fun onHalfClose() {
            if (passThrough) {
                checkNotNull(realListener).onHalfClose()
            } else {
                delayOrExecute { checkNotNull(realListener).onHalfClose() }
            }
        }

        override fun onCancel() {
            job?.cancel()
            if (passThrough) {
                checkNotNull(realListener).onCancel()
            } else {
                delayOrExecute { checkNotNull(realListener).onCancel() }
            }
        }

        override fun onComplete() {
            if (passThrough) {
                checkNotNull(realListener).onComplete()
            } else {
                delayOrExecute { checkNotNull(realListener).onComplete() }
            }
        }

        override fun onReady() {
            if (passThrough) {
                checkNotNull(realListener).onReady()
            } else {
                delayOrExecute { checkNotNull(realListener).onReady() }
            }
        }

        private fun delayOrExecute(runnable: Runnable) {
            synchronized(this) {
                if (!passThrough) {
                    pendingCallbacks.add(runnable)
                    return
                }
            }
            runnable.run()
        }

        fun drainPendingCallbacks() {
            check(!passThrough)
            var toRun: MutableList<Runnable> = mutableListOf()
            while (true) {
                synchronized(this) {
                    if (pendingCallbacks.isEmpty()) {
                        pendingCallbacks = mutableListOf()
                        passThrough = true
                        return
                    }
                    // Since there were pendingCallbacks, we need to process them. To maintain ordering we
                    // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
                    // added after we drop the lock. So we will have to re-check pendingCallbacks.
                    val tmp: MutableList<Runnable> = toRun
                    toRun = pendingCallbacks
                    pendingCallbacks = tmp
                }
                for (runnable in toRun) {
                    // Avoid calling listener while lock is held to prevent deadlocks.
                    runnable.run()
                }
                toRun.clear()
            }
        }
    }

    companion object {
        private val log = KotlinLogging.logger {}

        @Suppress("UNCHECKED_CAST")
        // Get by using reflection
        internal val COROUTINE_CONTEXT_KEY: Context.Key<CoroutineContext> =
            CoroutineContextServerInterceptor::class.let { kclass ->
                val companionObject = kclass.companionObject!!
                val property = companionObject.memberProperties.single { it.name == "COROUTINE_CONTEXT_KEY" }
                checkNotNull(property.getter.call(kclass.companionObjectInstance!!)) as Context.Key<CoroutineContext>
            }
    }
}

ikhoon added a commit to ikhoon/armeria that referenced this issue Jul 4, 2022
…TaskExecutor` is enabled.

Motivation:

`ServerInterceptor` bound to a `GrpcService` run in a event loop even if
`GrpcServiceBuilder.useBlockingTaskExecutor(true)` option is speficied.
See line#4275 for details

Modifications:

- Use a blocking executor to run `ServerInterceptor`s

Result:

- You can now run `ServerInterceptor`s in a blocking task executor when
  `GrpcServiceBuilder.useBlockingTaskExecutor(true)` is enabled.
- Fixes line#4275
ikhoon added a commit that referenced this issue Jul 5, 2022
…TaskExecutor` is enabled. (#4331)

Motivation:

`ServerInterceptor`s bound to a `GrpcService` run in an event loop even if
`GrpcServiceBuilder.useBlockingTaskExecutor(true)` option is specified.
See #4275 for details

Modifications:

- Use a blocking executor to run `ServerInterceptor`s

Result:

- You can now run `ServerInterceptor`s in a blocking task executor when
  `GrpcServiceBuilder.useBlockingTaskExecutor(true)` is enabled.
- Fixes #4275
ikhoon added a commit to ikhoon/armeria that referenced this issue Feb 1, 2023
Motivation:

The upstream `ServerInterceptor` of gRPC-Java does not allow to execute
an asynchronous task in a interceptor. The API is degined to return a
`Listener` synchronously.
https://github.com/grpc/grpc-java/blob/b7164f0791d405e002d67c1e01603e0df9d1cc9f/api/src/main/java/io/grpc/ServerInterceptor.java#L55

It make users difficult to use the existing asynchronous API in the
interceptors. The blocking threads can be exhausted easily if there is
slow responses from servers having problems.

It should be useful if you can use `CompletableFuture` to compose
asynchronous tasks without blocking threads.

The original idea is suggested from line#4275 (comment)
/cc @be-hase

Modifications:

- Add `AsyncServerInterceptor` that allow returning
  `ServerCall.Listener` with `CompletableFuture`.
- Add `DeferredListener` to execute pending taks in order when the
  future returned by `AsyncServerInterceptor` completes.
  - `DeferredListener` also propagtes `ServiceRequestContext` by
    performing the tasks with a context-aware executor.
- Fixed to peel exceptions before applying `GrpcStatusFunction`.

Result:

You can now execute asynchronous tasks in gRPC `ServerInterceptor` using
`AsyncServerInterceptor`.

```java
class AuthServerInterceptor implements AsyncServerInterceptor {

    @OverRide
    <I, O> CompletableFuture<Listener<I>> asyncInterceptCall(
            ServerCall<I, O> call, Metadata headers, ServerCallHandler<I, O> next) {

       return authorizer.authorize(headers).thenApply(result -> {
            if (result) {
                return next.startCall(call, headers);
            } else {
                throw new AuthenticationException("Invalid access");
            }
       });
   }
}

GrpcService.builder()
           .intercept(new AuthServerInterceptor()
           .addService(...)
           ...
```
ikhoon added a commit to ikhoon/armeria that referenced this issue Feb 1, 2023
Motivation:

The upstream `ServerInterceptor` of gRPC-Java does not allow to execute
an asynchronous task in a interceptor. The API is degined to return a
`Listener` synchronously.
https://github.com/grpc/grpc-java/blob/b7164f0791d405e002d67c1e01603e0df9d1cc9f/api/src/main/java/io/grpc/ServerInterceptor.java#L55

It make users difficult to use the existing asynchronous API in the
interceptors. The blocking threads can be exhausted easily if there is
slow responses from servers having problems.

It should be useful if you can use `CompletableFuture` to compose
asynchronous tasks without blocking threads.

The original idea is suggested from line#4275 (comment)
/cc @be-hase

Modifications:

- Add `AsyncServerInterceptor` that allow returning
  `ServerCall.Listener` with `CompletableFuture`.
- Add `DeferredListener` to execute pending taks in order when the
  future returned by `AsyncServerInterceptor` completes.
  - `DeferredListener` also propagtes `ServiceRequestContext` by
    performing the tasks with a context-aware executor.
- Fixed to peel exceptions before applying `GrpcStatusFunction`.

Result:

You can now execute asynchronous tasks in gRPC `ServerInterceptor` using
`AsyncServerInterceptor`.

```java
class AuthServerInterceptor implements AsyncServerInterceptor {

    @OverRide
    <I, O> CompletableFuture<Listener<I>> asyncInterceptCall(
            ServerCall<I, O> call, Metadata headers, ServerCallHandler<I, O> next) {

       return authorizer.authorize(headers).thenApply(result -> {
            if (result) {
                return next.startCall(call, headers);
            } else {
                throw new AuthenticationException("Invalid access");
            }
       });
   }
}

GrpcService.builder()
           .intercept(new AuthServerInterceptor()
           .addService(...)
           ...
```
ikhoon added a commit that referenced this issue Feb 8, 2023
Motivation:

The upstream `ServerInterceptor` of gRPC-Java does not allow executing an asynchronous task in an interceptor. The API is designed to return a `Listener` synchronously.
https://github.com/grpc/grpc-java/blob/b7164f0791d405e002d67c1e01603e0df9d1cc9f/api/src/main/java/io/grpc/ServerInterceptor.java#L55

It makes users difficult to use the existing asynchronous API in the interceptors. The blocking threads can be exhausted easily if there are slow responses from servers having problems.

It should be useful if you can use `CompletableFuture` to compose asynchronous tasks without blocking threads.

The original idea is suggested from #4275 (comment) /cc @be-hase

Modifications:

- Add `AsyncServerInterceptor` that allow returning `ServerCall.Listener` with `CompletableFuture`.
- Add `DeferredListener` to execute pending tasks in order when the future returned by `AsyncServerInterceptor` completes.
  - `DeferredListener` also propagates `ServiceRequestContext` by performing the tasks with a context-aware executor.
- Fixed to peel exceptions before applying `GrpcStatusFunction`.

Result:

You can now execute asynchronous tasks in gRPC `ServerInterceptor` using `AsyncServerInterceptor`.

```java
class AuthServerInterceptor implements AsyncServerInterceptor {

    @OverRide
    <I, O> CompletableFuture<Listener<I>> asyncInterceptCall(
            ServerCall<I, O> call, Metadata headers, ServerCallHandler<I, O> next) {

       return authorizer.authorize(headers).thenApply(result -> {
            if (result) {
                return next.startCall(call, headers);
            } else {
                throw new AuthenticationException("Invalid access");
            }
       });
   }
}

GrpcService.builder()
           .intercept(new AuthServerInterceptor()
           .addService(...)
           ...
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants