From e468c64026ce415f7687b76056b10d6b1b44d4a1 Mon Sep 17 00:00:00 2001 From: Rustam Date: Tue, 21 Nov 2023 11:00:28 +0100 Subject: [PATCH] KTOR-6482 Logging plugin blocks response body streaming when level is BODY (#3843) --- .../ktor/client/tests/LoggingMockedTests.kt | 43 +++++++++++++++++++ .../common/src/io/ktor/util/ByteChannels.kt | 14 +++--- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/LoggingMockedTests.kt b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/LoggingMockedTests.kt index d5b4b4e8ef..efff9256e7 100644 --- a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/LoggingMockedTests.kt +++ b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/LoggingMockedTests.kt @@ -13,7 +13,10 @@ import io.ktor.client.statement.* import io.ktor.client.tests.utils.* import io.ktor.http.* import io.ktor.util.* +import io.ktor.utils.io.* import io.ktor.utils.io.core.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import kotlin.test.* class LoggingMockedTests { @@ -339,4 +342,44 @@ class LoggingMockedTests { testLogger.verify() } } + + @Test + fun testCanStream() = testWithEngine(MockEngine) { + val channel = ByteChannel(autoFlush = true) + config { + engine { + addHandler { + respond( + content = channel, + status = HttpStatusCode.OK + ) + } + } + install(Logging) { + level = LogLevel.BODY + logger = Logger.DEFAULT + } + } + test { client -> + val content = channelFlow { + launch { + client.preparePost("/").execute { + val ch = it.bodyAsChannel() + while (!ch.isClosedForRead) { + ch.awaitContent() + send(ch.readUTF8Line()) + } + } + } + } + + channel.writeStringUtf8("Hello world!\n") + + withTimeout(5_000) { // the bug will cause this to timeout + content.collect { + channel.close() + } + } + } + } } diff --git a/ktor-utils/common/src/io/ktor/util/ByteChannels.kt b/ktor-utils/common/src/io/ktor/util/ByteChannels.kt index 5ff44c9111..6994939401 100644 --- a/ktor-utils/common/src/io/ktor/util/ByteChannels.kt +++ b/ktor-utils/common/src/io/ktor/util/ByteChannels.kt @@ -6,6 +6,7 @@ package io.ktor.util import io.ktor.utils.io.* import io.ktor.utils.io.core.* +import io.ktor.utils.io.pool.* import kotlinx.coroutines.* private const val CHUNK_BUFFER_SIZE = 4096L @@ -20,14 +21,14 @@ public fun ByteReadChannel.split(coroutineScope: CoroutineScope): Pair - listOf( - async { first.writePacket(chunk.copy()) }, - async { second.writePacket(chunk.copy()) } - ).awaitAll() - } + val read = this@split.readAvailable(buffer) + listOf( + async { first.writeFully(buffer, 0, read) }, + async { second.writeFully(buffer, 0, read) } + ).awaitAll() } closedCause?.let { throw it } @@ -36,6 +37,7 @@ public fun ByteReadChannel.split(coroutineScope: CoroutineScope): Pair