Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions okio/src/commonMain/kotlin/okio/Segment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,27 @@ import kotlin.jvm.JvmField
internal class Segment {
@JvmField val data: ByteArray

/** The next byte of application data byte to read in this segment. */
/** The next byte of application data byte to read in this segment. */
@JvmField var pos: Int = 0

/** The first byte of available data ready to be written to. */
/**
* The first byte of available data ready to be written to.
*
* If the segment is free and linked in the segment pool, the field contains total
* byte count of this and next segments.
*/
@JvmField var limit: Int = 0

/** True if other segments or byte strings use the same byte array. */
/** True if other segments or byte strings use the same byte array. */
@JvmField var shared: Boolean = false

/** True if this segment owns the byte array and can append to it, extending `limit`. */
/** True if this segment owns the byte array and can append to it, extending `limit`. */
@JvmField var owner: Boolean = false

/** Next segment in a linked or circularly-linked list. */
/** Next segment in a linked or circularly-linked list. */
@JvmField var next: Segment? = null

/** Previous segment in a circularly-linked list. */
/** Previous segment in a circularly-linked list. */
@JvmField var prev: Segment? = null

constructor() {
Expand Down
9 changes: 6 additions & 3 deletions okio/src/commonMain/kotlin/okio/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package okio
* This pool is a thread-safe static singleton.
*/
internal expect object SegmentPool {
val MAX_SIZE: Long
val MAX_SIZE: Int

/** For testing only. Returns a snapshot of the number of bytes currently in the pool. */
val byteCount: Long
/**
* For testing only. Returns a snapshot of the number of bytes currently in the pool. If the pool
* is segmented such as by thread, this returns the byte count accessible to the calling thread.
*/
val byteCount: Int

/** Return a segment for the caller's use. */
fun take(): Segment
Expand Down
24 changes: 12 additions & 12 deletions okio/src/commonTest/kotlin/okio/CommonBufferTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,24 @@ class CommonBufferTest {
val buffer = Buffer()

// Take 2 * MAX_SIZE segments. This will drain the pool, even if other tests filled it.
buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt()))
buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt()))
buffer.write(ByteArray(SegmentPool.MAX_SIZE))
buffer.write(ByteArray(SegmentPool.MAX_SIZE))
assertEquals(0, SegmentPool.byteCount)

// Recycle MAX_SIZE segments. They're all in the pool.
buffer.skip(SegmentPool.MAX_SIZE)
buffer.skip(SegmentPool.MAX_SIZE.toLong())
assertEquals(SegmentPool.MAX_SIZE, SegmentPool.byteCount)

// Recycle MAX_SIZE more segments. The pool is full so they get garbage collected.
buffer.skip(SegmentPool.MAX_SIZE)
buffer.skip(SegmentPool.MAX_SIZE.toLong())
assertEquals(SegmentPool.MAX_SIZE, SegmentPool.byteCount)

// Take MAX_SIZE segments to drain the pool.
buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt()))
buffer.write(ByteArray(SegmentPool.MAX_SIZE))
assertEquals(0, SegmentPool.byteCount)

// Take MAX_SIZE more segments. The pool is drained so these will need to be allocated.
buffer.write(ByteArray(SegmentPool.MAX_SIZE.toInt()))
buffer.write(ByteArray(SegmentPool.MAX_SIZE))
assertEquals(0, SegmentPool.byteCount)
}

Expand Down Expand Up @@ -253,17 +253,17 @@ class CommonBufferTest {
buffer.writeUtf8("a")
buffer.writeUtf8('b'.repeat(Segment.SIZE))
buffer.writeUtf8("c")
assertEquals('a'.toLong(), buffer.get(0).toLong())
assertEquals('a'.toLong(), buffer.get(0).toLong()) // getByte doesn't mutate!
assertEquals('c'.toLong(), buffer.get(buffer.size - 1).toLong())
assertEquals('b'.toLong(), buffer.get(buffer.size - 2).toLong())
assertEquals('b'.toLong(), buffer.get(buffer.size - 3).toLong())
assertEquals('a'.toLong(), buffer[0].toLong())
assertEquals('a'.toLong(), buffer[0].toLong()) // getByte doesn't mutate!
assertEquals('c'.toLong(), buffer[buffer.size - 1].toLong())
assertEquals('b'.toLong(), buffer[buffer.size - 2].toLong())
assertEquals('b'.toLong(), buffer[buffer.size - 3].toLong())
}

@Test fun getByteOfEmptyBuffer() {
val buffer = Buffer()
assertFailsWith<IndexOutOfBoundsException> {
buffer.get(0)
buffer[0]
}
}

Expand Down
4 changes: 2 additions & 2 deletions okio/src/jsMain/kotlin/okio/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package okio

