Skip to content

Commit

Permalink
KTOR-6482 Logging plugin blocks response body streaming when level is…
Browse files Browse the repository at this point in the history
… BODY (#3843)
  • Loading branch information
rsinukov committed Nov 21, 2023
1 parent 9a67d61 commit e468c64
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import io.ktor.client.statement.*
import io.ktor.client.tests.utils.*
import io.ktor.http.*
import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class LoggingMockedTests {
Expand Down Expand Up @@ -339,4 +342,44 @@ class LoggingMockedTests {
testLogger.verify()
}
}

@Test
fun testCanStream() = testWithEngine(MockEngine) {
val channel = ByteChannel(autoFlush = true)
config {
engine {
addHandler {
respond(
content = channel,
status = HttpStatusCode.OK
)
}
}
install(Logging) {
level = LogLevel.BODY
logger = Logger.DEFAULT
}
}
test { client ->
val content = channelFlow {
launch {
client.preparePost("/").execute {
val ch = it.bodyAsChannel()
while (!ch.isClosedForRead) {
ch.awaitContent()
send(ch.readUTF8Line())
}
}
}
}

channel.writeStringUtf8("Hello world!\n")

withTimeout(5_000) { // the bug will cause this to timeout
content.collect {
channel.close()
}
}
}
}
}
14 changes: 8 additions & 6 deletions ktor-utils/common/src/io/ktor/util/ByteChannels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.ktor.util

import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.pool.*
import kotlinx.coroutines.*

private const val CHUNK_BUFFER_SIZE = 4096L
Expand All @@ -20,14 +21,14 @@ public fun ByteReadChannel.split(coroutineScope: CoroutineScope): Pair<ByteReadC
val second = ByteChannel(autoFlush = true)

coroutineScope.launch {
val buffer = ByteArrayPool.borrow()
try {
while (!isClosedForRead) {
this@split.readRemaining(CHUNK_BUFFER_SIZE).use { chunk ->
listOf(
async { first.writePacket(chunk.copy()) },
async { second.writePacket(chunk.copy()) }
).awaitAll()
}
val read = this@split.readAvailable(buffer)
listOf(
async { first.writeFully(buffer, 0, read) },
async { second.writeFully(buffer, 0, read) }
).awaitAll()
}

closedCause?.let { throw it }
Expand All @@ -36,6 +37,7 @@ public fun ByteReadChannel.split(coroutineScope: CoroutineScope): Pair<ByteReadC
first.cancel(cause)
second.cancel(cause)
} finally {
ByteArrayPool.recycle(buffer)
first.close()
second.close()
}
Expand Down

0 comments on commit e468c64

Please sign in to comment.