Skip to content

Commit

Permalink
Rework reusability control in cancellable continuation (Kotlin#2581)
Browse files Browse the repository at this point in the history
* Rework reusability control in cancellable continuation

    * Update initCancellability documentation and implementation to be aligned with current invariants
    * Make parentHandle non-volatile and ensure there are no races around it
    * Establish new reusability invariants
      - Reusable continuation can be used _only_ if it states is not REUSABLE_CLAIMED
      - If it is, spin-loop and wait for release
      - Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and no one will be able to release intercepted continuation (-> detach from parent)
      - It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play
    * Leverage the fact that it's non-atomic and do not check it for cancellation prematurely. It increases the performance of fast-path, but potentially affects rare cancellation cases

Fixes Kotlin#2564
  • Loading branch information
qwwdfsad authored and pablobaxter committed Sep 14, 2022
1 parent d19ef82 commit 245b260
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 66 deletions.
8 changes: 5 additions & 3 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun completeResume(token: Any)

/**
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
* This function does nothing and is left only for binary compatibility with old compiled code.
* Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
* It's illegal to call this function in any non-`kotlinx.coroutines` code and
* such calls lead to undefined behaviour.
* Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
*
* @suppress **This is unstable API and it is subject to change.**
*/
Expand Down Expand Up @@ -332,7 +334,7 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
}
/*
* Attempt to claim reusable instance.
Expand Down
116 changes: 70 additions & 46 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>(
*/
private val _state = atomic<Any?>(Active)

private val _parentHandle = atomic<DisposableHandle?>(null)
private var parentHandle: DisposableHandle?
get() = _parentHandle.value
set(value) { _parentHandle.value = value }
private var parentHandle: DisposableHandle? = null

internal val state: Any? get() = _state.value

Expand All @@ -93,7 +90,21 @@ internal open class CancellableContinuationImpl<in T>(
}

public override fun initCancellability() {
setupCancellation()
/*
* Invariant: at the moment of invocation, `this` has not yet
* leaked to user code and no one is able to invoke `resume` or `cancel`
* on it yet. Also, this function is not invoked for reusable continuations.
*/
val handle = installParentHandle()
?: return // fast path -- don't do anything without parent
// now check our state _after_ registering, could have completed while we were registering,
// but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
// so we are helping it and cleaning the node ourselves
if (isCompleted) {
// Can be invoked concurrently in 'parentCancelled', no problems here
handle.dispose()
parentHandle = NonDisposableHandle
}
}

private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
Expand All @@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl<in T>(
return true
}

