Skip to content

Commit

Permalink
finatra: StackTransformer for Finatra
Browse files Browse the repository at this point in the history
Problem / Solution

There isn't a general mechanism to modify the filters applied to routes when
they get built. Finagle uses StackTransformer to modify server stacks, Finatra
can use it too if we replace its filter building mechanisms with equivalent
stack building ones.

JIRA Issues: CSL-7673

Differential Revision: https://phabricator.twitter.biz/D277493
  • Loading branch information
luciferous authored and jenkins committed Mar 20, 2019
1 parent ce05c72 commit a96312d
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 29 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ Note that ``RB_ID=#`` and ``PHAB_ID=#`` correspond to associated message in comm
Unreleased
----------

Changed
~~~~~~~

* finatra-thrift: Constructing a `ThriftRouter` now requires a `c.t.f.StackTransformer`
``PHAB_ID=D277493``

19.3.0
------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.twitter.inject.modules

import com.twitter.finagle.{ServiceFactory, Stack, StackTransformer}
import com.twitter.finagle.util.LoadService
import com.twitter.inject.TwitterModule

/**
* Provides a [[com.twitter.finagle.StackTransformer]] to the dependency
* injection context.
*/
object StackTransformerModule extends TwitterModule {
override protected def configure(): Unit =
bindSingleton[StackTransformer].toInstance(loadStackTransformer)

private[this] def loadStackTransformer: StackTransformer =
new StackTransformer {
private[this] val transformers = LoadService[StackTransformer]

val name: String = StackTransformerModule.this.getClass.getName

def apply[Req, Rep](stackSvcFac: Stack[ServiceFactory[Req, Rep]]): Stack[ServiceFactory[Req, Rep]] =
transformers.foldLeft(stackSvcFac)((stack, transform) => transform(stack))
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.finatra.thrift.internal

import com.twitter.finagle.{Filter, Service}
import com.twitter.finagle
import com.twitter.finagle.{Service, ServiceFactory, Stack}
import com.twitter.scrooge.ThriftMethod
import com.twitter.util.Future

Expand All @@ -9,13 +10,20 @@ private[thrift] class ThriftMethodService[Args, Result](
val service: Service[Args, Result]
) extends Service[Args, Result] {

private[this] var filter: Filter[Args, Result, Args, Result] = Filter.identity
private[this] val leaf: Stack[ServiceFactory[Args, Result]] =
Stack.leaf(finagle.stack.Endpoint, ServiceFactory.const(service))

private[this] var stack: Stack[ServiceFactory[Args, Result]] = leaf

private[this] lazy val filteredService: Service[Args, Result] =
// Materialize the stack with default params.
Service.pending(stack.make(Stack.Params.empty)())

private[finatra] def name: String = method.name

override def apply(request: Args): Future[Result] =
filter.andThen(service)(request)
private[finatra] def setStack(newStack: Stack[ServiceFactory[Args, Result]]): Unit =
stack = newStack ++ leaf

private[finatra] def setFilter(f: Filter.TypeAgnostic): Unit =
filter = filter.andThen(f.toFilter[Args, Result])
def apply(request: Args): Future[Result] =
filteredService(request)
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.twitter.finatra.thrift.routing

import com.twitter.finagle
import com.twitter.finagle.service.NilService
import com.twitter.finagle.thrift.{RichServerParam, ThriftService, ToThriftService}
import com.twitter.finagle.{Filter, Service, Thrift, ThriftMux}
import com.twitter.finagle.{Filter, Thrift, ThriftMux, Service, ServiceFactory, Stack, StackTransformer}
import com.twitter.finatra.thrift._
import com.twitter.finatra.thrift.exceptions.{ExceptionManager, ExceptionMapper}
import com.twitter.finatra.thrift.internal.routing.{NullThriftService, Registrar}
Expand Down Expand Up @@ -101,9 +102,9 @@ private[routing] abstract class BaseThriftRouter[Router <: BaseThriftRouter[Rout
result
}

protected[this] def registerGlobalFilter(thriftFilter: Filter.TypeAgnostic): Unit = {
protected[this] def registerGlobalFilter(thriftFilter: Object, registry: LibraryRegistry): Unit = {
if (thriftFilter ne Filter.TypeAgnostic.Identity) {
libraryRegistry
registry
.withSection("thrift")
.put("filters", thriftFilter.toString)
}
Expand All @@ -122,15 +123,23 @@ private object ThriftRouter {
* are encouraged to use the [[JavaThriftRouter]].
*/
@Singleton
class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionManager)
class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionManager, stackTransformer: StackTransformer)
extends BaseThriftRouter[ThriftRouter](injector, exceptionManager) {

private[this] var underlying: ThriftService = NullThriftService

// This map of routes is generated based on the controller and set once.
private[this] var routes: Map[ThriftMethod, ScroogeServiceImpl] = _

protected[this] var filters: Filter.TypeAgnostic = Filter.TypeAgnostic.Identity
private[this] def filterStack[Req, Rep]: Stack[ServiceFactory[Req, Rep]] = {
val nilStack = finagle.stack.nilStack[Req, Rep]
val stackSvcFac = filters.foldLeft(nilStack) { (stack, filter) =>
stack.prepend(Stack.Role(filter.toString), filter)
}
stackTransformer.apply(stackSvcFac)
}

private[this] var filters: Seq[Filter.TypeAgnostic] = Nil

private[finatra] def routeWarmup[M <: ThriftMethod](
m: M
Expand Down Expand Up @@ -198,7 +207,7 @@ class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionMana
*/
def filter(filter: Filter.TypeAgnostic): ThriftRouter =
preConfig("'filter' must be called before 'add'.") {
filters = filters.andThen(filter)
filters = filter +: filters
this
}

Expand Down Expand Up @@ -226,7 +235,7 @@ class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionMana
.instance[LibraryRegistry]
.withSection("thrift", "methods")

registerGlobalFilter(reg, filters)
registerGlobalFilter(filterStack, reg)

underlying = controller.config match {
case c: Controller.ControllerConfig => addController(controller, c)
Expand All @@ -251,8 +260,14 @@ class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionMana
val method: ThriftMethod = cm.method
val service = cm.impl.asInstanceOf[Service[Request[method.Args], Response[method.SuccessType]]]
thriftMethodRegistrar.register(controller.getClass, method, cm.filters)
method -> {
val endpoint = ServiceFactory.const(cm.filters.andThen(service))
val stack = filterStack ++ Stack.leaf(finagle.stack.Endpoint, endpoint)

method -> filters.andThen(cm.filters).andThen(service).asInstanceOf[ScroogeServiceImpl]
// Materialize a stack with default params.
val svcFac = stack.make(Stack.Params.empty)
Service.pending(svcFac()).asInstanceOf[ScroogeServiceImpl]
}
}.toMap

info(
Expand All @@ -277,8 +292,7 @@ class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionMana
routes = conf.methods.map { methodService =>
val method = methodService.method
thriftMethodRegistrar.register(controller.getClass, method, Filter.TypeAgnostic.Identity)
methodService.setFilter(filters)

methodService.setStack(filterStack)
// Convert to a ScroogeServiceImpl for issuing warmup requests
val castedService = methodService.asInstanceOf[Service[method.Args, method.SuccessType]]
val reqRepService = Service.mk[Request[method.Args], Response[method.SuccessType]] { req =>
Expand All @@ -295,17 +309,6 @@ class ThriftRouter @Inject()(injector: Injector, exceptionManager: ExceptionMana
}
controller.asInstanceOf[ToThriftService].toThriftService
}

private[this] def registerGlobalFilter(
registry: LibraryRegistry,
thriftFilter: Filter.TypeAgnostic
): Unit = {
if (thriftFilter ne Filter.TypeAgnostic.Identity) {
libraryRegistry
.withSection("thrift")
.put("filters", thriftFilter.toString)
}
}
}

/**
Expand Down Expand Up @@ -445,7 +448,7 @@ class JavaThriftRouter @Inject()(injector: Injector, exceptionManager: Exception
declaredMethods.map(method => s"$serviceName.${method.getName}").mkString("\n")
)

registerGlobalFilter(filters)
registerGlobalFilter(filters, libraryRegistry)
registerMethods(serviceName, controller, declaredMethods.toSeq)
underlying = serviceInstance
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.twitter.finatra.thrift.response.ThriftResponseClassifier
import com.twitter.finatra.thrift.routing.{JavaThriftRouter, ThriftRouter}
import com.twitter.inject.annotations.Lifecycle
import com.twitter.inject.internal.LibraryRegistry
import com.twitter.inject.modules.StackTransformerModule
import com.twitter.inject.server.{PortUtils, TwitterServer}
import com.twitter.util.{Await, Duration}

Expand All @@ -35,6 +36,7 @@ trait ThriftServerTrait extends TwitterServer {
/** Add Framework Modules */
addFrameworkModules(
ExceptionManagerModule,
StackTransformerModule,
thriftResponseClassifierModule)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.twitter.inject.Test
class ThriftWarmupTest extends Test {
test("ThriftWarmup refuses to send a request if a router is not configured") {
intercept[IllegalStateException] {
val tw = new ThriftWarmup(new ThriftRouter(null, null))
val tw = new ThriftWarmup(new ThriftRouter(null, null, null))
tw.send(Uppercase, Uppercase.Args("hithere"), 1)()
}
}
Expand Down

0 comments on commit a96312d

Please sign in to comment.