Skip to content

Commit d014b91

Browse files
committed
fixup! KTOR-6734 Fixes for Jetty 12 upgrade
1 parent 03668de commit d014b91

File tree

24 files changed

+489
-258
lines changed

24 files changed

+489
-258
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.server.jetty.jakarta
6+
7+
import kotlinx.io.IOException
8+
import org.eclipse.jetty.util.Callback
9+
import kotlin.coroutines.Continuation
10+
import kotlin.coroutines.resume
11+
import kotlin.coroutines.resumeWithException
12+
13+
/**
14+
* Jetty works with a `Callback` type which succeeds or fails similar to a coroutine continuation.
15+
*/
16+
internal fun Continuation<Unit>.asCallback(): Callback = object : Callback {
17+
override fun failed(x: Throwable?) {
18+
resumeWithException(x ?: IOException("Failed with no exception"))
19+
}
20+
override fun succeeded() {
21+
resume(Unit)
22+
}
23+
}

ktor-server/ktor-server-jetty-jakarta/jvm/src/io/ktor/server/jetty/jakarta/JettyApplicationCall.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,24 @@ import io.ktor.server.engine.*
99
import io.ktor.utils.io.*
1010
import org.eclipse.jetty.server.Request
1111
import org.eclipse.jetty.server.Response
12+
import java.util.concurrent.Executor
1213
import kotlin.coroutines.CoroutineContext
1314

