Skip to content

Commit

Permalink
finagle-http: PayloadSizeFilter now measures streaming messages
Browse files Browse the repository at this point in the history
Problem

We used to pass by streaming messages (isChunked) in PayloadSizeFilter.

Solution

Fork the filter in HTTP and measure chunked messages. Wrap the reader with an
onRead callback which measures the payload size. Return a request proxy wrapping
the reader we have callback there.

JIRA Issues: CSL-8134

Differential Revision: https://phabricator.twitter.biz/D337877
  • Loading branch information
yufangong authored and jenkins committed Jul 11, 2019
1 parent bf55a70 commit 11f4e32
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 104 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Unreleased
New Features
~~~~~~~~~~~~

* finagle-http: Measure streaming (message.isChunked) chunk payload size with two new histograms:
`stream/request/chunk_payload_bytes` and `stream/response/chunk_payload_bytes`, they are
published with a debug verbosity level. These chunk payload sizes are also traced via the same
trace keys. ``PHAB_ID=D337877``

* finagle-base-http: Add support for new "b3" tracing header. ``PHAB_ID=D334419``

* finagle-core: Allow to not bypass SOCKS proxy for localhost by using the GlobalFlag
Expand Down
62 changes: 2 additions & 60 deletions doc/src/sphinx/Metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -271,67 +271,9 @@ These metrics correspond to :ref:`feature toggles <toggles>`.
Streaming
---------

These metrics are added by
:finagle-http-src:`StreamingStatsFilter <com/twitter/finagle/http/filter/StreamingStatsFilter.scala>`
and can be enabled by setting `isChunked` as true on Http request and response.

**stream/request/closed**
A counter of the number of closed request streams.

**stream/request/duration_ms**
A histogram of the duration of the lifetime of request streams, from the time a stream is
initialized until it's closed, in milliseconds.

**stream/request/failures**
A counter of the number of times any failure has been observed in the middle of a request stream.

**stream/request/failures/<exception_name>**
A counter of the number of times a specific exception has been thrown in the middle of a request
stream.

**stream/request/opened**
A counter of the number of opened request streams.

**stream/request/pending**
A gauge of the number of pending request streams.

**stream/response/closed**
A counter of the number of closed response streams.

**stream/response/duration_ms**
A histogram of the duration of the lifetime of response streams, from the time a stream is
initialized until it's closed, in milliseconds.

**stream/response/failures**
A counter of the number of times any failure has been observed in the middle of a response stream.

**stream/response/failures/<exception_name>**
A counter of the number of times a specific exception has been thrown in the middle of a response
stream.

**stream/response/opened**
A counter of the number of opened response streams.

**stream/response/pending**
A gauge of the number of pending response streams.

You could derive the streaming success rate of:
- the total number of streams
number of successful streams divided by number of total streams
- closed streams
number of successful streams divided by number of closed streams
Here we assume a success stream as a stream terminated without an exception or a stream that has not
terminated yet.

Take request stream as an example, assuming your counters are not "latched", which means that their
values are monotonically increasing:

# Success rate of total number of streams:
1 - (rated_counter(stream/request/failures)
/ (gauge(stream/request/pending) + rated_counter(stream/request/closed)))
.. _streaming_metrics:

# Success rate of number of closed streams:
1 - (rated_counter(stream/request/failures) / rated_counter(stream/request/closed))
.. include:: metrics/Streaming.rst

HTTP
----
Expand Down
2 changes: 1 addition & 1 deletion doc/src/sphinx/metrics/Public.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ RequestSemaphoreFilter
A gauge of the total number of requests which are waiting because of the limit
on simultaneous requests.

PayloadSizeFilter (enabled for Mux, HTTP (non-chunked), Thrift)
PayloadSizeFilter (enabled for Mux, HTTP, Thrift)
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

**request_payload_bytes** `verbosity:debug`
Expand Down
69 changes: 69 additions & 0 deletions doc/src/sphinx/metrics/Streaming.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
These metrics are added by
:finagle-http-src:`StreamingStatsFilter <com/twitter/finagle/http/filter/StreamingStatsFilter.scala>`
and can be enabled by setting `isChunked` as true on Http request and response.

**stream/request/closed**
A counter of the number of closed request streams.

**stream/request/duration_ms**
A histogram of the duration of the lifetime of request streams, from the time a stream is
initialized until it's closed, in milliseconds.

**stream/request/failures**
A counter of the number of times any failure has been observed in the middle of a request stream.

**stream/request/failures/<exception_name>**
A counter of the number of times a specific exception has been thrown in the middle of a request
stream.

**stream/request/opened**
A counter of the number of opened request streams.

**stream/request/pending**
A gauge of the number of pending request streams.

**stream/response/closed**
A counter of the number of closed response streams.

**stream/response/duration_ms**
A histogram of the duration of the lifetime of response streams, from the time a stream is
initialized until it's closed, in milliseconds.

**stream/response/failures**
A counter of the number of times any failure has been observed in the middle of a response stream.

