Skip to content

Commit

Permalink
scrooge: Generate a finagle Service per thrift method
Browse files Browse the repository at this point in the history
Problem

Scrooge-generated services are not finagle Services, so they do not allow using Filters.

Solution

Generate a wrapper service that delegates to the underlying thrift service.

Result

For a thrift service, e.g.

service Logger {
  string log(1: string message, 2: i32 logLevel);
  i32 getLogSize();
}

Scrooge generates:

class Logger$ServiceImpl(underlying: Logger[Future]) {
  val log: com.twitter.finagle.Service[(String, Int), String] = ...
  val getLogSize: com.twitter.finagle.Service[Unit, Int] = ...
}

Usage in finagle:

val client = ThriftMux.newServiceIface(Logger, "localhost:8080")

client.log(Logger.Log.Args("message", 1)) onSuccess {...}

This avoids the reflection-based initialization (ThriftRichClient) by collecting the relevant types during generation.

Compatibility constructor to build a FutureIface from a ServiceIface:

val loggerFutureIface = Logger.newFutureIface(loggerServiceIface)
loggerFutureIface.log("msg")

Client configuration is done by e.g.

ThriftMux.client.withClientId(ClientId("asdf")).newServiceIface(Logger, dest)

RB_ID=663690
  • Loading branch information
nshkrob authored and travisbrown committed Jul 27, 2015
1 parent b14f232 commit ba43705
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 8 deletions.
5 changes: 5 additions & 0 deletions CHANGES
Expand Up @@ -94,6 +94,11 @@ Breaking API Changes
`FailureAccrualFactory` in the underlying stack. ``RB_ID=689076``


New Features
~~~~~~~~~~~~

- finagle-thrift: Support for finagle Services per thrift method.

6.25.0
------

Expand Down
Expand Up @@ -2,7 +2,6 @@ package com.twitter.finagle.example.thrift

import com.twitter.finagle.example.thriftscala.Hello
import com.twitter.finagle.Thrift
import com.twitter.util.Future

