Skip to content

Commit

Permalink
[finagle-http2]: Add a per session request limit option to http2
Browse files Browse the repository at this point in the history
See #954 for original description,
and included imagery.

Problem:
When running finagle in a setup using HTTP/2 via application load
balancers (ALB), the session is very likely subject to a limit on the number of
requests. E.g. when running via NGINX, the request limit is 1K requests per
connection by default. On AWS, ALB have a 10k request limit. This means that in
setups that use finagle and ALBs for their L7 features, the session is subject
to races happening on connection close regularly.
Netty fires inactive channel for any request that is in flight for the closed
session, leading Finagle to propagate a ChannelClosedException. There is no
possible remediation for this race in Netty since Finagle manages
sessions/connections.

This condition can be easily reproduced by running a Finagle server & client
with an ALB like NGINX in between, sending concurrent requests up until the
limit.

Solution:
Allow configuration of a MaxRequestsPerSession option that aligns a session with
underlying request limits. The number ofMaxConcurrentStreams is deducted to
account for any requests in flight. When this number of requests is reached,
mark the session as closed so it is shut down and a new session is created.

This option is off by default and therefore opt-in.

Result
When the configured number of requests is reached, the session is terminated and
a new session is opened

Closes #954

Differential Revision: https://phabricator.twitter.biz/D1107613
  • Loading branch information
mattdickinson5 authored and jenkins committed Nov 1, 2023
1 parent 0c3a8a3 commit 610a21c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -13,6 +13,8 @@ New Features
* finagle-stats: A new implementation of BucketedHistogram that works better under high contention, but requires
more memory. ``PHAB_ID=D1028784``

* finagle-http2: Add a per session request limit option to http2. ``PHAB_ID=D1107613``


Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Expand Up @@ -97,6 +97,20 @@ object MaxFrameSize {
implicit val param = Stack.Param(MaxFrameSize(None))
}

/*
* A class for configuring the maximum number of requests per connection
* MaxConcurrentStreams will be deducted from this number to allow requests
* in flight up to this number.
*/
case class MaxRequestsPerSession(maxRequestsPerSession: Option[Long]) {
def mk(): (MaxRequestsPerSession, Stack.Param[MaxRequestsPerSession]) =
(this, MaxRequestsPerSession.param)
}

object MaxRequestsPerSession {
implicit val param: Stack.Param[MaxRequestsPerSession] = Stack.Param(MaxRequestsPerSession(None))
}

/**
* A class for configuring overrides to the default maxHeaderListSize setting.
*/
Expand Down
@@ -1,23 +1,26 @@
package com.twitter.finagle.http2.transport.client