**stream/response/failures/<exception_name>**
A counter of the number of times a specific exception has been thrown in the middle of a response
stream.

**stream/response/opened**
A counter of the number of opened response streams.

**stream/response/pending**
A gauge of the number of pending response streams.

**stream/request/chunk_payload_bytes** `verbosity:debug`
A histogram of the number of bytes per chunk's payload of request streams. This is measured in
c.t.finagle.http.filter.PayloadSizeFilter.

**stream/response/chunk_payload_bytes** `verbosity:debug`
A histogram of the number of bytes per chunk's payload of response streams. This is measured in
c.t.finagle.http.filter.PayloadSizeFilter.

You could derive the streaming success rate of:
- the total number of streams
number of successful streams divided by number of total streams
- closed streams
number of successful streams divided by number of closed streams
Here we assume a success stream as a stream terminated without an exception or a stream that has not
terminated yet.

Take request stream as an example, assuming your counters are not "latched", which means that their
values are monotonically increasing:

# Success rate of total number of streams:
1 - (rated_counter(stream/request/failures)
/ (gauge(stream/request/pending) + rated_counter(stream/request/closed)))

# Success rate of number of closed streams:
1 - (rated_counter(stream/request/failures) / rated_counter(stream/request/closed))
38 changes: 3 additions & 35 deletions finagle-http/src/main/scala/com/twitter/finagle/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.twitter.finagle
import com.twitter.finagle.client._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.filter.{NackAdmissionFilter, PayloadSizeFilter}
import com.twitter.finagle.filter.NackAdmissionFilter
import com.twitter.finagle.http._
import com.twitter.finagle.http.codec.{HttpClientDispatcher, HttpServerDispatcher}
import com.twitter.finagle.http.exp.StreamTransport
Expand Down Expand Up @@ -157,32 +157,6 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
param.ResponseClassifier(rc)
}

// Only record payload sizes when streaming is disabled.
private[finagle] def nonChunkedPayloadSize(
reqTraceKey: String,
repTraceKey: String
): Stackable[ServiceFactory[Request, Response]] =
new Stack.Module2[http.param.Streaming, param.Stats, ServiceFactory[Request, Response]] {
override def role: Stack.Role = PayloadSizeFilter.Role
override def description: String = PayloadSizeFilter.Description

override def make(
streaming: http.param.Streaming,
stats: param.Stats,
next: ServiceFactory[Request, Response]
): ServiceFactory[Request, Response] = {
if (streaming.disabled)
new PayloadSizeFilter[Request, Response](
stats.statsReceiver,
reqTraceKey,
repTraceKey,
_.content.length,
_.content.length
).andThen(next)
else next
}
}

