Skip to content

Commit

Permalink
Loom support. (#1176)
Browse files Browse the repository at this point in the history
Switch from synchronized/wait/notify to ReentrantLock/Condition
  • Loading branch information
yschimke committed Jan 5, 2023
1 parent 4b4b1fd commit f8434f5
Show file tree
Hide file tree
Showing 20 changed files with 485 additions and 105 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/build.yml
Expand Up @@ -42,6 +42,29 @@ jobs:
name: japicmp-report
path: okio/jvm/japicmp/build/reports/japi.txt

loom:
runs-on: ubuntu-latest

strategy:
fail-fast: false

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Validate Gradle Wrapper
uses: gradle/wrapper-validation-action@v1

- name: Configure JDK
uses: actions/setup-java@v2
with:
distribution: 'zulu'
java-version: 19

- name: Test
run: |
./gradlew -DloomEnabled=true build
all-platforms:
runs-on: ${{ matrix.os }}

Expand Down
2 changes: 2 additions & 0 deletions build-support/src/main/kotlin/jvm.kt
@@ -0,0 +1,2 @@
// If true - tests should run for a loom environment.
val loomEnabled = System.getProperty("loomEnabled", "false").toBoolean()
8 changes: 8 additions & 0 deletions build.gradle.kts
Expand Up @@ -176,6 +176,14 @@ subprojects {
exceptionFormat = TestExceptionFormat.FULL
showStandardStreams = false
}

if (loomEnabled) {
jvmArgs = jvmArgs!! + listOf(
"-Djdk.tracePinnedThread=full",
"--enable-preview",
"-DloomEnabled=true"
)
}
}

