diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2d73ab8c83..3014ddb0b2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -35,11 +35,21 @@ Runtime Behavior Changes * finagle-http2: introduce optional parameter `NackRstFrameHandling` to enable or disable NACK conversion to RST_STREAM frames. ``PHAB_ID=D702696`` +* finagle-thrift, finagle-thriftmux: clients may start reporting (correctly) lower success rate. + Previously server exceptions not declared in IDL were erroneously considered as successes. + The fgix also improves failure detection and thus nodes previously considered as healthy + by failure accrual policy may be considered as unhealthy. ``PHAB_ID=D698272`` + Bug Fixes ~~~~~~~~~~ * finagle-core: Add `BackupRequestFilter` to client registry when configured. ``PHAB_ID=D686981`` +* finagle-thrift, finagle-thriftmux: clients now treat server exceptions + not declared in IDL as failures, rather than successes, + and do not skip the configured response classifier for failure accrual. + ``PHAB_ID=D698272`` + 21.6.0 ------ diff --git a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/service/ThriftCodec.scala b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/service/ThriftCodec.scala index 191ea11f4e..a93659ba78 100644 --- a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/service/ThriftCodec.scala +++ b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/service/ThriftCodec.scala @@ -2,7 +2,7 @@ package com.twitter.finagle.thrift.service import com.twitter.finagle.context.Contexts import com.twitter.finagle.thrift.{ClientDeserializeCtx, ThriftClientRequest, maxReusableBufferSize} -import com.twitter.finagle.{Filter, Service} +import com.twitter.finagle.{Filter, Service, SourcedException} import com.twitter.scrooge.{TReusableBuffer, ThriftMethod, ThriftStruct, ThriftStructCodec} import com.twitter.util.{Future, Return, Throw, Try} import java.util.Arrays @@ -10,32 +10,33 @@ import org.apache.thrift.TApplicationException import org.apache.thrift.protocol.{TMessage, TMessageType, TProtocolFactory} import org.apache.thrift.transport.TMemoryInputTransport -private[thrift] object ThriftCodec { +object ThriftCodec { /** * A [[Filter]] that wraps a binary thrift Service[ThriftClientRequest, Array[Byte]] * and produces a [[Service]] from a [[ThriftStruct]] to [[ThriftClientRequest]] (i.e. bytes). */ - def filter( + private[thrift] def filter( method: ThriftMethod, pf: TProtocolFactory ): Filter[method.Args, method.SuccessType, ThriftClientRequest, Array[Byte]] = new Filter[method.Args, method.SuccessType, ThriftClientRequest, Array[Byte]] { private[this] val decodeRepFn: Array[Byte] => Try[method.SuccessType] = { bytes => - val result: method.Result = decodeResponse(bytes, method.responseCodec, pf) - result.firstException() match { - case Some(ex) => Throw(ex) - case None => - result.successField match { - case Some(v) => Return(v) - case None => - Throw( - new TApplicationException( - TApplicationException.MISSING_RESULT, - s"Thrift method '${method.name}' failed: missing result" + decodeResponse(bytes, method.responseCodec, pf).flatMap { result: method.Result => + result.firstException() match { + case Some(ex) => Throw(ex) + case None => + result.successField match { + case Some(v) => Return(v) + case None => + Throw( + new TApplicationException( + TApplicationException.MISSING_RESULT, + s"Thrift method '${method.name}' failed: missing result" + ) ) - ) - } + } + } } } @@ -75,21 +76,22 @@ private[thrift] object ThriftCodec { new ThriftClientRequest(bytes, oneway) } - private def decodeResponse[T <: ThriftStruct]( + def decodeResponse[T <: ThriftStruct]( resBytes: Array[Byte], codec: ThriftStructCodec[T], - pf: TProtocolFactory - ): T = { + pf: TProtocolFactory, + serviceName: String = "" + ): Try[T] = { val iprot = pf.getProtocol(new TMemoryInputTransport(resBytes)) val msg = iprot.readMessageBegin() if (msg.`type` == TMessageType.EXCEPTION) { val exception = TApplicationException.readFrom(iprot) iprot.readMessageEnd() - throw exception + Throw(SourcedException.setServiceName(exception, serviceName)) } else { val result = codec.decode(iprot) iprot.readMessageEnd() - result + Return(result) } } } diff --git a/finagle-thriftmux/src/test/scala/com/twitter/finagle/thriftmux/EndToEndTest.scala b/finagle-thriftmux/src/test/scala/com/twitter/finagle/thriftmux/EndToEndTest.scala index f6bc2c17ad..7fb6cdd5b7 100644 --- a/finagle-thriftmux/src/test/scala/com/twitter/finagle/thriftmux/EndToEndTest.scala +++ b/finagle-thriftmux/src/test/scala/com/twitter/finagle/thriftmux/EndToEndTest.scala @@ -7,6 +7,7 @@ import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.client.StackClient import com.twitter.finagle.context.Contexts import com.twitter.finagle.dispatch.PipeliningDispatcher +import com.twitter.finagle.liveness.FailureAccrualFactory import com.twitter.finagle.param.{Label, Stats, Tracer => PTracer} import com.twitter.finagle.service._ import com.twitter.finagle.stats._ @@ -293,6 +294,103 @@ class EndToEndTest await(server.close()) } + test( + "thriftmux server + thriftmux client: " + + "if the server throws an exception, the client should treat it as a failure" + ) { + val serverSR = new InMemoryStatsReceiver() + val server = serverImpl + .withStatsReceiver(serverSR) + .withLabel("aserver") + .serveIface( + new InetSocketAddress(InetAddress.getLoopbackAddress, 0), + new TestService.MethodPerEndpoint { + def query(x: String): Future[String] = throw new Exception("sad panda") + def question(y: String): Future[String] = ??? + def inquiry(z: String): Future[String] = ??? + } + ) + + try { + val clientSR = new InMemoryStatsReceiver() + val markDeadForMs = 60000 + val dest = Name.bound(Address(server.boundAddress.asInstanceOf[InetSocketAddress])) + + val clientBase = ThriftMux.client + .withStatsReceiver(clientSR) + .withResponseClassifier(ThriftMuxResponseClassifier.ThriftExceptionsAsFailures) + .configured( + FailureAccrualFactory + .Param(numFailures = 1, markDeadFor = Duration.fromMilliseconds(markDeadForMs)) + ) + + val methodBuilderLabel = "methodBuilder" + val methodBuilderClient = TestService.MethodPerEndpoint( + clientBase + .withLabel(methodBuilderLabel) + .methodBuilder(dest) + .servicePerEndpoint[TestService.ServicePerEndpoint] + ) + testUndeclaredServerException( + methodBuilderClient, + methodBuilderLabel, + clientSR, + markDeadForMs, + assertLogicalStats = true, + ) + + assert(serverSR.counter("aserver", "failures")() == 1) + assert(serverSR.counter("aserver", "failures", "java.lang.Exception")() == 1) + assert(serverSR.counter("aserver", "success")() == 0) + + clientSR.clear() + + val methodPerEndpointLabel = "MethodPerEndpoint" + val methodPerEndpointClient = clientBase.build[thriftscala.TestService.MethodPerEndpoint]( + dest, + methodPerEndpointLabel, + ) + testUndeclaredServerException( + methodPerEndpointClient, + methodPerEndpointLabel, + clientSR, + markDeadForMs, + assertLogicalStats = false, + ) + + } finally { + server.close() + } + } + + private[this] def testUndeclaredServerException( + client: TestService.MethodPerEndpoint, + label: String, + clientSR: InMemoryStatsReceiver, + markDeadForMs: Int, + assertLogicalStats: Boolean, + ): Unit = { + val thrown = intercept[TApplicationException] { + await(client.query("ok")) + } + + assert(thrown.getMessage == "Internal error processing query: 'java.lang.Exception: sad panda'") + + eventually { + if (assertLogicalStats) { + assert(clientSR.counter(label, "logical", "failures")() == 1, label) + assert(clientSR.counter(label, "logical", "success")() == 0, label) + } + assert(clientSR.counter(label, "failures")() == 1, label) + assert(clientSR.counter(label, "success")() == 0, label) + assert(clientSR.counter(label, "failure_accrual", "removals")() == 1, label) + assert( + clientSR.counter(label, "failure_accrual", "removed_for_ms")() == markDeadForMs, + label, + ) + } + } + test("thriftmux server + Finagle thrift client: traceId should be passed from client to server") { @volatile var cltTraceId: Option[TraceId] = None @volatile var srvTraceId: Option[TraceId] = None