object Client {
private val stack: Stack[ServiceFactory[Request, Response]] =
StackClient.newStack
Expand Down Expand Up @@ -213,10 +187,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
// after the tracing context is initialized.
.insertAfter(
TraceInitializerFilter.role,
nonChunkedPayloadSize(
PayloadSizeFilter.ClientReqTraceKey,
PayloadSizeFilter.ClientRepTraceKey
)
PayloadSizeFilter.module(PayloadSizeFilter.clientTraceKeyPrefix)
)
.prepend(
new Stack.NoOpModule(http.filter.StatsFilter.role, http.filter.StatsFilter.description)
Expand Down Expand Up @@ -466,10 +437,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
// after the tracing context is initialized.
.insertAfter(
TraceInitializerFilter.role,
nonChunkedPayloadSize(
PayloadSizeFilter.ServerReqTraceKey,
PayloadSizeFilter.ServerRepTraceKey
)
PayloadSizeFilter.module(PayloadSizeFilter.serverTraceKeyPrefix)
)
.replace(TraceInitializerFilter.role, new HttpServerTraceInitializer[Request, Response])
.replace(StackServer.Role.preparer, HttpNackFilter.module)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.twitter.finagle.http.filter

import com.twitter.finagle.Stack.Module1
import com.twitter.finagle.http.{Chunk, Request, RequestProxy, Response, ResponseProxy}
import com.twitter.finagle.param.Stats
import com.twitter.finagle.{Service, ServiceFactory, SimpleFilter, Stack, param}
import com.twitter.finagle.stats.{Stat, StatsReceiver, Verbosity}
import com.twitter.finagle.tracing.{Trace, Tracing}
import com.twitter.io.Reader
import com.twitter.util.Future

private[finagle] object PayloadSizeFilter {
val Role: Stack.Role = Stack.Role("HttpPayloadSize")
val Description: String = "Reports Http request/response payload sizes"

private[finagle] def module(prefix: String) =
new Module1[param.Stats, ServiceFactory[Request, Response]] {
def make(
stats: Stats,
next: ServiceFactory[Request, Response]
): ServiceFactory[Request, Response] = {
new PayloadSizeFilter(stats.statsReceiver, prefix).andThen(next)
}

def role: Stack.Role = PayloadSizeFilter.Role
def description: String = PayloadSizeFilter.Description
}

val clientTraceKeyPrefix: String = "clnt/"
val serverTraceKeyPrefix: String = "srv/"
}

/**
* A filter that exports two histograms to a given [[StatsReceiver]].
*
* For non-streaming messages (messages are not chunked), the two histograms are:
* 1. "request_payload_bytes" - a distribution of request payload sizes in bytes
* 2. "response_payload_bytes" - a distribution of response payload sizes in bytes
*
* For streaming messages (isChunked equals true), the two histograms are:
* 1. "stream/request/chunk_payload_bytes" - a distribution of request's chunk payload sizes in bytes
* 2. "stream/response/chunk_payload_bytes" - a distribution of response's chunk payload sizes in bytes
*
* The sizes are also traced using the binary annotations metrics name above with a "clnt/" prefix
* on the client side, and "srv/" prefix on the server side.
*/
private[finagle] class PayloadSizeFilter(statsReceiver: StatsReceiver, prefix: String)
extends SimpleFilter[Request, Response] {

private[this] val reqKey = "request_payload_bytes"
private[this] val repKey = "response_payload_bytes"
private[this] val chunkKey = "chunk_payload_bytes"

private[this] val streamReqTraceKey = s"${prefix}stream/request/${chunkKey}"
private[this] val streamRepTraceKey = s"${prefix}stream/response/${chunkKey}"
private[this] val reqTraceKey = prefix + reqKey
private[this] val repTraceKey = prefix + repKey

private[this] val requestBytes = statsReceiver.stat(Verbosity.Debug, reqKey)
private[this] val responseBytes = statsReceiver.stat(Verbosity.Debug, repKey)

private[this] val streamRequestBytes =
statsReceiver.scope("stream").scope("request").stat(Verbosity.Debug, chunkKey)
private[this] val streamResponseBytes =
statsReceiver.scope("stream").scope("response").stat(Verbosity.Debug, chunkKey)

private[this] def handleResponse(trace: Tracing): Response => Response = { rep =>
if (rep.isChunked) {
new ResponseProxy {
override def response: Response = rep
override def chunkReader: Reader[Chunk] =
super.chunkReader
.map(onRead(recordRepSize, trace, streamResponseBytes, streamRepTraceKey))
}
} else {
recordRepSize(rep.content.length, trace, responseBytes, repTraceKey)
rep
}
}

private[this] def onRead(
record: (Int, Tracing, Stat, String) => Unit,
trace: Tracing,
stat: Stat,
traceKey: String
): Chunk => Chunk = { chunk =>
record(chunk.content.length, trace, stat, traceKey)
chunk
}

private[this] def recordReqSize(
size: Int,
trace: Tracing,
reqStat: Stat,
traceKey: String
): Unit = {
reqStat.add(size.toFloat)
if (trace.isActivelyTracing) trace.recordBinary(traceKey, size)
}

private[this] def recordRepSize(
size: Int,
trace: Tracing,
repStat: Stat,
traceKey: String
): Unit = {
repStat.add(size.toFloat)
if (trace.isActivelyTracing) trace.recordBinary(traceKey, size)
}

def apply(req: Request, service: Service[Request, Response]): Future[Response] = {
val trace = Trace()
val request = if (req.isChunked) {
new RequestProxy {
override def request: Request = req
override def chunkReader: Reader[Chunk] =
super.chunkReader.map(onRead(recordReqSize, trace, streamRequestBytes, streamReqTraceKey))
}
} else {
recordReqSize(req.content.length, trace, requestBytes, reqTraceKey)
req
}
service(request).map(handleResponse(trace))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -805,17 +805,27 @@ abstract class AbstractEndToEndTest
await(client.close())
}

test(s"$implName (streaming)" + ": does not measure payload size") {
val svc = Service.mk[Request, Response] { _ =>
Future.value(Response())
test(s"$implName (streaming)" + ": measure chunk payload size") {
val svc = Service.mk[Request, Response] { req =>
req.reader.read()
val rep = Response()
rep.setChunked(true)
rep.writer.write(Buf.Utf8("01234"))
Future.value(rep)
}
val req = Request()
req.setChunked(true)
req.writer.write(Buf.Utf8("0123456789"))
val client = connect(svc)
await(client(Request()))
val response = await(client(req))
response.reader.read()

assert(statsRecv.stat("client", "request_payload_bytes")() == Nil)
assert(statsRecv.stat("client", "response_payload_bytes")() == Nil)
assert(statsRecv.stat("server", "request_payload_bytes")() == Nil)
assert(statsRecv.stat("server", "response_payload_bytes")() == Nil)
eventually {
assert(statsRecv.stat("client", "stream", "request", "chunk_payload_bytes")() == Seq(10f))
assert(statsRecv.stat("client", "stream", "response", "chunk_payload_bytes")() == Seq(5f))
assert(statsRecv.stat("server", "stream", "request", "chunk_payload_bytes")() == Seq(10f))
assert(statsRecv.stat("server", "stream", "response", "chunk_payload_bytes")() == Seq(5f))
}
await(client.close())
}

Expand Down
Loading

0 comments on commit 11f4e32

Please sign in to comment.