/**
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
* It is only invoked from an internal [getResult] function for reusable continuations
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
*/
private fun setupCancellation() {
if (checkCompleted()) return
if (parentHandle !== null) return // fast path 2 -- was already initialized
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
// now check our state _after_ registering (could have completed while we were registering)
// Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
if (isCompleted && !isReusable()) {
handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}

private fun checkCompleted(): Boolean {
val completed = isCompleted
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
if (!completed) {
// Note: this cancel may fail if one more concurrent cancel is currently being invoked
cancel(cause)
}
return true
}

public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame

Expand Down Expand Up @@ -188,7 +165,9 @@ internal open class CancellableContinuationImpl<in T>(
*/
private fun cancelLater(cause: Throwable): Boolean {
if (!resumeMode.isReusableMode) return false
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
// Ensure that we are postponing cancellation to the right instance
if (!isReusable()) return false
val dispatched = delegate as DispatchedContinuation<*>
return dispatched.postponeCancellation(cause)
}

Expand Down Expand Up @@ -216,7 +195,7 @@ internal open class CancellableContinuationImpl<in T>(

private inline fun callCancelHandlerSafely(block: () -> Unit) {
try {
block()
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -276,9 +255,33 @@ internal open class CancellableContinuationImpl<in T>(

@PublishedApi
internal fun getResult(): Any? {
setupCancellation()
if (trySuspend()) return COROUTINE_SUSPENDED
val isReusable = isReusable()
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
// or we got async cancellation from parent.
if (trySuspend()) {
/*
* We were neither resumed nor cancelled, time to suspend.
* But first we have to install parent cancellation handle (if we didn't yet),
* so CC could be properly resumed on parent cancellation.
*/
if (parentHandle == null) {
installParentHandle()
}
/*
* Release the continuation after installing the handle (if needed).
* If we were successful, then do nothing, it's ok to reuse the instance now.
* Otherwise, dispose the handle by ourselves.
*/
if (isReusable) {
releaseClaimedReusableContinuation()
}
return COROUTINE_SUSPENDED
}
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
if (isReusable) {
// release claimed reusable continuation for the future reuse
releaseClaimedReusableContinuation()
}
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
Expand All @@ -296,6 +299,28 @@ internal open class CancellableContinuationImpl<in T>(
return getSuccessfulResult(state)
}

private fun installParentHandle(): DisposableHandle? {
val parent = context[Job] ?: return null // don't do anything without a parent
// Install the handle
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
return handle
}

/**
* Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
* in which case it detaches from the parent and cancels this continuation.
*/
private fun releaseClaimedReusableContinuation() {
// Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
detachChild()
cancel(cancellationCause)
}

override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)

Expand Down Expand Up @@ -462,11 +487,10 @@ internal open class CancellableContinuationImpl<in T>(

/**
* Detaches from the parent.
* Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
*/
internal fun detachChild() {
val handle = parentHandle
handle?.dispose()
val handle = parentHandle ?: return
handle.dispose()
parentHandle = NonDisposableHandle
}

Expand Down
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher :

@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
/*
* Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
* any ClassCastException can only indicate compiler bug
*/
val dispatched = continuation as DispatchedContinuation<*>
dispatched.release()
}

/**
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
// we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
cont.initCancellability()
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
cont.getResult()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>(
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
* [CancellableContinuationImpl.getResult] will check for cancellation later.
*
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
* AbstractChannel.receive method relies on the fact that the following pattern
* [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
* In the `getResult`, we have the following code:
* ```
* suspendCancellableCoroutineReusable { cont ->
* val result = pollFastPath()
* if (result != null) cont.resume(result)
* if (trySuspend()) {
* // <- at this moment current continuation can be redispatched and claimed again.
* attachChildToParent()
* releaseClaimedContinuation()
* }
* ```
* always succeeds.
* To make it always successful, we actually postpone "reusable" cancellation
* to this phase and set cancellation only at the moment of instantiation.
*/
private val _reusableCancellableContinuation = atomic<Any?>(null)

Expand All @@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>(
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
/*
* Reusability control:
* `null` -> no reusability at all, false
* `null` -> no reusability at all, `false`
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
* Else, if result is CCI === requester.
* Else, if result is CCI === requester, then it's our reusable continuation
* Identity check my fail for the following pattern:
* ```
* loop:
Expand All @@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>(
return true
}


/**
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
* stop mutating cached instance
*/
public fun awaitReusability() {
_reusableCancellableContinuation.loop { it ->
if (it !== REUSABLE_CLAIMED) return
}
}

public fun release() {
/*
* Called from `releaseInterceptedContinuation`, can be concurrent with
* the code in `getResult` right after `trySuspend` returned `true`, so we have
* to wait for a release here.
*/
awaitReusability()
reusableCancellableContinuation?.detachChild()
}

/**
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
* so all cancellations will be postponed.
Expand All @@ -103,11 +122,20 @@ internal class DispatchedContinuation<in T>(
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
return null
}
// potentially competing with cancel
state is CancellableContinuationImpl<*> -> {
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
return state as CancellableContinuationImpl<T>
}
}
state === REUSABLE_CLAIMED -> {
// Do nothing, wait until reusable instance will be returned from
// getResult() of a previous `suspendCancellableCoroutineReusable`
}
state is Throwable -> {
// Also do nothing, Throwable can only indicate that the CC
// is in REUSABLE_CLAIMED state, but with postponed cancellation
}
else -> error("Inconsistent state $state")
}
}
Expand All @@ -127,14 +155,13 @@ internal class DispatchedContinuation<in T>(
*
* See [CancellableContinuationImpl.getResult].
*/
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
_reusableCancellableContinuation.loop { state ->
// not when(state) to avoid Intrinsics.equals call
when {
state === REUSABLE_CLAIMED -> {
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
}
state === null -> return null
state is Throwable -> {
require(_reusableCancellableContinuation.compareAndSet(state, null))
return state
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
private fun keepMe(a: ByteArray) {
// does nothing, makes sure the variable is kept in state-machine
}
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/FieldWalker.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -56,7 +56,7 @@ object FieldWalker {
* Reflectively starts to walk through object graph and map to all the reached object to their path
* in from root. Use [showPath] do display a path if needed.
*/
private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
val visited = IdentityHashMap<Any, Ref>()
if (root == null) return visited
visited[root] = Ref.RootRef
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*

class ReusableCancellableContinuationLeakStressTest : TestBase() {

@Suppress("UnnecessaryVariable")
private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
return r
}

private val iterations = 100_000 * stressTestMultiplier

class Leak(val i: Int)

@Test // Simplified version of #2564
fun testReusableContinuationLeak() = runTest {
val channel = produce(capacity = 1) { // from the main thread
(0 until iterations).forEach {
send(Leak(it))
}
}

launch(Dispatchers.Default) {
repeat (iterations) {
val value = channel.receiveBatch()
assertEquals(it, value.i)
}
(channel as Job).join()

FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
}
}
}

0 comments on commit 245b260

Please sign in to comment.