Skip to content

Commit

Permalink
finagle-http2: Support configuring encoderEnforceMaxConcurrentStreams…
Browse files Browse the repository at this point in the history
… for http2 connections

Problem:

By default, netty will reject attempts to open more than maxConcurrentStreams
with an exception. Users maybe instead want to queue new streams rather than
rejecting them.

Solution:

Add a param for enabling `encoderEnforceMaxConcurrentStreams` on the netty
Http2FrameCodecBuilder.  Enabling it will configure netty to queue streams
once more than `maxConcurrentStreams` are open.

Differential Revision: https://phabricator.twitter.biz/D643138
  • Loading branch information
Steve Niemitz authored and jenkins committed May 3, 2021
1 parent 0906fc6 commit c6d5f52
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 14 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,14 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

New Features
~~~~~~~~~~~~

* finagle-http2: Added `c.t.f.http2.param.EnforceMaxConcurrentStreams` which allows users to
configure http2 clients to buffer streams once a connection has hit the max concurrent stream
limit rather than rejecting them. A `buffered_streams` gauge has been added to track the
current number of buffered streams. ``PHAB_ID=D643138``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand All @@ -20,6 +28,9 @@ Bug Fixes
be properly seen as a `c.t.f.ChannelClosedException` instead of a
`c.t.f.UnknownChannelException`. ``PHAB_ID=D661550``

* finagle-http2: The `streams` gauge is now correctly added for http2 connections over TLS.
``PHAB_ID=D643138``

21.4.0
------

Expand Down
3 changes: 3 additions & 0 deletions doc/src/sphinx/metrics/Http2.rst
Expand Up @@ -19,6 +19,9 @@
**<client_label>/streams**
A gauge exposing the number of opened streams on the client.

**<client_label>/buffered_streams**
A gauge exposing the number of buffered streams on the client.

**<client_label>/dead_session**
A counter of the number of closed sessions evicted.

Expand Down
@@ -1,27 +1,17 @@
package com.twitter.finagle.http2

import com.twitter.finagle.http2.param._
import com.twitter.finagle.Stack
import com.twitter.finagle.http2.param.{
EncoderIgnoreMaxHeaderListSize,
FrameLoggerNamePrefix,
FrameLogging,
HeaderSensitivity
}
import com.twitter.finagle.param.Stats
import com.twitter.finagle.stats.StatsReceiver
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{
Channel,
ChannelFuture,
ChannelFutureListener,
ChannelHandler,
ChannelInitializer
}
import io.netty.channel._
import io.netty.handler.codec.http2.{
Http2FrameCodec,
Http2FrameCodecBuilder,
Http2HeadersEncoder,
Http2MultiplexHandler
Http2MultiplexHandler,
StreamBufferingEncoder
}

/**
Expand Down Expand Up @@ -57,6 +47,29 @@ private object MultiplexHandlerBuilder {
channel.closeFuture.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture): Unit = streams.remove()
})

addBufferedStreamsGaugeIfNeeded(statsReceiver, frameCodec, channel)
}

private def addBufferedStreamsGaugeIfNeeded(
statsReceiver: StatsReceiver,
frameCodec: Http2FrameCodec,
channel: Channel
): Unit = {
frameCodec.encoder() match {
case sb: StreamBufferingEncoder =>
// scalafix:off StoreGaugesAsMemberVariables
val bufferedStreams = statsReceiver.addGauge("buffered_streams") {
sb.numBufferedStreams()
}
// scalafix:on StoreGaugesAsMemberVariables

channel
.closeFuture().addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = bufferedStreams.remove()
})
case _ => // noop
}
}

/** Construct a `Http2MultiplexHandler` for server pipelines */
Expand Down Expand Up @@ -103,6 +116,10 @@ private object MultiplexHandlerBuilder {
)
.headerSensitivityDetector(detector(params))

if (params[EnforceMaxConcurrentStreams].enabled) {
builder.encoderEnforceMaxConcurrentStreams(true)
}

if (params[FrameLogging].enabled) {
builder.frameLogger(
new LoggerPerFrameTypeLogger(params[FrameLoggerNamePrefix].loggerNamePrefix))
Expand Down
Expand Up @@ -71,6 +71,20 @@ object InitialWindowSize {
implicit val param = Stack.Param(InitialWindowSize(None))
}

/**
* A class for configuring overrides to the default `encoderEnforceMaxConcurrentStreams` setting. If
* enabled the encoder will queue frames if the maximum number of concurrent streams would otherwise
* be exceeded.
*/
final case class EnforceMaxConcurrentStreams(enabled: Boolean) {
def mk(): (EnforceMaxConcurrentStreams, Stack.Param[EnforceMaxConcurrentStreams]) =
(this, EnforceMaxConcurrentStreams.param)
}

object EnforceMaxConcurrentStreams {
implicit val param = Stack.Param(EnforceMaxConcurrentStreams(enabled = false))
}

/**
* A class for configuring overrides to the default maxFrameSize setting.
*/
Expand Down
Expand Up @@ -120,6 +120,7 @@ object TlsTransporter {
val (codec, handler) = MultiplexHandlerBuilder.clientFrameCodec(params, None)
val streamChannelInit = H2StreamChannelInit.initClient(params)
val pingDetectionHandler = new H2ClientFilter(params)
MultiplexHandlerBuilder.addStreamsGauge(params[Stats].statsReceiver, codec, channel)

channel.pipeline
.addLast(Http2CodecName, codec)
Expand Down

0 comments on commit c6d5f52

Please sign in to comment.