object ThriftClient {
def main(args: Array[String]) {
Expand Down
1 change: 1 addition & 0 deletions finagle-thrift/src/main/java/BUILD
Expand Up @@ -3,6 +3,7 @@ java_library(name='java',
dependencies=[
'3rdparty/jvm/org/slf4j:slf4j-api',
'3rdparty/jvm/org/apache/thrift:libthrift-0.5.0',
'scrooge/scrooge-core',
],
provides=artifact(org='com.twitter',
name='finagle-thrift-java',
Expand Down
1 change: 1 addition & 0 deletions finagle-thrift/src/main/scala/BUILD
Expand Up @@ -9,6 +9,7 @@ scala_library(name='scala',
'3rdparty/jvm/org/apache/thrift:thrift-0.5.0',
'finagle/finagle-core',
'finagle/finagle-thrift/src/main/java',
'scrooge/scrooge-core',
'util/util-core',
],
sources=rglobs('*.scala'),
Expand Down
Expand Up @@ -57,7 +57,10 @@ object Thrift extends Client[ThriftClientRequest, Array[Byte]] with ThriftRichCl
with Server[Array[Byte], Array[Byte]] with ThriftRichServer {

val protocolFactory: TProtocolFactory = Protocols.binaryFactory()
protected val defaultClientName = "thrift"

protected lazy val Label(defaultClientName) = client.params[Label]

override protected lazy val Stats(stats) = client.params[Stats]

object param {
case class ClientId(clientId: Option[thrift.ClientId])
Expand Down Expand Up @@ -133,12 +136,13 @@ object Thrift extends Client[ThriftClientRequest, Array[Byte]] with ThriftRichCl
params: Stack.Params = this.params
): Client = copy(stack, params)

protected val defaultClientName = "thrift"
protected lazy val Label(defaultClientName) = params[Label]

protected type In = ThriftClientRequest
protected type Out = Array[Byte]

protected val param.ProtocolFactory(protocolFactory) = params[param.ProtocolFactory]
override protected lazy val Stats(stats) = params[Stats]

protected def newTransporter(): Transporter[In, Out] = {
val pipeline =
Expand Down Expand Up @@ -247,3 +251,4 @@ object Thrift extends Client[ThriftClientRequest, Array[Byte]] with ThriftRichCl
service: ServiceFactory[Array[Byte], Array[Byte]]
): ListeningServer = server.serve(addr, service)
}

48 changes: 45 additions & 3 deletions finagle-thrift/src/main/scala/com/twitter/finagle/rich.scala
@@ -1,9 +1,9 @@
package com.twitter.finagle

import com.twitter.finagle.server.StackBasedServer
import com.twitter.finagle.stats.{LoadedStatsReceiver, ClientStatsReceiver, ServerStatsReceiver, StatsReceiver}
import com.twitter.finagle.thrift.ThriftClientRequest
import com.twitter.finagle.stats._
import com.twitter.finagle.thrift._
import com.twitter.finagle.util.Showable
import com.twitter.scrooge.{ThriftMethod, ThriftStruct}
import com.twitter.util.NonFatal
import java.lang.reflect.{Constructor, Method}
import java.net.SocketAddress
Expand Down Expand Up @@ -335,6 +335,48 @@ trait ThriftRichClient { self: Client[ThriftClientRequest, Array[Byte]] =>

constructIface(underlying, cls, protocolFactory, sr)
}


/**
* Construct a Finagle Service interface for a Scrooge-generated thrift object.
*
* E.g. given a thrift service
* {{{
* service Logger {
* string log(1: string message, 2: i32 logLevel);
* i32 getLogSize();
* }
* }}}
*
* you can construct a client interface with a Finagle Service per thrift method:
*
* {{{
* val loggerService = Thrift.newServiceIface(Logger, "localhost:8000")
* val response = loggerService.log(Logger.Log.Args("log message", 1))
* }}}
*
* @param builder The builder type is generated by Scrooge for a thrift service.
* @param dest Address of the service to connect to, in the format accepted by [[Resolver.eval]].
*/
def newServiceIface[ServiceIface](dest: String)(
implicit builder: ServiceIfaceBuilder[ServiceIface]
): ServiceIface = {
val thriftService = newService(dest, "")
val scopedStats = stats.scope(defaultClientName)
builder.newServiceIface(thriftService, protocolFactory, scopedStats)
}

def newServiceIface[ServiceIface](dest: Name)(
implicit builder: ServiceIfaceBuilder[ServiceIface]
): ServiceIface = {
val thriftService = newService(dest, "")
val scopedStats = stats.scope(defaultClientName)
builder.newServiceIface(thriftService, protocolFactory, scopedStats)
}

def newMethodIface[ServiceIface, FutureIface](serviceIface: ServiceIface)(
implicit builder: MethodIfaceBuilder[ServiceIface, FutureIface]
): FutureIface = builder.newMethodIface(serviceIface)
}

/**
Expand Down
@@ -0,0 +1,257 @@
package com.twitter.finagle.thrift

import com.twitter.app.GlobalFlag
import com.twitter.conversions.storage._
import com.twitter.finagle.stats.{ClientStatsReceiver, Counter, StatsReceiver}
import com.twitter.finagle._
import com.twitter.scrooge._
import com.twitter.util._
import java.util.Arrays
import org.apache.thrift.TApplicationException
import org.apache.thrift.protocol.{TMessageType, TMessage, TProtocolFactory}
import org.apache.thrift.transport.TMemoryInputTransport


object maxReusableBufferSize extends GlobalFlag[StorageUnit](
16.kilobytes,
"Max size (bytes) for ThriftServiceIface reusable transport buffer"
)

/**
* Typeclass ServiceIfaceBuilder[T] creates T-typed interfaces from thrift clients.
* Scrooge generates implementations of this builder.
*/
trait ServiceIfaceBuilder[ServiceIface] {
/**
* Build a client ServiceIface wrapping a binary thrift service.
*
* @param thriftService An underlying thrift service that works on byte arrays.
* @param pf The protocol factory used to encode/decode thrift structures.
*/
def newServiceIface(
thriftService: Service[ThriftClientRequest, Array[Byte]],
pf: TProtocolFactory,
stats: StatsReceiver
): ServiceIface
}

/**
* A typeclass to construct a MethodIface by wrapping a ServiceIface.
* This is a compatibility constructor to replace an existing Future interface
* with one built from a ServiceIface.
*
* Scrooge generates implementations of this builder.
*/
trait MethodIfaceBuilder[ServiceIface, MethodIface] {
/**
* Build a FutureIface wrapping a ServiceIface.
*/
def newMethodIface(serviceIface: ServiceIface): MethodIface
}

object ThriftMethodStats {
def apply(stats: StatsReceiver): ThriftMethodStats =
ThriftMethodStats(
stats.counter("requests"),
stats.counter("success"),
stats.counter("failures"),
stats.scope("failures"))
}

case class ThriftMethodStats(
requestsCounter: Counter,
successCounter: Counter,
failuresCounter: Counter,
failuresScope: StatsReceiver)

/**
* Construct Service interface for a thrift method.
*
* There are two ways to use a Scrooge-generated thrift service with Finagle:
*
* 1. Using a Service interface, i.e. a collection of finagle [[Service Services]].
*
* 2. Using a method interface, i.e. a collection of methods returning [[Future Futures]].
*
* Example: for a thrift service
* {{{
* service Logger {
* string log(1: string message, 2: i32 logLevel);
* i32 getLogSize();
* }
* }}}
*
* the Service interface is
* {{{
* trait LoggerServiceIface {
* val log: com.twitter.finagle.Service[Logger.Log.Args, Logger.Log.Result]
* val getLogSize: com.twitter.finagle.Service[Logger.GetLogSize.Args, Logger.GetLogSize.Result]
* }
* }}}
*
* and the method interface is
* {{{
* trait Logger[Future] {
* def log(message: String, logLevel: Int): Future[String]
* def getLogSize(): Future[Int]
* }
* }}}
*
* Service interfaces can be modified and composed with Finagle [[Filter Filters]].
*/
object ThriftServiceIface {
private val resetCounter = ClientStatsReceiver.scope("thrift_service_iface").counter("reusable_buffer_resets")

/**
* Build a Service from a given Thrift method.
*/
def apply(
method: ThriftMethod,
thriftService: Service[ThriftClientRequest, Array[Byte]],
pf: TProtocolFactory,
stats: StatsReceiver
): Service[method.Args, method.Result] = {
statsFilter(method, stats) andThen
thriftCodecFilter(method, pf) andThen
thriftService
}

/**
* A [[Filter]] that updates success and failure stats for a thrift method.
* Thrift exceptions are counted as failures here.
*/
private def statsFilter(
method: ThriftMethod,
stats: StatsReceiver
): SimpleFilter[method.Args, method.Result] = {
val methodStats = ThriftMethodStats(stats.scope(method.serviceName).scope(method.name))
new SimpleFilter[method.Args, method.Result] {
def apply(
args: method.Args,
service: Service[method.Args, method.Result]
): Future[method.Result] = {
methodStats.requestsCounter.incr()
service(args).onSuccess { result =>
if (result.successField.isDefined) {
methodStats.successCounter.incr()
} else {
result.firstException.map { ex =>
methodStats.failuresCounter.incr()
methodStats.failuresScope.counter(Throwables.mkString(ex): _*).incr()
}
}
}
}
}
}

/**
* A [[Filter]] that wraps a binary thrift Service[ThriftClientRequest, Array[Byte]]
* and produces a [[Service]] from a [[ThriftStruct]] to [[ThriftClientRequest]] (i.e. bytes).
*/
private def thriftCodecFilter(
method: ThriftMethod,
pf: TProtocolFactory
): Filter[method.Args, method.Result, ThriftClientRequest, Array[Byte]] =
new Filter[method.Args, method.Result, ThriftClientRequest, Array[Byte]] {
override def apply(
args: method.Args,
service: Service[ThriftClientRequest, Array[Byte]]
): Future[method.Result] = {
val request = encodeRequest(method.name, args, pf, method.oneway)
service(request).map { bytes =>
decodeResponse(bytes, method.responseCodec, pf)
}
}
}

def resultFilter(
method: ThriftMethod
): Filter[method.Args, method.SuccessType, method.Args, method.Result] =
new Filter[method.Args, method.SuccessType, method.Args, method.Result] {
def apply(
args: method.Args,
service: Service[method.Args, method.Result]
): Future[method.SuccessType] = {
service(args).flatMap { response: method.Result =>
response.firstException() match {
case Some(exception) =>
setServiceName(exception, method.serviceName)
Future.exception(exception)
case None =>
response.successField match {
case Some(result) =>
Future.value(result)
case None =>
Future.exception(new TApplicationException(
TApplicationException.MISSING_RESULT,
"Thrift method '${method.name}' failed: missing result"
))
}
}
}
}
}

private[this] val tlReusableBuffer = new ThreadLocal[TReusableMemoryTransport] {
override def initialValue() = TReusableMemoryTransport(512)
}

private[this] def getReusableBuffer(): TReusableMemoryTransport = {
val buf = tlReusableBuffer.get()
buf.reset()
buf
}

private[this] def resetBuffer(trans: TReusableMemoryTransport): Unit = {
if (trans.currentCapacity > maxReusableBufferSize().inBytes) {
resetCounter.incr()
tlReusableBuffer.remove()
}
}

private def encodeRequest(
methodName: String,
args: ThriftStruct,
pf: TProtocolFactory,
oneway: Boolean
): ThriftClientRequest = {
val buf = getReusableBuffer()
val oprot = pf.getProtocol(buf)

oprot.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, 0))
args.write(oprot)
oprot.writeMessageEnd()

val bytes = Arrays.copyOfRange(buf.getArray, 0, buf.length)
resetBuffer(buf)

new ThriftClientRequest(bytes, oneway)
}

private def decodeResponse[T <: ThriftStruct](
resBytes: Array[Byte],
codec: ThriftStructCodec[T],
pf: TProtocolFactory
): T = {
val iprot = pf.getProtocol(new TMemoryInputTransport(resBytes))
val msg = iprot.readMessageBegin()
if (msg.`type` == TMessageType.EXCEPTION) {
val exception = TApplicationException.read(iprot)
iprot.readMessageEnd()
throw exception
} else {
val result = codec.decode(iprot)
iprot.readMessageEnd()
result
}
}

private def setServiceName(ex: Throwable, serviceName: String): Throwable =
ex match {
case se: SourcedException if !serviceName.isEmpty =>
se.serviceName = serviceName
se
case _ => ex
}
}

0 comments on commit ba43705

Please sign in to comment.