Skip to content

Commit

Permalink
Loom Support (#7367)
Browse files Browse the repository at this point in the history
  • Loading branch information
yschimke committed Jan 3, 2023
1 parent 4ae6ce4 commit 33ace75
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 62 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ kotlin.js.compiler=ir
kotlin.incremental.js.ir=true
androidBuild=false
graalBuild=false
loomBuild=false
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
package okhttp3

import android.annotation.SuppressLint
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.logging.Handler
import java.util.logging.Level
import java.util.logging.LogManager
import java.util.logging.LogRecord
import java.util.logging.Logger
import okhttp3.internal.buildConnectionPool
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http2.Http2
import okhttp3.internal.taskRunnerInternal
import okhttp3.testing.Flaky
import okhttp3.testing.PlatformRule.Companion.LOOM_PROPERTY
import okhttp3.testing.PlatformRule.Companion.getPlatformSystemProperty
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.extension.AfterEachCallback
Expand Down Expand Up @@ -116,10 +121,9 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
fun newClient(): OkHttpClient {
var client = testClient
if (client == null) {
client = OkHttpClient.Builder()
client = initialClientBuilder()
.dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses.
.eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) }
.connectionPool(ConnectionPool(connectionListener = connectionListener))
.build()
connectionListener.forbidLock(RealConnectionPool.get(client.connectionPool))
connectionListener.forbidLock(client.dispatcher)
Expand All @@ -128,6 +132,29 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
return client
}

private fun initialClientBuilder(): OkHttpClient.Builder = if (isLoom()) {
val backend = TaskRunner.RealBackend(loomThreadFactory())
val taskRunner = TaskRunner(backend)

OkHttpClient.Builder()
.connectionPool(buildConnectionPool(connectionListener = connectionListener, taskRunner = taskRunner))
.dispatcher(Dispatcher(backend.executor))
.taskRunnerInternal(taskRunner)
} else {
OkHttpClient.Builder()
.connectionPool(ConnectionPool(connectionListener = connectionListener))
}

private fun loomThreadFactory(): ThreadFactory {
val ofVirtual = Thread::class.java.getMethod("ofVirtual").invoke(null)

return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(ofVirtual) as ThreadFactory
}

private fun isLoom(): Boolean {
return getPlatformSystemProperty() == LOOM_PROPERTY
}