import com.twitter.finagle.http2.DeadConnectionException
import com.twitter.finagle.http2.param.MaxConcurrentStreams
import com.twitter.finagle.http2.param.MaxRequestsPerSession
import com.twitter.finagle.netty4.Netty4Transporter
import com.twitter.finagle.netty4.param.Allocator
import com.twitter.finagle.transport.Transport
import com.twitter.finagle.{Failure, FailureFlags, Stack, Status}
import com.twitter.util.{Future, Promise, Time}
import io.netty.channel.{
Channel,
ChannelFuture,
ChannelFutureListener,
ChannelInitializer,
ChannelOption
}
import io.netty.handler.codec.http2.{
Http2FrameCodec,
Http2StreamChannel,
Http2StreamChannelBootstrap
}
import com.twitter.finagle.Failure
import com.twitter.finagle.FailureFlags
import com.twitter.finagle.Stack
import com.twitter.finagle.Status
import com.twitter.util.Future
import com.twitter.util.Promise
import com.twitter.util.Time
import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.handler.codec.http2.Http2FrameCodec
import io.netty.handler.codec.http2.Http2StreamChannel
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap
import io.netty.util
import io.netty.util.concurrent.GenericFutureListener
import java.lang.{Boolean => JBool}
Expand All @@ -30,7 +33,8 @@ private final class ClientSessionImpl(
failureDetectorStatus: () => Status)
extends ClientSession {

import ClientSessionImpl.StreamHighWaterMark
import ClientSessionImpl.DefaultStreamHighWaterMark
import ClientSessionImpl.DefaultMaxConcurrentStreams

// For the client we want to consider the status of the session.
private[this] final class ChildTransport(ch: Channel) extends StreamChannelTransport(ch) {
Expand All @@ -50,6 +54,15 @@ private final class ClientSessionImpl(
codec
}

private[this] val streamHighWaterMark: Long = {
val maxConcurrentStreams: Long =
params[MaxConcurrentStreams].maxConcurrentStreams.getOrElse(DefaultMaxConcurrentStreams)
val streamHighWaterMark: Long = params[MaxRequestsPerSession].maxRequestsPerSession
.map(r => r * 2).getOrElse(DefaultStreamHighWaterMark)

streamHighWaterMark - 2 * maxConcurrentStreams
}

private[this] val bootstrap: Http2StreamChannelBootstrap = {
// Note that we need to invert the boolean since auto read means no backpressure.
val streamAutoRead = !params[Netty4Transporter.Backpressure].backpressure
Expand Down Expand Up @@ -96,9 +109,9 @@ private final class ClientSessionImpl(
// However, since `status` is racy anyway we tolerate it as fixing it would be much
// more complex.
if (!channel.isOpen) Status.Closed
// We're nearly out of stream ID's so signal closed so that the pooling layers will
// shut us down and start up a new session.
else if (codec.connection.local.lastStreamCreated > StreamHighWaterMark) Status.Closed
// We've reached the stream high watermark for this session. Signal closed so that
// the pooling layers will shut us down and start up a new session.
else if (codec.connection.local.lastStreamCreated > streamHighWaterMark) Status.Closed
// If we've received a GOAWAY frame we shouldn't attempt to open any new streams.
else if (codec.connection.goAwayReceived) Status.Closed
// If we can't open a stream that means that the maximum number of outstanding
Expand Down Expand Up @@ -179,9 +192,11 @@ private final class ClientSessionImpl(

private object ClientSessionImpl {

// The max stream id is the maximum possible 31-bit unsigned integer. We want to
// close before that to avoid races so we've arbitrarily picked 50 remaining
// The default max stream id is the maximum possible 31-bit unsigned integer. We want to
// close before that
private val DefaultStreamHighWaterMark: Int = Int.MaxValue
// To avoid races so we've arbitrarily picked 50 remaining
// streams (client initiated stream id's are odd numbered so we multiply by 2) as
// the high water mark before we signal that this session is closed for business.
private val StreamHighWaterMark: Int = Int.MaxValue - 100
private val DefaultMaxConcurrentStreams: Int = 50
}
Expand Up @@ -2,17 +2,22 @@ package com.twitter.finagle.http2.transport.client

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.http.Request
import com.twitter.finagle.http2.param.MaxConcurrentStreams
import com.twitter.finagle.http2.param.MaxRequestsPerSession
import com.twitter.finagle.netty4.http.Bijections
import com.twitter.finagle.{FailureFlags, Stack, Status}
import com.twitter.util.{Await, Awaitable}
import com.twitter.finagle.FailureFlags
import com.twitter.finagle.Stack
import com.twitter.finagle.Status
import com.twitter.util.Await
import com.twitter.util.Awaitable
import io.netty.buffer.Unpooled
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.channel.{Channel, ChannelHandler, ChannelInitializer}
import io.netty.handler.codec.http2.{
Http2MultiplexCodec,
Http2MultiplexCodecBuilder,
Http2StreamFrameToHttpObjectCodec
}
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelInitializer
import io.netty.handler.codec.http2.Http2MultiplexCodec
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec
import org.scalatest.funsuite.AnyFunSuite

class ClientSessionImplTest extends AnyFunSuite {
Expand Down Expand Up @@ -111,9 +116,14 @@ class ClientSessionImplTest extends AnyFunSuite {
}
}

test("Status is Closed when we're less than 50 streams away from exhausting the identifiers") {
test(
"By default status is Closed when we're less than 50 streams away from exhausting the identifiers") {
new Ctx {
assert(clientSession.status == Status.Open)

multiplexCodec.connection.local.createStream(Int.MaxValue - 100, false)
assert(clientSession.status == Status.Open)

// client streams are odd streams so to be less than 50 we need to multiply by 2.
multiplexCodec.connection.local.createStream(Int.MaxValue - 100 + 2, false)
assert(clientSession.status == Status.Closed)
Expand All @@ -131,6 +141,23 @@ class ClientSessionImplTest extends AnyFunSuite {
}
}

test("Status is closed when we have exhausted the max requests per session limit") {
new Ctx {
override def params: Stack.Params = Stack.Params.empty
.+(MaxConcurrentStreams(Option(128L))).+(
MaxRequestsPerSession(Option(50000L))
)
assert(clientSession.status == Status.Open)

private val expectedHighWatermark: Int = 50000 * 2 - 128 * 2
multiplexCodec.connection.local.createStream(expectedHighWatermark - 1, false)
assert(clientSession.status == Status.Open)

multiplexCodec.connection.local.createStream(expectedHighWatermark + 1, false)
assert(clientSession.status == Status.Closed)
}
}

test("dispatching results in a rejection if we have exhausted the max concurrent stream limit") {
new Ctx {
assert(clientSession.status == Status.Open)
Expand Down

0 comments on commit 610a21c

Please sign in to comment.