Skip to content

Commit

Permalink
Move RealCall and RealConnection to loom safe locks (#8290)
Browse files Browse the repository at this point in the history
  • Loading branch information
yschimke committed Apr 15, 2024
1 parent ce28a31 commit a673f45
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 355 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import javax.net.SocketFactory
import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLSocketFactory
import kotlin.concurrent.withLock
import okhttp3.internal.RecordingOkAuthenticator
import okhttp3.internal.concurrent.TaskFaker
import okhttp3.internal.concurrent.TaskRunner
Expand Down Expand Up @@ -93,7 +94,7 @@ class TestValueFactory : Closeable {
socket = Socket(),
idleAtNs = idleAtNanos,
)
synchronized(result) { pool.put(result) }
result.lock.withLock { pool.put(result) }
return result
}

Expand Down
76 changes: 43 additions & 33 deletions okhttp/src/main/kotlin/okhttp3/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import okhttp3.internal.assertThreadDoesntHoldLock
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import okhttp3.internal.assertNotHeld
import okhttp3.internal.connection.RealCall
import okhttp3.internal.connection.RealCall.AsyncCall
import okhttp3.internal.okHttpName
Expand All @@ -36,17 +38,20 @@ import okhttp3.internal.threadFactory
* concurrently.
*/
class Dispatcher() {
internal val lock: ReentrantLock = ReentrantLock()

/**
* The maximum number of requests to execute concurrently. Above this requests queue in memory,
* waiting for the running calls to complete.
*
* If more than [maxRequests] requests are in flight when this is invoked, those requests will
* remain in flight.
*/
@get:Synchronized var maxRequests = 64
var maxRequests = 64
get() = lock.withLock { field }
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
lock.withLock {
field = maxRequests
}
promoteAndExecute()
Expand All @@ -62,10 +67,11 @@ class Dispatcher() {
*
* WebSocket connections to hosts **do not** count against this limit.
*/
@get:Synchronized var maxRequestsPerHost = 5
var maxRequestsPerHost = 5
get() = lock.withLock { field }
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
lock.withLock {
field = maxRequestsPerHost
}
promoteAndExecute()
Expand All @@ -82,29 +88,31 @@ class Dispatcher() {
* This means that if you are doing synchronous calls the network layer will not truly be idle
* until every returned [Response] has been closed.
*/
@set:Synchronized
@get:Synchronized
var idleCallback: Runnable? = null
get() = lock.withLock { field }
set(value) {
lock.withLock { field = value }
}

private var executorServiceOrNull: ExecutorService? = null

@get:Synchronized
@get:JvmName("executorService")
val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull =
ThreadPoolExecutor(
0,
Int.MAX_VALUE,
60,
TimeUnit.SECONDS,
SynchronousQueue(),
threadFactory("$okHttpName Dispatcher", false),
)
get() =
lock.withLock {
if (executorServiceOrNull == null) {
executorServiceOrNull =
ThreadPoolExecutor(
0,
Int.MAX_VALUE,
60,
TimeUnit.SECONDS,
SynchronousQueue(),
threadFactory("$okHttpName Dispatcher", false),
)
}
return executorServiceOrNull!!
}
return executorServiceOrNull!!
}

/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
Expand All @@ -120,7 +128,7 @@ class Dispatcher() {
}

internal fun enqueue(call: AsyncCall) {
synchronized(this) {
lock.withLock {
readyAsyncCalls.add(call)

// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
Expand All @@ -147,15 +155,17 @@ class Dispatcher() {
* Cancel all calls currently enqueued or executing. Includes calls executed both
* [synchronously][Call.execute] and [asynchronously][Call.enqueue].
*/
@Synchronized fun cancelAll() {
for (call in readyAsyncCalls) {
call.call.cancel()
}
for (call in runningAsyncCalls) {
call.call.cancel()
}
for (call in runningSyncCalls) {
call.cancel()
fun cancelAll() {
lock.withLock {
for (call in readyAsyncCalls) {
call.call.cancel()
}
for (call in runningAsyncCalls) {
call.call.cancel()
}
for (call in runningSyncCalls) {
call.cancel()
}
}
}

Expand All @@ -167,11 +177,11 @@ class Dispatcher() {
* @return true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
lock.assertNotHeld()

val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
lock.withLock {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
import okhttp3.CertificatePinner
import okhttp3.ConnectionSpec
import okhttp3.Handshake
Expand Down Expand Up @@ -503,7 +504,7 @@ class ConnectPlan(
val pooled3 = routePlanner.planReusePooledConnection(this, routes)
if (pooled3 != null) return pooled3.connection

synchronized(connection) {
connection.lock.withLock {
connectionPool.put(connection)
user.acquireConnectionNoEvents(connection)
}
Expand Down
31 changes: 18 additions & 13 deletions okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import okhttp3.Call
import okhttp3.Callback
import okhttp3.EventListener
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.internal.assertHeld
import okhttp3.internal.assertNotHeld
import okhttp3.internal.assertThreadDoesntHoldLock
import okhttp3.internal.assertThreadHoldsLock
import okhttp3.internal.cache.CacheInterceptor
import okhttp3.internal.closeQuietly
import okhttp3.internal.http.BridgeInterceptor
Expand All @@ -60,6 +63,8 @@ class RealCall(
val originalRequest: Request,
val forWebSocket: Boolean,
) : Call, Cloneable {
internal val lock: ReentrantLock = ReentrantLock()

private val connectionPool: RealConnectionPool = client.connectionPool.delegate

internal val eventListener: EventListener = client.eventListenerFactory.create(this)
Expand Down Expand Up @@ -95,7 +100,7 @@ class RealCall(
internal var interceptorScopedExchange: Exchange? = null
private set

// These properties are guarded by this. They are typically only accessed by the thread executing
// These properties are guarded by lock. They are typically only accessed by the thread executing
// the call, but they may be accessed by other threads for duplex requests.

/** True if this call still has a request body open. */
Expand Down Expand Up @@ -231,7 +236,7 @@ class RealCall(
) {
check(interceptorScopedExchange == null)

synchronized(this) {
lock.withLock {
check(!responseBodyOpen) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
Expand Down Expand Up @@ -265,7 +270,7 @@ class RealCall(

/** Finds a new or pooled connection to carry a forthcoming request and response. */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
lock.withLock {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
Expand All @@ -277,7 +282,7 @@ class RealCall(
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
lock.withLock {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
Expand All @@ -287,7 +292,7 @@ class RealCall(
}

fun acquireConnectionNoEvents(connection: RealConnection) {
connection.assertThreadHoldsLock()
connection.lock.assertHeld()

check(this.connection == null)
this.connection = connection
Expand All @@ -312,7 +317,7 @@ class RealCall(

var bothStreamsDone = false
var callDone = false
synchronized(this) {
lock.withLock {
if (requestDone && requestBodyOpen || responseDone && responseBodyOpen) {
if (requestDone) requestBodyOpen = false
if (responseDone) responseBodyOpen = false
Expand All @@ -335,7 +340,7 @@ class RealCall(

internal fun noMoreExchanges(e: IOException?): IOException? {
var callDone = false
synchronized(this) {
lock.withLock {
if (expectMoreExchanges) {
expectMoreExchanges = false
callDone = !requestBodyOpen && !responseBodyOpen
Expand All @@ -362,13 +367,13 @@ class RealCall(
* additional context. Otherwise [e] is returned as-is.
*/
private fun <E : IOException?> callDone(e: E): E {
assertThreadDoesntHoldLock()
lock.assertNotHeld()

val connection = this.connection
if (connection != null) {
connection.assertThreadDoesntHoldLock()
connection.lock.assertNotHeld()
val toClose: Socket? =
synchronized(connection) {
connection.lock.withLock {
// Sets this.connection to null.
releaseConnectionNoEvents()
}
Expand Down Expand Up @@ -399,7 +404,7 @@ class RealCall(
*/
internal fun releaseConnectionNoEvents(): Socket? {
val connection = this.connection!!
connection.assertThreadHoldsLock()
connection.lock.assertHeld()

val calls = connection.calls
val index = calls.indexOfFirst { it.get() == this@RealCall }
Expand Down Expand Up @@ -443,7 +448,7 @@ class RealCall(
* This is usually due to either an exception or a retry.
*/
internal fun exitNetworkInterceptorExchange(closeExchange: Boolean) {
synchronized(this) {
lock.withLock {
check(expectMoreExchanges) { "released" }
}

Expand Down

0 comments on commit a673f45

Please sign in to comment.