Navigation Menu

Skip to content

Commit

Permalink
finagle-exception,zipkin: Improve client configuration
Browse files Browse the repository at this point in the history
Problem

The Finagle clients used by Reporter and RawZipkinTracer are not well
behaved in the face of a poorly behaved server and there is no
visibility into their stats.

Solution

Export stats to ClientStatsReceiver, add an upper bound on max waiters
and a global timeout of 1 second.

Result

Visibility into the client behavior and removal of an unbounded queue
which can manifest as a memory leak.

RB_ID=674628
  • Loading branch information
kevinoliver authored and jenkins committed Jun 1, 2015
1 parent 23ad89c commit eb94564
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 32 deletions.
@@ -1,19 +1,15 @@
package com.twitter.finagle.exception

import java.net.{SocketAddress, InetSocketAddress, InetAddress}

import org.apache.thrift.protocol.TBinaryProtocol
import com.twitter.finagle.exception.thriftscala.{LogEntry, ResultCode, Scribe, Scribe$FinagleClient}

import com.twitter.app.GlobalFlag
import com.twitter.util.GZIPStringEncoder
import com.twitter.util.{Future, Time, Monitor, NullMonitor}

import com.twitter.conversions.time._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.thrift.ThriftClientFramedCodec
import com.twitter.finagle.exception.thriftscala.{LogEntry, ResultCode, Scribe, Scribe$FinagleClient}
import com.twitter.finagle.stats.{ClientStatsReceiver, NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.thrift.{Protocols, ThriftClientFramedCodec}
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.util.ReporterFactory
import com.twitter.util.{Future, GZIPStringEncoder, Monitor, NullMonitor, Time}
import java.net.{InetAddress, InetSocketAddress, SocketAddress}

trait ClientMonitorFactory extends (String => Monitor)
trait ServerMonitorFactory extends ((String, SocketAddress) => Monitor)
Expand All @@ -22,8 +18,10 @@ trait MonitorFactory extends ServerMonitorFactory with ClientMonitorFactory {
def clientMonitor(serviceName: String): Monitor
def serverMonitor(serviceName: String, address: SocketAddress): Monitor

def apply(serviceName: String) = clientMonitor(serviceName)
def apply(serviceName: String, address: SocketAddress) = serverMonitor(serviceName, address)
def apply(serviceName: String): Monitor = clientMonitor(serviceName)

def apply(serviceName: String, address: SocketAddress): Monitor =
serverMonitor(serviceName, address)
}

object NullMonitorFactory extends MonitorFactory {
Expand Down Expand Up @@ -59,7 +57,7 @@ object Reporter {
*/
@deprecated("Use reporterFactory instead")
def clientReporter(scribeHost: String, scribePort: Int): String => Monitor = {
monitorFactory(scribeHost, scribePort).clientMonitor _
monitorFactory(scribeHost, scribePort).clientMonitor
}

/**
Expand All @@ -73,7 +71,7 @@ object Reporter {
*/
@deprecated("Use reporterFactory instead")
def sourceReporter(scribeHost: String, scribePort: Int): (String, SocketAddress) => Monitor = {
monitorFactory(scribeHost, scribePort).serverMonitor _
monitorFactory(scribeHost, scribePort).serverMonitor
}


Expand All @@ -84,22 +82,28 @@ object Reporter {
def monitorFactory(scribeHost: String, scribePort: Int): MonitorFactory = new MonitorFactory {
private[this] val scribeClient = makeClient(scribeHost, scribePort)

def clientMonitor(serviceName: String) =
def clientMonitor(serviceName: String): Reporter =
new Reporter(scribeClient, serviceName).withClient()
def serverMonitor(serviceName: String, address: SocketAddress) =
def serverMonitor(serviceName: String, address: SocketAddress): Reporter =
new Reporter(scribeClient, serviceName).withSource(address)
}


private[exception] def makeClient(scribeHost: String, scribePort: Int) = {
val service = ClientBuilder() // these are from the zipkin tracer
.name("exception_reporter")
.hosts(new InetSocketAddress(scribeHost, scribePort))
.codec(ThriftClientFramedCodec())
.reportTo(ClientStatsReceiver)
.hostConnectionLimit(5)
// using an arbitrary, but bounded number of waiters to avoid memory leaks
.hostConnectionMaxWaiters(250)
// somewhat arbitrary, but bounded timeouts
.timeout(1.second)
.daemon(true)
.build()

new Scribe$FinagleClient(service, new TBinaryProtocol.Factory())
new Scribe$FinagleClient(service, Protocols.binaryFactory())
}
}

Expand All @@ -115,11 +119,12 @@ object Reporter {
* is very wrong!
*/
sealed case class Reporter(
client: Scribe[Future],
serviceName: String,
statsReceiver: StatsReceiver = NullStatsReceiver,
private val sourceAddress: Option[String] = Some(InetAddress.getLoopbackAddress.getHostName),
private val clientAddress: Option[String] = None) extends Monitor {
client: Scribe[Future],
serviceName: String,
statsReceiver: StatsReceiver = NullStatsReceiver,
private val sourceAddress: Option[String] = Some(InetAddress.getLoopbackAddress.getHostName),
private val clientAddress: Option[String] = None)
extends Monitor {

private[this] val okCounter = statsReceiver.counter("report_exception_ok")
private[this] val tryLaterCounter = statsReceiver.counter("report_exception_ok")
Expand All @@ -129,7 +134,7 @@ sealed case class Reporter(
*
* The endpoint string is the ip address of the host (e.g. "127.0.0.1").
*/
def withClient(address: InetAddress = InetAddress.getLoopbackAddress) =
def withClient(address: InetAddress = InetAddress.getLoopbackAddress): Reporter =
copy(clientAddress = Some(address.getHostAddress))

/**
Expand All @@ -139,7 +144,7 @@ sealed case class Reporter(
* "127.0.0.1:8080"). This is retained for orthogonality of exterior
* interfaces. We use the host name internaly.
*/
def withSource(address: SocketAddress) =
def withSource(address: SocketAddress): Reporter =
address match {
case isa: InetSocketAddress => copy(sourceAddress = Some(isa.getAddress.getHostName))
case _ => this // don't deal with non-InetSocketAddress types, but don't crash either
Expand All @@ -149,7 +154,7 @@ sealed case class Reporter(
* Create a default ServiceException and fold in the modifiers (i.e. to add a source/client
* endpoint).
*/
def createEntry(e: Throwable) = {
def createEntry(e: Throwable): LogEntry = {
var se = new ServiceException(serviceName, e, Time.now, Trace.id.traceId.toLong)

sourceAddress foreach { sa => se = se withSource sa }
Expand All @@ -164,7 +169,7 @@ sealed case class Reporter(
* See top level comment for this class for more details on performance
* implications.
*/
def handle(t: Throwable) = {
def handle(t: Throwable): Boolean = {
client.log(createEntry(t) :: Nil) onSuccess {
case ResultCode.Ok => okCounter.incr()
case ResultCode.TryLater => tryLaterCounter.incr()
Expand All @@ -176,7 +181,9 @@ sealed case class Reporter(
}
}

object host extends GlobalFlag(new InetSocketAddress("localhost", 1463), "Host to scribe exception messages")
object host extends GlobalFlag[InetSocketAddress](
new InetSocketAddress("localhost", 1463),
"Host to scribe exception messages")

class ExceptionReporter extends ReporterFactory {
private[this] val client = Reporter.makeClient(host().getHostName, host().getPort)
Expand Down
@@ -1,11 +1,10 @@
package com.twitter.finagle.zipkin.thrift

import com.google.common.io.BaseEncoding
import com.twitter.conversions.time._
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.stats.{ClientStatsReceiver, NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.thrift.{Protocols, ThriftClientFramedCodec, thrift}
import com.twitter.finagle.tracing._
import com.twitter.finagle.util.DefaultTimer
Expand All @@ -18,8 +17,6 @@ import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.{ArrayBlockingQueue, TimeoutException}
import org.apache.thrift.TByteArrayOutputStream
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.transport.TTransport
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.reflectiveCalls

Expand All @@ -32,7 +29,12 @@ object RawZipkinTracer {
.name("zipkin-tracer")
.hosts(new InetSocketAddress(scribeHost, scribePort))
.codec(ThriftClientFramedCodec())
.reportTo(ClientStatsReceiver)
.hostConnectionLimit(5)
// using an arbitrary, but bounded number of waiters to avoid memory leaks
.hostConnectionMaxWaiters(250)
// somewhat arbitrary, but bounded timeouts
.timeout(1.second)
.daemon(true)
.build()

Expand Down

0 comments on commit eb94564

Please sign in to comment.