1415
@InternalAPI
1516
public class JettyApplicationCall(
1617
application: Application,
1718
request: Request,
1819
response: Response,
20+
engineExecutor: Executor,
21+
engineDispatcher: CoroutineContext,
22+
appDispatcher: CoroutineContext,
1923
override val coroutineContext: CoroutineContext
2024
) : BaseApplicationCall(application) {
2125

2226
override val request: JettyApplicationRequest =
2327
JettyApplicationRequest(this, request)
2428
override val response: JettyApplicationResponse =
25-
JettyApplicationResponse(this, request, response, coroutineContext)
29+
JettyApplicationResponse(this, request, response, engineExecutor, engineDispatcher, appDispatcher)
2630

2731
init {
2832
putResponseAttribute()

ktor-server/ktor-server-jetty-jakarta/jvm/src/io/ktor/server/jetty/jakarta/JettyApplicationEngineBase.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public open class JettyApplicationEngineBase(
7171
configuration.shutdownTimeout
7272
)
7373

74-
val connectors = server.connectors.zip(configuration.connectors)
74+
val connectors = server.connectors
75+
.zip(configuration.connectors)
7576
.map { it.second.withPort((it.first as ServerConnector).localPort) }
7677
resolvedConnectorsDeferred.complete(connectors)
7778

ktor-server/ktor-server-jetty-jakarta/jvm/src/io/ktor/server/jetty/jakarta/JettyApplicationRequest.kt

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,68 @@ import io.ktor.server.application.*
99
import io.ktor.server.engine.*
1010
import io.ktor.server.request.*
1111
import io.ktor.utils.io.*
12-
import io.ktor.utils.io.jvm.javaio.*
13-
import org.eclipse.jetty.io.*
14-
import org.eclipse.jetty.server.*
12+
import kotlinx.coroutines.CoroutineName
13+
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.suspendCancellableCoroutine
15+
import kotlinx.io.EOFException
16+
import kotlinx.io.IOException
17+
import org.eclipse.jetty.server.Request
18+
import kotlin.coroutines.resume
1519

1620
@InternalAPI
1721
public class JettyApplicationRequest(
1822
call: PipelineCall,
1923
request: Request
2024
) : BaseApplicationRequest(call) {
2125

26+
// See https://jetty.org/docs/jetty/12/programming-guide/arch/io.html#content-source
27+
private val requestBodyJob: WriterJob =
28+
call.writer(Dispatchers.IO + CoroutineName("request-reader")) {
29+
val contentLength = if (request.headers.contains(HttpHeaders.ContentLength)) {
30+
request.headers.get(HttpHeaders.ContentLength)?.toLong()
31+
} else null
32+
33+
var bytesRead = 0L
34+
while (true) {
35+
when(val chunk = request.read()) {
36+
// nothing available, suspend for more content
37+
null -> {
38+
suspendCancellableCoroutine { continuation ->
39+
request.demand { continuation.resume(Unit) }
40+
}
41+
}
42+
// read the chunk, exit and close channel if last chunk or failure
43+
else -> {
44+
with(chunk) {
45+
if (failure != null) {
46+
if (isLast)
47+
throw failure
48+
call.application.log.warn("Recoverable error reading request body; continuing", failure)
49+
} else {
50+
bytesRead += byteBuffer.remaining()
51+
channel.writeFully(byteBuffer)
52+
release()
53+
if (contentLength != null && bytesRead > contentLength) {
54+
channel.cancel(IOException("Request body exceeded content length limit"))
55+
}
56+
if (isLast) {
57+
if (contentLength != null && bytesRead < contentLength) {
58+
channel.cancel(EOFException("Expected $contentLength bytes, received only $bytesRead"))
59+
}
60+
return@writer
61+
}
62+
}
63+
}
64+
}
65+
}
66+
}
67+
}
68+
2269
override val cookies: RequestCookies = JettyRequestCookies(this, request)
2370

2471
override val engineHeaders: Headers = JettyHeaders(request)
2572

26-
override val engineReceiveChannel: ByteReadChannel = Content.Source.asInputStream(request).toByteReadChannel()
73+
override val engineReceiveChannel: ByteReadChannel by lazy { requestBodyJob.channel }
2774

2875
override val local: RequestConnectionPoint = JettyConnectionPoint(request)
2976

ktor-server/ktor-server-jetty-jakarta/jvm/src/io/ktor/server/jetty/jakarta/JettyApplicationResponse.kt

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,23 @@ import io.ktor.server.engine.*
1111
import io.ktor.server.response.*
1212
import io.ktor.utils.io.*
1313
import io.ktor.utils.io.pool.*
14-
import kotlinx.coroutines.CoroutineName
15-
import kotlinx.coroutines.CoroutineScope
16-
import kotlinx.coroutines.Dispatchers
17-
import kotlinx.coroutines.InternalCoroutinesApi
14+
import kotlinx.coroutines.*
15+
import kotlinx.io.InternalIoApi
1816
import org.eclipse.jetty.server.Request
1917
import org.eclipse.jetty.server.Response
2018
import org.eclipse.jetty.util.Callback
2119
import java.nio.ByteBuffer
22-
import java.nio.ByteBuffer.allocate
20+
import java.util.concurrent.Executor
2321
import kotlin.coroutines.CoroutineContext
24-
import kotlin.coroutines.resume
25-
import kotlin.coroutines.resumeWithException
26-
import kotlin.coroutines.suspendCoroutine
2722

2823
@InternalAPI
2924
public class JettyApplicationResponse(
3025
call: PipelineCall,
3126
private val request: Request,
3227
private val response: Response,
33-
override val coroutineContext: CoroutineContext
28+
private val executor: Executor,
29+
override val coroutineContext: CoroutineContext,
30+
private val userContext: CoroutineContext,
3431
) : BaseApplicationResponse(call), CoroutineScope {
3532
private companion object {
3633
private val bufferPool = ByteBufferPool(bufferSize = 8192)
@@ -48,42 +45,33 @@ public class JettyApplicationResponse(
4845
}
4946
}
5047

51-
@OptIn(InternalCoroutinesApi::class)
48+
@OptIn(InternalCoroutinesApi::class, InternalIoApi::class)
5249
private val responseJob: Lazy<ReaderJob> = lazy {
5350
reader(Dispatchers.IO + coroutineContext + CoroutineName("response-writer")) {
5451
var count = 0
5552
val buffer = bufferPool.borrow()
5653
try {
57-
while(true) {
58-
when(val current = channel.readAvailable(buffer)) {
54+
while (true) {
55+
when (val current = channel.readAvailable(buffer)) {
5956
-1 -> break
60-
0 -> {
61-
channel.awaitContent()
62-
continue
63-
}
57+
0 -> continue
6458
else -> count += current
6559
}
6660

67-
suspendCoroutine<Unit> { continuation ->
68-
try {
69-
response.write(
70-
channel.isClosedForRead,
71-
buffer.flip(),
72-
Callback.from {
73-
buffer.flip()
74-
continuation.resume(Unit)
75-
}
76-
)
77-
} catch (cause: Throwable) {
78-
continuation.resumeWithException(cause)
79-
}
61+
suspendCancellableCoroutine { continuation ->
62+
response.write(
63+
channel.isClosedForRead,
64+
buffer.flip(),
65+
continuation.asCallback()
66+
)
8067
}
68+
buffer.compact()
8169
}
8270
} finally {
8371
bufferPool.recycle(buffer)
8472
runCatching {
8573
if (!response.isCommitted)
86-
response.write(true, allocate(0), Callback.NOOP)
74+
response.write(true, emptyBuffer, Callback.NOOP)
8775
}
8876
}
8977
}
@@ -98,23 +86,37 @@ public class JettyApplicationResponse(
9886
override fun getEngineHeaderValues(name: String): List<String> = response.headers.getValuesList(name)
9987
}
10088

89+
// TODO set idle timeout from websocket config on endpoint
10190
override suspend fun respondUpgrade(upgrade: OutgoingContent.ProtocolUpgrade) {
102-
val connection = request.connectionMetaData.connection
103-
val endpoint = connection.endPoint
91+
if (responseJob.isInitialized())
92+
responseJob.value.cancel()
93+
94+
// use the underlying endpoint instance for two-way connection
95+
val endpoint = request.connectionMetaData.connection.endPoint
10496
endpoint.idleTimeout = 6000 * 1000
10597

106-
val websocketConnection = JettyWebsocketConnection(endpoint, coroutineContext)
107-
response.write(true, allocate(0), Callback.from { endpoint.upgrade(websocketConnection) })
98+
val websocketConnection = JettyWebsocketConnection2(
99+
endpoint,
100+
executor,
101+
bufferPool,
102+
coroutineContext
103+
)
104+
105+
suspendCancellableCoroutine { continuation ->
106+
response.write(true, emptyBuffer, continuation.asCallback())
107+
}
108+
109+
endpoint.upgrade(websocketConnection)
108110

109111
val upgradeJob = upgrade.upgrade(
110112
websocketConnection.inputChannel,
111113
websocketConnection.outputChannel,
112114
coroutineContext,
113-
coroutineContext,
115+
userContext,
114116
)
115117

116118
upgradeJob.invokeOnCompletion {
117-
websocketConnection.inputChannel.close()
119+
websocketConnection.inputChannel.cancel()
118120
websocketConnection.outputChannel.close()
119121
}
120122

@@ -126,7 +128,7 @@ public class JettyApplicationResponse(
126128
}
127129

128130
override suspend fun respondNoContent(content: OutgoingContent.NoContent) {
129-
response.write(true, allocate(0), Callback.NOOP)
131+
response.write(true, emptyBuffer, Callback.NOOP)
130132
}
131133

132134
override suspend fun responseChannel(): ByteWriteChannel =
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.server.jetty.jakarta
6+
7+
import io.ktor.utils.io.*
8+
import io.ktor.utils.io.pool.*
9+
import kotlinx.coroutines.CoroutineName
10+
import kotlinx.coroutines.CoroutineScope
11+
import kotlinx.coroutines.Dispatchers
12+
import kotlinx.coroutines.suspendCancellableCoroutine
13+
import org.eclipse.jetty.io.EndPoint
14+
import kotlin.coroutines.CoroutineContext
15+
16+
public fun CoroutineScope.connect(
17+
endpoint: EndPoint,
18+
coroutineNamePrefix: String,
19+
bufferPool: ByteBufferPool,
20+
coroutineContext: CoroutineContext
21+
): Pair<WriterJob, ReaderJob> {
22+
val inputJob: WriterJob =
23+
writer(Dispatchers.IO + coroutineContext + CoroutineName("$coroutineNamePrefix-input")) {
24+
val buffer = bufferPool.borrow()
25+
try {
26+
while (true) {
27+
suspendCancellableCoroutine { continuation ->
28+
endpoint.tryFillInterested(continuation.asCallback())
29+
}
30+
when (endpoint.fill(buffer)) {
31+
-1 -> break
32+
0 -> continue
33+
else -> {}
34+
}
35+
channel.writeFully(buffer.flip())
36+
buffer.compact()
37+
}
38+
} finally {
39+
bufferPool.recycle(buffer)
40+
}
41+
}
42+
43+
val outputJob: ReaderJob =
44+
reader(Dispatchers.IO + coroutineContext + CoroutineName("$coroutineNamePrefix-output")) {
45+
val buffer = bufferPool.borrow()
46+
try {
47+
while (true) {
48+
when (channel.readAvailable(buffer)) {
49+
-1 -> break
50+
0 -> continue
51+
else -> {}
52+
}
53+
suspendCancellableCoroutine<Unit> { continuation ->
54+
endpoint.write(continuation.asCallback(), buffer.flip())
55+
}
56+
buffer.compact()
57+
}
58+
} finally {
59+
bufferPool.recycle(buffer)
60+
}
61+
}
62+
63+
return inputJob to outputJob
64+
}

0 commit comments

Comments
 (0)