From f8e9137140ccee2ceb2de9d4a159512c4f9842cc Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 10:44:35 +0100 Subject: [PATCH 01/10] Centralise more locks --- .../kotlin/okhttp3/OkHttpClientTestRule.kt | 4 +-- .../okhttp3/internal/concurrent/TaskFaker.kt | 23 +++++++-------- .../okhttp3/internal/concurrent/TaskQueue.kt | 18 ++++++------ .../okhttp3/internal/concurrent/TaskRunner.kt | 28 +++++++++---------- .../internal/connection/ConnectPlan.kt | 1 - .../okhttp3/internal/connection/Locks.kt | 12 ++++++++ .../internal/connection/RealConnection.kt | 7 ++--- 7 files changed, 52 insertions(+), 41 deletions(-) diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt index 275d09f6a0a7..860eb83ad83a 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt @@ -25,7 +25,7 @@ import java.util.logging.Level import java.util.logging.LogManager import java.util.logging.LogRecord import java.util.logging.Logger -import kotlin.concurrent.withLock +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.buildConnectionPool import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.connection.RealConnectionPool @@ -234,7 +234,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback { // a test timeout failure. val waitTime = (entryTime + 1_000_000_000L - System.nanoTime()) if (!queue.idleLatch().await(waitTime, TimeUnit.NANOSECONDS)) { - TaskRunner.INSTANCE.lock.withLock { + TaskRunner.INSTANCE.withLock { TaskRunner.INSTANCE.cancelAll() } fail("Queue still active after 1000 ms") diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt index 88dfd7936742..cb8c44be41fb 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") package okhttp3.internal.concurrent import assertk.assertThat @@ -23,7 +24,7 @@ import java.util.concurrent.BlockingQueue import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.logging.Logger -import kotlin.concurrent.withLock +import okhttp3.internal.connection.Locks.withLock import okhttp3.OkHttpClient import okhttp3.TestUtil.threadFactory @@ -44,14 +45,14 @@ import okhttp3.TestUtil.threadFactory class TaskFaker : Closeable { @Suppress("NOTHING_TO_INLINE") internal inline fun Any.assertThreadHoldsLock() { - if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) { + if (assertionsEnabled && !taskRunner.lock_.isHeldByCurrentThread) { throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this") } } @Suppress("NOTHING_TO_INLINE") internal inline fun Any.assertThreadDoesntHoldLock() { - if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) { + if (assertionsEnabled && taskRunner.lock_.isHeldByCurrentThread) { throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this") } } @@ -166,7 +167,7 @@ class TaskFaker : Closeable { fun advanceUntil(newTime: Long) { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { check(currentTask == TestThreadSerialTask) nanoTime = newTime yieldUntil(ResumePriority.AfterOtherTasks) @@ -177,7 +178,7 @@ class TaskFaker : Closeable { fun assertNoMoreTasks() { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { assertThat(activeThreads).isEqualTo(0) } } @@ -207,7 +208,7 @@ class TaskFaker : Closeable { fun runNextTask() { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { val contextSwitchCountBefore = contextSwitchCount yieldUntil(ResumePriority.BeforeOtherTasks) { contextSwitchCount > contextSwitchCountBefore @@ -217,7 +218,7 @@ class TaskFaker : Closeable { /** Sleep until [durationNanos] elapses. For use by the task threads. */ fun sleep(durationNanos: Long) { - taskRunner.lock.withLock { + taskRunner.withLock { val sleepUntil = nanoTime + durationNanos yieldUntil { nanoTime >= sleepUntil } } @@ -229,7 +230,7 @@ class TaskFaker : Closeable { */ fun yield() { taskRunner.assertThreadDoesntHoldLock() - taskRunner.lock.withLock { + taskRunner.withLock { yieldUntil() } } @@ -328,7 +329,7 @@ class TaskFaker : Closeable { runnable.run() require(currentTask == this) { "unexpected current task: $currentTask" } } finally { - taskRunner.lock.withLock { + taskRunner.withLock { activeThreads-- startNextTask() } @@ -354,7 +355,7 @@ class TaskFaker : Closeable { timeout: Long, unit: TimeUnit, ): T? { - taskRunner.lock.withLock { + taskRunner.withLock { val waitUntil = nanoTime + unit.toNanos(timeout) while (true) { val result = poll() @@ -367,7 +368,7 @@ class TaskFaker : Closeable { } override fun put(element: T) { - taskRunner.lock.withLock { + taskRunner.withLock { delegate.put(element) editCount++ } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt index 788fc0f44321..e15d197f7462 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt @@ -18,8 +18,8 @@ package okhttp3.internal.concurrent import java.util.concurrent.CountDownLatch import java.util.concurrent.RejectedExecutionException import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock import okhttp3.internal.assertNotHeld +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.okHttpName /** @@ -32,7 +32,7 @@ class TaskQueue internal constructor( internal val taskRunner: TaskRunner, internal val name: String, ) { - val lock: ReentrantLock = ReentrantLock() + internal val lock_: ReentrantLock = ReentrantLock() internal var shutdown = false @@ -50,7 +50,7 @@ class TaskQueue internal constructor( * currently-executing task unless it is also scheduled for future execution. */ val scheduledTasks: List - get() = taskRunner.lock.withLock { futureTasks.toList() } + get() = taskRunner.withLock { futureTasks.toList() } /** * Schedules [task] for execution in [delayNanos]. A task may only have one future execution @@ -66,7 +66,7 @@ class TaskQueue internal constructor( task: Task, delayNanos: Long = 0L, ) { - taskRunner.lock.withLock { + taskRunner.withLock { if (shutdown) { if (task.cancelable) { taskRunner.logger.taskLog(task, this) { "schedule canceled (queue is shutdown)" } @@ -126,7 +126,7 @@ class TaskQueue internal constructor( /** Returns a latch that reaches 0 when the queue is next idle. */ fun idleLatch(): CountDownLatch { - taskRunner.lock.withLock { + taskRunner.withLock { // If the queue is already idle, that's easy. if (activeTask == null && futureTasks.isEmpty()) { return CountDownLatch(0) @@ -206,9 +206,9 @@ class TaskQueue internal constructor( * be removed from the execution schedule. */ fun cancelAll() { - lock.assertNotHeld() + lock_.assertNotHeld() - taskRunner.lock.withLock { + taskRunner.withLock { if (cancelAllAndDecide()) { taskRunner.kickCoordinator(this) } @@ -216,9 +216,9 @@ class TaskQueue internal constructor( } fun shutdown() { - lock.assertNotHeld() + lock_.assertNotHeld() - taskRunner.lock.withLock { + taskRunner.withLock { shutdown = true if (cancelAllAndDecide()) { taskRunner.kickCoordinator(this) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt index 6acc7b24e774..b648724713fa 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt @@ -23,10 +23,10 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import java.util.logging.Logger -import kotlin.concurrent.withLock import okhttp3.internal.addIfAbsent import okhttp3.internal.assertHeld import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.okHttpName import okhttp3.internal.threadFactory @@ -45,8 +45,8 @@ class TaskRunner( val backend: Backend, internal val logger: Logger = TaskRunner.logger, ) { - val lock: ReentrantLock = ReentrantLock() - val condition: Condition = lock.newCondition() + internal val lock_: ReentrantLock = ReentrantLock() + val condition: Condition = lock_.newCondition() private var nextQueueName = 10000 private var coordinatorWaiting = false @@ -63,7 +63,7 @@ class TaskRunner( override fun run() { while (true) { val task = - this@TaskRunner.lock.withLock { + this@TaskRunner.withLock { awaitTaskToRun() } ?: return @@ -75,7 +75,7 @@ class TaskRunner( } finally { // If the task is crashing start another thread to service the queues. if (!completedNormally) { - lock.withLock { + this@TaskRunner.withLock { backend.execute(this@TaskRunner, this) } } @@ -86,7 +86,7 @@ class TaskRunner( } internal fun kickCoordinator(taskQueue: TaskQueue) { - lock.assertHeld() + lock_.assertHeld() if (taskQueue.activeTask == null) { if (taskQueue.futureTasks.isNotEmpty()) { @@ -104,7 +104,7 @@ class TaskRunner( } private fun beforeRun(task: Task) { - lock.assertHeld() + lock_.assertHeld() task.nextExecuteNanoTime = -1L val queue = task.queue!! @@ -123,7 +123,7 @@ class TaskRunner( try { delayNanos = task.runOnce() } finally { - lock.withLock { + this.withLock { afterRun(task, delayNanos) } currentThread.name = oldName @@ -134,7 +134,7 @@ class TaskRunner( task: Task, delayNanos: Long, ) { - lock.assertHeld() + lock_.assertHeld() val queue = task.queue!! check(queue.activeTask === task) @@ -160,7 +160,7 @@ class TaskRunner( * this will launch another thread to handle that work. */ fun awaitTaskToRun(): Task? { - lock.assertHeld() + lock_.assertHeld() while (true) { if (readyQueues.isEmpty()) { @@ -239,7 +239,7 @@ class TaskRunner( } fun newQueue(): TaskQueue { - val name = lock.withLock { nextQueueName++ } + val name = this.withLock { nextQueueName++ } return TaskQueue(this, "Q$name") } @@ -248,13 +248,13 @@ class TaskRunner( * necessarily track queues that have no tasks scheduled. */ fun activeQueues(): List { - lock.withLock { + this.withLock { return busyQueues + readyQueues } } fun cancelAll() { - lock.assertHeld() + lock_.assertHeld() for (i in busyQueues.size - 1 downTo 0) { busyQueues[i].cancelAllAndDecide() } @@ -315,7 +315,7 @@ class TaskRunner( taskRunner: TaskRunner, nanos: Long, ) { - taskRunner.lock.assertHeld() + taskRunner.lock_.assertHeld() if (nanos > 0) { taskRunner.condition.awaitNanos(nanos) } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt index 625791a5b835..90ecf50a8ab0 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt @@ -26,7 +26,6 @@ import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLSocket -import kotlin.concurrent.withLock import okhttp3.CertificatePinner import okhttp3.ConnectionSpec import okhttp3.Handshake diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index b09ce9a2d33a..8f9d738a964d 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -22,6 +22,8 @@ import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract import okhttp3.Dispatcher +import okhttp3.internal.concurrent.TaskQueue +import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.http2.Http2Connection import okhttp3.internal.http2.Http2Stream import okhttp3.internal.http2.Http2Writer @@ -55,6 +57,16 @@ internal object Locks { return lock.withLock(action) } + inline fun TaskRunner.withLock(action: () -> T): T { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return lock_.withLock(action) + } + + inline fun TaskQueue.withLock(action: () -> T): T { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return lock_.withLock(action) + } + inline fun Http2Writer.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index 432dcc074e7e..89c0bb30bd26 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLSocket -import kotlin.concurrent.withLock import okhttp3.Address import okhttp3.Connection import okhttp3.ConnectionListener @@ -335,7 +334,7 @@ class RealConnection( return http2Connection.isHealthy(nowNs) } - val idleDurationNs = lock.withLock { nowNs - idleAtNs } + val idleDurationNs = this.withLock { nowNs - idleAtNs } if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { return socket.isHealthy(source) } @@ -354,7 +353,7 @@ class RealConnection( connection: Http2Connection, settings: Settings, ) { - lock.withLock { + this.withLock { val oldLimit = allocationLimit allocationLimit = settings.getMaxConcurrentStreams() @@ -398,7 +397,7 @@ class RealConnection( e: IOException?, ) { var noNewExchangesEvent = false - lock.withLock { + this.withLock { if (e is StreamResetException) { when { e.errorCode == ErrorCode.REFUSED_STREAM -> { From 54fd42839c8e09d84534631d94f029b44858fb61 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 11:07:51 +0100 Subject: [PATCH 02/10] Centralise more locks --- .../okhttp3/internal/concurrent/TaskFaker.kt | 4 ++-- .../okhttp3/internal/concurrent/TaskQueue.kt | 6 +++--- .../okhttp3/internal/concurrent/TaskRunner.kt | 17 +++++++++-------- .../kotlin/okhttp3/internal/connection/Locks.kt | 12 ++++++++++-- .../okhttp3/internal/http2/Http2Connection.kt | 3 ++- .../okhttp3/internal/http2/Http2Stream.kt | 3 ++- 6 files changed, 28 insertions(+), 17 deletions(-) diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt index cb8c44be41fb..11186d588a9e 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt @@ -45,14 +45,14 @@ import okhttp3.TestUtil.threadFactory class TaskFaker : Closeable { @Suppress("NOTHING_TO_INLINE") internal inline fun Any.assertThreadHoldsLock() { - if (assertionsEnabled && !taskRunner.lock_.isHeldByCurrentThread) { + if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) { throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this") } } @Suppress("NOTHING_TO_INLINE") internal inline fun Any.assertThreadDoesntHoldLock() { - if (assertionsEnabled && taskRunner.lock_.isHeldByCurrentThread) { + if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) { throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this") } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt index e15d197f7462..99d5b600b265 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt @@ -32,7 +32,7 @@ class TaskQueue internal constructor( internal val taskRunner: TaskRunner, internal val name: String, ) { - internal val lock_: ReentrantLock = ReentrantLock() + internal val lock: ReentrantLock = ReentrantLock() internal var shutdown = false @@ -206,7 +206,7 @@ class TaskQueue internal constructor( * be removed from the execution schedule. */ fun cancelAll() { - lock_.assertNotHeld() + lock.assertNotHeld() taskRunner.withLock { if (cancelAllAndDecide()) { @@ -216,7 +216,7 @@ class TaskQueue internal constructor( } fun shutdown() { - lock_.assertNotHeld() + lock.assertNotHeld() taskRunner.withLock { shutdown = true diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt index b648724713fa..4425369f327d 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt @@ -26,6 +26,7 @@ import java.util.logging.Logger import okhttp3.internal.addIfAbsent import okhttp3.internal.assertHeld import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE +import okhttp3.internal.connection.Locks.newLockCondition import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.okHttpName import okhttp3.internal.threadFactory @@ -45,8 +46,8 @@ class TaskRunner( val backend: Backend, internal val logger: Logger = TaskRunner.logger, ) { - internal val lock_: ReentrantLock = ReentrantLock() - val condition: Condition = lock_.newCondition() + internal val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newLockCondition() private var nextQueueName = 10000 private var coordinatorWaiting = false @@ -86,7 +87,7 @@ class TaskRunner( } internal fun kickCoordinator(taskQueue: TaskQueue) { - lock_.assertHeld() + lock.assertHeld() if (taskQueue.activeTask == null) { if (taskQueue.futureTasks.isNotEmpty()) { @@ -104,7 +105,7 @@ class TaskRunner( } private fun beforeRun(task: Task) { - lock_.assertHeld() + lock.assertHeld() task.nextExecuteNanoTime = -1L val queue = task.queue!! @@ -134,7 +135,7 @@ class TaskRunner( task: Task, delayNanos: Long, ) { - lock_.assertHeld() + lock.assertHeld() val queue = task.queue!! check(queue.activeTask === task) @@ -160,7 +161,7 @@ class TaskRunner( * this will launch another thread to handle that work. */ fun awaitTaskToRun(): Task? { - lock_.assertHeld() + lock.assertHeld() while (true) { if (readyQueues.isEmpty()) { @@ -254,7 +255,7 @@ class TaskRunner( } fun cancelAll() { - lock_.assertHeld() + lock.assertHeld() for (i in busyQueues.size - 1 downTo 0) { busyQueues[i].cancelAllAndDecide() } @@ -315,7 +316,7 @@ class TaskRunner( taskRunner: TaskRunner, nanos: Long, ) { - taskRunner.lock_.assertHeld() + taskRunner.lock.assertHeld() if (nanos > 0) { taskRunner.condition.awaitNanos(nanos) } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index 8f9d738a964d..add3f7b1ba49 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -17,10 +17,14 @@ package okhttp3.internal.connection +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTimedValue import okhttp3.Dispatcher import okhttp3.internal.concurrent.TaskQueue import okhttp3.internal.concurrent.TaskRunner @@ -59,12 +63,12 @@ internal object Locks { inline fun TaskRunner.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock_.withLock(action) + return lock.withLock(action) } inline fun TaskQueue.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock_.withLock(action) + return lock.withLock(action) } inline fun Http2Writer.withLock(action: () -> T): T { @@ -74,4 +78,8 @@ internal object Locks { return lock.withLock(action) } + + internal fun ReentrantLock.newLockCondition(): Condition { + return this.newCondition() + } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt index 7339f4fcf8df..eac412033736 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt @@ -27,6 +27,7 @@ import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertThreadDoesntHoldLock import okhttp3.internal.closeQuietly import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.Locks.newLockCondition import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE @@ -56,7 +57,7 @@ import okio.source @Suppress("NAME_SHADOWING") class Http2Connection internal constructor(builder: Builder) : Closeable { internal val lock: ReentrantLock = ReentrantLock() - internal val condition: Condition = lock.newCondition() + internal val condition: Condition = lock.newLockCondition() // Internal state of this connection is guarded by 'lock'. No blocking operations may be // performed while holding this lock! diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt index ea27c00ecf5e..8c6b8ec18e02 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -26,6 +26,7 @@ import okhttp3.Headers import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertNotHeld import okhttp3.internal.connection.Locks.withLock +import okhttp3.internal.connection.Locks.newLockCondition import okhttp3.internal.http2.flowcontrol.WindowCounter import okhttp3.internal.toHeaderList import okio.AsyncTimeout @@ -45,7 +46,7 @@ class Http2Stream internal constructor( headers: Headers?, ) { internal val lock: ReentrantLock = ReentrantLock() - val condition: Condition = lock.newCondition() + val condition: Condition = lock.newLockCondition() // Internal state is guarded by [lock]. No long-running or potentially blocking operations are // performed while the lock is held. From 0a18964d4ad7e84cd8d8f88d1ab7e1d641a8975a Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 11:18:55 +0100 Subject: [PATCH 03/10] Centralise more locks --- .../okhttp3/internal/connection/Locks.kt | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index add3f7b1ba49..c356418dcbb8 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -23,8 +23,6 @@ import kotlin.concurrent.withLock import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract -import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.measureTimedValue import okhttp3.Dispatcher import okhttp3.internal.concurrent.TaskQueue import okhttp3.internal.concurrent.TaskRunner @@ -38,48 +36,52 @@ import okhttp3.internal.http2.Http2Writer internal object Locks { inline fun Dispatcher.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun RealConnection.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun RealCall.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun Http2Connection.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun Http2Stream.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun TaskRunner.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun TaskQueue.withLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return lock.withLock(action) + return lock.withMonitoredLock(action) } inline fun Http2Writer.withLock(action: () -> T): T { - contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - // TODO can we assert we don't have the connection lock? - return lock.withLock(action) + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return lock.withMonitoredLock(action) } internal fun ReentrantLock.newLockCondition(): Condition { return this.newCondition() } + + inline fun ReentrantLock.withMonitoredLock(action: () -> T): T { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + return withLock(action) + } } From 44dc78f53ee188268f9e8a2a57c9f2537be9b7a9 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 11:57:06 +0100 Subject: [PATCH 04/10] Experiment with locks --- .../okhttp3/internal/concurrent/TaskRunner.kt | 5 ++ .../okhttp3/internal/connection/Locks.kt | 72 ++++++++++++++++++- okhttp/src/test/java/okhttp3/CallTest.kt | 19 +++++ 3 files changed, 94 insertions(+), 2 deletions(-) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt index 4425369f327d..6546ef54ba6b 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt @@ -26,6 +26,7 @@ import java.util.logging.Logger import okhttp3.internal.addIfAbsent import okhttp3.internal.assertHeld import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE +import okhttp3.internal.connection.Locks import okhttp3.internal.connection.Locks.newLockCondition import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.okHttpName @@ -341,5 +342,9 @@ class TaskRunner( @JvmField val INSTANCE = TaskRunner(RealBackend(threadFactory("$okHttpName TaskRunner", daemon = true))) + + init { + Locks.lockToWatch = INSTANCE.lock + } } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index c356418dcbb8..4a270e5cd3b9 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -17,13 +17,19 @@ package okhttp3.internal.connection +import java.util.Date +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTimedValue import okhttp3.Dispatcher +import okhttp3.internal.assertHeld import okhttp3.internal.concurrent.TaskQueue import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.http2.Http2Connection @@ -77,11 +83,73 @@ internal object Locks { } internal fun ReentrantLock.newLockCondition(): Condition { - return this.newCondition() + val condition = this.newCondition() + return object : Condition by condition { + override fun await() { + assertHeld() + return timeAwait { condition.await() } + } + + override fun await(time: Long, unit: TimeUnit?): Boolean { + assertHeld() + return timeAwait { condition.await(time, unit) } + } + + override fun awaitUninterruptibly() { + assertHeld() + return timeAwait { condition.awaitUninterruptibly() } + } + + override fun awaitNanos(nanosTimeout: Long): Long { + assertHeld() + return timeAwait { condition.awaitNanos(nanosTimeout) } + } + + override fun awaitUntil(deadline: Date): Boolean { + assertHeld() + return timeAwait { condition.awaitUntil(deadline) } + } + } + } + + private fun ReentrantLock.timeAwait(function: () -> T): T { + return if (this == lockToWatch) { + measureTimedValue { function() }.also { + val lockDuration = it.duration + if (lockDuration > 1.milliseconds) { +// println(Thread.currentThread().name + " await " + lockDuration) +// Exception().printStackTrace() + threadLocalAwait.set(threadLocalAwait.get() + lockDuration) + } + }.value + } else { + function() + } } inline fun ReentrantLock.withMonitoredLock(action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - return withLock(action) + return if (this == lockToWatch) { + withLock { + measureTimedValue { + action() + } + }.also { + val awaitDuration = threadLocalAwait.get() + threadLocalAwait.remove() + if (it.duration - awaitDuration > 1.milliseconds) { + println(Thread.currentThread().name + " lock " + it.duration + " " + awaitDuration) + Exception().printStackTrace() + } + }.value + } else { + withLock(action) + } } + + @Suppress("NewApi") + val threadLocalAwait = ThreadLocal.withInitial { Duration.ZERO } + + @Volatile + var lockToWatch: ReentrantLock? = null } diff --git a/okhttp/src/test/java/okhttp3/CallTest.kt b/okhttp/src/test/java/okhttp3/CallTest.kt index e59e0de7b6ee..2c8adcc4e9d9 100644 --- a/okhttp/src/test/java/okhttp3/CallTest.kt +++ b/okhttp/src/test/java/okhttp3/CallTest.kt @@ -42,6 +42,8 @@ import java.net.HttpURLConnection import java.net.InetAddress import java.net.ProtocolException import java.net.Proxy +import java.net.Socket +import java.net.SocketAddress import java.net.SocketTimeoutException import java.net.UnknownHostException import java.net.UnknownServiceException @@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference +import javax.net.SocketFactory import javax.net.ssl.SSLException import javax.net.ssl.SSLHandshakeException import javax.net.ssl.SSLPeerUnverifiedException @@ -142,6 +145,22 @@ open class CallTest { private var client = clientTestRule.newClientBuilder() .eventListenerFactory(clientTestRule.wrap(listener)) + .socketFactory(object: DelegatingSocketFactory(SocketFactory.getDefault()) { + override fun createSocket(): Socket { + Thread.sleep(1_000) + return object : Socket() { + override fun connect(endpoint: SocketAddress?) { + Thread.sleep(1_000) + super.connect(endpoint) + } + + override fun connect(endpoint: SocketAddress?, timeout: Int) { + Thread.sleep(1_000) + super.connect(endpoint, timeout) + } + } + } + }) .build() private val callback = RecordingCallback() private val cache = From e37da146fd3a5c04be8829c6c0817400261b3d29 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 12:21:40 +0100 Subject: [PATCH 05/10] Slow connects --- .../kotlin/mockwebserver3/MockWebServer.kt | 7 +- .../kotlin/mockwebserver3/SocketPolicy.kt | 6 ++ .../kotlin/okhttp3/OkHttpClientTestRule.kt | 2 +- .../okhttp3/internal/concurrent/TaskFaker.kt | 3 +- .../okhttp3/internal/connection/Locks.kt | 47 +++++----- .../okhttp3/internal/http2/Http2Stream.kt | 2 +- okhttp/src/test/java/okhttp3/CallTest.kt | 31 ++++--- .../src/test/java/okhttp3/SlowNetworkTest.kt | 88 +++++++++++++++++++ 8 files changed, 147 insertions(+), 39 deletions(-) create mode 100644 okhttp/src/test/java/okhttp3/SlowNetworkTest.kt diff --git a/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt b/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt index 78205b7c5cde..dc7a36962af7 100644 --- a/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt +++ b/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt @@ -374,6 +374,12 @@ class MockWebServer : Closeable { @Throws(Exception::class) private fun acceptConnections() { while (true) { + val socketPolicy = dispatcher.peek().socketPolicy + + if (socketPolicy is SocketPolicy.DelayAccept) { + Thread.sleep(socketPolicy.delay.inWholeMilliseconds) + } + val socket: Socket try { socket = serverSocket!!.accept() @@ -382,7 +388,6 @@ class MockWebServer : Closeable { return } - val socketPolicy = dispatcher.peek().socketPolicy if (socketPolicy === DisconnectAtStart) { dispatchBookkeepingRequest(0, socket) socket.close() diff --git a/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt b/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt index f74df36c906b..be24ff0f0936 100644 --- a/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt +++ b/mockwebserver/src/main/kotlin/mockwebserver3/SocketPolicy.kt @@ -16,6 +16,7 @@ package mockwebserver3 +import kotlin.time.Duration import okhttp3.ExperimentalOkHttpApi /** @@ -59,6 +60,11 @@ sealed interface SocketPolicy { */ object DisconnectAtStart : SocketPolicy + /** + * Delay before accepting on the ServerSocket. + */ + class DelayAccept(val delay: Duration) : SocketPolicy + /** * Close connection after reading the request but before writing the response. Use this to * simulate late connection pool failures. diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt index 860eb83ad83a..8d222346e88e 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt @@ -25,9 +25,9 @@ import java.util.logging.Level import java.util.logging.LogManager import java.util.logging.LogRecord import java.util.logging.Logger -import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.buildConnectionPool import okhttp3.internal.concurrent.TaskRunner +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.connection.RealConnectionPool import okhttp3.internal.http2.Http2 import okhttp3.internal.taskRunnerInternal diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt index 11186d588a9e..b90fba17e258 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt @@ -14,6 +14,7 @@ * limitations under the License. */ @file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + package okhttp3.internal.concurrent import assertk.assertThat @@ -24,9 +25,9 @@ import java.util.concurrent.BlockingQueue import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.logging.Logger -import okhttp3.internal.connection.Locks.withLock import okhttp3.OkHttpClient import okhttp3.TestUtil.threadFactory +import okhttp3.internal.connection.Locks.withLock /** * Runs a [TaskRunner] in a controlled environment so that everything is sequential and diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index 4a270e5cd3b9..bd5564d34870 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -83,33 +83,36 @@ internal object Locks { } internal fun ReentrantLock.newLockCondition(): Condition { - val condition = this.newCondition() - return object : Condition by condition { - override fun await() { - assertHeld() - return timeAwait { condition.await() } - } + val condition = this.newCondition() + return object : Condition by condition { + override fun await() { + assertHeld() + return timeAwait { condition.await() } + } - override fun await(time: Long, unit: TimeUnit?): Boolean { - assertHeld() - return timeAwait { condition.await(time, unit) } - } + override fun await( + time: Long, + unit: TimeUnit?, + ): Boolean { + assertHeld() + return timeAwait { condition.await(time, unit) } + } - override fun awaitUninterruptibly() { - assertHeld() - return timeAwait { condition.awaitUninterruptibly() } - } + override fun awaitUninterruptibly() { + assertHeld() + return timeAwait { condition.awaitUninterruptibly() } + } - override fun awaitNanos(nanosTimeout: Long): Long { - assertHeld() - return timeAwait { condition.awaitNanos(nanosTimeout) } - } + override fun awaitNanos(nanosTimeout: Long): Long { + assertHeld() + return timeAwait { condition.awaitNanos(nanosTimeout) } + } - override fun awaitUntil(deadline: Date): Boolean { - assertHeld() - return timeAwait { condition.awaitUntil(deadline) } - } + override fun awaitUntil(deadline: Date): Boolean { + assertHeld() + return timeAwait { condition.awaitUntil(deadline) } } + } } private fun ReentrantLock.timeAwait(function: () -> T): T { diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt index 8c6b8ec18e02..fcabc4d2ca5c 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -25,8 +25,8 @@ import java.util.concurrent.locks.ReentrantLock import okhttp3.Headers import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertNotHeld -import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.connection.Locks.newLockCondition +import okhttp3.internal.connection.Locks.withLock import okhttp3.internal.http2.flowcontrol.WindowCounter import okhttp3.internal.toHeaderList import okio.AsyncTimeout diff --git a/okhttp/src/test/java/okhttp3/CallTest.kt b/okhttp/src/test/java/okhttp3/CallTest.kt index 2c8adcc4e9d9..1d88303d6459 100644 --- a/okhttp/src/test/java/okhttp3/CallTest.kt +++ b/okhttp/src/test/java/okhttp3/CallTest.kt @@ -145,22 +145,27 @@ open class CallTest { private var client = clientTestRule.newClientBuilder() .eventListenerFactory(clientTestRule.wrap(listener)) - .socketFactory(object: DelegatingSocketFactory(SocketFactory.getDefault()) { - override fun createSocket(): Socket { - Thread.sleep(1_000) - return object : Socket() { - override fun connect(endpoint: SocketAddress?) { - Thread.sleep(1_000) - super.connect(endpoint) - } + .socketFactory( + object : DelegatingSocketFactory(SocketFactory.getDefault()) { + override fun createSocket(): Socket { + Thread.sleep(1_000) + return object : Socket() { + override fun connect(endpoint: SocketAddress?) { + Thread.sleep(1_000) + super.connect(endpoint) + } - override fun connect(endpoint: SocketAddress?, timeout: Int) { - Thread.sleep(1_000) - super.connect(endpoint, timeout) + override fun connect( + endpoint: SocketAddress?, + timeout: Int, + ) { + Thread.sleep(1_000) + super.connect(endpoint, timeout) + } } } - } - }) + }, + ) .build() private val callback = RecordingCallback() private val cache = diff --git a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt new file mode 100644 index 000000000000..019c19c68bca --- /dev/null +++ b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3 + +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds +import mockwebserver3.MockResponse +import mockwebserver3.MockWebServer +import mockwebserver3.SocketPolicy +import okhttp3.testing.PlatformRule +import okio.IOException +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension + +class SlowNetworkTest { + @JvmField + @RegisterExtension + val clientTestRule = OkHttpClientTestRule() + + @JvmField + @RegisterExtension + val platform = PlatformRule() + + private val handshakeCertificates = platform.localhostHandshakeCertificates() + private lateinit var client: OkHttpClient + private lateinit var server: MockWebServer + + @BeforeEach + fun setUp(server: MockWebServer) { + this.server = server + + client = + clientTestRule.newClientBuilder() + .sslSocketFactory( + handshakeCertificates.sslSocketFactory(), + handshakeCertificates.trustManager, + ) + .callTimeout(15.seconds) + .connectTimeout(15.seconds) + .build() + + server.useHttps(handshakeCertificates.sslSocketFactory()) + } + + @Test + fun slowRequests() { + repeat(100) { + server.enqueue( + MockResponse.Builder() + .socketPolicy(SocketPolicy.DelayAccept(10.milliseconds)) + .build(), + ) + } + + (1..100).map { + client.newCall(Request(server.url("/"))).enqueue( + object : Callback { + override fun onFailure( + call: Call, + e: IOException, + ) { + } + + override fun onResponse( + call: Call, + response: Response, + ) { + response.body.string() + } + }, + ) + } + } +} From 399972bb24f36dddefa50d8f1750f556da9a4fc6 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 12:28:00 +0100 Subject: [PATCH 06/10] Slow connects --- .../main/kotlin/okhttp3/internal/connection/Locks.kt | 10 +++++----- okhttp/src/test/java/okhttp3/CallTest.kt | 9 +++++++-- okhttp/src/test/java/okhttp3/SlowNetworkTest.kt | 8 ++++++++ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index bd5564d34870..dee4ecf8511c 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -119,11 +119,11 @@ internal object Locks { return if (this == lockToWatch) { measureTimedValue { function() }.also { val lockDuration = it.duration - if (lockDuration > 1.milliseconds) { +// if (lockDuration > 1.milliseconds) { // println(Thread.currentThread().name + " await " + lockDuration) // Exception().printStackTrace() threadLocalAwait.set(threadLocalAwait.get() + lockDuration) - } +// } }.value } else { function() @@ -140,10 +140,10 @@ internal object Locks { }.also { val awaitDuration = threadLocalAwait.get() threadLocalAwait.remove() - if (it.duration - awaitDuration > 1.milliseconds) { +// if (it.duration - awaitDuration > 1.milliseconds) { println(Thread.currentThread().name + " lock " + it.duration + " " + awaitDuration) - Exception().printStackTrace() - } +// Exception().printStackTrace() +// } }.value } else { withLock(action) diff --git a/okhttp/src/test/java/okhttp3/CallTest.kt b/okhttp/src/test/java/okhttp3/CallTest.kt index 1d88303d6459..c1aa3ac10c07 100644 --- a/okhttp/src/test/java/okhttp3/CallTest.kt +++ b/okhttp/src/test/java/okhttp3/CallTest.kt @@ -151,7 +151,7 @@ open class CallTest { Thread.sleep(1_000) return object : Socket() { override fun connect(endpoint: SocketAddress?) { - Thread.sleep(1_000) + Thread.sleep(500) super.connect(endpoint) } @@ -159,9 +159,14 @@ open class CallTest { endpoint: SocketAddress?, timeout: Int, ) { - Thread.sleep(1_000) + Thread.sleep(500) super.connect(endpoint, timeout) } + + override fun close() { + Thread.sleep(500) + super.close() + } } } }, diff --git a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt index 019c19c68bca..2be3c81c516f 100644 --- a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt +++ b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt @@ -15,6 +15,7 @@ */ package okhttp3 +import java.util.concurrent.CountDownLatch import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import mockwebserver3.MockResponse @@ -66,6 +67,8 @@ class SlowNetworkTest { ) } + val latch = CountDownLatch(100) + (1..100).map { client.newCall(Request(server.url("/"))).enqueue( object : Callback { @@ -73,6 +76,8 @@ class SlowNetworkTest { call: Call, e: IOException, ) { + println(e) + latch.countDown() } override fun onResponse( @@ -80,9 +85,12 @@ class SlowNetworkTest { response: Response, ) { response.body.string() + latch.countDown() } }, ) } + + latch.await() } } From 9899f03133fd90bafe9f30f45ff4c57166d9f9a7 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 12:28:38 +0100 Subject: [PATCH 07/10] Slow connects --- okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt index dee4ecf8511c..8b998abb19ff 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt @@ -140,10 +140,10 @@ internal object Locks { }.also { val awaitDuration = threadLocalAwait.get() threadLocalAwait.remove() -// if (it.duration - awaitDuration > 1.milliseconds) { + if (it.duration - awaitDuration > 1.milliseconds) { println(Thread.currentThread().name + " lock " + it.duration + " " + awaitDuration) // Exception().printStackTrace() -// } + } }.value } else { withLock(action) From 633881489df5510b818c65f7f44e421e9ebdecd3 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 12:33:45 +0100 Subject: [PATCH 08/10] Slow connects --- .../kotlin/mockwebserver3/MockWebServer.kt | 3 +- .../src/test/java/okhttp3/SlowNetworkTest.kt | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt b/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt index dc7a36962af7..d869e6543e86 100644 --- a/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt +++ b/mockwebserver/src/main/kotlin/mockwebserver3/MockWebServer.kt @@ -45,6 +45,7 @@ import javax.net.ssl.SSLSocket import javax.net.ssl.SSLSocketFactory import javax.net.ssl.TrustManager import javax.net.ssl.X509TrustManager +import kotlin.time.Duration.Companion.milliseconds import mockwebserver3.SocketPolicy.DisconnectAfterRequest import mockwebserver3.SocketPolicy.DisconnectAtEnd import mockwebserver3.SocketPolicy.DisconnectAtStart @@ -377,7 +378,7 @@ class MockWebServer : Closeable { val socketPolicy = dispatcher.peek().socketPolicy if (socketPolicy is SocketPolicy.DelayAccept) { - Thread.sleep(socketPolicy.delay.inWholeMilliseconds) + Thread.sleep(100.milliseconds.inWholeMilliseconds) } val socket: Socket diff --git a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt index 2be3c81c516f..493065943a43 100644 --- a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt +++ b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt @@ -15,12 +15,15 @@ */ package okhttp3 +import java.net.Socket +import java.net.SocketAddress import java.util.concurrent.CountDownLatch import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import mockwebserver3.MockResponse import mockwebserver3.MockWebServer import mockwebserver3.SocketPolicy +import okhttp3.internal.connection.RealConnection import okhttp3.testing.PlatformRule import okio.IOException import org.junit.jupiter.api.BeforeEach @@ -50,8 +53,33 @@ class SlowNetworkTest { handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager, ) + .socketFactory(object : DelegatingSocketFactory(getDefault()) { + override fun createSocket(): Socket { + return object : Socket() { + override fun connect(endpoint: SocketAddress?) { + Thread.sleep(100) + super.connect(endpoint) + } + + override fun connect(endpoint: SocketAddress?, timeout: Int) { + Thread.sleep(100) + super.connect(endpoint, timeout) + } + + override fun close() { + Thread.sleep(100) + super.close() + } + } + } + }) .callTimeout(15.seconds) .connectTimeout(15.seconds) + .eventListener(object : EventListener() { + override fun connectionAcquired(call: Call, connection: Connection) { + (connection as RealConnection).noNewExchanges() + } + }) .build() server.useHttps(handshakeCertificates.sslSocketFactory()) @@ -84,6 +112,7 @@ class SlowNetworkTest { call: Call, response: Response, ) { +// println("response") response.body.string() latch.countDown() } From 60709f89106dd2d608b613d04aa73f1b0978980f Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 13:30:38 +0100 Subject: [PATCH 09/10] More tests --- .../src/test/java/okhttp3/SlowNetworkTest.kt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt index 493065943a43..b009cb895e80 100644 --- a/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt +++ b/okhttp/src/test/java/okhttp3/SlowNetworkTest.kt @@ -122,4 +122,25 @@ class SlowNetworkTest { latch.await() } + + @Test + fun test1() { + repeat(10) { + slowRequests() + } + } + + @Test + fun test2() { + repeat(10) { + slowRequests() + } + } + + @Test + fun test3() { + repeat(10) { + slowRequests() + } + } } From e3af37c167f1d316d805a1d03478622d64c015d4 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 27 Apr 2024 15:17:16 +0100 Subject: [PATCH 10/10] More tests --- .../okhttp3/internal/concurrent/TaskRunner.kt | 13 +++++++++++-- .../okhttp3/internal/http2/Http2Connection.kt | 10 +++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt index 6546ef54ba6b..2e749e7c8c72 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt @@ -23,6 +23,9 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import java.util.logging.Logger +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTime import okhttp3.internal.addIfAbsent import okhttp3.internal.assertHeld import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE @@ -297,7 +300,7 @@ class TaskRunner( // keepAliveTime: 60L, TimeUnit.SECONDS, - SynchronousQueue(), + SynchronousQueue(false), threadFactory, ) @@ -329,7 +332,13 @@ class TaskRunner( taskRunner: TaskRunner, runnable: Runnable, ) { - executor.execute(runnable) + val time = measureTime { + executor.execute(runnable) + } + + if (time > 500.microseconds) { + println("executor.execute " + time) + } } fun shutdown() { diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt index eac412033736..3da1fb82414a 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt @@ -22,6 +22,8 @@ import java.net.Socket import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTime import okhttp3.internal.EMPTY_BYTE_ARRAY import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertThreadDoesntHoldLock @@ -793,7 +795,13 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { peerSettings = newPeerSettings settingsListenerQueue.execute("$connectionName onSettings") { - listener.onSettings(this@Http2Connection, newPeerSettings) + measureTime { + listener.onSettings(this@Http2Connection, newPeerSettings) + }.also { + if (it > 1.milliseconds) { + println("onSettings " + it) + } + } } } try {