Skip to content

Commit

Permalink
KTOR-4475 Limit current running requests (#3065)
Browse files Browse the repository at this point in the history
* KTOR-4475 Limit current running requests
  • Loading branch information
rsinukov committed Jun 22, 2022
1 parent 30e05ae commit 8214a32
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class NettyApplicationEngine(
/**
* Number of concurrently running requests from the same http pipeline
*/
public var runningLimit: Int = 10
public var runningLimit: Int = 32

/**
* Do not create separate call event group and reuse worker group for processing calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,6 @@ public class NettyChannelInitializer(
) : ChannelInitializer<SocketChannel>() {
private var sslContext: SslContext? = null

internal constructor(
enginePipeline: EnginePipeline,
environment: ApplicationEngineEnvironment,
callEventGroup: EventExecutorGroup,
engineContext: CoroutineContext,
userContext: CoroutineContext,
connector: EngineConnectorConfig,
requestQueueLimit: Int,
runningLimit: Int,
responseWriteTimeout: Int,
requestReadTimeout: Int,
httpServerCodec: () -> HttpServerCodec
) : this(
enginePipeline,
environment,
callEventGroup,
engineContext,
userContext,
connector,
requestQueueLimit,
runningLimit,
responseWriteTimeout,
requestReadTimeout,
httpServerCodec,
{}
)

init {
if (connector is EngineSSLConnectorConfig) {

Expand Down Expand Up @@ -144,7 +117,8 @@ public class NettyChannelInitializer(
environment,
callEventGroup,
engineContext,
userContext
userContext,
runningLimit
)

with(pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ internal class NettyHttpResponsePipeline constructor(
null
}

httpHandler.activeRequests.decrementAndGet()
httpHandler.onLastResponseMessage(context)
call.finishedEvent.setSuccess()

lastMessageFuture?.addListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ internal class RequestBodyHandler(
current.close()
current = null
}
requestMoreEvents()
}
is ByteBuf -> {
val channel =
current ?: throw IllegalStateException("No current channel but received a byte buf")
processContent(channel, event)
requestMoreEvents()
}
is ByteWriteChannel -> {
current?.close()
Expand All @@ -59,7 +61,6 @@ internal class RequestBodyHandler(
upgraded = true
}
}
requestMoreEvents()
}
} catch (t: Throwable) {
queue.close(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ internal class NettyHttp1Handler(
private val environment: ApplicationEngineEnvironment,
private val callEventGroup: EventExecutorGroup,
private val engineContext: CoroutineContext,
private val userContext: CoroutineContext
private val userContext: CoroutineContext,
private val runningLimit: Int
) : ChannelInboundHandlerAdapter(), CoroutineScope {
private val handlerJob = CompletableDeferred<Nothing>()

override val coroutineContext: CoroutineContext get() = handlerJob

private var skipEmpty = false
private val skippedRead: AtomicBoolean = atomic(false)

private lateinit var responseWriter: NettyHttpResponsePipeline

private var currentRequest: ByteReadChannel? = null

/**
* Represents current number of processing requests
*/
Expand Down Expand Up @@ -82,12 +82,12 @@ internal class NettyHttp1Handler(
activeRequests.incrementAndGet()

handleRequest(context, message)
context.read()
callReadIfNeeded(context)
}
message is LastHttpContent && !message.content().isReadable && skipEmpty -> {
skipEmpty = false
message.release()
context.read()
callReadIfNeeded(context)
}
else -> {
context.fireChannelRead(message)
Expand Down Expand Up @@ -140,8 +140,6 @@ internal class NettyHttp1Handler(
null
}
else -> prepareRequestContentChannel(context, message)
}?.also {
currentRequest = it
}

return NettyHttp1ApplicationCall(
Expand All @@ -166,4 +164,21 @@ internal class NettyHttp1Handler(
}
}
}

private fun callReadIfNeeded(context: ChannelHandlerContext) {
if (activeRequests.value < runningLimit) {
context.read()
skippedRead.value = false
} else {
skippedRead.value = true
}
}

internal fun onLastResponseMessage(context: ChannelHandlerContext) {
activeRequests.decrementAndGet()

if (skippedRead.compareAndSet(expect = false, update = true) && activeRequests.value < runningLimit) {
context.read()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,6 @@ public class HighLoadHttpGenerator(
}
}

// private fun findEol0(bb: ByteBuffer) {
// if (scan(bb) { it == N }) {
// parseState = ParseState.HTTP
// tokenSize = 0
// }
// }

private fun findEol(bb: ByteBuffer) {
val position = bb.position()
val limit = bb.limit()
Expand Down

0 comments on commit 8214a32

Please sign in to comment.