internal actual object SegmentPool {
actual val MAX_SIZE: Long = 0L
actual val MAX_SIZE: Int = 0

actual val byteCount: Long = 0L
actual val byteCount: Int = 0

actual fun take(): Segment = Segment()

Expand Down
64 changes: 42 additions & 22 deletions okio/src/jvmMain/kotlin/okio/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
package okio

import okio.SegmentPool.LOCK
import okio.SegmentPool.MAX_SIZE
import okio.SegmentPool.recycle
import okio.SegmentPool.take
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference

/**
Expand All @@ -36,29 +34,47 @@ import java.util.concurrent.atomic.AtomicReference
* Under significant contention this pool will have fewer hits and the VM will do more GC and zero
* filling of arrays.
*
* Note that the [MAX_SIZE] may be exceeded if multiple calls to [recycle] race. Exceeding the
* target pool size by a few segments doesn't harm performance, and imperfect enforcement is less
* code.
* This tracks the number of bytes in each linked list in its [Segment.limit] property. Each element
* has a limit that's one segment size greater than its successor element.
*/
internal actual object SegmentPool {
/** The maximum number of bytes to pool. */
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
actual val MAX_SIZE = 64 * 1024L // 64 KiB.
actual val MAX_SIZE = 64 * 1024 // 64 KiB.

/** A sentinel segment to indicate that the linked list is currently being modified. */
private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false)

/** Singly-linked list of segments. */
private var firstRef = AtomicReference<Segment?>()
/**
* The number of hash buckets. This number needs to balance keeping the pool small and contention
* low. We use the number of processors rounded up to the nearest power of two. For example a
* machine with 6 cores will have 8 hash buckets.
*/
private val HASH_BUCKET_COUNT =
Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1)

/** Total bytes in this pool. */
private var atomicByteCount = AtomicLong()
/**
* Hash buckets each containing a singly-linked list of segments. We use multiple hash buckets so
* different threads don't race each other. We use thread IDs as hash keys because they're handy,
* and because it may increase locality.
*
* We don't use [ThreadLocal] because we don't know how many threads the host process has and we
* don't want to leak memory for the duration of a thread's life.
*/
private val hashBuckets: Array<AtomicReference<Segment?>> = Array(HASH_BUCKET_COUNT) {
AtomicReference<Segment?>()
}

actual val byteCount: Long
get() = atomicByteCount.get()
actual val byteCount: Int
get() {
val first = firstRef().get() ?: return 0
return first.limit
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rewlad rather than trying to make this correct, I’m just redefining the field to only return the byte count accessible to the current thread. That’s certainly enough to make our tests happy!

}

@JvmStatic
actual fun take(): Segment {
val firstRef = firstRef()

val first = firstRef.getAndSet(LOCK)
when {
first === LOCK -> {
Expand All @@ -74,7 +90,7 @@ internal actual object SegmentPool {
// We acquired the lock and the pool was not empty. Pop the first element and return it.
firstRef.set(first.next)
first.next = null
atomicByteCount.addAndGet(-Segment.SIZE.toLong())
first.limit = 0
return first
}
}
Expand All @@ -84,21 +100,25 @@ internal actual object SegmentPool {
actual fun recycle(segment: Segment) {
require(segment.next == null && segment.prev == null)
if (segment.shared) return // This segment cannot be recycled.
if (atomicByteCount.get() >= MAX_SIZE) return // Pool is full.

val firstRef = firstRef()

val first = firstRef.get()
if (first === LOCK) return // A take() is currently in progress.
val firstLimit = first?.limit ?: 0
if (firstLimit >= MAX_SIZE) return // Pool is full.

segment.next = first
segment.limit = 0
segment.pos = 0
segment.limit = firstLimit + Segment.SIZE

if (firstRef.compareAndSet(first, segment)) {
// We successfully recycled this segment. Adjust the pool size.
atomicByteCount.addAndGet(Segment.SIZE.toLong())
} else {
// We raced another operation. Don't recycle this segment.
segment.next = null
}
if (!firstRef.compareAndSet(first, segment)) segment.next = null
// If we raced another operation: Don't recycle this segment.
}

private fun firstRef(): AtomicReference<Segment?> {
// Get a value in [0..HASH_BUCKET_COUNT).
val hashBucket = (Thread.currentThread().id and (HASH_BUCKET_COUNT - 1L)).toInt()
return hashBuckets[hashBucket]
}
}
4 changes: 2 additions & 2 deletions okio/src/nativeMain/kotlin/okio/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package okio

internal actual object SegmentPool {
actual val MAX_SIZE: Long = 0L
actual val MAX_SIZE: Int = 0

actual val byteCount: Long = 0L
actual val byteCount: Int = 0

actual fun take(): Segment = Segment()

Expand Down