Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply contextCapture automatically in await function #59

Open
be-hase opened this issue Oct 5, 2023 · 0 comments
Open

Apply contextCapture automatically in await function #59

be-hase opened this issue Oct 5, 2023 · 0 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@be-hase
Copy link

be-hase commented Oct 5, 2023

Motivation

This is a similar issue.
reactor/reactor-core#3406

For users using kotlin coroutine, ThreadContextElement is used to propagate MDC, etc.

The block operator is supported by Hooks.automaticContextPropagation, but not the await function. Therefore, we are forced to write it this way.

// e.g. WebClient

webClient.post()
    .uri("...")
    // omitted
    .contextCapture()
    .awaitSingle()

It would be great if the await function is also supported, like the block operator.

Reproduction code

I have created a sample project.
https://github.com/be-hase/context-propagation-report-202309/tree/main/coroutines-issue

The code is in one file.
https://github.com/be-hase/context-propagation-report-202309/blob/main/coroutines-issue/src/main/kotlin/example/CoroutinesIssueApplication.kt

You can start it with the following command.

./gradlew :coroutines-issue:bootRun 

Then, request with curl.

curl localhost:8080

The log shows that the context is not propagating. (see last log)

Controller. MDC={traceId=651e4b5c7f37de9f9425f6f2b46edc94, spanId=9425f6f2b46edc94}, myThreadLocal=d578c95d-5bed-471d-8571-d45493cd9298
Request. MDC={traceId=651e4b5c7f37de9f9425f6f2b46edc94, spanId=9425f6f2b46edc94}, myThreadLocal=d578c95d-5bed-471d-8571-d45493cd9298
Response. MDC={traceId=651e4b5c7f37de9f9425f6f2b46edc94, spanId=bc0acce7efb692e0}, myThreadLocal=null
doOnSuccess. MDC={}, myThreadLocal=null

Desired solution

The await function is implemented on the coroutine side, maybe I should make a request on the coroutine side.
https://github.com/Kotlin/kotlinx.coroutines/blob/1ccc96890d38be52d1bdcd31af5befb54804aa05/reactive/kotlinx-coroutines-reactor/src/Mono.kt#L46

But, I found that the following workaround can be done using the ContextInjector mechanism provided by the coroutines side.

package example

import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.reactive.ContextInjector
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.coroutines.CoroutineContext

@OptIn(InternalCoroutinesApi::class)
class ContextCaptureInjector : ContextInjector {
    override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
        return when (publisher) {
            is Mono -> publisher.contextCapture()
            is Flux -> publisher.contextCapture()
            else -> publisher
        }
    }
}

If this method is acceptable, I will send you a PR.

Considered alternatives

Additional context

I originally submitted my request here.
reactor/reactor-core#3563

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

2 participants