Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loom Support #7367

Merged
merged 19 commits into from
Jan 3, 2023
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ kotlin.js.compiler=ir
kotlin.incremental.js.ir=true
androidBuild=false
graalBuild=false
loomBuild=false
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package okhttp3

import android.annotation.SuppressLint
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.logging.Handler
import java.util.logging.Level
Expand All @@ -26,6 +27,8 @@ import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http2.Http2
import okhttp3.testing.Flaky
import okhttp3.testing.PlatformRule.Companion.LOOM_PROPERTY
import okhttp3.testing.PlatformRule.Companion.getPlatformSystemProperty
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.extension.AfterEachCallback
Expand Down Expand Up @@ -116,10 +119,9 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
fun newClient(): OkHttpClient {
var client = testClient
if (client == null) {
client = OkHttpClient.Builder()
client = initialClientBuilder()
.dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses.
.eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) }
.connectionPool(ConnectionPool(connectionListener = connectionListener))
.build()
connectionListener.forbidLock(RealConnectionPool.get(client.connectionPool))
connectionListener.forbidLock(client.dispatcher)
Expand All @@ -128,6 +130,29 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
return client
}

private fun initialClientBuilder(): OkHttpClient.Builder = if (isLoom()) {
val backend = TaskRunner.RealBackend(loomThreadFactory())
val taskRunner = TaskRunner(backend)

OkHttpClient.Builder()
.connectionPool(ConnectionPool(connectionListener = connectionListener, taskRunner = taskRunner))
.dispatcher(Dispatcher(backend.executor))
.taskRunner(taskRunner)
} else {
OkHttpClient.Builder()
.connectionPool(ConnectionPool(connectionListener = connectionListener))
}

private fun loomThreadFactory(): ThreadFactory {
val ofVirtual = Thread::class.java.getMethod("ofVirtual").invoke(null)

return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(ofVirtual) as ThreadFactory
}

private fun isLoom(): Boolean {
return getPlatformSystemProperty() == LOOM_PROPERTY
}

