From 11f4e32cd8e5fcab9fc0de998f3e844f37ab741c Mon Sep 17 00:00:00 2001 From: Yufan Gong Date: Thu, 11 Jul 2019 01:07:22 +0000 Subject: [PATCH] finagle-http: PayloadSizeFilter now measures streaming messages Problem We used to pass by streaming messages (isChunked) in PayloadSizeFilter. Solution Fork the filter in HTTP and measure chunked messages. Wrap the reader with an onRead callback which measures the payload size. Return a request proxy wrapping the reader we have callback there. JIRA Issues: CSL-8134 Differential Revision: https://phabricator.twitter.biz/D337877 --- CHANGELOG.rst | 5 + doc/src/sphinx/Metrics.rst | 62 +----- doc/src/sphinx/metrics/Public.rst | 2 +- doc/src/sphinx/metrics/Streaming.rst | 69 +++++++ .../main/scala/com/twitter/finagle/Http.scala | 38 +--- .../http/filter/PayloadSizeFilter.scala | 125 ++++++++++++ .../finagle/http/AbstractEndToEndTest.scala | 26 ++- .../http/filter/PayloadSizeFilterTest.scala | 186 ++++++++++++++++++ 8 files changed, 409 insertions(+), 104 deletions(-) create mode 100644 doc/src/sphinx/metrics/Streaming.rst create mode 100644 finagle-http/src/main/scala/com/twitter/finagle/http/filter/PayloadSizeFilter.scala create mode 100644 finagle-http/src/test/scala/com/twitter/finagle/http/filter/PayloadSizeFilterTest.scala diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f47bbdbf05..cd8e8659ad 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,11 @@ Unreleased New Features ~~~~~~~~~~~~ +* finagle-http: Measure streaming (message.isChunked) chunk payload size with two new histograms: + `stream/request/chunk_payload_bytes` and `stream/response/chunk_payload_bytes`, they are + published with a debug verbosity level. These chunk payload sizes are also traced via the same + trace keys. ``PHAB_ID=D337877`` + * finagle-base-http: Add support for new "b3" tracing header. ``PHAB_ID=D334419`` * finagle-core: Allow to not bypass SOCKS proxy for localhost by using the GlobalFlag diff --git a/doc/src/sphinx/Metrics.rst b/doc/src/sphinx/Metrics.rst index 3f3df06121..284529f78c 100644 --- a/doc/src/sphinx/Metrics.rst +++ b/doc/src/sphinx/Metrics.rst @@ -271,67 +271,9 @@ These metrics correspond to :ref:`feature toggles `. Streaming --------- -These metrics are added by -:finagle-http-src:`StreamingStatsFilter ` -and can be enabled by setting `isChunked` as true on Http request and response. - -**stream/request/closed** - A counter of the number of closed request streams. - -**stream/request/duration_ms** - A histogram of the duration of the lifetime of request streams, from the time a stream is - initialized until it's closed, in milliseconds. - -**stream/request/failures** - A counter of the number of times any failure has been observed in the middle of a request stream. - -**stream/request/failures/** - A counter of the number of times a specific exception has been thrown in the middle of a request - stream. - -**stream/request/opened** - A counter of the number of opened request streams. - -**stream/request/pending** - A gauge of the number of pending request streams. - -**stream/response/closed** - A counter of the number of closed response streams. - -**stream/response/duration_ms** - A histogram of the duration of the lifetime of response streams, from the time a stream is - initialized until it's closed, in milliseconds. - -**stream/response/failures** - A counter of the number of times any failure has been observed in the middle of a response stream. - -**stream/response/failures/** - A counter of the number of times a specific exception has been thrown in the middle of a response - stream. - -**stream/response/opened** - A counter of the number of opened response streams. - -**stream/response/pending** - A gauge of the number of pending response streams. - -You could derive the streaming success rate of: - - the total number of streams - number of successful streams divided by number of total streams - - closed streams - number of successful streams divided by number of closed streams -Here we assume a success stream as a stream terminated without an exception or a stream that has not -terminated yet. - -Take request stream as an example, assuming your counters are not "latched", which means that their -values are monotonically increasing: - - # Success rate of total number of streams: - 1 - (rated_counter(stream/request/failures) - / (gauge(stream/request/pending) + rated_counter(stream/request/closed))) +.. _streaming_metrics: - # Success rate of number of closed streams: - 1 - (rated_counter(stream/request/failures) / rated_counter(stream/request/closed)) +.. include:: metrics/Streaming.rst HTTP ---- diff --git a/doc/src/sphinx/metrics/Public.rst b/doc/src/sphinx/metrics/Public.rst index d863c9fa0c..a5981474ab 100644 --- a/doc/src/sphinx/metrics/Public.rst +++ b/doc/src/sphinx/metrics/Public.rst @@ -89,7 +89,7 @@ RequestSemaphoreFilter A gauge of the total number of requests which are waiting because of the limit on simultaneous requests. -PayloadSizeFilter (enabled for Mux, HTTP (non-chunked), Thrift) +PayloadSizeFilter (enabled for Mux, HTTP, Thrift) <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< **request_payload_bytes** `verbosity:debug` diff --git a/doc/src/sphinx/metrics/Streaming.rst b/doc/src/sphinx/metrics/Streaming.rst new file mode 100644 index 0000000000..6c24d5b9af --- /dev/null +++ b/doc/src/sphinx/metrics/Streaming.rst @@ -0,0 +1,69 @@ +These metrics are added by +:finagle-http-src:`StreamingStatsFilter ` +and can be enabled by setting `isChunked` as true on Http request and response. + +**stream/request/closed** + A counter of the number of closed request streams. + +**stream/request/duration_ms** + A histogram of the duration of the lifetime of request streams, from the time a stream is + initialized until it's closed, in milliseconds. + +**stream/request/failures** + A counter of the number of times any failure has been observed in the middle of a request stream. + +**stream/request/failures/** + A counter of the number of times a specific exception has been thrown in the middle of a request + stream. + +**stream/request/opened** + A counter of the number of opened request streams. + +**stream/request/pending** + A gauge of the number of pending request streams. + +**stream/response/closed** + A counter of the number of closed response streams. + +**stream/response/duration_ms** + A histogram of the duration of the lifetime of response streams, from the time a stream is + initialized until it's closed, in milliseconds. + +**stream/response/failures** + A counter of the number of times any failure has been observed in the middle of a response stream. + +**stream/response/failures/** + A counter of the number of times a specific exception has been thrown in the middle of a response + stream. + +**stream/response/opened** + A counter of the number of opened response streams. + +**stream/response/pending** + A gauge of the number of pending response streams. + +**stream/request/chunk_payload_bytes** `verbosity:debug` + A histogram of the number of bytes per chunk's payload of request streams. This is measured in + c.t.finagle.http.filter.PayloadSizeFilter. + +**stream/response/chunk_payload_bytes** `verbosity:debug` + A histogram of the number of bytes per chunk's payload of response streams. This is measured in + c.t.finagle.http.filter.PayloadSizeFilter. + +You could derive the streaming success rate of: + - the total number of streams + number of successful streams divided by number of total streams + - closed streams + number of successful streams divided by number of closed streams +Here we assume a success stream as a stream terminated without an exception or a stream that has not +terminated yet. + +Take request stream as an example, assuming your counters are not "latched", which means that their +values are monotonically increasing: + + # Success rate of total number of streams: + 1 - (rated_counter(stream/request/failures) + / (gauge(stream/request/pending) + rated_counter(stream/request/closed))) + + # Success rate of number of closed streams: + 1 - (rated_counter(stream/request/failures) / rated_counter(stream/request/closed)) diff --git a/finagle-http/src/main/scala/com/twitter/finagle/Http.scala b/finagle-http/src/main/scala/com/twitter/finagle/Http.scala index 2b6ea2bbe9..d932e78b02 100644 --- a/finagle-http/src/main/scala/com/twitter/finagle/Http.scala +++ b/finagle-http/src/main/scala/com/twitter/finagle/Http.scala @@ -3,7 +3,7 @@ package com.twitter.finagle import com.twitter.finagle.client._ import com.twitter.finagle.context.Contexts import com.twitter.finagle.dispatch.GenSerialClientDispatcher -import com.twitter.finagle.filter.{NackAdmissionFilter, PayloadSizeFilter} +import com.twitter.finagle.filter.NackAdmissionFilter import com.twitter.finagle.http._ import com.twitter.finagle.http.codec.{HttpClientDispatcher, HttpServerDispatcher} import com.twitter.finagle.http.exp.StreamTransport @@ -157,32 +157,6 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re param.ResponseClassifier(rc) } - // Only record payload sizes when streaming is disabled. - private[finagle] def nonChunkedPayloadSize( - reqTraceKey: String, - repTraceKey: String - ): Stackable[ServiceFactory[Request, Response]] = - new Stack.Module2[http.param.Streaming, param.Stats, ServiceFactory[Request, Response]] { - override def role: Stack.Role = PayloadSizeFilter.Role - override def description: String = PayloadSizeFilter.Description - - override def make( - streaming: http.param.Streaming, - stats: param.Stats, - next: ServiceFactory[Request, Response] - ): ServiceFactory[Request, Response] = { - if (streaming.disabled) - new PayloadSizeFilter[Request, Response]( - stats.statsReceiver, - reqTraceKey, - repTraceKey, - _.content.length, - _.content.length - ).andThen(next) - else next - } - } - object Client { private val stack: Stack[ServiceFactory[Request, Response]] = StackClient.newStack @@ -213,10 +187,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re // after the tracing context is initialized. .insertAfter( TraceInitializerFilter.role, - nonChunkedPayloadSize( - PayloadSizeFilter.ClientReqTraceKey, - PayloadSizeFilter.ClientRepTraceKey - ) + PayloadSizeFilter.module(PayloadSizeFilter.clientTraceKeyPrefix) ) .prepend( new Stack.NoOpModule(http.filter.StatsFilter.role, http.filter.StatsFilter.description) @@ -466,10 +437,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re // after the tracing context is initialized. .insertAfter( TraceInitializerFilter.role, - nonChunkedPayloadSize( - PayloadSizeFilter.ServerReqTraceKey, - PayloadSizeFilter.ServerRepTraceKey - ) + PayloadSizeFilter.module(PayloadSizeFilter.serverTraceKeyPrefix) ) .replace(TraceInitializerFilter.role, new HttpServerTraceInitializer[Request, Response]) .replace(StackServer.Role.preparer, HttpNackFilter.module) diff --git a/finagle-http/src/main/scala/com/twitter/finagle/http/filter/PayloadSizeFilter.scala b/finagle-http/src/main/scala/com/twitter/finagle/http/filter/PayloadSizeFilter.scala new file mode 100644 index 0000000000..09db77abb9 --- /dev/null +++ b/finagle-http/src/main/scala/com/twitter/finagle/http/filter/PayloadSizeFilter.scala @@ -0,0 +1,125 @@ +package com.twitter.finagle.http.filter + +import com.twitter.finagle.Stack.Module1 +import com.twitter.finagle.http.{Chunk, Request, RequestProxy, Response, ResponseProxy} +import com.twitter.finagle.param.Stats +import com.twitter.finagle.{Service, ServiceFactory, SimpleFilter, Stack, param} +import com.twitter.finagle.stats.{Stat, StatsReceiver, Verbosity} +import com.twitter.finagle.tracing.{Trace, Tracing} +import com.twitter.io.Reader +import com.twitter.util.Future + +private[finagle] object PayloadSizeFilter { + val Role: Stack.Role = Stack.Role("HttpPayloadSize") + val Description: String = "Reports Http request/response payload sizes" + + private[finagle] def module(prefix: String) = + new Module1[param.Stats, ServiceFactory[Request, Response]] { + def make( + stats: Stats, + next: ServiceFactory[Request, Response] + ): ServiceFactory[Request, Response] = { + new PayloadSizeFilter(stats.statsReceiver, prefix).andThen(next) + } + + def role: Stack.Role = PayloadSizeFilter.Role + def description: String = PayloadSizeFilter.Description + } + + val clientTraceKeyPrefix: String = "clnt/" + val serverTraceKeyPrefix: String = "srv/" +} + +/** + * A filter that exports two histograms to a given [[StatsReceiver]]. + * + * For non-streaming messages (messages are not chunked), the two histograms are: + * 1. "request_payload_bytes" - a distribution of request payload sizes in bytes + * 2. "response_payload_bytes" - a distribution of response payload sizes in bytes + * + * For streaming messages (isChunked equals true), the two histograms are: + * 1. "stream/request/chunk_payload_bytes" - a distribution of request's chunk payload sizes in bytes + * 2. "stream/response/chunk_payload_bytes" - a distribution of response's chunk payload sizes in bytes + * + * The sizes are also traced using the binary annotations metrics name above with a "clnt/" prefix + * on the client side, and "srv/" prefix on the server side. + */ +private[finagle] class PayloadSizeFilter(statsReceiver: StatsReceiver, prefix: String) + extends SimpleFilter[Request, Response] { + + private[this] val reqKey = "request_payload_bytes" + private[this] val repKey = "response_payload_bytes" + private[this] val chunkKey = "chunk_payload_bytes" + + private[this] val streamReqTraceKey = s"${prefix}stream/request/${chunkKey}" + private[this] val streamRepTraceKey = s"${prefix}stream/response/${chunkKey}" + private[this] val reqTraceKey = prefix + reqKey + private[this] val repTraceKey = prefix + repKey + + private[this] val requestBytes = statsReceiver.stat(Verbosity.Debug, reqKey) + private[this] val responseBytes = statsReceiver.stat(Verbosity.Debug, repKey) + + private[this] val streamRequestBytes = + statsReceiver.scope("stream").scope("request").stat(Verbosity.Debug, chunkKey) + private[this] val streamResponseBytes = + statsReceiver.scope("stream").scope("response").stat(Verbosity.Debug, chunkKey) + + private[this] def handleResponse(trace: Tracing): Response => Response = { rep => + if (rep.isChunked) { + new ResponseProxy { + override def response: Response = rep + override def chunkReader: Reader[Chunk] = + super.chunkReader + .map(onRead(recordRepSize, trace, streamResponseBytes, streamRepTraceKey)) + } + } else { + recordRepSize(rep.content.length, trace, responseBytes, repTraceKey) + rep + } + } + + private[this] def onRead( + record: (Int, Tracing, Stat, String) => Unit, + trace: Tracing, + stat: Stat, + traceKey: String + ): Chunk => Chunk = { chunk => + record(chunk.content.length, trace, stat, traceKey) + chunk + } + + private[this] def recordReqSize( + size: Int, + trace: Tracing, + reqStat: Stat, + traceKey: String + ): Unit = { + reqStat.add(size.toFloat) + if (trace.isActivelyTracing) trace.recordBinary(traceKey, size) + } + + private[this] def recordRepSize( + size: Int, + trace: Tracing, + repStat: Stat, + traceKey: String + ): Unit = { + repStat.add(size.toFloat) + if (trace.isActivelyTracing) trace.recordBinary(traceKey, size) + } + + def apply(req: Request, service: Service[Request, Response]): Future[Response] = { + val trace = Trace() + val request = if (req.isChunked) { + new RequestProxy { + override def request: Request = req + override def chunkReader: Reader[Chunk] = + super.chunkReader.map(onRead(recordReqSize, trace, streamRequestBytes, streamReqTraceKey)) + } + } else { + recordReqSize(req.content.length, trace, requestBytes, reqTraceKey) + req + } + service(request).map(handleResponse(trace)) + } +} diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala index 1438f9d5c1..5367e97636 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala @@ -805,17 +805,27 @@ abstract class AbstractEndToEndTest await(client.close()) } - test(s"$implName (streaming)" + ": does not measure payload size") { - val svc = Service.mk[Request, Response] { _ => - Future.value(Response()) + test(s"$implName (streaming)" + ": measure chunk payload size") { + val svc = Service.mk[Request, Response] { req => + req.reader.read() + val rep = Response() + rep.setChunked(true) + rep.writer.write(Buf.Utf8("01234")) + Future.value(rep) } + val req = Request() + req.setChunked(true) + req.writer.write(Buf.Utf8("0123456789")) val client = connect(svc) - await(client(Request())) + val response = await(client(req)) + response.reader.read() - assert(statsRecv.stat("client", "request_payload_bytes")() == Nil) - assert(statsRecv.stat("client", "response_payload_bytes")() == Nil) - assert(statsRecv.stat("server", "request_payload_bytes")() == Nil) - assert(statsRecv.stat("server", "response_payload_bytes")() == Nil) + eventually { + assert(statsRecv.stat("client", "stream", "request", "chunk_payload_bytes")() == Seq(10f)) + assert(statsRecv.stat("client", "stream", "response", "chunk_payload_bytes")() == Seq(5f)) + assert(statsRecv.stat("server", "stream", "request", "chunk_payload_bytes")() == Seq(10f)) + assert(statsRecv.stat("server", "stream", "response", "chunk_payload_bytes")() == Seq(5f)) + } await(client.close()) } diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/filter/PayloadSizeFilterTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/filter/PayloadSizeFilterTest.scala new file mode 100644 index 0000000000..57f9bff941 --- /dev/null +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/filter/PayloadSizeFilterTest.scala @@ -0,0 +1,186 @@ +package com.twitter.finagle.http.filter + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.Service +import com.twitter.finagle.http.{Method, Request, Response, Status, Version} +import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver, StatsReceiver} +import com.twitter.finagle.tracing.{Annotation, BufferingTracer, Record, Trace, TraceId} +import com.twitter.io.{Buf, Reader} +import com.twitter.util.{Await, Future, Time} +import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually + +class PayloadSizeFilterTest extends FunSuite with Eventually { + + private def filter(sr: StatsReceiver) = + new PayloadSizeFilter(sr, PayloadSizeFilter.serverTraceKeyPrefix) + + private def nonStreamingService(sr: StatsReceiver) = filter(sr).andThen { + Service.mk[Request, Response] { req => + Future.value { + val rep = Response.apply(req) + rep.contentString = "key=value2" + rep + } + } + } + + private def streamingService(sr: StatsReceiver) = filter(sr).andThen { + Service.mk[Request, Response] { req => + Reader.readAll(req.reader) + val reader = Reader.fromSeq(List("1234", "12345", "123456", "1234567")) + Future.value(Response.apply(Version.Http11, Status.Ok, reader.map(Buf.Utf8(_)))) + } + } + + private val nonStreamingRequest = { + val req = Request.apply("/test") + req.contentString = "key=value" + req + } + + private def streamingRequest: Request = { + val reader = Reader.fromSeq(List("1", "12", "123")) + Request.apply(Version.Http11, Method.Get, "/test", reader.map(Buf.Utf8(_))) + } + + def await[T](f: Future[T]): T = Await.result(f, 5.seconds) + + test("nonStreaming -- traces sizes when actively tracing") { + val svc = nonStreamingService(NullStatsReceiver) + Time.withCurrentTimeFrozen { _ => + val tracer = new BufferingTracer + Trace.letTracer(tracer) { + assert(Trace.isActivelyTracing) + assert(await(svc(nonStreamingRequest)).contentString == "key=value2") + } + assert( + tracer.toSeq == Seq( + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/request_payload_bytes", 9), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/response_payload_bytes", 10), + None + ) + ) + ) + } + } + + test("nonStreaming -- doesn't trace sizes when not actively tracing") { + val svc = nonStreamingService(NullStatsReceiver) + val tracer = new BufferingTracer { + override def isActivelyTracing(traceId: TraceId): Boolean = false + } + Trace.letTracer(tracer) { + assert(!Trace.isActivelyTracing) + assert(await(svc(nonStreamingRequest)).contentString == "key=value2") + } + assert(tracer.toSeq == Nil) + } + + test("nonStreaming -- records metrics") { + val stats = new InMemoryStatsReceiver() + val svc = nonStreamingService(stats) + assert(await(svc(nonStreamingRequest)).contentString == "key=value2") + assert(stats.stat("request_payload_bytes")() == Seq(9f)) + eventually { + assert(stats.stat("response_payload_bytes")() == Seq(10f)) + } + } + + test("streaming -- traces sizes when actively tracing") { + val svc = streamingService(NullStatsReceiver) + Time.withCurrentTimeFrozen { _ => + val tracer = new BufferingTracer + Trace.letTracer(tracer) { + assert(Trace.isActivelyTracing) + val rep = await(svc(streamingRequest)) + assert( + Buf.Utf8.unapply(await(Reader.readAll(rep.reader))) == Some( + "1234" + "12345" + "123456" + "1234567")) + } + assert( + tracer.toSeq == Seq( + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/request/chunk_payload_bytes", 1), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/request/chunk_payload_bytes", 2), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/request/chunk_payload_bytes", 3), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/response/chunk_payload_bytes", 4), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/response/chunk_payload_bytes", 5), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/response/chunk_payload_bytes", 6), + None + ), + Record( + Trace.id, + Time.now, + Annotation.BinaryAnnotation("srv/stream/response/chunk_payload_bytes", 7), + None + ) + ) + ) + } + } + + test("streaming -- doesn't trace sizes when not actively tracing") { + val svc = streamingService(NullStatsReceiver) + val tracer = new BufferingTracer { + override def isActivelyTracing(traceId: TraceId): Boolean = false + } + Trace.letTracer(tracer) { + assert(!Trace.isActivelyTracing) + val rep = await(svc(streamingRequest)) + assert( + Buf.Utf8.unapply(await(Reader.readAll(rep.reader))) == Some( + "1234" + "12345" + "123456" + "1234567")) + } + assert(tracer.toSeq == Nil) + } + + test("streaming -- records metrics") { + val stats = new InMemoryStatsReceiver() + val svc = streamingService(stats) + val rep = await(svc(streamingRequest)) + assert( + Buf.Utf8.unapply(await(Reader.readAll(rep.reader))) == Some( + "1234" + "12345" + "123456" + "1234567")) + assert(stats.stat("stream", "request", "chunk_payload_bytes")() == Seq(1f, 2f, 3f)) + eventually { + Reader.readAll(rep.reader) + assert(stats.stat("stream", "response", "chunk_payload_bytes")() == Seq(4f, 5f, 6f, 7f)) + } + } +}