Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kotlin-gRPC client CoroutineStub #2669

Merged
merged 10 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ io.grpc:
- io.netty:netty-handler-proxy
- io.netty:netty-transport
- io.netty:netty-tcnative-boringssl-static
grpc-kotlin-stub: { version: &GRPC_KOTLIN_VERSION '0.1.1' }
protoc-gen-grpc-kotlin: { version: *GRPC_KOTLIN_VERSION }

io.micrometer:
micrometer-core:
Expand Down Expand Up @@ -420,7 +422,7 @@ org.jctools:
- from: org.jctools
to: com.linecorp.armeria.internal.shaded.jctools

# If you want to change `org.jetbrains.kotlin:kotlin-allopen` version,
# If you want to change `org.jetbrains.kotlin:kotlin-allopen` version,
# you also need to change `org.jetbrains.kotlin.jvm` version in `build.gradle`.
org.jetbrains.kotlin:
kotlin-allopen: { version: &KOTLIN_VERSION '1.3.71' }
Expand Down
6 changes: 4 additions & 2 deletions examples/grpc-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ dependencies {
runtimeOnly("org.slf4j:slf4j-simple")

implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation(kotlin("reflect"))
implementation(kotlin("stdlib-jdk8"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("io.grpc:grpc-kotlin-stub")

testImplementation("jakarta.annotation:jakarta.annotation-api")
testImplementation("net.javacrumbs.json-unit:json-unit-fluent")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package example.armeria.grpc.kotlin

import com.linecorp.armeria.common.RequestContext
import com.linecorp.armeria.server.ServiceRequestContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher

/**
* A [CoroutineContext] that propagates an [ServiceRequestContext] to coroutines run using
* an Armeria context aware executor.
*/
internal object ArmeriaContext : CoroutineContext {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
override fun <R> fold(initial: R, operation: (R, CoroutineContext.Element) -> R): R =
EmptyCoroutineContext.fold(initial, operation)

override fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
EmptyCoroutineContext.get(key)

override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
EmptyCoroutineContext.minusKey(key)

override fun plus(context: CoroutineContext): CoroutineContext {
val requestCtx: RequestContext = RequestContext.current()
return requestCtx.contextAwareExecutor().asCoroutineDispatcher() + context
}
}

val Dispatchers.Armeria: CoroutineContext
get() = ArmeriaContext
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,51 @@ package example.armeria.grpc.kotlin
import com.linecorp.armeria.server.ServiceRequestContext
import example.armeria.grpc.kotlin.Hello.HelloReply
import example.armeria.grpc.kotlin.Hello.HelloRequest
import io.grpc.stub.StreamObserver
import java.time.Duration
import java.util.concurrent.TimeUnit
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers

class HelloServiceImpl : HelloServiceGrpc.HelloServiceImplBase() {
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.withContext

/**
* Note that if you want to access a current [ServiceRequestContext] in [HelloServiceImpl],
* you should initialize [HelloServiceImpl] with [ArmeriaContext].
*/
class HelloServiceImpl : HelloServiceGrpcKt.HelloServiceCoroutineImplBase(Dispatchers.Armeria) {

/**
* Sends a [HelloReply] immediately when receiving a request.
*/
override fun hello(request: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
responseObserver.onNext(buildReply(toMessage(request.name)))
responseObserver.onCompleted()
override suspend fun hello(request: HelloRequest): HelloReply {
// Make sure that current thread is request context aware
ServiceRequestContext.current()
return buildReply(toMessage(request.name))
}

override fun lazyHello(request: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
// You can use the event loop for scheduling a task.
ServiceRequestContext.current().contextAwareEventLoop().schedule({
responseObserver.onNext(buildReply(toMessage(request.name)))
responseObserver.onCompleted()
}, 3, TimeUnit.SECONDS)
override suspend fun lazyHello(request: HelloRequest): HelloReply {
delay(3000L)
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
ServiceRequestContext.current()
return buildReply(toMessage(request.name))
}

/**
* Sends a [HelloReply] using `blockingTaskExecutor`.
*
* @see [Blocking service implementation](https://line.github.io/armeria/server-grpc.html#blocking-service-implementation)
*/
override fun blockingHello(request: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
// Unlike upstream gRPC-Java, Armeria does not run service logic in a separate thread pool by default.
// Therefore, this method will run in the event loop, which means that you can suffer the performance
// degradation if you call a blocking API in this method. In this case, you have the following options:
//
// 1. Call a blocking API in the blockingTaskExecutor provided by Armeria.
// 2. Set `GrpcServiceBuilder.useBlockingTaskExecutor(true)` when building your GrpcService.
// 3. Call a blocking API in the separate thread pool you manage.
//
// In this example, we chose the option 1:
ServiceRequestContext.current().blockingTaskExecutor().submit {
override suspend fun blockingHello(request: HelloRequest): HelloReply {
return withContext(ServiceRequestContext.current().blockingTaskExecutor().asCoroutineDispatcher()) {
try { // Simulate a blocking API call.
Thread.sleep(3000)
} catch (ignored: Exception) { // Do nothing.
}
responseObserver.onNext(buildReply(toMessage(request.name)))
responseObserver.onCompleted()
// Make sure that current thread is request context aware
ServiceRequestContext.current()
buildReply(toMessage(request.name))
}
}

Expand All @@ -57,69 +56,39 @@ class HelloServiceImpl : HelloServiceGrpc.HelloServiceImplBase() {
*
* @see lazyHello(HelloRequest, StreamObserver)
*/
override fun lotsOfReplies(request: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
override fun lotsOfReplies(request: HelloRequest): Flow<HelloReply> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be flow? I wouldn't expect a for loop in a coroutine to cause any differences compared to our normal withArmeriaContext methods

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, it doesn't matter so much whether it's a flow or not but was hoping withArmeriaContext applies to all the methods without having yet another pattern of flowOn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve tried but it was impossible to apply because withContext only could be applied to suspend function. This function is just return flow and not suspend :-(

Copy link
Contributor Author

@ikhoon ikhoon Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can wrap withArmeriaContext with the inside of flow block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - does that mean we can have

flow {
  withArmeriaContext {
    for (i...)

?

Definitely not great but I guess a bit better than a new concept flowOn. And probably good evidence to add to our issue about getting better support for customizing the context globally :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - does that mean we can have

Yes, flow takes suspend function.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/Builders.kt#L49

And probably good evidence to add to our issue about getting better support for customizing the context globally :)

That sounds good. 👍

Copy link
Contributor Author

@ikhoon ikhoon Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test failed, the nested block does not work...

flow {
  withArmeriaContext {
    for (i...)

emit() should be called from dispatchers of flow block unless it throws ISE.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/Builders.kt#L37-L45

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad :( Let's stick with flowOn then since best we can do

// You can also write this code without Reactor like 'lazyHello' example.
Flux.interval(Duration.ofSeconds(1))
.take(5)
.map { "Hello, ${request.name}! (sequence: ${it + 1})" }
// You can make your Flux/Mono publish the signals in the RequestContext-aware executor.
.publishOn(Schedulers.fromExecutor(ServiceRequestContext.current().contextAwareExecutor()))
.subscribe({
// Confirm this callback is being executed on the RequestContext-aware executor.
ServiceRequestContext.current()
responseObserver.onNext(buildReply(it))
},
{
// Confirm this callback is being executed on the RequestContext-aware executor.
ServiceRequestContext.current()
responseObserver.onError(it)
},
{
// Confirm this callback is being executed on the RequestContext-aware executor.
ServiceRequestContext.current()
responseObserver.onCompleted()
})
return flow {
for (i in 1..5) {
// Check context between delay and emit
ServiceRequestContext.current()
delay(1000)
ServiceRequestContext.current()
emit(buildReply("Hello, ${request.name}! (sequence: $i)")) // emit next value
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
ServiceRequestContext.current()
}
}
}

/**
* Sends a [HelloReply] when a request has been completed with multiple [HelloRequest]s.
*/
override fun lotsOfGreetings(responseObserver: StreamObserver<HelloReply>): StreamObserver<HelloRequest> {
return object : StreamObserver<HelloRequest> {
val names = arrayListOf<String>()

override fun onNext(value: HelloRequest) {
names.add(value.name)
}

override fun onError(t: Throwable) {
responseObserver.onError(t)
}

override fun onCompleted() {
responseObserver.onNext(buildReply(toMessage(names.joinToString())))
responseObserver.onCompleted()
}
}
override suspend fun lotsOfGreetings(requests: Flow<HelloRequest>): HelloReply {
val names = mutableListOf<String>()
requests.map { it.name }.toList(names)
// Make sure that current thread is request context aware
ServiceRequestContext.current()
return buildReply(toMessage(names.joinToString()))
}

/**
* Sends a [HelloReply] when each [HelloRequest] is received. The response will be completed
* when the request is completed.
*/
override fun bidiHello(responseObserver: StreamObserver<HelloReply>): StreamObserver<HelloRequest> {
return object : StreamObserver<HelloRequest> {
override fun onNext(value: HelloRequest) { // Respond to every request received.
responseObserver.onNext(buildReply(toMessage(value.name)))
}

override fun onError(t: Throwable) {
responseObserver.onError(t)
}

override fun onCompleted() {
responseObserver.onCompleted()
}
override fun bidiHello(requests: Flow<HelloRequest>): Flow<HelloReply> = flow {
requests.collect { request ->
ServiceRequestContext.current()
emit(buildReply(toMessage(request.name)))
}
}

Expand Down
Loading