Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loom Support #7367

Merged
merged 19 commits into from
Jan 3, 2023
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ kotlin.js.compiler=ir
kotlin.incremental.js.ir=true
androidBuild=false
graalBuild=false
loomBuild=false
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
package okhttp3

import android.annotation.SuppressLint
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.logging.Handler
import java.util.logging.Level
import java.util.logging.LogManager
import java.util.logging.LogRecord
import java.util.logging.Logger
import okhttp3.internal.buildConnectionPool
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http2.Http2
import okhttp3.internal.taskRunnerInternal
import okhttp3.testing.Flaky
import okhttp3.testing.PlatformRule.Companion.LOOM_PROPERTY
import okhttp3.testing.PlatformRule.Companion.getPlatformSystemProperty
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.extension.AfterEachCallback
Expand Down Expand Up @@ -116,10 +121,9 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
fun newClient(): OkHttpClient {
var client = testClient
if (client == null) {
client = OkHttpClient.Builder()
client = initialClientBuilder()
.dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses.
.eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) }
.connectionPool(ConnectionPool(connectionListener = connectionListener))
.build()
connectionListener.forbidLock(RealConnectionPool.get(client.connectionPool))
connectionListener.forbidLock(client.dispatcher)
Expand All @@ -128,6 +132,29 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
return client
}

private fun initialClientBuilder(): OkHttpClient.Builder = if (isLoom()) {
val backend = TaskRunner.RealBackend(loomThreadFactory())
val taskRunner = TaskRunner(backend)

OkHttpClient.Builder()
.connectionPool(buildConnectionPool(connectionListener = connectionListener, taskRunner = taskRunner))
.dispatcher(Dispatcher(backend.executor))
.taskRunnerInternal(taskRunner)
} else {
OkHttpClient.Builder()
.connectionPool(ConnectionPool(connectionListener = connectionListener))
}

private fun loomThreadFactory(): ThreadFactory {
val ofVirtual = Thread::class.java.getMethod("ofVirtual").invoke(null)

return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(ofVirtual) as ThreadFactory
}

private fun isLoom(): Boolean {
return getPlatformSystemProperty() == LOOM_PROPERTY
}

