Skip to content

Commit

Permalink
finagle: Add statsFilter for streaming
Browse files Browse the repository at this point in the history
Problem

The Finagle Http StatsFilter is only enabled with `withHttpStats`, however, we
need all stream related stats to be populated as long as `isChunked` is set to
true in request or response.

Solution

Have a StatsFilter dedicated for stream metrics.

Result

Created a StreamingStatsFilter, moved stream duration metrics from
`http/StatsFilter` to it, added counters for closed streams, total streams, and
stream failures.

Note: We are collecting stream failures metrics in `HttpClientDispatcher` and
`HttpServerDispatcher` as `stream/failures/<exception_name>` and
`stream/failures`. However, we don't distinguish a request stream failure from a
response stream failure. Also, maintaining stats in a dispatcher is expensive as
the stats are initialized for every connection. The plan is to deprecate these
counters after this change is approved.

JIRA Issues: CSL-8041

Differential Revision: https://phabricator.twitter.biz/D315041
  • Loading branch information
jyanJing authored and jenkins committed Jun 1, 2019
1 parent f175cea commit d9b69bd
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 60 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,26 @@ Unreleased
New Features
~~~~~~~~~~~~

* finagle-http: Added counters for request/response stream as: `stream/request/closed`,
`stream/request/failures`, `stream/request/failures/<exception_name>`, `stream/request/opened`,
`stream/request/pending` and `stream/response/closed`, `stream/response/failures`,
`stream/response/failures/<exception_name>`, `stream/response/opened`, `stream/response/pending`.
The counters will be populated when `isChunked` is set to true, the failures counters will be
populated when `isChunked` is set to true and the stream fails before it has been fully read in the
request and response respectively. ``PHAB_ID=D315041``

* finagle-http: Add two new API variants in `CookieMap`: `addAll` and `removeAll` that allow for
adding and removing cookies in bulk, without triggering a header rewrite on each item.
``PHAB_ID=D318013``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle-http: Rename `request_stream_duration_ms` to `stream/request/duration_ms` and
`response_stream_duration_ms` to `stream/response/duration_ms`. The stats will be
populated when `isChunked` is set to true in the request and response respectively.
``PHAB_ID=D315041``

* finagle: Upgrade to Netty 4.1.35.Final and netty-tcnative 2.0.25.Final.
``PHAB_ID=D312439``

Expand Down
79 changes: 69 additions & 10 deletions doc/src/sphinx/Metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,71 @@ These metrics correspond to :ref:`feature toggles <toggles>`.
A gauge summarizing the current state of a `ToggleMap` which may be useful
for comparing state across a cluster or over time.

Streaming
---------

These metrics are added by
:finagle-http-src:`StreamingStatsFilter <com/twitter/finagle/http/filter/StreamingStatsFilter.scala>`
and can be enabled by setting `isChunked` as true on Http request and response.

**stream/request/closed**
A counter of the number of closed request streams.

**stream/request/duration_ms**
A histogram of the duration of the lifetime of request streams, from the time a stream is
initialized until it's closed, in milliseconds.

**stream/request/failures**
A counter of the number of times any failure has been observed in the middle of a request stream.

**stream/request/failures/<exception_name>**
A counter of the number of times a specific exception has been thrown in the middle of a request
stream.

**stream/request/opened**
A counter of the number of opened request streams.

**stream/request/pending**
A gauge of the number of pending request streams.

**stream/response/closed**
A counter of the number of closed response streams.

**stream/response/duration_ms**
A histogram of the duration of the lifetime of response streams, from the time a stream is
initialized until it's closed, in milliseconds.

**stream/response/failures**
A counter of the number of times any failure has been observed in the middle of a response stream.

**stream/response/failures/<exception_name>**
A counter of the number of times a specific exception has been thrown in the middle of a response
stream.

**stream/response/opened**
A counter of the number of opened response streams.

**stream/response/pending**
A gauge of the number of pending response streams.

You could derive the streaming success rate of:
- the total number of streams
number of successful streams divided by number of total streams
- closed streams
number of successful streams divided by number of closed streams
Here we assume a success stream as a stream terminated without an exception or a stream that has not
terminated yet.

Take request stream as an example, assuming your counters are not "latched", which means that their
values are monotonically increasing:

# Success rate of total number of streams:
1 - (rated_counter(stream/request/failures)
/ (gauge(stream/request/pending) + rated_counter(stream/request/closed)))

# Success rate of number of closed streams:
1 - (rated_counter(stream/request/failures) / rated_counter(stream/request/closed))

HTTP
----
.. _http_stats:
Expand All @@ -282,10 +347,12 @@ These stats pertain to the HTTP protocol.
A counter of the number of non-retryable HTTP 503 responses the HTTP server returns. Those
responses are not automatically retried.

**stream/failures/<exception_name>**
**Deprecated: stream/failures/<exception_name>**
The replacement is `stream/request/failures/<exception_name>` and `stream/response/failures/<exception_name>`.
A counter of the number of times a specific exception has been thrown in the middle of a stream.