fun newClientBuilder(): OkHttpClient.Builder {
return newClient().newBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Logger
import kotlin.concurrent.withLock

/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
Expand All @@ -47,24 +48,18 @@ import java.util.logging.Logger
class TaskFaker : Closeable {
@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !Thread.holdsLock(this)) {
if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && Thread.holdsLock(this)) {
if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.wait() = (this as Object).wait()

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
internal inline fun Any.notifyAll() = (this as Object).notifyAll()

val logger = Logger.getLogger("TaskFaker." + instance++)

/** Though this executor service may hold many threads, they are not executed concurrently. */
Expand Down Expand Up @@ -108,9 +103,9 @@ class TaskFaker : Closeable {
val acquiredTaskRunnerLock = AtomicBoolean()

tasksExecutor.execute {
synchronized(taskRunner) {
taskRunner.lock.withLock {
acquiredTaskRunnerLock.set(true)
taskRunner.notifyAll()
taskRunner.condition.signalAll()

tasksRunningCount++
if (tasksRunningCount > 1) isParallel = true
Expand All @@ -130,7 +125,7 @@ class TaskFaker : Closeable {

// Execute() must not return until the launched task stalls.
while (!acquiredTaskRunnerLock.get()) {
taskRunner.wait()
taskRunner.condition.await()
}
}

Expand All @@ -141,7 +136,7 @@ class TaskFaker : Closeable {
check(waitingCoordinatorThread != null)

stalledTasks.remove(waitingCoordinatorThread)
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) {
Expand Down Expand Up @@ -170,7 +165,7 @@ class TaskFaker : Closeable {
stalledTasks += currentThread
try {
while (currentThread in stalledTasks) {
taskRunner.wait()
taskRunner.condition.await()
}
} catch (e: InterruptedException) {
stalledTasks.remove(currentThread)
Expand All @@ -182,7 +177,7 @@ class TaskFaker : Closeable {
taskRunner.assertThreadHoldsLock()

stalledTasks.clear()
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

/** Runs all tasks that are ready. Used by the test thread only. */
Expand All @@ -194,7 +189,7 @@ class TaskFaker : Closeable {
fun advanceUntil(newTime: Long) {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
isRunningAllTasks = true
nanoTime = newTime
unstallTasks()
Expand All @@ -207,7 +202,7 @@ class TaskFaker : Closeable {
taskRunner.assertThreadDoesntHoldLock()

while (true) {
synchronized(taskRunner) {
taskRunner.lock.withLock {
if (tasksRunningCount == stalledTasks.size) {
isRunningAllTasks = false
return@waitForTasksToStall // All stalled.
Expand All @@ -222,7 +217,7 @@ class TaskFaker : Closeable {
fun assertNoMoreTasks() {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
assertThat(stalledTasks).isEmpty()
}
}
Expand All @@ -234,7 +229,7 @@ class TaskFaker : Closeable {
// Make sure the coordinator is ready to be interrupted.
runTasks()

synchronized(taskRunner) {
taskRunner.lock.withLock {
val toInterrupt = waitingCoordinatorThread ?: error("no thread currently waiting")
taskBecameStalled.drainPermits()
toInterrupt.interrupt()
Expand All @@ -247,10 +242,10 @@ class TaskFaker : Closeable {
fun runNextTask() {
taskRunner.assertThreadDoesntHoldLock()

synchronized(taskRunner) {
taskRunner.lock.withLock {
check(stalledTasks.size >= 1) { "no tasks to run" }
stalledTasks.removeFirst()
taskRunner.notifyAll()
taskRunner.condition.signalAll()
}

waitForTasksToStall()
Expand Down
4 changes: 2 additions & 2 deletions okhttp/api/okhttp.api
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ public final class okhttp3/ConnectionListener$Companion {
public final class okhttp3/ConnectionPool {
public fun <init> ()V
public fun <init> (IJLjava/util/concurrent/TimeUnit;)V
public fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;)V
public synthetic fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lokhttp3/ConnectionListener;IJLjava/util/concurrent/TimeUnit;)V
public synthetic fun <init> (Lokhttp3/ConnectionListener;IJLjava/util/concurrent/TimeUnit;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun connectionCount ()I
public final fun evictAll ()V
public final fun idleConnectionCount ()I
Expand Down
20 changes: 17 additions & 3 deletions okhttp/src/jvmMain/kotlin/okhttp3/Cache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.security.cert.CertificateEncodingException
import java.security.cert.CertificateException
import java.security.cert.CertificateFactory
import java.util.TreeSet
import java.util.concurrent.TimeUnit
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.internal.EMPTY_HEADERS
Expand All @@ -32,6 +33,7 @@ import okhttp3.internal.cache.CacheStrategy
import okhttp3.internal.cache.DiskLruCache
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http.HttpMethod
import okhttp3.internal.http.StatusLine
import okhttp3.internal.platform.Platform
Expand Down Expand Up @@ -141,18 +143,30 @@ import okio.buffer
*
* [rfc_7234]: http://tools.ietf.org/html/rfc7234
*/
class Cache(
class Cache internal constructor(
directory: Path,
maxSize: Long,
fileSystem: FileSystem
fileSystem: FileSystem,
taskRunner: TaskRunner
) : Closeable, Flushable {
constructor(
directory: Path,
maxSize: Long,
fileSystem: FileSystem,
) : this(
directory,
maxSize,
fileSystem,
TaskRunner.INSTANCE
)

internal val cache = DiskLruCache(
fileSystem = fileSystem,
directory = directory,
appVersion = VERSION,
valueCount = ENTRY_COUNT,
maxSize = maxSize,
taskRunner = TaskRunner.INSTANCE
taskRunner = taskRunner
)

// read and write statistics, all guarded by 'this'.
Expand Down
33 changes: 24 additions & 9 deletions okhttp/src/jvmMain/kotlin/okhttp3/ConnectionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,45 @@ import okhttp3.internal.connection.RealConnectionPool
class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool
) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
internal constructor(
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES,
taskRunner: TaskRunner = TaskRunner.INSTANCE,
connectionListener: ConnectionListener = ConnectionListener.NONE,
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
taskRunner = taskRunner,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = ConnectionListener.NONE
connectionListener = connectionListener
))

constructor(
connectionListener: ConnectionListener = ConnectionListener.NONE,
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES,
connectionListener: ConnectionListener = ConnectionListener.NONE
) : this(RealConnectionPool(
) : this(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
connectionListener = connectionListener
))
)

// Public API
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit,
) : this(
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit,
taskRunner = TaskRunner.INSTANCE,
connectionListener = ConnectionListener.NONE
)

constructor() : this(5, 5, TimeUnit.MINUTES)

Expand Down
4 changes: 4 additions & 0 deletions okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,10 @@ open class OkHttpClient internal constructor(
this.cache = cache
}

internal fun taskRunner(taskRunner: TaskRunner) = apply {
this.taskRunner = taskRunner
}

/**
* Sets the DNS service used to lookup IP addresses for hostnames.
*
Expand Down
15 changes: 15 additions & 0 deletions okhttp/src/jvmMain/kotlin/okhttp3/internal/-UtilJvm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.Locale
import java.util.TimeZone
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.text.Charsets.UTF_16BE
import kotlin.text.Charsets.UTF_16LE
import kotlin.text.Charsets.UTF_32BE
Expand Down Expand Up @@ -299,13 +300,27 @@ internal val assertionsEnabled: Boolean = OkHttpClient::class.java.desiredAssert
internal val okHttpName: String =
OkHttpClient::class.java.name.removePrefix("okhttp3.").removeSuffix("Client")

@Suppress("NOTHING_TO_INLINE")
internal inline fun ReentrantLock.assertHeld() {
if (assertionsEnabled && !this.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadHoldsLock() {
if (assertionsEnabled && !Thread.holdsLock(this)) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun ReentrantLock.assertNotHeld() {
if (assertionsEnabled && this.isHeldByCurrentThread) {
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
}
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Any.assertThreadDoesntHoldLock() {
if (assertionsEnabled && Thread.holdsLock(this)) {
Expand Down

0 comments on commit 33ace75

Please sign in to comment.