fun newClientBuilder(): OkHttpClient.Builder {
return newClient().newBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Logger
import kotlin.concurrent.withLock

/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
Expand All @@ -47,24 +48,18 @@ import java.util.logging.Logger
class TaskFaker : Closeable {
@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !Thread.holdsLock(this)) {
if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && Thread.holdsLock(this)) {
if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.wait() = (this as Object).wait()

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.notifyAll() = (this as Object).notifyAll()

val logger = Logger.getLogger("TaskFaker." + instance++)

/** Though this executor service may hold many threads, they are not executed concurrently. */
Expand Down Expand Up @@ -108,9 +103,9 @@ class TaskFaker : Closeable {
val acquiredTaskRunnerLock = AtomicBoolean()

tasksExecutor.execute {
synchronized(taskRunner) {
taskRunner.lock.withLock {
acquiredTaskRunnerLock.set(true)
taskRunner.notifyAll()
taskRunner.condition.signalAll()

tasksRunningCount++
if (tasksRunningCount > 1) isParallel = true
Expand All @@ -130,7 +125,7 @@ class TaskFaker : Closeable {

// Execute() must not return until the launched task stalls.
while (!acquiredTaskRunnerLock.get()) {
taskRunner.wait()
taskRunner.condition.await()
}
}

Expand All @@ -141,7 +136,7 @@ class TaskFaker : Closeable {
check(waitingCoordinatorThread != null)

stalledTasks.remove(waitingCoordinatorThread)
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) {
Expand Down Expand Up @@ -170,7 +165,7 @@ class TaskFaker : Closeable {
stalledTasks += currentThread
try {
while (currentThread in stalledTasks) {
taskRunner.wait()
taskRunner.condition.await()
}
} catch (e: InterruptedException) {
stalledTasks.remove(currentThread)
Expand All @@ -182,7 +177,7 @@ class TaskFaker : Closeable {
taskRunner.assertThreadHoldsLock()

stalledTasks.clear()
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

/** Runs all tasks that are ready. Used by the test thread only. */
Expand All @@ -194,7 +189,7 @@ class TaskFaker : Closeable {
fun advanceUntil(newTime: Long) {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
isRunningAllTasks = true
nanoTime = newTime
unstallTasks()
Expand All @@ -207,7 +202,7 @@ class TaskFaker : Closeable {
taskRunner.assertThreadDoesntHoldLock()

while (true) {
synchronized(taskRunner) {
taskRunner.lock.withLock {
if (tasksRunningCount == stalledTasks.size) {
isRunningAllTasks = false
return@waitForTasksToStall // All stalled.
Expand All @@ -222,7 +217,7 @@ class TaskFaker : Closeable {
fun assertNoMoreTasks() {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
assertThat(stalledTasks).isEmpty()
}
}
Expand All @@ -234,7 +229,7 @@ class TaskFaker : Closeable {
// Make sure the coordinator is ready to be interrupted.
runTasks()

synchronized(taskRunner) {
taskRunner.lock.withLock {
val toInterrupt = waitingCoordinatorThread ?: error("no thread currently waiting")
taskBecameStalled.drainPermits()
toInterrupt.interrupt()
Expand All @@ -247,10 +242,10 @@ class TaskFaker : Closeable {
fun runNextTask() {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
check(stalledTasks.size >= 1) { "no tasks to run" }
stalledTasks.removeFirst()
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

waitForTasksToStall()
Expand Down
4 changes: 2 additions & 2 deletions okhttp/api/okhttp.api
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ public final class okhttp3/ConnectionListener$Companion {
public final class okhttp3/ConnectionPool {
public fun <init> ()V
public fun <init> (IJLjava/util/concurrent/TimeUnit;)V
public fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;)V
public synthetic fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lokhttp3/ConnectionListener;IJLjava/util/concurrent/TimeUnit;)V
public synthetic fun <init> (Lokhttp3/ConnectionListener;IJLjava/util/concurrent/TimeUnit;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun connectionCount ()I
public final fun evictAll ()V
public final fun idleConnectionCount ()I
Expand Down
20 changes: 17 additions & 3 deletions okhttp/src/jvmMain/kotlin/okhttp3/Cache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.security.cert.CertificateEncodingException
import java.security.cert.CertificateException
import java.security.cert.CertificateFactory
import java.util.TreeSet
import java.util.concurrent.TimeUnit
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.internal.EMPTY_HEADERS
Expand All @@ -32,6 +33,7 @@ import okhttp3.internal.cache.CacheStrategy
import okhttp3.internal.cache.DiskLruCache
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http.HttpMethod
import okhttp3.internal.http.StatusLine
import okhttp3.internal.platform.Platform
Expand Down Expand Up @@ -141,18 +143,30 @@ import okio.buffer
*
* [rfc_7234]: http://tools.ietf.org/html/rfc7234
*/
class Cache(
class Cache internal constructor(
directory: Path,
maxSize: Long,
fileSystem: FileSystem
fileSystem: FileSystem,
taskRunner: TaskRunner
yschimke marked this conversation as resolved.
Show resolved Hide resolved
) : Closeable, Flushable {
constructor(
directory: Path,
maxSize: Long,
fileSystem: FileSystem,
) : this(
directory,
maxSize,
fileSystem,
TaskRunner.INSTANCE
)

internal val cache = DiskLruCache(
fileSystem = fileSystem,
directory = directory,
appVersion = VERSION,
valueCount = ENTRY_COUNT,
maxSize = maxSize,
taskRunner = TaskRunner.INSTANCE
taskRunner = taskRunner
)

// read and write statistics, all guarded by 'this'.
Expand Down
33 changes: 24 additions & 9 deletions okhttp/src/jvmMain/kotlin/okhttp3/ConnectionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,45 @@ import okhttp3.internal.connection.RealConnectionPool
class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool
) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
internal constructor(
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES,
taskRunner: TaskRunner = TaskRunner.INSTANCE,
yschimke marked this conversation as resolved.
Show resolved Hide resolved
connectionListener: ConnectionListener = ConnectionListener.NONE,
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
taskRunner = taskRunner,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = ConnectionListener.NONE
connectionListener = connectionListener
))

constructor(
connectionListener: ConnectionListener = ConnectionListener.NONE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: surprising that the new parameter is first?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reorder. It seemed right to me. But should we go full Builder mode on it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up #7631

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Builder.

maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES,
connectionListener: ConnectionListener = ConnectionListener.NONE
) : this(RealConnectionPool(
) : this(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = connectionListener
))
)

// Public API
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit,
) : this(
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
taskRunner = TaskRunner.INSTANCE,
connectionListener = ConnectionListener.NONE
)

constructor() : this(5, 5, TimeUnit.MINUTES)

Expand Down
4 changes: 4 additions & 0 deletions okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,10 @@ open class OkHttpClient internal constructor(
this.cache = cache
}

internal fun taskRunner(taskRunner: TaskRunner) = apply {
this.taskRunner = taskRunner
}

/**
* Sets the DNS service used to lookup IP addresses for hostnames.
*
Expand Down
15 changes: 15 additions & 0 deletions okhttp/src/jvmMain/kotlin/okhttp3/internal/-UtilJvm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.Locale
import java.util.TimeZone
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.text.Charsets.UTF_16BE
import kotlin.text.Charsets.UTF_16LE
import kotlin.text.Charsets.UTF_32BE
Expand Down Expand Up @@ -299,13 +300,27 @@ internal val assertionsEnabled: Boolean = OkHttpClient::class.java.desiredAssert
internal val okHttpName: String =
OkHttpClient::class.java.name.removePrefix("okhttp3.").removeSuffix("Client")

@Suppress("NOTHING_TO_INLINE")
internal inline fun ReentrantLock.assertHeld() {
if (assertionsEnabled && !this.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
yschimke marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !Thread.holdsLock(this)) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun ReentrantLock.assertNotHeld() {
if (assertionsEnabled && this.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && Thread.holdsLock(this)) {
Expand Down