Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-4475 Limit current running requests #3065

Merged
merged 2 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class NettyApplicationEngine(
/**
* Number of concurrently running requests from the same http pipeline
*/
public var runningLimit: Int = 10
public var runningLimit: Int = 30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would increase it at least to 32 or 64


/**
* Do not create separate call event group and reuse worker group for processing calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,6 @@ public class NettyChannelInitializer(
) : ChannelInitializer<SocketChannel>() {
private var sslContext: SslContext? = null

internal constructor(
enginePipeline: EnginePipeline,
environment: ApplicationEngineEnvironment,
callEventGroup: EventExecutorGroup,
engineContext: CoroutineContext,
userContext: CoroutineContext,
connector: EngineConnectorConfig,
requestQueueLimit: Int,
runningLimit: Int,
responseWriteTimeout: Int,
requestReadTimeout: Int,
httpServerCodec: () -> HttpServerCodec
) : this(
enginePipeline,
environment,
callEventGroup,
engineContext,
userContext,
connector,
requestQueueLimit,
runningLimit,
responseWriteTimeout,
requestReadTimeout,
httpServerCodec,
{}
)

init {
if (connector is EngineSSLConnectorConfig) {

Expand Down Expand Up @@ -144,7 +117,8 @@ public class NettyChannelInitializer(
environment,
callEventGroup,
engineContext,
userContext
userContext,
runningLimit
)

with(pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ internal class NettyHttpResponsePipeline constructor(
null
}

httpHandler.activeRequests.decrementAndGet()
httpHandler.onLastResponseMessage(context)
call.finishedEvent.setSuccess()

lastMessageFuture?.addListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ internal class RequestBodyHandler(
current.close()
current = null
}
requestMoreEvents()
}
is ByteBuf -> {
val channel =
current ?: throw IllegalStateException("No current channel but received a byte buf")
processContent(channel, event)
requestMoreEvents()
}
is ByteWriteChannel -> {
current?.close()
Expand All @@ -59,7 +61,6 @@ internal class RequestBodyHandler(
upgraded = true
}
}
requestMoreEvents()
}
} catch (t: Throwable) {
queue.close(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ internal class NettyHttp1Handler(
private val environment: ApplicationEngineEnvironment,
private val callEventGroup: EventExecutorGroup,
private val engineContext: CoroutineContext,
private val userContext: CoroutineContext
private val userContext: CoroutineContext,
private val runningLimit: Int
) : ChannelInboundHandlerAdapter(), CoroutineScope {
private val handlerJob = CompletableDeferred<Nothing>()

override val coroutineContext: CoroutineContext get() = handlerJob

private var skipEmpty = false
private val skippedRead: AtomicBoolean = atomic(false)

private lateinit var responseWriter: NettyHttpResponsePipeline

private var currentRequest: ByteReadChannel? = null

/**
* Represents current number of processing requests
*/
Expand Down Expand Up @@ -82,12 +82,12 @@ internal class NettyHttp1Handler(
activeRequests.incrementAndGet()

handleRequest(context, message)
context.read()
callReadIfNeeded(context)
}
message is LastHttpContent && !message.content().isReadable && skipEmpty -> {
skipEmpty = false
message.release()
context.read()
callReadIfNeeded(context)
}
else -> {
context.fireChannelRead(message)
Expand Down Expand Up @@ -140,8 +140,6 @@ internal class NettyHttp1Handler(
null
}
else -> prepareRequestContentChannel(context, message)
}?.also {
currentRequest = it
}

return NettyHttp1ApplicationCall(
Expand All @@ -166,4 +164,21 @@ internal class NettyHttp1Handler(
}
}
}

private fun callReadIfNeeded(context: ChannelHandlerContext) {
if (activeRequests.value < runningLimit) {
context.read()
skippedRead.value = false
} else {
skippedRead.value = true
}
}

internal fun onLastResponseMessage(context: ChannelHandlerContext) {
activeRequests.decrementAndGet()

if (skippedRead.compareAndSet(expect = false, update = true) && activeRequests.value < runningLimit) {
context.read()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,6 @@ public class HighLoadHttpGenerator(
}
}

// private fun findEol0(bb: ByteBuffer) {
// if (scan(bb) { it == N }) {
// parseState = ParseState.HTTP
// tokenSize = 0
// }
// }

private fun findEol(bb: ByteBuffer) {
val position = bb.position()
val limit = bb.limit()
Expand Down