tasks.withType<AbstractArchiveTask>().configureEach {
Expand Down
46 changes: 46 additions & 0 deletions okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023 Block, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okio

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ThreadFactory

object TestingExecutors {
val isLoom = System.getProperty("loomEnabled").toBoolean()

fun newScheduledExecutorService(corePoolSize: Int = 0): ScheduledExecutorService = if (isLoom) {
Executors.newScheduledThreadPool(corePoolSize, newVirtualThreadFactory())
} else {
Executors.newScheduledThreadPool(corePoolSize)
}

fun newExecutorService(corePoolSize: Int = 0): ExecutorService = if (isLoom) {
Executors.newScheduledThreadPool(corePoolSize, newVirtualThreadFactory())
} else {
Executors.newScheduledThreadPool(corePoolSize)
}

fun newVirtualThreadFactory(): ThreadFactory {
val threadBuilder = Thread::class.java.getMethod("ofVirtual").invoke(null)
return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(threadBuilder) as ThreadFactory
}

fun newVirtualThreadPerTaskExecutor(): ExecutorService {
return Executors::class.java.getMethod("newVirtualThreadPerTaskExecutor").invoke(null) as ExecutorService
}
}
6 changes: 5 additions & 1 deletion okio/src/commonMain/kotlin/okio/-CommonPlatform.kt
Expand Up @@ -23,7 +23,11 @@ internal expect fun String.asUtf8ToByteArray(): ByteArray
// TODO make internal https://youtrack.jetbrains.com/issue/KT-37316
expect class ArrayIndexOutOfBoundsException(message: String?) : IndexOutOfBoundsException

internal expect inline fun <R> synchronized(lock: Any, block: () -> R): R
expect class Lock

expect inline fun <T> Lock.withLock(action: () -> T): T

internal expect fun newLock(): Lock

expect open class IOException(message: String?, cause: Throwable?) : Exception {
constructor(message: String? = null)
Expand Down
30 changes: 16 additions & 14 deletions okio/src/commonMain/kotlin/okio/FileHandle.kt
Expand Up @@ -52,6 +52,8 @@ abstract class FileHandle(
*/
private var openStreamCount = 0

val lock: Lock = newLock()

/**
* Reads at least 1, and up to [byteCount] bytes from this starting at [fileOffset] and copies
* them to [array] at [arrayOffset]. Returns the number of bytes read, or -1 if [fileOffset]
Expand All @@ -64,7 +66,7 @@ abstract class FileHandle(
arrayOffset: Int,
byteCount: Int
): Int {
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
return protectedRead(fileOffset, array, arrayOffset, byteCount)
Expand All @@ -76,7 +78,7 @@ abstract class FileHandle(
*/
@Throws(IOException::class)
fun read(fileOffset: Long, sink: Buffer, byteCount: Long): Long {
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
return readNoCloseCheck(fileOffset, sink, byteCount)
Expand All @@ -87,7 +89,7 @@ abstract class FileHandle(
*/
@Throws(IOException::class)
fun size(): Long {
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
return protectedSize()
Expand All @@ -100,7 +102,7 @@ abstract class FileHandle(
@Throws(IOException::class)
fun resize(size: Long) {
check(readWrite) { "file handle is read-only" }
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
return protectedResize(size)
Expand All @@ -114,7 +116,7 @@ abstract class FileHandle(
byteCount: Int
) {
check(readWrite) { "file handle is read-only" }
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
return protectedWrite(fileOffset, array, arrayOffset, byteCount)
Expand All @@ -124,7 +126,7 @@ abstract class FileHandle(
@Throws(IOException::class)
fun write(fileOffset: Long, source: Buffer, byteCount: Long) {
check(readWrite) { "file handle is read-only" }
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
writeNoCloseCheck(fileOffset, source, byteCount)
Expand All @@ -134,7 +136,7 @@ abstract class FileHandle(
@Throws(IOException::class)
fun flush() {
check(readWrite) { "file handle is read-only" }
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
}
return protectedFlush()
Expand All @@ -146,7 +148,7 @@ abstract class FileHandle(
*/
@Throws(IOException::class)
fun source(fileOffset: Long = 0L): Source {
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
openStreamCount++
}
Expand Down Expand Up @@ -216,7 +218,7 @@ abstract class FileHandle(
@Throws(IOException::class)
fun sink(fileOffset: Long = 0L): Sink {
check(readWrite) { "file handle is read-only" }
synchronized(this) {
lock.withLock {
check(!closed) { "closed" }
openStreamCount++
}
Expand Down Expand Up @@ -282,10 +284,10 @@ abstract class FileHandle(

@Throws(IOException::class)
final override fun close() {
synchronized(this) {
if (closed) return@close
lock.withLock {
if (closed) return
closed = true
if (openStreamCount != 0) return@close
if (openStreamCount != 0) return
}
protectedClose()
}
Expand Down Expand Up @@ -405,7 +407,7 @@ abstract class FileHandle(
override fun close() {
if (closed) return
closed = true
synchronized(fileHandle) {
fileHandle.lock.withLock {
fileHandle.openStreamCount--
if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close
}
Expand All @@ -431,7 +433,7 @@ abstract class FileHandle(
override fun close() {
if (closed) return
closed = true
synchronized(fileHandle) {
fileHandle.lock.withLock {
fileHandle.openStreamCount--
if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close
}
Expand Down
11 changes: 8 additions & 3 deletions okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt
Expand Up @@ -16,16 +16,21 @@

package okio

import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock as jvmWithLock

internal actual fun ByteArray.toUtf8String(): String = String(this, Charsets.UTF_8)

internal actual fun String.asUtf8ToByteArray(): ByteArray = toByteArray(Charsets.UTF_8)

// TODO remove if https://youtrack.jetbrains.com/issue/KT-20641 provides a better solution
actual typealias ArrayIndexOutOfBoundsException = java.lang.ArrayIndexOutOfBoundsException

internal actual inline fun <R> synchronized(lock: Any, block: () -> R): R {
return kotlin.synchronized(lock, block)
}
actual typealias Lock = ReentrantLock

internal actual fun newLock(): Lock = ReentrantLock()

actual inline fun <T> Lock.withLock(action: () -> T): T = jvmWithLock(action)

actual typealias IOException = java.io.IOException

Expand Down
22 changes: 12 additions & 10 deletions okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt
Expand Up @@ -18,6 +18,9 @@ package okio
import java.io.IOException
import java.io.InterruptedIOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/**
* This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
Expand Down Expand Up @@ -179,7 +182,7 @@ open class AsyncTimeout : Timeout() {
while (true) {
try {
var timedOut: AsyncTimeout? = null
synchronized(AsyncTimeout::class.java) {
AsyncTimeout.lock.withLock {
timedOut = awaitTimeout()

// The queue is completely empty. Let this thread exit and let another watchdog thread
Expand All @@ -199,6 +202,9 @@ open class AsyncTimeout : Timeout() {
}

companion object {
val lock: ReentrantLock = ReentrantLock()
val condition: Condition = lock.newCondition()

/**
* Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow
* connections may suffer timeouts even when they're making (slow) progress. Without this,
Expand All @@ -221,7 +227,7 @@ open class AsyncTimeout : Timeout() {
private var head: AsyncTimeout? = null

private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
synchronized(AsyncTimeout::class.java) {
AsyncTimeout.lock.withLock {
check(!node.inQueue) { "Unbalanced enter/exit" }
node.inQueue = true

Expand Down Expand Up @@ -253,7 +259,7 @@ open class AsyncTimeout : Timeout() {
prev.next = node
if (prev === head) {
// Wake up the watchdog when inserting at the front.
(AsyncTimeout::class.java as Object).notify()
condition.signal()
}
break
}
Expand All @@ -264,7 +270,7 @@ open class AsyncTimeout : Timeout() {

/** Returns true if the timeout occurred. */
private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
synchronized(AsyncTimeout::class.java) {
AsyncTimeout.lock.withLock {
if (!node.inQueue) return false
node.inQueue = false

Expand Down Expand Up @@ -299,7 +305,7 @@ open class AsyncTimeout : Timeout() {
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
val startNanos = System.nanoTime()
(AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
head // The idle timeout elapsed.
} else {
Expand All @@ -311,11 +317,7 @@ open class AsyncTimeout : Timeout() {

// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
val waitMillis = waitNanos / 1000000L
waitNanos -= waitMillis * 1000000L
(AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
condition.await(waitNanos, TimeUnit.NANOSECONDS)
return null
}

Expand Down
1 change: 1 addition & 0 deletions okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt
Expand Up @@ -21,6 +21,7 @@ internal class JvmFileHandle(
readWrite: Boolean,
private val randomAccessFile: RandomAccessFile
) : FileHandle(readWrite) {

@Synchronized
override fun protectedResize(size: Long) {
val currentSize = size()
Expand Down

0 comments on commit f8434f5

Please sign in to comment.