**stream/failures**
**Deprecated: stream/failures**
The replacement is `stream/request/failures` and `stream/response/failures`.
A counter of the number of times any failure has been observed in the middle of a stream.

**http/cookie/samesite_failures** `verbosity:debug`
Expand All @@ -303,14 +370,6 @@ These metrics are added by
:finagle-http-src:`StatsFilter <com/twitter/finagle/http/filter/StatsFilter.scala>` and can be enabled by
using `.withHttpStats` on `Http.Client` and `Http.Server`.

**request_stream_duration_ms**
A histogram of the duration of the lifetime of request streams, from the time a stream is initialized
until it's closed, in milliseconds.

**response_stream_duration_ms**
A histogram of the duration of the lifetime of response streams, from the time a stream is initialized
until it's closed, in milliseconds.

**status/<statusCode>**
A counter of the number of responses received, or returned for servers, that had this
statusCode.
Expand Down
2 changes: 2 additions & 0 deletions finagle-http/src/main/scala/com/twitter/finagle/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
.prepend(
new Stack.NoOpModule(http.filter.StatsFilter.role, http.filter.StatsFilter.description)
)
.insertAfter(http.filter.StatsFilter.role, StreamingStatsFilter.module)

private def params: Stack.Params =
StackClient.defaultParams +
Expand Down Expand Up @@ -462,6 +463,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
.prepend(
new Stack.NoOpModule(http.filter.StatsFilter.role, http.filter.StatsFilter.description)
)
.insertAfter(http.filter.StatsFilter.role, StreamingStatsFilter.module)
// the backup request module adds tracing annotations and as such must come
// after trace initialization and deserialization of contexts.
.insertAfter(TraceInitializerFilter.role, ServerContextFilter.module)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ object StatsFilter {

def module: Stackable[ServiceFactory[Request, Response]] =
new Stack.Module1[param.Stats, ServiceFactory[Request, Response]] {
val role = StatsFilter.role
val description = StatsFilter.description
val role: Stack.Role = StatsFilter.role
val description: String = StatsFilter.description

def make(
statsParam: param.Stats,
Expand Down Expand Up @@ -44,8 +44,6 @@ class StatsFilter[REQUEST <: Request](stats: StatsReceiver)
private[this] val statusReceiver = stats.scope("status")
private[this] val timeReceiver = stats.scope("time")
private[this] val responseSizeStat = stats.stat("response_size")
private[this] val requestStreamDurationMs = stats.stat("request_stream_duration_ms")
private[this] val responseStreamDurationMs = stats.stat("response_stream_duration_ms")

private[this] val counterCache: String => Counter =
Memoize(statusReceiver.counter(_))
Expand All @@ -56,15 +54,9 @@ class StatsFilter[REQUEST <: Request](stats: StatsReceiver)
def apply(request: REQUEST, service: Service[REQUEST, Response]): Future[Response] = {
val elapsed = Stopwatch.start()

if (request.isChunked) {
countRequestStreamDuration(request)
}
val future = service(request)
future respond {
case Return(response) =>
if (response.isChunked) {
countResponseStreamDuration(response)
}
count(elapsed(), response)
case Throw(_) =>
// Treat exceptions as empty 500 errors
Expand All @@ -86,16 +78,4 @@ class StatsFilter[REQUEST <: Request](stats: StatsReceiver)

responseSizeStat.add(response.length)
}

private def countRequestStreamDuration(request: REQUEST): Unit = {
val streamingRequestElapsed = Stopwatch.start()
request.reader.onClose.respond(_ =>
requestStreamDurationMs.add(streamingRequestElapsed().inMilliseconds))
}

private def countResponseStreamDuration(response: Response): Unit = {
val streamingResponseElapsed = Stopwatch.start()
response.reader.onClose.respond(_ =>
responseStreamDurationMs.add(streamingResponseElapsed().inMilliseconds))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package com.twitter.finagle.http.filter

import com.twitter.finagle._
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.stats.{Counter, ExceptionStatsHandler, Stat, StatsReceiver}
import com.twitter.io.{Buf, Reader}
import com.twitter.util._
import java.util.concurrent.atomic.LongAdder

object StreamingStatsFilter {
def module: Stackable[ServiceFactory[Request, Response]] =
new Stack.Module2[param.Stats, param.ExceptionStatsHandler, ServiceFactory[Request, Response]] {
val role: Stack.Role = Stack.Role("HttpStreamingStatsFilter")
val description: String = "HTTP Streaming Stats"

def make(
statsParam: param.Stats,
excStatParam: param.ExceptionStatsHandler,
next: ServiceFactory[Request, Response]
): ServiceFactory[Request, Response] = {
if (statsParam.statsReceiver.isNull) next
else
new StreamingStatsFilter(statsParam.statsReceiver.scope("http"), excStatParam.categorizer)
.andThen(next)
}
}
}

/**
* A filter to export statistics for HTTP streaming requests and responses.
* A streaming request/response is an HTTP request/response with `isChunked` set to true.
*
* This filter is included in HTTP stack by default, all metrics defined in this filter are only
* populated for HTTP streaming requests/responses.
*
* Stats:
* ---------------Request Stream------------
* - stream/request/duration_ms:
* A histogram of the duration of the lifetime of request streams, from the time a stream is
* initialized until it's closed, in milliseconds.
* ---------------Response Stream-----------
* - stream/response/duration_ms:
* A histogram of the duration of the lifetime of response streams, from the time a stream is
* initialized until it's closed, in milliseconds.
* Counters:
* ---------------Request Stream------------
* - stream/request/closed:
* A counter of the number of closed request streams.
* - stream/request/failures
* A counter of the number of times any failure has been observed in the middle of a request
* stream.
* - stream/request/failures/<exception_name>:
* A counter of the number of times a specific exception has been thrown in the middle of a
* request stream.
* - stream/request/opened:
* A counter of the number of opened request streams.
* ---------------Response Stream-----------
* - stream/response/closed:
* A counter of the number of closed response streams.
* - stream/response/failures
* A counter of the number of times any failure has been observed in the middle of a response
* stream.
* - stream/response/failures/<exception_name>:
* A counter of the number of times a specific exception has been thrown in the middle of a
* response stream.
* - stream/response/opened:
* A counter of the number of opened response streams.
* Gauges:
* ---------------Request Stream------------
* - stream/request/pending:
* A gauge of the number of pending request streams.
* ---------------Response Stream------------
* - stream/response/pending:
* A gauge of the number of pending response streams.
*
* You could derive the streaming success rate of:
* - the total number of streams
* number of successful streams divided by number of total streams
* - closed streams
* number of successful streams divided by number of closed streams
* Here we assume a success stream as a stream terminated without an exception or a stream that has
* not terminated yet.
*
* Take request stream as an example, assuming your counters are not "latched", which means that
* their values are monotonically increasing:
* - Success rate of total number of streams:
* 1 - (rated_counter(stream/request/failures)
* / (gauge(stream/request/pending) + rated_counter(stream/request/closed)))
* - Success rate of number of closed streams:
* 1 - (rated_counter(stream/request/failures) / rated_counter(stream/request/closed))
*/
class StreamingStatsFilter(
stats: StatsReceiver,
exceptionStatsHandler: ExceptionStatsHandler,
nowMillis: () => Long = Stopwatch.systemMillis)
extends SimpleFilter[Request, Response] {

private[this] val pendingRequestStreamsCount: LongAdder = new LongAdder()
private[this] val pendingResponseStreamsCount: LongAdder = new LongAdder()

// Request stream stats
private[this] val requestStreamStat = stats.scope("stream", "request")
private[this] val requestStreamDurationMs = requestStreamStat.stat("duration_ms")
private[this] val openedRequestStream = requestStreamStat.counter("opened")
private[this] val closedRequestStream = requestStreamStat.counter("closed")
private[this] val pendingRequestStream = requestStreamStat.addGauge("pending") {
pendingRequestStreamsCount.sum()
}

// Response stream stats
private[this] val responseStreamStat = stats.scope("stream", "response")
private[this] val responseStreamDurationMs = responseStreamStat.stat("duration_ms")
private[this] val openedResponseStream = responseStreamStat.counter("opened")
private[this] val closedResponseStream = responseStreamStat.counter("closed")
private[this] val pendingResponseStream = responseStreamStat.addGauge("pending") {
pendingResponseStreamsCount.sum()
}

def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
if (request.isChunked) {
// Update stream metrics for request stream
openedRequestStream.incr()
pendingRequestStreamsCount.increment()
updateClosedStream(
request.reader,
requestStreamStat,
closedRequestStream,
pendingRequestStreamsCount,
requestStreamDurationMs)
}
service(request).respond {
case Return(response) =>
if (response.isChunked) {
// Update stream metrics for response stream
openedResponseStream.incr()
pendingResponseStreamsCount.increment()
updateClosedStream(
response.reader,
responseStreamStat,
closedResponseStream,
pendingResponseStreamsCount,
responseStreamDurationMs)
}
case _ =>
}
}

private def updateClosedStream(
reader: Reader[Buf],
statsReceiver: StatsReceiver,
closedStreamCounter: Counter,
pendingStreamCount: LongAdder,
streamDurationStat: Stat
): Unit = {
val streamingStart = nowMillis()
reader.onClose.respond { closeP =>
closedStreamCounter.incr()
pendingStreamCount.decrement()
val streamingEnd = nowMillis()
streamDurationStat.add(streamingEnd - streamingStart)
closeP match {
case Throw(exception) =>
exceptionStatsHandler.record(statsReceiver, exception)
case _ =>
}
}
}

}
Loading

0 comments on commit d9b69bd

Please sign in to comment.