From ef98a2d54bcd7d53bfe60416df00b1bfa5bb6253 Mon Sep 17 00:00:00 2001 From: Sergei Kirjanov Date: Sun, 31 May 2020 14:07:21 -0400 Subject: [PATCH 1/3] Use N segment pools for N processors --- okio/src/commonMain/kotlin/okio/Segment.kt | 3 ++ okio/src/jvmMain/kotlin/okio/SegmentPool.kt | 49 ++++++++++++++------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/okio/src/commonMain/kotlin/okio/Segment.kt b/okio/src/commonMain/kotlin/okio/Segment.kt index 8bafb27406..bffecf8921 100644 --- a/okio/src/commonMain/kotlin/okio/Segment.kt +++ b/okio/src/commonMain/kotlin/okio/Segment.kt @@ -53,6 +53,9 @@ internal class Segment { /** Previous segment in a circularly-linked list. */ @JvmField var prev: Segment? = null + /** If the segment is free **/ + @JvmField var freeSegmentCount: Long = 0L + constructor() { this.data = ByteArray(SIZE) this.owner = true diff --git a/okio/src/jvmMain/kotlin/okio/SegmentPool.kt b/okio/src/jvmMain/kotlin/okio/SegmentPool.kt index 1589c248e8..902a2bffce 100644 --- a/okio/src/jvmMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/jvmMain/kotlin/okio/SegmentPool.kt @@ -19,7 +19,6 @@ import okio.SegmentPool.LOCK import okio.SegmentPool.MAX_SIZE import okio.SegmentPool.recycle import okio.SegmentPool.take -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference /** @@ -44,21 +43,33 @@ internal actual object SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? actual val MAX_SIZE = 64 * 1024L // 64 KiB. + private val maxSegmentCount = MAX_SIZE / Segment.SIZE + /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false) - /** Singly-linked list of segments. */ - private var firstRef = AtomicReference() - - /** Total bytes in this pool. */ - private var atomicByteCount = AtomicLong() + /** + * 16 hash buckets, each containing a singly-linked list of segments. We use multiple hash buckets + * so different threads don't race each other. We use thread IDs as hash keys because they're + * handy, and because it may increase locality. + * + * We don't use [ThreadLocal] because we don't know how many threads the host process has and we + * don't want to leak memory for the duration of a thread's life. + */ + private val hashBucketCount = Runtime.getRuntime().availableProcessors() + private val hashBuckets: Array> = Array(hashBucketCount) { + AtomicReference() + } + // Not optimized, use for stats or so. actual val byteCount: Long - get() = atomicByteCount.get() + get() = hashBuckets.map{ first -> getSegmentCount(first.get()) }.sum() * Segment.SIZE @JvmStatic actual fun take(): Segment { + val firstRef = firstRef() + val first = firstRef.getAndSet(LOCK) when { first === LOCK -> { @@ -74,31 +85,37 @@ internal actual object SegmentPool { // We acquired the lock and the pool was not empty. Pop the first element and return it. firstRef.set(first.next) first.next = null - atomicByteCount.addAndGet(-Segment.SIZE.toLong()) return first } } } + private fun getSegmentCount(first: Segment?): Long = + first?.freeSegmentCount ?: 0L + @JvmStatic actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) if (segment.shared) return // This segment cannot be recycled. - if (atomicByteCount.get() >= MAX_SIZE) return // Pool is full. + + val firstRef = firstRef() val first = firstRef.get() if (first === LOCK) return // A take() is currently in progress. + val wasSegmentCount = getSegmentCount(first) + if (wasSegmentCount >= maxSegmentCount) return // Pool is full. segment.next = first segment.limit = 0 segment.pos = 0 + segment.freeSegmentCount = wasSegmentCount + 1 - if (firstRef.compareAndSet(first, segment)) { - // We successfully recycled this segment. Adjust the pool size. - atomicByteCount.addAndGet(Segment.SIZE.toLong()) - } else { - // We raced another operation. Don't recycle this segment. - segment.next = null - } + if (!firstRef.compareAndSet(first, segment)) segment.next = null + // If we raced another operation: Don't recycle this segment. + } + + private fun firstRef(): AtomicReference { + val hashBucket = (Thread.currentThread().id % hashBucketCount).toInt() // Get a value in [0..hashBucketCount). + return hashBuckets[hashBucket] } } From 32b935b38a4ab0c63effe56e62c4472255876cb4 Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Sun, 31 May 2020 14:51:46 -0400 Subject: [PATCH 2/3] Overload the Segment.limit field to track the pooled size Also round up the number of cores to the nearest power of two. Also change the behavior of SegmentPool.byteCount to only track the parts of the pool accessible to the current caller. This builds upon rewlad's fine work of tuning the segment pool. --- okio/src/commonMain/kotlin/okio/Segment.kt | 15 +++--- .../src/commonMain/kotlin/okio/SegmentPool.kt | 9 ++-- .../kotlin/okio/CommonBufferTest.kt | 24 ++++----- okio/src/jsMain/kotlin/okio/SegmentPool.kt | 4 +- okio/src/jvmMain/kotlin/okio/SegmentPool.kt | 49 ++++++++++--------- .../src/nativeMain/kotlin/okio/SegmentPool.kt | 4 +- 6 files changed, 54 insertions(+), 51 deletions(-) diff --git a/okio/src/commonMain/kotlin/okio/Segment.kt b/okio/src/commonMain/kotlin/okio/Segment.kt index bffecf8921..8fe5b016bd 100644 --- a/okio/src/commonMain/kotlin/okio/Segment.kt +++ b/okio/src/commonMain/kotlin/okio/Segment.kt @@ -35,27 +35,24 @@ import kotlin.jvm.JvmField internal class Segment { @JvmField val data: ByteArray - /** The next byte of application data byte to read in this segment. */ + /** The next byte of application data byte to read in this segment. */ @JvmField var pos: Int = 0 - /** The first byte of available data ready to be written to. */ + /** The first byte of available data ready to be written to. */ @JvmField var limit: Int = 0 - /** True if other segments or byte strings use the same byte array. */ + /** True if other segments or byte strings use the same byte array. */ @JvmField var shared: Boolean = false - /** True if this segment owns the byte array and can append to it, extending `limit`. */ + /** True if this segment owns the byte array and can append to it, extending `limit`. */ @JvmField var owner: Boolean = false - /** Next segment in a linked or circularly-linked list. */ + /** Next segment in a linked or circularly-linked list. */ @JvmField var next: Segment? = null - /** Previous segment in a circularly-linked list. */ + /** Previous segment in a circularly-linked list. */ @JvmField var prev: Segment? = null - /** If the segment is free **/ - @JvmField var freeSegmentCount: Long = 0L - constructor() { this.data = ByteArray(SIZE) this.owner = true diff --git a/okio/src/commonMain/kotlin/okio/SegmentPool.kt b/okio/src/commonMain/kotlin/okio/SegmentPool.kt index 0a887478f1..f21c7a29fb 100644 --- a/okio/src/commonMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/commonMain/kotlin/okio/SegmentPool.kt @@ -20,10 +20,13 @@ package okio * This pool is a thread-safe static singleton. */ internal expect object SegmentPool { - val MAX_SIZE: Long + val MAX_SIZE: Int - /** For testing only. Returns a snapshot of the number of bytes currently in the pool. */ - val byteCount: Long + /** + * For testing only. Returns a snapshot of the number of bytes currently in the pool. If the pool + * is segmented such as by thread, this returns the byte count accessible to the calling thread. + */ + val byteCount: Int /** Return a segment for the caller's use. */ fun take(): Segment diff --git a/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt b/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt index d99075a341..abf9b0d471 100644 --- a/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt +++ b/okio/src/commonTest/kotlin/okio/CommonBufferTest.kt @@ -88,24 +88,24 @@ class CommonBufferTest { val buffer = Buffer() // Take 2 * MAX_SIZE segments. This will drain the pool, even if other tests filled it. - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) assertEquals(0, SegmentPool.byteCount) // Recycle MAX_SIZE segments. They're all in the pool. - buffer.skip(SegmentPool.MAX_SIZE) + buffer.skip(SegmentPool.MAX_SIZE.toLong()) assertEquals(SegmentPool.MAX_SIZE, SegmentPool.byteCount) // Recycle MAX_SIZE more segments. The pool is full so they get garbage collected. - buffer.skip(SegmentPool.MAX_SIZE) + buffer.skip(SegmentPool.MAX_SIZE.toLong()) assertEquals(SegmentPool.MAX_SIZE, SegmentPool.byteCount) // Take MAX_SIZE segments to drain the pool. - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) assertEquals(0, SegmentPool.byteCount) // Take MAX_SIZE more segments. The pool is drained so these will need to be allocated. - buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt())) + buffer.write(ByteArray(SegmentPool.MAX_SIZE)) assertEquals(0, SegmentPool.byteCount) } @@ -253,17 +253,17 @@ class CommonBufferTest { buffer.writeUtf8("a") buffer.writeUtf8('b'.repeat(Segment.SIZE)) buffer.writeUtf8("c") - assertEquals('a'.toLong(), buffer.get(0).toLong()) - assertEquals('a'.toLong(), buffer.get(0).toLong()) // getByte doesn't mutate! - assertEquals('c'.toLong(), buffer.get(buffer.size - 1).toLong()) - assertEquals('b'.toLong(), buffer.get(buffer.size - 2).toLong()) - assertEquals('b'.toLong(), buffer.get(buffer.size - 3).toLong()) + assertEquals('a'.toLong(), buffer[0].toLong()) + assertEquals('a'.toLong(), buffer[0].toLong()) // getByte doesn't mutate! + assertEquals('c'.toLong(), buffer[buffer.size - 1].toLong()) + assertEquals('b'.toLong(), buffer[buffer.size - 2].toLong()) + assertEquals('b'.toLong(), buffer[buffer.size - 3].toLong()) } @Test fun getByteOfEmptyBuffer() { val buffer = Buffer() assertFailsWith { - buffer.get(0) + buffer[0] } } diff --git a/okio/src/jsMain/kotlin/okio/SegmentPool.kt b/okio/src/jsMain/kotlin/okio/SegmentPool.kt index b85f9bd2e0..50e709913b 100644 --- a/okio/src/jsMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/jsMain/kotlin/okio/SegmentPool.kt @@ -16,9 +16,9 @@ package okio internal actual object SegmentPool { - actual val MAX_SIZE: Long = 0L + actual val MAX_SIZE: Int = 0 - actual val byteCount: Long = 0L + actual val byteCount: Int = 0 actual fun take(): Segment = Segment() diff --git a/okio/src/jvmMain/kotlin/okio/SegmentPool.kt b/okio/src/jvmMain/kotlin/okio/SegmentPool.kt index 902a2bffce..842ab31b45 100644 --- a/okio/src/jvmMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/jvmMain/kotlin/okio/SegmentPool.kt @@ -16,7 +16,6 @@ package okio import okio.SegmentPool.LOCK -import okio.SegmentPool.MAX_SIZE import okio.SegmentPool.recycle import okio.SegmentPool.take import java.util.concurrent.atomic.AtomicReference @@ -35,36 +34,42 @@ import java.util.concurrent.atomic.AtomicReference * Under significant contention this pool will have fewer hits and the VM will do more GC and zero * filling of arrays. * - * Note that the [MAX_SIZE] may be exceeded if multiple calls to [recycle] race. Exceeding the - * target pool size by a few segments doesn't harm performance, and imperfect enforcement is less - * code. + * This tracks the number of bytes in each linked list in its [Segment.limit] property. Each element + * has a limit that's one segment size greater than its successor element. */ internal actual object SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? - actual val MAX_SIZE = 64 * 1024L // 64 KiB. - private val maxSegmentCount = MAX_SIZE / Segment.SIZE - + actual val MAX_SIZE = 64 * 1024 // 64 KiB. /** A sentinel segment to indicate that the linked list is currently being modified. */ private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false) /** - * 16 hash buckets, each containing a singly-linked list of segments. We use multiple hash buckets - * so different threads don't race each other. We use thread IDs as hash keys because they're - * handy, and because it may increase locality. + * The number of hash buckets. This number needs to balance keeping the pool small and contention + * low. We use the number of processors rounded up to the nearest power of two. For example a + * machine with 6 cores will have 8 hash buckets. + */ + private val HASH_BUCKET_COUNT = + Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1) + + /** + * Hash buckets each containing a singly-linked list of segments. We use multiple hash buckets so + * different threads don't race each other. We use thread IDs as hash keys because they're handy, + * and because it may increase locality. * * We don't use [ThreadLocal] because we don't know how many threads the host process has and we * don't want to leak memory for the duration of a thread's life. */ - private val hashBucketCount = Runtime.getRuntime().availableProcessors() - private val hashBuckets: Array> = Array(hashBucketCount) { + private val hashBuckets: Array> = Array(HASH_BUCKET_COUNT) { AtomicReference() } - // Not optimized, use for stats or so. - actual val byteCount: Long - get() = hashBuckets.map{ first -> getSegmentCount(first.get()) }.sum() * Segment.SIZE + actual val byteCount: Int + get() { + val first = firstRef().get() ?: return 0 + return first.limit + } @JvmStatic actual fun take(): Segment { @@ -85,14 +90,12 @@ internal actual object SegmentPool { // We acquired the lock and the pool was not empty. Pop the first element and return it. firstRef.set(first.next) first.next = null + first.limit = 0 return first } } } - private fun getSegmentCount(first: Segment?): Long = - first?.freeSegmentCount ?: 0L - @JvmStatic actual fun recycle(segment: Segment) { require(segment.next == null && segment.prev == null) @@ -102,20 +105,20 @@ internal actual object SegmentPool { val first = firstRef.get() if (first === LOCK) return // A take() is currently in progress. - val wasSegmentCount = getSegmentCount(first) - if (wasSegmentCount >= maxSegmentCount) return // Pool is full. + val firstLimit = first?.limit ?: 0 + if (firstLimit >= MAX_SIZE) return // Pool is full. segment.next = first - segment.limit = 0 segment.pos = 0 - segment.freeSegmentCount = wasSegmentCount + 1 + segment.limit = firstLimit + Segment.SIZE if (!firstRef.compareAndSet(first, segment)) segment.next = null // If we raced another operation: Don't recycle this segment. } private fun firstRef(): AtomicReference { - val hashBucket = (Thread.currentThread().id % hashBucketCount).toInt() // Get a value in [0..hashBucketCount). + // Get a value in [0..HASH_BUCKET_COUNT). + val hashBucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt() return hashBuckets[hashBucket] } } diff --git a/okio/src/nativeMain/kotlin/okio/SegmentPool.kt b/okio/src/nativeMain/kotlin/okio/SegmentPool.kt index b85f9bd2e0..50e709913b 100644 --- a/okio/src/nativeMain/kotlin/okio/SegmentPool.kt +++ b/okio/src/nativeMain/kotlin/okio/SegmentPool.kt @@ -16,9 +16,9 @@ package okio internal actual object SegmentPool { - actual val MAX_SIZE: Long = 0L + actual val MAX_SIZE: Int = 0 - actual val byteCount: Long = 0L + actual val byteCount: Int = 0 actual fun take(): Segment = Segment() From f390906aa9520c92c189b1e2fa48b4bcb4a3d1d7 Mon Sep 17 00:00:00 2001 From: Sergei Kirjanov Date: Tue, 2 Jun 2020 04:45:50 +0300 Subject: [PATCH 3/3] Comment on var limit (#728) --- okio/src/commonMain/kotlin/okio/Segment.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/okio/src/commonMain/kotlin/okio/Segment.kt b/okio/src/commonMain/kotlin/okio/Segment.kt index 8fe5b016bd..c8aa479bbf 100644 --- a/okio/src/commonMain/kotlin/okio/Segment.kt +++ b/okio/src/commonMain/kotlin/okio/Segment.kt @@ -38,7 +38,12 @@ internal class Segment { /** The next byte of application data byte to read in this segment. */ @JvmField var pos: Int = 0 - /** The first byte of available data ready to be written to. */ + /** + * The first byte of available data ready to be written to. + * + * If the segment is free and linked in the segment pool, the field contains total + * byte count of this and next segments. + */ @JvmField var limit: Int = 0 /** True if other segments or byte strings use the same byte array. */