fun newClientBuilder(): OkHttpClient.Builder {
return newClient().newBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Logger
import kotlin.concurrent.withLock

/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
Expand All @@ -47,24 +48,18 @@ import java.util.logging.Logger
class TaskFaker : Closeable {
@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !Thread.holdsLock(this)) {
if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && Thread.holdsLock(this)) {
if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.wait() = (this as Object).wait()

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.notifyAll() = (this as Object).notifyAll()

val logger = Logger.getLogger("TaskFaker." + instance++)

/** Though this executor service may hold many threads, they are not executed concurrently. */
Expand Down Expand Up @@ -108,9 +103,9 @@ class TaskFaker : Closeable {
val acquiredTaskRunnerLock = AtomicBoolean()

tasksExecutor.execute {
synchronized(taskRunner) {
taskRunner.lock.withLock {
acquiredTaskRunnerLock.set(true)
taskRunner.notifyAll()
taskRunner.condition.signalAll()

tasksRunningCount++
if (tasksRunningCount > 1) isParallel = true
Expand All @@ -130,7 +125,7 @@ class TaskFaker : Closeable {

// Execute() must not return until the launched task stalls.
while (!acquiredTaskRunnerLock.get()) {
taskRunner.wait()
taskRunner.condition.await()
}
}

Expand All @@ -141,7 +136,7 @@ class TaskFaker : Closeable {
check(waitingCoordinatorThread != null)

stalledTasks.remove(waitingCoordinatorThread)
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) {
Expand Down Expand Up @@ -170,7 +165,7 @@ class TaskFaker : Closeable {
stalledTasks += currentThread
try {
while (currentThread in stalledTasks) {
taskRunner.wait()
taskRunner.condition.await()
}
} catch (e: InterruptedException) {
stalledTasks.remove(currentThread)
Expand All @@ -182,7 +177,7 @@ class TaskFaker : Closeable {
taskRunner.assertThreadHoldsLock()

stalledTasks.clear()
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

/** Runs all tasks that are ready. Used by the test thread only. */
Expand All @@ -194,7 +189,7 @@ class TaskFaker : Closeable {
fun advanceUntil(newTime: Long) {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
isRunningAllTasks = true
nanoTime = newTime
unstallTasks()
Expand All @@ -207,7 +202,7 @@ class TaskFaker : Closeable {
taskRunner.assertThreadDoesntHoldLock()

while (true) {
synchronized(taskRunner) {
taskRunner.lock.withLock {
if (tasksRunningCount == stalledTasks.size) {
isRunningAllTasks = false
return@waitForTasksToStall // All stalled.
Expand All @@ -222,7 +217,7 @@ class TaskFaker : Closeable {
fun assertNoMoreTasks() {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
assertThat(stalledTasks).isEmpty()
}
}
Expand All @@ -234,7 +229,7 @@ class TaskFaker : Closeable {
// Make sure the coordinator is ready to be interrupted.
runTasks()

synchronized(taskRunner) {
taskRunner.lock.withLock {
val toInterrupt = waitingCoordinatorThread ?: error("no thread currently waiting")
taskBecameStalled.drainPermits()
toInterrupt.interrupt()
Expand All @@ -247,10 +242,10 @@ class TaskFaker : Closeable {
fun runNextTask() {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
check(stalledTasks.size >= 1) { "no tasks to run" }
stalledTasks.removeFirst()
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

waitForTasksToStall()
Expand Down
8 changes: 6 additions & 2 deletions okhttp/api/okhttp.api
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class okhttp3/Cache : java/io/Closeable, java/io/Flushable {
public final fun -deprecated_directory ()Ljava/io/File;
public fun <init> (Ljava/io/File;J)V
public fun <init> (Lokio/Path;JLokio/FileSystem;)V
public fun <init> (Lokio/Path;JLokio/FileSystem;Lokhttp3/internal/concurrent/TaskRunner;)V
yschimke marked this conversation as resolved.
Show resolved Hide resolved
public fun close ()V
public final fun delete ()V
public final fun directory ()Ljava/io/File;
Expand Down Expand Up @@ -378,8 +379,10 @@ public final class okhttp3/ConnectionListener$Companion {
public final class okhttp3/ConnectionPool {
public fun <init> ()V
public fun <init> (IJLjava/util/concurrent/TimeUnit;)V
public fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;)V
public synthetic fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/internal/concurrent/TaskRunner;)V
public synthetic fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/internal/concurrent/TaskRunner;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/internal/concurrent/TaskRunner;Lokhttp3/ConnectionListener;)V
public synthetic fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/internal/concurrent/TaskRunner;Lokhttp3/ConnectionListener;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun connectionCount ()I
public final fun evictAll ()V
public final fun idleConnectionCount ()I
Expand Down Expand Up @@ -980,6 +983,7 @@ public final class okhttp3/OkHttpClient$Builder {
public final fun socketFactory (Ljavax/net/SocketFactory;)Lokhttp3/OkHttpClient$Builder;
public final fun sslSocketFactory (Ljavax/net/ssl/SSLSocketFactory;)Lokhttp3/OkHttpClient$Builder;
public final fun sslSocketFactory (Ljavax/net/ssl/SSLSocketFactory;Ljavax/net/ssl/X509TrustManager;)Lokhttp3/OkHttpClient$Builder;
public final fun taskRunner (Lokhttp3/internal/concurrent/TaskRunner;)Lokhttp3/OkHttpClient$Builder;
public final fun writeTimeout (JLjava/util/concurrent/TimeUnit;)Lokhttp3/OkHttpClient$Builder;
public final fun writeTimeout (Ljava/time/Duration;)Lokhttp3/OkHttpClient$Builder;
}
Expand Down
18 changes: 16 additions & 2 deletions okhttp/src/jvmMain/kotlin/okhttp3/Cache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.security.cert.CertificateEncodingException
import java.security.cert.CertificateException
import java.security.cert.CertificateFactory
import java.util.TreeSet
import java.util.concurrent.TimeUnit
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.internal.EMPTY_HEADERS
Expand All @@ -32,6 +33,7 @@ import okhttp3.internal.cache.CacheStrategy
import okhttp3.internal.cache.DiskLruCache
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http.HttpMethod
import okhttp3.internal.http.StatusLine
import okhttp3.internal.platform.Platform
Expand Down Expand Up @@ -144,15 +146,27 @@ import okio.buffer
class Cache(
directory: Path,
maxSize: Long,
fileSystem: FileSystem
fileSystem: FileSystem,
taskRunner: TaskRunner
yschimke marked this conversation as resolved.
Show resolved Hide resolved
) : Closeable, Flushable {
constructor(
directory: Path,
maxSize: Long,
fileSystem: FileSystem,
) : this(
directory,
maxSize,
fileSystem,
TaskRunner.INSTANCE
)

internal val cache = DiskLruCache(
fileSystem = fileSystem,
directory = directory,
appVersion = VERSION,
valueCount = ENTRY_COUNT,
maxSize = maxSize,
taskRunner = TaskRunner.INSTANCE
taskRunner = taskRunner
)

// read and write statistics, all guarded by 'this'.
Expand Down
35 changes: 24 additions & 11 deletions okhttp/src/jvmMain/kotlin/okhttp3/ConnectionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,42 @@ class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool
) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES,
taskRunner: TaskRunner = TaskRunner.INSTANCE,
yschimke marked this conversation as resolved.
Show resolved Hide resolved
connectionListener: ConnectionListener = ConnectionListener.NONE,
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
taskRunner = taskRunner,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = ConnectionListener.NONE
connectionListener = connectionListener
))

constructor(
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES,
connectionListener: ConnectionListener = ConnectionListener.NONE
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
taskRunner: TaskRunner = TaskRunner.INSTANCE,
yschimke marked this conversation as resolved.
Show resolved Hide resolved
) : this(
taskRunner = taskRunner,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = connectionListener
))
connectionListener = ConnectionListener.NONE
)

constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit,
) : this(
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
taskRunner = TaskRunner.INSTANCE,
connectionListener = ConnectionListener.NONE
)

constructor() : this(5, 5, TimeUnit.MINUTES)

Expand Down
4 changes: 4 additions & 0 deletions okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,10 @@ open class OkHttpClient internal constructor(
this.cache = cache
}

fun taskRunner(taskRunner: TaskRunner) = apply {
yschimke marked this conversation as resolved.
Show resolved Hide resolved
this.taskRunner = taskRunner
}

/**
* Sets the DNS service used to lookup IP addresses for hostnames.
*
Expand Down
17 changes: 16 additions & 1 deletion okhttp/src/jvmMain/kotlin/okhttp3/internal/-UtilJvm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.Locale
import java.util.TimeZone
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.text.Charsets.UTF_16BE
import kotlin.text.Charsets.UTF_16LE
import kotlin.text.Charsets.UTF_32BE
Expand Down Expand Up @@ -257,7 +258,7 @@ internal fun Int.toHexString(): String = Integer.toHexString(this)
internal inline fun Any.wait() = (this as Object).wait()

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.notify() = (this as Object).notify()
inline fun Any.notify() = (this as Object).notify()
yschimke marked this conversation as resolved.
Show resolved Hide resolved

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.notifyAll() = (this as Object).notifyAll()
Expand Down Expand Up @@ -299,13 +300,27 @@ internal val assertionsEnabled: Boolean = OkHttpClient::class.java.desiredAssert
internal val okHttpName: String =
OkHttpClient::class.java.name.removePrefix("okhttp3.").removeSuffix("Client")

@Suppress("NOTHING_TO_INLINE")
internal inline fun ReentrantLock.assertThreadHolds() {
yschimke marked this conversation as resolved.
Show resolved Hide resolved
if (assertionsEnabled && !this.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
yschimke marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !Thread.holdsLock(this)) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun ReentrantLock.assertThreadDoesntHold() {
if (assertionsEnabled && this.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && Thread.holdsLock(this)) {
Expand Down