diff --git a/okio/src/commonMain/kotlin/okio/Segment.kt b/okio/src/commonMain/kotlin/okio/Segment.kt index 8bafb27406..c8aa479bbf 100644 --- a/okio/src/commonMain/kotlin/okio/Segment.kt +++ b/okio/src/commonMain/kotlin/okio/Segment.kt @@ -35,22 +35,27 @@ import kotlin.jvm.JvmField internal class Segment { @JvmField val data: ByteArray - /** The next byte of application data byte to read in this segment. */ + /** The next byte of application data byte to read in this segment. */ @JvmField var pos: Int = 0 - /** The first byte of available data ready to be written to. */ + /** + * The first byte of available data ready to be written to. + * + * If the segment is free and linked in the segment pool, the field contains total + * byte count of this and next segments. + */ @JvmField var limit: Int = 0 - /** True if other segments or byte strings use the same byte array. */ + /** True if other segments or byte strings use the same byte array. */ @JvmField var shared: Boolean = false - /** True if this segment owns the byte array and can append to it, extending `limit`. */ + /** True if this segment owns the byte array and can append to it, extending `limit`. */ @JvmField var owner: Boolean = false - /** Next segment in a linked or circularly-linked list. */ + /** Next segment in a linked or circularly-linked list. */ @JvmField var next: Segment? = null - /** Previous segment in a circularly-linked list. */ + /** Previous segment in a circularly-linked list. */ @JvmField var prev: Segment? = null constructor() { diff --git a/okio/src/commonMain/kotlin/okio/SegmentPool.kt b/okio/src/commonMain/kotlin/okio/SegmentPool.kt index 0a887478f1..f21c7a29fb 100644 --- a/okio/src/commonMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/commonMain/kotlin/okio/SegmentPool.kt @@ -20,10 +20,13 @@ package okio * This pool is a thread-safe static singleton. */ internal expect object SegmentPool { - val MAX_SIZE: Long + val MAX_SIZE: Int - /** For testing only. Returns a snapshot of the number of bytes currently in the pool. */ - val byteCount: Long + /** + * For testing only. Returns a snapshot of the number of bytes currently in the pool. If the pool + * is segmented such as by thread, this returns the byte count accessible to the calling thread. + */ + val byteCount: Int /** Return a segment for the caller's use. */ fun take(): Segment diff --git a/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt b/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt index d99075a341..abf9b0d471 100644 --- a/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt +++ b/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt @@ -88,24 +88,24 @@ class CommonBufferTest { val buffer = Buffer() // Take 2 * MAX_SIZE segments. This will drain the pool, even if other tests filled it. - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) assertEquals(0, SegmentPool.byteCount) // Recycle MAX_SIZE segments. They're all in the pool. - buffer.skip(SegmentPool.MAX_SIZE) + buffer.skip(SegmentPool.MAX_SIZE.toLong()) assertEquals(SegmentPool.MAX_SIZE, SegmentPool.byteCount) // Recycle MAX_SIZE more segments. The pool is full so they get garbage collected. - buffer.skip(SegmentPool.MAX_SIZE) + buffer.skip(SegmentPool.MAX_SIZE.toLong()) assertEquals(SegmentPool.MAX_SIZE, SegmentPool.byteCount) // Take MAX_SIZE segments to drain the pool. - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) assertEquals(0, SegmentPool.byteCount) // Take MAX_SIZE more segments. The pool is drained so these will need to be allocated. - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) assertEquals(0, SegmentPool.byteCount) } @@ -253,17 +253,17 @@ class CommonBufferTest { buffer.writeUtf8("a") buffer.writeUtf8('b'.repeat(Segment.SIZE)) buffer.writeUtf8("c") - assertEquals('a'.toLong(), buffer.get(0).toLong()) - assertEquals('a'.toLong(), buffer.get(0).toLong()) // getByte doesn't mutate! - assertEquals('c'.toLong(), buffer.get(buffer.size - 1).toLong()) - assertEquals('b'.toLong(), buffer.get(buffer.size - 2).toLong()) - assertEquals('b'.toLong(), buffer.get(buffer.size - 3).toLong()) + assertEquals('a'.toLong(), buffer[0].toLong()) + assertEquals('a'.toLong(), buffer[0].toLong()) // getByte doesn't mutate! + assertEquals('c'.toLong(), buffer[buffer.size - 1].toLong()) + assertEquals('b'.toLong(), buffer[buffer.size - 2].toLong()) + assertEquals('b'.toLong(), buffer[buffer.size - 3].toLong()) } @Test fun getByteOfEmptyBuffer() { val buffer = Buffer() assertFailsWith { - buffer.get(0) + buffer[0] } } diff --git a/okio/src/jsMain/kotlin/okio/SegmentPool.kt b/okio/src/jsMain/kotlin/okio/SegmentPool.kt index b85f9bd2e0..50e709913b 100644 --- a/okio/src/jsMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/jsMain/kotlin/okio/SegmentPool.kt @@ -16,9 +16,9 @@ package okio internal actual object SegmentPool { - actual val MAX_SIZE: Long = 0L + actual val MAX_SIZE: Int = 0 - actual val byteCount: Long = 0L + actual val byteCount: Int = 0 actual fun take(): Segment = Segment() diff --git a/okio/src/jvmMain/kotlin/okio/SegmentPool.kt b/okio/src/jvmMain/kotlin/okio/SegmentPool.kt index 1589c248e8..842ab31b45 100644 --- a/okio/src/jvmMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/jvmMain/kotlin/okio/SegmentPool.kt @@ -16,10 +16,8 @@ package okio import okio.SegmentPool.LOCK -import okio.SegmentPool.MAX_SIZE import okio.SegmentPool.recycle import okio.SegmentPool.take -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference /** @@ -36,29 +34,47 @@ import java.util.concurrent.atomic.AtomicReference * Under significant contention this pool will have fewer hits and the VM will do more GC and zero * filling of arrays. * - * Note that the [MAX_SIZE] may be exceeded if multiple calls to [recycle] race. Exceeding the - * target pool size by a few segments doesn't harm performance, and imperfect enforcement is less - * code. + * This tracks the number of bytes in each linked list in its [Segment.limit] property. Each element + * has a limit that's one segment size greater than its successor element. */ internal actual object SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? - actual val MAX_SIZE = 64 * 1024L // 64 KiB. + actual val MAX_SIZE = 64 * 1024 // 64 KiB. /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false) - /** Singly-linked list of segments. */ - private var firstRef = AtomicReference() + /** + * The number of hash buckets. This number needs to balance keeping the pool small and contention + * low. We use the number of processors rounded up to the nearest power of two. For example a + * machine with 6 cores will have 8 hash buckets. + */ + private val HASH_BUCKET_COUNT = + Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1) - /** Total bytes in this pool. */ - private var atomicByteCount = AtomicLong() + /** + * Hash buckets each containing a singly-linked list of segments. We use multiple hash buckets so + * different threads don't race each other. We use thread IDs as hash keys because they're handy, + * and because it may increase locality. + * + * We don't use [ThreadLocal] because we don't know how many threads the host process has and we + * don't want to leak memory for the duration of a thread's life. + */ + private val hashBuckets: Array> = Array(HASH_BUCKET_COUNT) { + AtomicReference() + } - actual val byteCount: Long - get() = atomicByteCount.get() + actual val byteCount: Int + get() { + val first = firstRef().get() ?: return 0 + return first.limit + } @JvmStatic actual fun take(): Segment { + val firstRef = firstRef() + val first = firstRef.getAndSet(LOCK) when { first === LOCK -> { @@ -74,7 +90,7 @@ internal actual object SegmentPool { // We acquired the lock and the pool was not empty. Pop the first element and return it. firstRef.set(first.next) first.next = null - atomicByteCount.addAndGet(-Segment.SIZE.toLong()) + first.limit = 0 return first } } @@ -84,21 +100,25 @@ internal actual object SegmentPool { actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) if (segment.shared) return // This segment cannot be recycled. - if (atomicByteCount.get() >= MAX_SIZE) return // Pool is full. + + val firstRef = firstRef() val first = firstRef.get() if (first === LOCK) return // A take() is currently in progress. + val firstLimit = first?.limit ?: 0 + if (firstLimit >= MAX_SIZE) return // Pool is full. segment.next = first - segment.limit = 0 segment.pos = 0 + segment.limit = firstLimit + Segment.SIZE - if (firstRef.compareAndSet(first, segment)) { - // We successfully recycled this segment. Adjust the pool size. - atomicByteCount.addAndGet(Segment.SIZE.toLong()) - } else { - // We raced another operation. Don't recycle this segment. - segment.next = null - } + if (!firstRef.compareAndSet(first, segment)) segment.next = null + // If we raced another operation: Don't recycle this segment. + } + + private fun firstRef(): AtomicReference { + // Get a value in [0..HASH_BUCKET_COUNT). + val hashBucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() + return hashBuckets[hashBucket] } } diff --git a/okio/src/nativeMain/kotlin/okio/SegmentPool.kt b/okio/src/nativeMain/kotlin/okio/SegmentPool.kt index b85f9bd2e0..50e709913b 100644 --- a/okio/src/nativeMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/nativeMain/kotlin/okio/SegmentPool.kt @@ -16,9 +16,9 @@ package okio internal actual object SegmentPool { - actual val MAX_SIZE: Long = 0L + actual val MAX_SIZE: Int = 0 - actual val byteCount: Long = 0L + actual val byteCount: Int = 0 actual fun take(): Segment = Segment()