From f8434f575787198928a26334758ddbca9726b11c Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 12:16:41 +1000 Subject: [PATCH] Loom support. (#1176) Switch from synchronized/wait/notify to ReentrantLock/Condition --- .github/workflows/build.yml | 23 +++ build-support/src/main/kotlin/jvm.kt | 2 + build.gradle.kts | 8 + .../jvmMain/kotlin/okio/TestingExecutors.kt | 46 +++++ .../commonMain/kotlin/okio/-CommonPlatform.kt | 6 +- okio/src/commonMain/kotlin/okio/FileHandle.kt | 30 +-- okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt | 11 +- okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt | 22 ++- okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt | 1 + okio/src/jvmMain/kotlin/okio/Pipe.kt | 47 +++-- okio/src/jvmMain/kotlin/okio/Throttler.kt | 21 +-- okio/src/jvmMain/kotlin/okio/Timeout.kt | 82 +++++++++ okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 171 ++++++++++++++++++ .../jvmTest/java/okio/LargeStreamsTest.java | 2 +- okio/src/jvmTest/java/okio/PipeTest.java | 97 ++++++---- .../java/okio/WaitUntilNotifiedTest.java | 2 +- .../src/jvmTest/kotlin/okio/PipeKotlinTest.kt | 3 +- okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt | 3 +- okio/src/jvmTest/kotlin/okio/TimeoutTest.kt | 3 +- .../nonJvmMain/kotlin/okio/-NonJvmPlatform.kt | 10 +- 20 files changed, 485 insertions(+), 105 deletions(-) create mode 100644 build-support/src/main/kotlin/jvm.kt create mode 100644 okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt create mode 100644 okio/src/jvmTest/java/okio/AwaitSignalTest.kt diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7a5018ab19..81a239bdbd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,6 +42,29 @@ jobs: name: japicmp-report path: okio/jvm/japicmp/build/reports/japi.txt + loom: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Validate Gradle Wrapper + uses: gradle/wrapper-validation-action@v1 + + - name: Configure JDK + uses: actions/setup-java@v2 + with: + distribution: 'zulu' + java-version: 19 + + - name: Test + run: | + ./gradlew -DloomEnabled=true build + all-platforms: runs-on: ${{ matrix.os }} diff --git a/build-support/src/main/kotlin/jvm.kt b/build-support/src/main/kotlin/jvm.kt new file mode 100644 index 0000000000..fe13ad5295 --- /dev/null +++ b/build-support/src/main/kotlin/jvm.kt @@ -0,0 +1,2 @@ +// If true - tests should run for a loom environment. +val loomEnabled = System.getProperty("loomEnabled", "false").toBoolean() diff --git a/build.gradle.kts b/build.gradle.kts index 08b301cccb..6509938e2a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -176,6 +176,14 @@ subprojects { exceptionFormat = TestExceptionFormat.FULL showStandardStreams = false } + + if (loomEnabled) { + jvmArgs = jvmArgs!! + listOf( + "-Djdk.tracePinnedThread=full", + "--enable-preview", + "-DloomEnabled=true" + ) + } } tasks.withType().configureEach { diff --git a/okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt b/okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt new file mode 100644 index 0000000000..4561240f15 --- /dev/null +++ b/okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2023 Block, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ThreadFactory + +object TestingExecutors { + val isLoom = System.getProperty("loomEnabled").toBoolean() + + fun newScheduledExecutorService(corePoolSize: Int = 0): ScheduledExecutorService = if (isLoom) { + Executors.newScheduledThreadPool(corePoolSize, newVirtualThreadFactory()) + } else { + Executors.newScheduledThreadPool(corePoolSize) + } + + fun newExecutorService(corePoolSize: Int = 0): ExecutorService = if (isLoom) { + Executors.newScheduledThreadPool(corePoolSize, newVirtualThreadFactory()) + } else { + Executors.newScheduledThreadPool(corePoolSize) + } + + fun newVirtualThreadFactory(): ThreadFactory { + val threadBuilder = Thread::class.java.getMethod("ofVirtual").invoke(null) + return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(threadBuilder) as ThreadFactory + } + + fun newVirtualThreadPerTaskExecutor(): ExecutorService { + return Executors::class.java.getMethod("newVirtualThreadPerTaskExecutor").invoke(null) as ExecutorService + } +} diff --git a/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt b/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt index 72ccec4161..d9af5989a5 100644 --- a/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt +++ b/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt @@ -23,7 +23,11 @@ internal expect fun String.asUtf8ToByteArray(): ByteArray // TODO make internal https://youtrack.jetbrains.com/issue/KT-37316 expect class ArrayIndexOutOfBoundsException(message: String?) : IndexOutOfBoundsException -internal expect inline fun synchronized(lock: Any, block: () -> R): R +expect class Lock + +expect inline fun Lock.withLock(action: () -> T): T + +internal expect fun newLock(): Lock expect open class IOException(message: String?, cause: Throwable?) : Exception { constructor(message: String? = null) diff --git a/okio/src/commonMain/kotlin/okio/FileHandle.kt b/okio/src/commonMain/kotlin/okio/FileHandle.kt index 606b74cb23..472e0e47a2 100644 --- a/okio/src/commonMain/kotlin/okio/FileHandle.kt +++ b/okio/src/commonMain/kotlin/okio/FileHandle.kt @@ -52,6 +52,8 @@ abstract class FileHandle( */ private var openStreamCount = 0 + val lock: Lock = newLock() + /** * Reads at least 1, and up to [byteCount] bytes from this starting at [fileOffset] and copies * them to [array] at [arrayOffset]. Returns the number of bytes read, or -1 if [fileOffset] @@ -64,7 +66,7 @@ abstract class FileHandle( arrayOffset: Int, byteCount: Int ): Int { - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } return protectedRead(fileOffset, array, arrayOffset, byteCount) @@ -76,7 +78,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun read(fileOffset: Long, sink: Buffer, byteCount: Long): Long { - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } return readNoCloseCheck(fileOffset, sink, byteCount) @@ -87,7 +89,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun size(): Long { - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } return protectedSize() @@ -100,7 +102,7 @@ abstract class FileHandle( @Throws(IOException::class) fun resize(size: Long) { check(readWrite) { "file handle is read-only" } - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } return protectedResize(size) @@ -114,7 +116,7 @@ abstract class FileHandle( byteCount: Int ) { check(readWrite) { "file handle is read-only" } - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } return protectedWrite(fileOffset, array, arrayOffset, byteCount) @@ -124,7 +126,7 @@ abstract class FileHandle( @Throws(IOException::class) fun write(fileOffset: Long, source: Buffer, byteCount: Long) { check(readWrite) { "file handle is read-only" } - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } writeNoCloseCheck(fileOffset, source, byteCount) @@ -134,7 +136,7 @@ abstract class FileHandle( @Throws(IOException::class) fun flush() { check(readWrite) { "file handle is read-only" } - synchronized(this) { + lock.withLock { check(!closed) { "closed" } } return protectedFlush() @@ -146,7 +148,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun source(fileOffset: Long = 0L): Source { - synchronized(this) { + lock.withLock { check(!closed) { "closed" } openStreamCount++ } @@ -216,7 +218,7 @@ abstract class FileHandle( @Throws(IOException::class) fun sink(fileOffset: Long = 0L): Sink { check(readWrite) { "file handle is read-only" } - synchronized(this) { + lock.withLock { check(!closed) { "closed" } openStreamCount++ } @@ -282,10 +284,10 @@ abstract class FileHandle( @Throws(IOException::class) final override fun close() { - synchronized(this) { - if (closed) return@close + lock.withLock { + if (closed) return closed = true - if (openStreamCount != 0) return@close + if (openStreamCount != 0) return } protectedClose() } @@ -405,7 +407,7 @@ abstract class FileHandle( override fun close() { if (closed) return closed = true - synchronized(fileHandle) { + fileHandle.lock.withLock { fileHandle.openStreamCount-- if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close } @@ -431,7 +433,7 @@ abstract class FileHandle( override fun close() { if (closed) return closed = true - synchronized(fileHandle) { + fileHandle.lock.withLock { fileHandle.openStreamCount-- if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close } diff --git a/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt b/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt index b0a2b6498c..f40032db5b 100644 --- a/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt +++ b/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt @@ -16,6 +16,9 @@ package okio +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock as jvmWithLock + internal actual fun ByteArray.toUtf8String(): String = String(this, Charsets.UTF_8) internal actual fun String.asUtf8ToByteArray(): ByteArray = toByteArray(Charsets.UTF_8) @@ -23,9 +26,11 @@ internal actual fun String.asUtf8ToByteArray(): ByteArray = toByteArray(Charsets // TODO remove if https://youtrack.jetbrains.com/issue/KT-20641 provides a better solution actual typealias ArrayIndexOutOfBoundsException = java.lang.ArrayIndexOutOfBoundsException -internal actual inline fun synchronized(lock: Any, block: () -> R): R { - return kotlin.synchronized(lock, block) -} +actual typealias Lock = ReentrantLock + +internal actual fun newLock(): Lock = ReentrantLock() + +actual inline fun Lock.withLock(action: () -> T): T = jvmWithLock(action) actual typealias IOException = java.io.IOException diff --git a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt index 2077832713..87d87d774e 100644 --- a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt +++ b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt @@ -18,6 +18,9 @@ package okio import java.io.IOException import java.io.InterruptedIOException import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to @@ -179,7 +182,7 @@ open class AsyncTimeout : Timeout() { while (true) { try { var timedOut: AsyncTimeout? = null - synchronized(AsyncTimeout::class.java) { + AsyncTimeout.lock.withLock { timedOut = awaitTimeout() // The queue is completely empty. Let this thread exit and let another watchdog thread @@ -199,6 +202,9 @@ open class AsyncTimeout : Timeout() { } companion object { + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + /** * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow * connections may suffer timeouts even when they're making (slow) progress. Without this, @@ -221,7 +227,7 @@ open class AsyncTimeout : Timeout() { private var head: AsyncTimeout? = null private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { - synchronized(AsyncTimeout::class.java) { + AsyncTimeout.lock.withLock { check(!node.inQueue) { "Unbalanced enter/exit" } node.inQueue = true @@ -253,7 +259,7 @@ open class AsyncTimeout : Timeout() { prev.next = node if (prev === head) { // Wake up the watchdog when inserting at the front. - (AsyncTimeout::class.java as Object).notify() + condition.signal() } break } @@ -264,7 +270,7 @@ open class AsyncTimeout : Timeout() { /** Returns true if the timeout occurred. */ private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean { - synchronized(AsyncTimeout::class.java) { + AsyncTimeout.lock.withLock { if (!node.inQueue) return false node.inQueue = false @@ -299,7 +305,7 @@ open class AsyncTimeout : Timeout() { // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. if (node == null) { val startNanos = System.nanoTime() - (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS) + condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) { head // The idle timeout elapsed. } else { @@ -311,11 +317,7 @@ open class AsyncTimeout : Timeout() { // The head of the queue hasn't timed out yet. Await that. if (waitNanos > 0) { - // Waiting is made complicated by the fact that we work in nanoseconds, - // but the API wants (millis, nanos) in two arguments. - val waitMillis = waitNanos / 1000000L - waitNanos -= waitMillis * 1000000L - (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt()) + condition.await(waitNanos, TimeUnit.NANOSECONDS) return null } diff --git a/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt b/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt index 3bbb04107a..55851538c4 100644 --- a/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt +++ b/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt @@ -21,6 +21,7 @@ internal class JvmFileHandle( readWrite: Boolean, private val randomAccessFile: RandomAccessFile ) : FileHandle(readWrite) { + @Synchronized override fun protectedResize(size: Long) { val currentSize = size() diff --git a/okio/src/jvmMain/kotlin/okio/Pipe.kt b/okio/src/jvmMain/kotlin/okio/Pipe.kt index 43c23bfd3a..dfcbbc54a3 100644 --- a/okio/src/jvmMain/kotlin/okio/Pipe.kt +++ b/okio/src/jvmMain/kotlin/okio/Pipe.kt @@ -15,6 +15,10 @@ */ package okio +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + /** * A source and a sink that are attached. The sink's output is the source's input. Typically each * is accessed by its own thread: a producer thread writes data to the sink and a consumer thread @@ -40,6 +44,9 @@ class Pipe(internal val maxBufferSize: Long) { internal var sourceClosed = false internal var foldedSink: Sink? = null + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + init { require(maxBufferSize >= 1L) { "maxBufferSize < 1: $maxBufferSize" } } @@ -51,21 +58,21 @@ class Pipe(internal val maxBufferSize: Long) { override fun write(source: Buffer, byteCount: Long) { var byteCount = byteCount var delegate: Sink? = null - synchronized(buffer) { + lock.withLock { check(!sinkClosed) { "closed" } if (canceled) throw IOException("canceled") while (byteCount > 0) { foldedSink?.let { delegate = it - return@synchronized + return@withLock } if (sourceClosed) throw IOException("source is closed") val bufferSpaceAvailable = maxBufferSize - buffer.size if (bufferSpaceAvailable == 0L) { - timeout.waitUntilNotified(buffer) // Wait until the source drains the buffer. + timeout.awaitSignal(condition) // Wait until the source drains the buffer. if (canceled) throw IOException("canceled") continue } @@ -73,7 +80,7 @@ class Pipe(internal val maxBufferSize: Long) { val bytesToWrite = minOf(bufferSpaceAvailable, byteCount) buffer.write(source, bytesToWrite) byteCount -= bytesToWrite - (buffer as Object).notifyAll() // Notify the source that it can resume reading. + condition.signalAll() // Notify the source that it can resume reading. } } @@ -82,13 +89,13 @@ class Pipe(internal val maxBufferSize: Long) { override fun flush() { var delegate: Sink? = null - synchronized(buffer) { + lock.withLock { check(!sinkClosed) { "closed" } if (canceled) throw IOException("canceled") foldedSink?.let { delegate = it - return@synchronized + return@withLock } if (sourceClosed && buffer.size > 0L) { @@ -101,17 +108,17 @@ class Pipe(internal val maxBufferSize: Long) { override fun close() { var delegate: Sink? = null - synchronized(buffer) { + lock.withLock { if (sinkClosed) return foldedSink?.let { delegate = it - return@synchronized + return@withLock } if (sourceClosed && buffer.size > 0L) throw IOException("source is closed") sinkClosed = true - (buffer as Object).notifyAll() // Notify the source that no more bytes are coming. + condition.signalAll() // Notify the source that no more bytes are coming. } delegate?.forward { close() } @@ -125,26 +132,26 @@ class Pipe(internal val maxBufferSize: Long) { private val timeout = Timeout() override fun read(sink: Buffer, byteCount: Long): Long { - synchronized(buffer) { + lock.withLock { check(!sourceClosed) { "closed" } if (canceled) throw IOException("canceled") while (buffer.size == 0L) { if (sinkClosed) return -1L - timeout.waitUntilNotified(buffer) // Wait until the sink fills the buffer. + timeout.awaitSignal(condition) // Wait until the sink fills the buffer. if (canceled) throw IOException("canceled") } val result = buffer.read(sink, byteCount) - (buffer as Object).notifyAll() // Notify the sink that it can resume writing. + condition.signalAll() // Notify the sink that it can resume writing. return result } } override fun close() { - synchronized(buffer) { + lock.withLock { sourceClosed = true - (buffer as Object).notifyAll() // Notify the sink that no more bytes are desired. + condition.signalAll() // Notify the sink that no more bytes are desired. } } @@ -166,7 +173,7 @@ class Pipe(internal val maxBufferSize: Long) { // must copy it to sink without holding any locks, then try it all again. var closed = false lateinit var sinkBuffer: Buffer - synchronized(buffer) { + lock.withLock { check(foldedSink == null) { "sink already folded" } if (canceled) { @@ -183,7 +190,7 @@ class Pipe(internal val maxBufferSize: Long) { closed = sinkClosed sinkBuffer = Buffer() sinkBuffer.write(buffer, buffer.size) - (buffer as Object).notifyAll() // Notify the sink that it can resume writing. + condition.signalAll() // Notify the sink that it can resume writing. } var success = false @@ -197,9 +204,9 @@ class Pipe(internal val maxBufferSize: Long) { success = true } finally { if (!success) { - synchronized(buffer) { + lock.withLock { sourceClosed = true - (buffer as Object).notifyAll() // Notify the sink that it can resume writing. + condition.signalAll() // Notify the sink that it can resume writing. } } } @@ -240,10 +247,10 @@ class Pipe(internal val maxBufferSize: Long) { * operating on the source or the sink. */ fun cancel() { - synchronized(buffer) { + lock.withLock { canceled = true buffer.clear() - (buffer as Object).notifyAll() // Notify the source and sink that they're canceled. + condition.signalAll() // Notify the source and sink that they're canceled. } } } diff --git a/okio/src/jvmMain/kotlin/okio/Throttler.kt b/okio/src/jvmMain/kotlin/okio/Throttler.kt index dbb83fe3ba..675322b77c 100644 --- a/okio/src/jvmMain/kotlin/okio/Throttler.kt +++ b/okio/src/jvmMain/kotlin/okio/Throttler.kt @@ -17,6 +17,9 @@ package okio import java.io.IOException import java.io.InterruptedIOException +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * Enables limiting of Source and Sink throughput. Attach to this throttler via [source] and [sink] @@ -46,6 +49,9 @@ class Throttler internal constructor( private var waitByteCount: Long = 8 * 1024 // 8 KiB. private var maxByteCount: Long = 256 * 1024 // 256 KiB. + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + constructor() : this(allocatedUntil = System.nanoTime()) /** Sets the rate at which bytes will be allocated. Use 0 for no limit. */ @@ -55,7 +61,7 @@ class Throttler internal constructor( waitByteCount: Long = this.waitByteCount, maxByteCount: Long = this.maxByteCount ) { - synchronized(this) { + lock.withLock { require(bytesPerSecond >= 0) require(waitByteCount > 0) require(maxByteCount >= waitByteCount) @@ -63,7 +69,7 @@ class Throttler internal constructor( this.bytesPerSecond = bytesPerSecond this.waitByteCount = waitByteCount this.maxByteCount = maxByteCount - (this as Object).notifyAll() + condition.signalAll() } } @@ -74,15 +80,14 @@ class Throttler internal constructor( internal fun take(byteCount: Long): Long { require(byteCount > 0) - synchronized(this) { + lock.withLock { while (true) { val now = System.nanoTime() val byteCountOrWaitNanos = byteCountOrWaitNanos(now, byteCount) if (byteCountOrWaitNanos >= 0) return byteCountOrWaitNanos - waitNanos(-byteCountOrWaitNanos) + condition.awaitNanos(-byteCountOrWaitNanos) } } - throw AssertionError() // Unreachable, but synchronized() doesn't know that. } /** @@ -125,12 +130,6 @@ class Throttler internal constructor( private fun Long.bytesToNanos() = this * 1_000_000_000L / bytesPerSecond - private fun waitNanos(nanosToWait: Long) { - val millisToWait = nanosToWait / 1_000_000L - val remainderNanos = nanosToWait - (millisToWait * 1_000_000L) - (this as Object).wait(millisToWait, remainderNanos.toInt()) - } - /** Create a Source which honors this Throttler. */ fun source(source: Source): Source { return object : ForwardingSource(source) { diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index b45d2ab6b9..e9a9302697 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -18,6 +18,7 @@ package okio import java.io.IOException import java.io.InterruptedIOException import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition actual open class Timeout { /** @@ -103,6 +104,87 @@ actual open class Timeout { } } + /** + * Waits on `monitor` until it is signaled. Throws [InterruptedIOException] if either the thread + * is interrupted or if this timeout elapses before `monitor` is signaled. + * The caller must hold the lock that monitor is bound to. + * + * Here's a sample class that uses `awaitSignal()` to await a specific state. Note that the + * call is made within a loop to avoid unnecessary waiting and to mitigate spurious notifications. + * ``` + * class Dice { + * Random random = new Random(); + * int latestTotal; + * + * ReentrantLock lock = new ReentrantLock(); + * Condition condition = lock.newCondition(); + * + * public void roll() { + * lock.withLock { + * latestTotal = 2 + random.nextInt(6) + random.nextInt(6); + * System.out.println("Rolled " + latestTotal); + * condition.signalAll(); + * } + * } + * + * public void rollAtFixedRate(int period, TimeUnit timeUnit) { + * Executors.newScheduledThreadPool(0).scheduleAtFixedRate(new Runnable() { + * public void run() { + * roll(); + * } + * }, 0, period, timeUnit); + * } + * + * public void awaitTotal(Timeout timeout, int total) + * throws InterruptedIOException { + * lock.withLock { + * while (latestTotal != total) { + * timeout.awaitSignal(this); + * } + * } + * } + * } + * ``` + */ + @Throws(InterruptedIOException::class) + fun awaitSignal(condition: Condition) { + try { + val hasDeadline = hasDeadline() + val timeoutNanos = timeoutNanos() + + if (!hasDeadline && timeoutNanos == 0L) { + condition.await() // There is no timeout: wait forever. + return + } + + // Compute how long we'll wait. + val start = System.nanoTime() + val waitNanos = if (hasDeadline && timeoutNanos != 0L) { + val deadlineNanos = deadlineNanoTime() - start + minOf(timeoutNanos, deadlineNanos) + } else if (hasDeadline) { + deadlineNanoTime() - start + } else { + timeoutNanos + } + + // Attempt to wait that long. This will break out early if the monitor is notified. + var elapsedNanos = 0L + if (waitNanos > 0L) { + condition.await(waitNanos, TimeUnit.NANOSECONDS) + elapsedNanos = System.nanoTime() - start + } + + // Throw if the timeout elapsed before the monitor was notified. + if (elapsedNanos >= waitNanos) { + throw InterruptedIOException("timeout") + } + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() // Retain interrupted status. + throw InterruptedIOException("interrupted") + } + } + /** * Waits on `monitor` until it is notified. Throws [InterruptedIOException] if either the thread * is interrupted or if this timeout elapses before `monitor` is notified. The caller must be diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt new file mode 100644 index 0000000000..78e5d782a2 --- /dev/null +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2023 Block Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import okio.TestUtil.assumeNotWindows +import org.junit.After +import org.junit.Assert +import org.junit.Test +import java.io.InterruptedIOException +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock + +class AwaitSignalTest { + val executorService = TestingExecutors.newScheduledExecutorService(0) + + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + + @After + fun tearDown() { + executorService.shutdown() + } + + @Test + fun signaled() = lock.withLock { + val timeout = Timeout() + timeout.timeout(5000, TimeUnit.MILLISECONDS) + val start = now() + executorService.schedule( + { lock.withLock { condition.signal() } }, + 1000, + TimeUnit.MILLISECONDS + ) + timeout.awaitSignal(condition) + assertElapsed(1000.0, start) + } + + @Test + fun timeout() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun deadline() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.deadline(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun deadlineBeforeTimeout() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(5000, TimeUnit.MILLISECONDS) + timeout.deadline(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun timeoutBeforeDeadline() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(1000, TimeUnit.MILLISECONDS) + timeout.deadline(5000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun deadlineAlreadyReached() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.deadlineNanoTime(System.nanoTime()) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(0.0, start) + } + + @Test + fun threadInterrupted() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + val start = now() + Thread.currentThread().interrupt() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("interrupted", expected.message) + Assert.assertTrue(Thread.interrupted()) + } + assertElapsed(0.0, start) + } + + @Test + fun threadInterruptedOnThrowIfReached() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + Thread.currentThread().interrupt() + try { + timeout.throwIfReached() + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("interrupted", expected.message) + Assert.assertTrue(Thread.interrupted()) + } + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private fun now(): Double { + return System.nanoTime() / 1000000.0 + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+450 milliseconds. + */ + private fun assertElapsed(duration: Double, start: Double) { + Assert.assertEquals(duration, now() - start - 200.0, 250.0) + } +} diff --git a/okio/src/jvmTest/java/okio/LargeStreamsTest.java b/okio/src/jvmTest/java/okio/LargeStreamsTest.java index b9be1a2e42..6752017d35 100644 --- a/okio/src/jvmTest/java/okio/LargeStreamsTest.java +++ b/okio/src/jvmTest/java/okio/LargeStreamsTest.java @@ -104,7 +104,7 @@ private Long readAllAndClose(Source source, Sink sink) throws IOException { /** Calls {@link #readAllAndClose} on a background thread. */ private Future readAllAndCloseAsync(final Source source, final Sink sink) { - ExecutorService executor = Executors.newSingleThreadExecutor(); + ExecutorService executor = TestingExecutors.INSTANCE.newExecutorService(0); try { return executor.submit(new Callable() { @Override public Long call() throws Exception { diff --git a/okio/src/jvmTest/java/okio/PipeTest.java b/okio/src/jvmTest/java/okio/PipeTest.java index 030e6ba0b7..7c1e1df644 100644 --- a/okio/src/jvmTest/java/okio/PipeTest.java +++ b/okio/src/jvmTest/java/okio/PipeTest.java @@ -15,29 +15,29 @@ */ package okio; +import org.junit.After; +import org.junit.Test; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.Random; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public final class PipeTest { - final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + final ScheduledExecutorService executorService = TestingExecutors.INSTANCE.newScheduledExecutorService(2); - @After public void tearDown() throws Exception { + @After + public void tearDown() throws Exception { executorService.shutdown(); } - @Test public void test() throws Exception { + @Test + public void test() throws Exception { Pipe pipe = new Pipe(6); pipe.sink().write(new Buffer().writeUtf8("abc"), 3L); @@ -56,14 +56,16 @@ public final class PipeTest { * A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a * consumer consumes them. Both compute hashes of their data to confirm that they're as expected. */ - @Test public void largeDataset() throws Exception { + @Test + public void largeDataset() throws Exception { final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange. final long totalBytes = 16L * 1024L * 1024L; ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d"); // Write data to the sink. Future sinkHash = executorService.submit(new Callable() { - @Override public ByteString call() throws Exception { + @Override + public ByteString call() throws Exception { HashingSink hashingSink = HashingSink.sha1(pipe.sink()); Random random = new Random(0); byte[] data = new byte[8192]; @@ -82,7 +84,8 @@ public final class PipeTest { // Read data from the source. Future sourceHash = executorService.submit(new Callable() { - @Override public ByteString call() throws Exception { + @Override + public ByteString call() throws Exception { Buffer blackhole = new Buffer(); HashingSink hashingSink = HashingSink.sha1(blackhole); @@ -101,7 +104,8 @@ public final class PipeTest { assertEquals(expectedHash, sourceHash.get()); } - @Test public void sinkTimeout() throws Exception { + @Test + public void sinkTimeout() throws Exception { TestUtil.INSTANCE.assumeNotWindows(); Pipe pipe = new Pipe(3); @@ -121,7 +125,8 @@ public final class PipeTest { assertEquals("abc", readBuffer.readUtf8()); } - @Test public void sourceTimeout() throws Exception { + @Test + public void sourceTimeout() throws Exception { TestUtil.INSTANCE.assumeNotWindows(); Pipe pipe = new Pipe(3L); @@ -142,8 +147,8 @@ public final class PipeTest { * The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates * sleeping 1000 ms, then reading 3 bytes. That should make for an approximate timeline like * this: - * - * 0: writer writes 'abc', blocks 0: reader sleeps until 1000 + *

+ * 0: writer writes 'abc', blocks 0: reader sleeps until 1000 * 1000: reader reads 'abc', sleeps until 2000 * 1000: writer writes 'def', blocks * 2000: reader reads 'def', sleeps until 3000 @@ -151,13 +156,15 @@ public final class PipeTest { * 3000: reader reads 'ghi', sleeps until 4000 * 3000: writer writes 'jkl', returns * 4000: reader reads 'jkl', returns - * + *

* Because the writer is writing to a buffer, it finishes before the reader does. */ - @Test public void sinkBlocksOnSlowReader() throws Exception { + @Test + public void sinkBlocksOnSlowReader() throws Exception { final Pipe pipe = new Pipe(3L); executorService.execute(new Runnable() { - @Override public void run() { + @Override + public void run() { try { Buffer buffer = new Buffer(); Thread.sleep(1000L); @@ -183,10 +190,12 @@ public final class PipeTest { assertElapsed(3000.0, start); } - @Test public void sinkWriteFailsByClosedReader() throws Exception { + @Test + public void sinkWriteFailsByClosedReader() throws Exception { final Pipe pipe = new Pipe(3L); executorService.schedule(new Runnable() { - @Override public void run() { + @Override + public void run() { try { pipe.source().close(); } catch (IOException e) { @@ -205,7 +214,8 @@ public final class PipeTest { } } - @Test public void sinkFlushDoesntWaitForReader() throws Exception { + @Test + public void sinkFlushDoesntWaitForReader() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.sink().flush(); @@ -214,7 +224,8 @@ public final class PipeTest { assertEquals("abc", bufferedSource.readUtf8(3)); } - @Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + @Test + public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.source().close(); @@ -226,7 +237,8 @@ public final class PipeTest { } } - @Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + @Test + public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.source().close(); @@ -238,7 +250,8 @@ public final class PipeTest { } } - @Test public void sinkClose() throws Exception { + @Test + public void sinkClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().close(); try { @@ -255,13 +268,15 @@ public final class PipeTest { } } - @Test public void sinkMultipleClose() throws Exception { + @Test + public void sinkMultipleClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().close(); pipe.sink().close(); } - @Test public void sinkCloseDoesntWaitForSourceRead() throws Exception { + @Test + public void sinkCloseDoesntWaitForSourceRead() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.sink().close(); @@ -271,7 +286,8 @@ public final class PipeTest { assertTrue(bufferedSource.exhausted()); } - @Test public void sourceClose() throws Exception { + @Test + public void sourceClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.source().close(); try { @@ -282,16 +298,19 @@ public final class PipeTest { } } - @Test public void sourceMultipleClose() throws Exception { + @Test + public void sourceMultipleClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.source().close(); pipe.source().close(); } - @Test public void sourceReadUnblockedByClosedSink() throws Exception { + @Test + public void sourceReadUnblockedByClosedSink() throws Exception { final Pipe pipe = new Pipe(3L); executorService.schedule(new Runnable() { - @Override public void run() { + @Override + public void run() { try { pipe.sink().close(); } catch (IOException e) { @@ -310,9 +329,9 @@ public final class PipeTest { /** * The writer has 12 bytes to write. It alternates sleeping 1000 ms, then writing 3 bytes. The * reader is reading as fast as it can. That should make for an approximate timeline like this: - * - * 0: writer sleeps until 1000 - * 0: reader blocks + *

+ * 0: writer sleeps until 1000 + * 0: reader blocks * 1000: writer writes 'abc', sleeps until 2000 * 1000: reader reads 'abc' * 2000: writer writes 'def', sleeps until 3000 @@ -322,10 +341,12 @@ public final class PipeTest { * 4000: writer writes 'jkl', returns * 4000: reader reads 'jkl', returns */ - @Test public void sourceBlocksOnSlowWriter() throws Exception { + @Test + public void sourceBlocksOnSlowWriter() throws Exception { final Pipe pipe = new Pipe(100L); executorService.execute(new Runnable() { - @Override public void run() { + @Override + public void run() { try { Thread.sleep(1000L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); @@ -361,7 +382,9 @@ public final class PipeTest { assertElapsed(4000.0, start); } - /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + /** + * Returns the nanotime in milliseconds as a double for measuring timeouts. + */ private double now() { return System.nanoTime() / 1000000.0d; } diff --git a/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java b/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java index e440528960..a3e11f222f 100644 --- a/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java +++ b/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java @@ -27,7 +27,7 @@ import static org.junit.Assert.fail; public final class WaitUntilNotifiedTest { - final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0); + final ScheduledExecutorService executorService = TestingExecutors.INSTANCE.newScheduledExecutorService(0); @After public void tearDown() { executorService.shutdown(); diff --git a/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt b/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt index 3a41e7422b..166c98e67f 100644 --- a/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt +++ b/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt @@ -24,7 +24,6 @@ import org.junit.Rule import org.junit.Test import java.io.IOException import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.test.assertFailsWith import org.junit.rules.Timeout as JUnitTimeout @@ -32,7 +31,7 @@ import org.junit.rules.Timeout as JUnitTimeout class PipeKotlinTest { @JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS) - private val executorService = Executors.newScheduledThreadPool(1) + private val executorService = TestingExecutors.newScheduledExecutorService(1) @After @Throws(Exception::class) fun tearDown() { diff --git a/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt b/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt index 0b151794d8..cfda2141d8 100644 --- a/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt +++ b/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt @@ -19,7 +19,6 @@ import okio.TestUtil.randomSource import org.junit.After import org.junit.Before import org.junit.Test -import java.util.concurrent.Executors import kotlin.test.Ignore @Ignore("These tests are flaky and fail on slower hardware, need to be improved") @@ -31,7 +30,7 @@ class ThrottlerTest { private val throttlerSlow = Throttler() private val threads = 4 - private val executorService = Executors.newFixedThreadPool(threads) + private val executorService = TestingExecutors.newExecutorService(threads) private var stopwatch = Stopwatch() @Before fun setup() { diff --git a/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt b/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt index 49fb81e2e1..9bf96f33fa 100644 --- a/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt +++ b/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt @@ -20,14 +20,13 @@ import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse import org.junit.Rule import org.junit.Test -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.rules.Timeout as JUnitTimeout class TimeoutTest { @JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS) - private val executorService = Executors.newScheduledThreadPool(1) + private val executorService = TestingExecutors.newExecutorService(1) @After @Throws(Exception::class) fun tearDown() { diff --git a/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt b/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt index 4f555010f2..4077149c74 100644 --- a/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt +++ b/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt @@ -29,7 +29,15 @@ actual open class ArrayIndexOutOfBoundsException actual constructor( message: String? ) : IndexOutOfBoundsException(message) -internal actual inline fun synchronized(lock: Any, block: () -> R): R = block() +actual class Lock { + companion object { + val instance = Lock() + } +} + +internal actual fun newLock(): Lock = Lock.instance + +actual inline fun Lock.withLock(action: () -> T): T = action() actual open class IOException actual constructor( message: String?,