Skip to content

Commit

Permalink
Fix bug in Pipe.fold where sink can incorrectly be left open (#1413)
Browse files Browse the repository at this point in the history
* add new reproducer test

* make test green

* spotlessApply
  • Loading branch information
jhump authored Feb 4, 2024
1 parent 520fc8b commit 775c1e5
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
19 changes: 12 additions & 7 deletions okio/src/jvmMain/kotlin/okio/Pipe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class Pipe(internal val maxBufferSize: Long) {
// Either the buffer is empty and we can swap and return. Or the buffer is non-empty and we
// must copy it to sink without holding any locks, then try it all again.
var closed = false
var done = false
lateinit var sinkBuffer: Buffer
lock.withLock {
check(foldedSink == null) { "sink already folded" }
Expand All @@ -181,26 +182,30 @@ class Pipe(internal val maxBufferSize: Long) {
throw IOException("canceled")
}

closed = sinkClosed
if (buffer.exhausted()) {
sourceClosed = true
foldedSink = sink
return@fold
done = true
return@withLock
}

closed = sinkClosed
sinkBuffer = Buffer()
sinkBuffer.write(buffer, buffer.size)
condition.signalAll() // Notify the sink that it can resume writing.
}

var success = false
try {
sink.write(sinkBuffer, sinkBuffer.size)
if (done) {
if (closed) {
sink.close()
} else {
sink.flush()
}
return
}

var success = false
try {
sink.write(sinkBuffer, sinkBuffer.size)
sink.flush()
success = true
} finally {
if (!success) {
Expand Down
41 changes: 41 additions & 0 deletions okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package okio
import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.test.assertFailsWith
import org.junit.After
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -129,6 +130,46 @@ class PipeKotlinTest {
}
}

@Test fun closeWhileFolding() {
val pipe = Pipe(100L)
val writing = CountDownLatch(1)
val closed = CountDownLatch(1)
val sinkBuffer = Buffer()
val sinkClosed = AtomicBoolean()
val data = byteArrayOf(1, 2, 3, 4, 5, 6, 7, 8)
pipe.sink.write(Buffer().write(data), data.size.toLong())
val foldResult = executorService.submit {
val sink = object : Sink {
override fun write(source: Buffer, byteCount: Long) {
writing.countDown()
closed.await()
sinkBuffer.write(source, byteCount)
}

override fun flush() {
sinkBuffer.flush()
}

override fun timeout(): Timeout {
return sinkBuffer.timeout()
}

override fun close() {
sinkBuffer.close()
sinkClosed.set(true)
}
}
pipe.fold(sink)
}
writing.await()
pipe.sink.close()
closed.countDown()
foldResult.get()

assertTrue(sinkClosed.get())
assertArrayEquals(data, sinkBuffer.readByteArray())
}

@Test fun honorsPipeSinkTimeoutOnWritingWhenItIsSmaller() {
val pipe = Pipe(4)
val underlying = TimeoutWritingSink()
Expand Down

0 comments on commit 775c1e5

Please sign in to comment.