Skip to content

Commit

Permalink
finagle-thrift[mux]: fix Thrift[Mux].client treats undeclared server …
Browse files Browse the repository at this point in the history
…exceptions as successes

= Problem =
Thrift[Mux] clients treat undeclared (in IDL) server exceptions as successes:
- In wire stats they are reported as successes rather than failures, which leads to incorrect
  high SR (logical/* stats is correct, though)
- Failure accrual service does not disable a faulty host
- Client's custom failure accrual policy is not used at all

= Solution =
Make `ThriftCodec.decodeResponse` return `Throw(ex)` rather than throwing the exception.

= Result =
- Wire success rate is correct
- Failure accrual service disables a faulty host
- Client's failure accrual policy is used for all exceptions, not just for declared

JIRA Issues: GRAPH-15029

Differential Revision: https://phabricator.twitter.biz/D698272
  • Loading branch information
Anton Ivanov authored and jenkins committed Jul 14, 2021
1 parent 9a418a5 commit 3bba41c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 21 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ Runtime Behavior Changes
* finagle-http2: introduce optional parameter `NackRstFrameHandling` to enable or disable NACK
conversion to RST_STREAM frames. ``PHAB_ID=D702696``

* finagle-thrift, finagle-thriftmux: clients may start reporting (correctly) lower success rate.
Previously server exceptions not declared in IDL were erroneously considered as successes.
The fgix also improves failure detection and thus nodes previously considered as healthy
by failure accrual policy may be considered as unhealthy. ``PHAB_ID=D698272``

Bug Fixes
~~~~~~~~~~

* finagle-core: Add `BackupRequestFilter` to client registry when configured. ``PHAB_ID=D686981``

* finagle-thrift, finagle-thriftmux: clients now treat server exceptions
not declared in IDL as failures, rather than successes,
and do not skip the configured response classifier for failure accrual.
``PHAB_ID=D698272``

21.6.0
------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,41 @@ package com.twitter.finagle.thrift.service

import com.twitter.finagle.context.Contexts
import com.twitter.finagle.thrift.{ClientDeserializeCtx, ThriftClientRequest, maxReusableBufferSize}
import com.twitter.finagle.{Filter, Service}
import com.twitter.finagle.{Filter, Service, SourcedException}
import com.twitter.scrooge.{TReusableBuffer, ThriftMethod, ThriftStruct, ThriftStructCodec}
import com.twitter.util.{Future, Return, Throw, Try}
import java.util.Arrays
import org.apache.thrift.TApplicationException
import org.apache.thrift.protocol.{TMessage, TMessageType, TProtocolFactory}
import org.apache.thrift.transport.TMemoryInputTransport

private[thrift] object ThriftCodec {
object ThriftCodec {

/**
* A [[Filter]] that wraps a binary thrift Service[ThriftClientRequest, Array[Byte]]
* and produces a [[Service]] from a [[ThriftStruct]] to [[ThriftClientRequest]] (i.e. bytes).
*/
def filter(
private[thrift] def filter(
method: ThriftMethod,
pf: TProtocolFactory
): Filter[method.Args, method.SuccessType, ThriftClientRequest, Array[Byte]] =
new Filter[method.Args, method.SuccessType, ThriftClientRequest, Array[Byte]] {
private[this] val decodeRepFn: Array[Byte] => Try[method.SuccessType] = { bytes =>
val result: method.Result = decodeResponse(bytes, method.responseCodec, pf)
result.firstException() match {
case Some(ex) => Throw(ex)
case None =>
result.successField match {
case Some(v) => Return(v)
case None =>
Throw(
new TApplicationException(
TApplicationException.MISSING_RESULT,
s"Thrift method '${method.name}' failed: missing result"
decodeResponse(bytes, method.responseCodec, pf).flatMap { result: method.Result =>
result.firstException() match {
case Some(ex) => Throw(ex)
case None =>
result.successField match {
case Some(v) => Return(v)
case None =>
Throw(
new TApplicationException(
TApplicationException.MISSING_RESULT,
s"Thrift method '${method.name}' failed: missing result"
)
)
)
}
}
}
}
}

Expand Down Expand Up @@ -75,21 +76,22 @@ private[thrift] object ThriftCodec {
new ThriftClientRequest(bytes, oneway)
}

private def decodeResponse[T <: ThriftStruct](
def decodeResponse[T <: ThriftStruct](
resBytes: Array[Byte],
codec: ThriftStructCodec[T],
pf: TProtocolFactory
): T = {
pf: TProtocolFactory,
serviceName: String = ""
): Try[T] = {
val iprot = pf.getProtocol(new TMemoryInputTransport(resBytes))
val msg = iprot.readMessageBegin()
if (msg.`type` == TMessageType.EXCEPTION) {
val exception = TApplicationException.readFrom(iprot)
iprot.readMessageEnd()
throw exception
Throw(SourcedException.setServiceName(exception, serviceName))
} else {
val result = codec.decode(iprot)
iprot.readMessageEnd()
result
Return(result)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.client.StackClient
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.dispatch.PipeliningDispatcher
import com.twitter.finagle.liveness.FailureAccrualFactory
import com.twitter.finagle.param.{Label, Stats, Tracer => PTracer}
import com.twitter.finagle.service._
import com.twitter.finagle.stats._
Expand Down Expand Up @@ -293,6 +294,103 @@ class EndToEndTest
await(server.close())
}

test(
"thriftmux server + thriftmux client: " +
"if the server throws an exception, the client should treat it as a failure"
) {
val serverSR = new InMemoryStatsReceiver()
val server = serverImpl
.withStatsReceiver(serverSR)
.withLabel("aserver")
.serveIface(
new InetSocketAddress(InetAddress.getLoopbackAddress, 0),
new TestService.MethodPerEndpoint {
def query(x: String): Future[String] = throw new Exception("sad panda")
def question(y: String): Future[String] = ???
def inquiry(z: String): Future[String] = ???
}
)

try {
val clientSR = new InMemoryStatsReceiver()
val markDeadForMs = 60000
val dest = Name.bound(Address(server.boundAddress.asInstanceOf[InetSocketAddress]))

val clientBase = ThriftMux.client
.withStatsReceiver(clientSR)
.withResponseClassifier(ThriftMuxResponseClassifier.ThriftExceptionsAsFailures)
.configured(
FailureAccrualFactory
.Param(numFailures = 1, markDeadFor = Duration.fromMilliseconds(markDeadForMs))
)

val methodBuilderLabel = "methodBuilder"
val methodBuilderClient = TestService.MethodPerEndpoint(
clientBase
.withLabel(methodBuilderLabel)
.methodBuilder(dest)
.servicePerEndpoint[TestService.ServicePerEndpoint]
)
testUndeclaredServerException(
methodBuilderClient,
methodBuilderLabel,
clientSR,
markDeadForMs,
assertLogicalStats = true,
)

assert(serverSR.counter("aserver", "failures")() == 1)
assert(serverSR.counter("aserver", "failures", "java.lang.Exception")() == 1)
assert(serverSR.counter("aserver", "success")() == 0)

clientSR.clear()

val methodPerEndpointLabel = "MethodPerEndpoint"
val methodPerEndpointClient = clientBase.build[thriftscala.TestService.MethodPerEndpoint](
dest,
methodPerEndpointLabel,
)
testUndeclaredServerException(
methodPerEndpointClient,
methodPerEndpointLabel,
clientSR,
markDeadForMs,
assertLogicalStats = false,
)

} finally {
server.close()
}
}

private[this] def testUndeclaredServerException(
client: TestService.MethodPerEndpoint,
label: String,
clientSR: InMemoryStatsReceiver,
markDeadForMs: Int,
assertLogicalStats: Boolean,
): Unit = {
val thrown = intercept[TApplicationException] {
await(client.query("ok"))
}

assert(thrown.getMessage == "Internal error processing query: 'java.lang.Exception: sad panda'")

eventually {
if (assertLogicalStats) {
assert(clientSR.counter(label, "logical", "failures")() == 1, label)
assert(clientSR.counter(label, "logical", "success")() == 0, label)
}
assert(clientSR.counter(label, "failures")() == 1, label)
assert(clientSR.counter(label, "success")() == 0, label)
assert(clientSR.counter(label, "failure_accrual", "removals")() == 1, label)
assert(
clientSR.counter(label, "failure_accrual", "removed_for_ms")() == markDeadForMs,
label,
)
}
}

test("thriftmux server + Finagle thrift client: traceId should be passed from client to server") {
@volatile var cltTraceId: Option[TraceId] = None
@volatile var srvTraceId: Option[TraceId] = None
Expand Down

0 comments on commit 3bba41c

Please sign in to comment.