Skip to content

Commit

Permalink
fixup! KTOR-6734 Fixes for Jetty 12 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
bjhham committed Mar 4, 2025
1 parent d014b91 commit 828c18d
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 255 deletions.
23 changes: 23 additions & 0 deletions ktor-http/common/src/io/ktor/http/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.ktor.http

import io.ktor.utils.io.InternalAPI

/**
* Parse query string withing starting at the specified [startIndex] but up to [limit] pairs
*
Expand Down Expand Up @@ -94,3 +96,24 @@ private fun trimStart(start: Int, end: Int, query: CharSequence): Int {
while (spaceIndex < end && query[spaceIndex].isWhitespace()) spaceIndex++
return spaceIndex
}

/**
* Converts parameters to query parameters by fixing the [Parameters.get] method
* to make it return an empty string for the query parameter without value
*/
@InternalAPI
public fun Parameters.toQueryParameters(): Parameters {
val parameters = this
return object : Parameters {
override fun get(name: String): String? {
val values = getAll(name) ?: return null
return if (values.isEmpty()) "" else values.first()
}
override val caseInsensitiveName: Boolean
get() = parameters.caseInsensitiveName
override fun getAll(name: String): List<String>? = parameters.getAll(name)
override fun names(): Set<String> = parameters.names()
override fun entries(): Set<Map.Entry<String, List<String>>> = parameters.entries()
override fun isEmpty(): Boolean = parameters.isEmpty()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal class CIOApplicationRequest(

override var engineHeaders: Headers = CIOHeaders(request.headers)

@OptIn(InternalAPI::class)
override val queryParameters: Parameters by lazy { encodeParameters(rawQueryParameters).toQueryParameters() }

override val rawQueryParameters: Parameters by lazy {
Expand All @@ -47,26 +48,6 @@ internal class CIOApplicationRequest(
}
}

/**
* Converts parameters to query parameters by fixing the [Parameters.get] method
* to make it return an empty string for the query parameter without value
*/
private fun Parameters.toQueryParameters(): Parameters {
val parameters = this
return object : Parameters {
override fun get(name: String): String? {
val values = getAll(name) ?: return null
return if (values.isEmpty()) "" else values.first()
}
override val caseInsensitiveName: Boolean
get() = parameters.caseInsensitiveName
override fun getAll(name: String): List<String>? = parameters.getAll(name)
override fun names(): Set<String> = parameters.names()
override fun entries(): Set<Map.Entry<String, List<String>>> = parameters.entries()
override fun isEmpty(): Boolean = parameters.isEmpty()
}
}

internal class CIOConnectionPoint(
private val remoteNetworkAddress: NetworkAddress?,
private val localNetworkAddress: NetworkAddress?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class JettyApplicationRequest(
call.writer(Dispatchers.IO + CoroutineName("request-reader")) {
val contentLength = if (request.headers.contains(HttpHeaders.ContentLength)) {
request.headers.get(HttpHeaders.ContentLength)?.toLong()
} else null
} else {
null
}

var bytesRead = 0L
while (true) {
when(val chunk = request.read()) {
when (val chunk = request.read()) {
// nothing available, suspend for more content
null -> {
suspendCancellableCoroutine { continuation ->
Expand All @@ -43,8 +45,9 @@ public class JettyApplicationRequest(
else -> {
with(chunk) {
if (failure != null) {
if (isLast)
if (isLast) {
throw failure
}
call.application.log.warn("Recoverable error reading request body; continuing", failure)
} else {
bytesRead += byteBuffer.remaining()
Expand All @@ -55,7 +58,9 @@ public class JettyApplicationRequest(
}
if (isLast) {
if (contentLength != null && bytesRead < contentLength) {
channel.cancel(EOFException("Expected $contentLength bytes, received only $bytesRead"))
channel.cancel(
EOFException("Expected $contentLength bytes, received only $bytesRead")
)
}
return@writer
}
Expand All @@ -75,7 +80,7 @@ public class JettyApplicationRequest(
override val local: RequestConnectionPoint = JettyConnectionPoint(request)

override val queryParameters: Parameters by lazy {
encodeParameters(rawQueryParameters)
encodeParameters(rawQueryParameters).toQueryParameters()
}

override val rawQueryParameters: Parameters by lazy(LazyThreadSafetyMode.NONE) {
Expand Down Expand Up @@ -109,7 +114,9 @@ public class JettyHeaders(
}.toSet()
}

override fun getAll(name: String): List<String>? = jettyRequest.headers.getValuesList(name).takeIf { it.isNotEmpty() }
override fun getAll(name: String): List<String>? = jettyRequest.headers.getValuesList(name).takeIf {
it.isNotEmpty()
}

override fun get(name: String): String? = jettyRequest.headers.get(name).takeIf { it.isNotEmpty() }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.jetty.jakarta.JettyWebsocketConnection.Companion.upgradeAndAwait
import io.ktor.server.response.*
import io.ktor.utils.io.*
import io.ktor.utils.io.pool.*
Expand Down Expand Up @@ -70,8 +71,9 @@ public class JettyApplicationResponse(
} finally {
bufferPool.recycle(buffer)
runCatching {
if (!response.isCommitted)
if (!response.isCommitted) {
response.write(true, emptyBuffer, Callback.NOOP)
}
}
}
}
Expand All @@ -82,45 +84,41 @@ public class JettyApplicationResponse(
response.headers.add(name, value)
}

override fun getEngineHeaderNames(): List<String> = response.headers.fieldNamesCollection.toList()
override fun getEngineHeaderValues(name: String): List<String> = response.headers.getValuesList(name)
override fun getEngineHeaderNames(): List<String> =
response.headers.fieldNamesCollection.toList()

override fun getEngineHeaderValues(name: String): List<String> =
response.headers.getValuesList(name)
}

// TODO set idle timeout from websocket config on endpoint
override suspend fun respondUpgrade(upgrade: OutgoingContent.ProtocolUpgrade) {
if (responseJob.isInitialized())
if (responseJob.isInitialized()) {
responseJob.value.cancel()
}

// use the underlying endpoint instance for two-way connection
// Use the underlying endpoint instance for two-way connection
val endpoint = request.connectionMetaData.connection.endPoint
endpoint.idleTimeout = 6000 * 1000

val websocketConnection = JettyWebsocketConnection2(
// An [AbstractConnection] implementation for translating I/O
val websocketConnection = JettyWebsocketConnection(
endpoint,
executor,
bufferPool,
coroutineContext
)

// Finish the current response channel by writing an empty message with last=true
suspendCancellableCoroutine { continuation ->
response.write(true, emptyBuffer, continuation.asCallback())
}

endpoint.upgrade(websocketConnection)

val upgradeJob = upgrade.upgrade(
websocketConnection.inputChannel,
websocketConnection.outputChannel,
coroutineContext,
userContext,
// Start a job for handling the websocket connection and wait for it to finish
upgrade.upgradeAndAwait(
websocketConnection,
userContext
)

upgradeJob.invokeOnCompletion {
websocketConnection.inputChannel.cancel()
websocketConnection.outputChannel.close()
}

upgradeJob.join()
}

override suspend fun respondFromBytes(bytes: ByteArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ internal class JettyKtorHandler(
try {
logError(call, error)
if (!response.isCommitted) {
Response.writeError(request,
Response.writeError(
request,
response,
callback,
HttpStatus.INTERNAL_SERVER_ERROR_500,
Expand Down
Loading

0 comments on commit 828c18d

Please sign in to comment.