Skip to content

Commit

Permalink
CIO: encapsulate atomic reference of continuation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Mashkov committed May 30, 2018
1 parent 940c61a commit 45d7487
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions ktor-network/src/io/ktor/network/selector/ActorSelectorManager.kt
Expand Up @@ -4,7 +4,6 @@ import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.internal.*
import java.io.Closeable
import java.io.*
import java.nio.channels.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.experimental.*
Expand All @@ -19,7 +18,7 @@ class ActorSelectorManager(dispatcher: CoroutineDispatcher) : SelectorManagerSup
@Volatile
private var inSelect = false

private val continuation = AtomicReference<Continuation<Unit>?>(null)
private val continuation = ContinuationHolder<Unit, Continuation<Unit>>()

@Volatile
private var closed = false
Expand Down Expand Up @@ -114,10 +113,7 @@ class ActorSelectorManager(dispatcher: CoroutineDispatcher) : SelectorManagerSup
override fun publishInterest(selectable: Selectable) {
try {
if (mb.addLast(selectable)) {
val cont = continuation.getAndSet(null)
if (cont != null) {
cont.resume(Unit)
} else {
if (!continuation.resume(Unit)) {
selectWakeup()
}
}
Expand All @@ -141,22 +137,42 @@ class ActorSelectorManager(dispatcher: CoroutineDispatcher) : SelectorManagerSup
if (closed) return null

suspendCoroutineUninterceptedOrReturn<Unit> {
continuation.set(it)

if ((!isEmpty || closed) && continuation.compareAndSet(it, null)) Unit
else COROUTINE_SUSPENDED
continuation.suspendIf(it) { isEmpty && !closed } ?: Unit
}
}
}

override fun close() {
closed = true
mb.close()
val cont = continuation.getAndSet(null)
if (cont != null) {
cont.resume(Unit)
} else {
if (!continuation.resume(Unit)) {
selectWakeup()
}
}

private class ContinuationHolder<R, C : Continuation<R>> {
private val ref = AtomicReference<C?>(null)

fun resume(value: R): Boolean {
val continuation = ref.getAndSet(null)
if (continuation != null) {
continuation.resume(value)
return true
}

return false
}

/**
* @return `null` if not suspended due to failed condition or `COROUTINE_SUSPENDED` if successfully applied
*/
inline fun suspendIf(continuation: C, condition: () -> Boolean): Any? {
if (!condition()) return null
if (!ref.compareAndSet(null, continuation)) {
throw IllegalStateException("Continuation is already set")
}
if (!condition() && ref.compareAndSet(continuation, null)) return null
return COROUTINE_SUSPENDED
}
}
}

0 comments on commit 45d7487

Please sign in to comment.