Skip to content

Commit

Permalink
finagle-core: integrate and expose Dtab.limited
Browse files Browse the repository at this point in the history
Problem

We want to introduce a limited Dtab that does not
propagate via request broadcast and stays local
to the process. We need to make sure that's integrated
where Dtab.local is used and assert that we are not
leaking this information out along with the Dtab.local.

Solution

Expose the `Dtab.limited` API publicly, integrate with the
`Namer` and `BindingFactory`, as well as test the expected
behavior. We expect that `Dtab.local`s that are set for a
request will take precedence over the `Dtab.limited`, but
both will override values set in the `Dtab.base`.

JIRA Issues: CSL-10977

Differential Revision: https://phabricator.twitter.biz/D677860
  • Loading branch information
enbnt authored and jenkins committed Jun 1, 2021
1 parent d868efc commit 2e06c66
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 47 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

New Features
~~~~~~~~~~~~

* finagle-core: Introduce `Dtab.limited`, which is a process-local `Dtab` that will
NOT be remotely broadcast for protocols for any protocol, where `Dtab.local` will
be broadcast for propagation on supported protocols. For path name resolution, the
`Dtab.local` will take precedence over the `Dtab.limited`, if the same path is
defined in both, and both take precedence over the `Dtab.base`. The existing
`Dtab.local` request propagation behavior remains unchanged. ``PHAB_ID=D677860``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,23 @@ class DtabFilterTest extends AnyFunSuite with AssertionsForJUnit {
assert(receivedDtab == Some(origDtab))
}
}

test("Injector does not transmit dtab.limited") {
var receivedDtab: Option[Dtab] = None
val svc = new DtabFilter.Injector().andThen(Service.mk[Request, Response] { req =>
receivedDtab = HttpDtab.read(req).toOption
Future.value(Response())
})

Dtab.unwind {
val newDtab = Dtab.read("/s => /srv/smf1")
Dtab.limited = newDtab

// prepare a request and add a correct looking new-style dtab header
val req = Request()
Await.result(svc(req), timeout)

assert(receivedDtab == Some(Dtab.empty))
}
}
}
12 changes: 7 additions & 5 deletions finagle-core/src/main/scala/com/twitter/finagle/DtabBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,32 +309,34 @@ private[finagle] abstract class DtabCompanionBase {
/**
* The limited, or "non-propagated, per-request", delegation table applies to the
* current [[com.twitter.util.Local Local]] scope which is usually
* propagated from the upstream.
* propagated from the upstream. Finagle uses the Dtab
* `Dtab.base ++ Dtab.limited ++ Dtab.local` to bind [[com.twitter.finagle.Name.Path
* Paths]] via a [[com.twitter.finagle.naming.NameInterpreter]].
*
* Limited's scope is dictated by [[com.twitter.util.Local Local]].
*
* Unlike `local`, `limited` is not propagated to the entire request graph.
*
*/
private[finagle] def limited: Dtab = _limited() match {
def limited: Dtab = _limited() match {
case Some(dtab) => dtab
case None => Dtab.empty
}

private[finagle] def limited_=(dtab: Dtab): Unit =
def limited_=(dtab: Dtab): Unit =
_limited() = dtab

/**
* Java API for `limited_=`
*/
private[finagle] def setLimited(dtab: Dtab): Unit =
def setLimited(dtab: Dtab): Unit =
limited = dtab

/**
* The local, or "per-request", delegation table applies to the
* current [[com.twitter.util.Local Local]] scope which is usually
* defined on a per-request basis. Finagle uses the Dtab
* `Dtab.base (++ Dtab.limited) ++ Dtab.local` to bind [[com.twitter.finagle.Name.Path
* `Dtab.base ++ Dtab.limited ++ Dtab.local` to bind [[com.twitter.finagle.Name.Path
* Paths]] via a [[com.twitter.finagle.naming.NameInterpreter]].
*
* Local's scope is dictated by [[com.twitter.util.Local Local]].
Expand Down
13 changes: 8 additions & 5 deletions finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,24 @@ class GlobalRequestTimeoutException(timeout: Duration)
class NoBrokersAvailableException(
val name: String,
val baseDtabFn: () => Dtab,
val localDtabFn: () => Dtab)
val localDtabFn: () => Dtab,
val limitedDtabFn: () => Dtab)
extends RequestException
with SourcedException {

// backwards compatibility constructor
def this(name: String, baseDtab: Dtab, localDtab: Dtab) =
this(name, () => baseDtab, () => localDtab)
def this(name: String, baseDtab: Dtab, localDtab: Dtab, limitedDtab: Dtab) =
this(name, () => baseDtab, () => localDtab, () => limitedDtab)

def this(name: String = "unknown") = this(name, () => Dtab.base, () => Dtab.local)
def this(name: String = "unknown") =
this(name, () => Dtab.base, () => Dtab.local, () => Dtab.limited)

def baseDtab: Dtab = baseDtabFn()
def localDtab: Dtab = localDtabFn()
def limitedDtab: Dtab = limitedDtabFn()

override def exceptionMessage: String =
s"No hosts are available for $name, Dtab.base=[${baseDtab.show}], Dtab.local=[${localDtab.show}]"
s"No hosts are available for $name, Dtab.base=[${baseDtab.show}], Dtab.limited=[${limitedDtab.show}], Dtab.local=[${localDtab.show}]"

serviceName = name
}
Expand Down
8 changes: 6 additions & 2 deletions finagle-core/src/main/scala/com/twitter/finagle/Namer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,13 @@ object Namer {
}

/**
* Resolve a path to an address set (taking [[Dtab.local]] into account).
* Resolve a path to an address set (taking [[Dtab.limited]] and [[Dtab.local]] into account).
*
* @note The [[Path path]] resolution order will have the [[Dtab.local]] will take precedence,
* followed by the [[Dtab.limited]], and lastly the [[Dtab.base]]. This ensures that the
* [[Dtab.local]] remote request propagation behavior is retained.
*/
def resolve(path: Path): Var[Addr] = resolve(Dtab.base ++ Dtab.local, path)
def resolve(path: Path): Var[Addr] = resolve(Dtab.base ++ Dtab.limited ++ Dtab.local, path)

/**
* Resolve a path to an address set (taking [[Dtab.local]] into account).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ private[finagle] object AddrLifecycle {
Activity(dest.map {
case Addr.Bound(set, _) => Activity.Ok(set)
case Addr.Neg =>
log.info(s"$label: name resolution is negative (local dtab: ${Dtab.local})")
log.info(
s"$label: name resolution is negative (limited dtab: ${Dtab.limited} local dtab: ${Dtab.local})")
Activity.Ok(Set.empty[Address])
case Addr.Failed(e) =>
log.info(s"$label: name resolution failed (local dtab: ${Dtab.local})", e)
log.info(
s"$label: name resolution failed (limited dtab: ${Dtab.limited} local dtab: ${Dtab.local})",
e)
Activity.Failed(e)
case Addr.Pending =>
log.debug(s"$label: name resolution is pending")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ class BindingFactory[Req, Rep] private[naming] (

private[this] val dtabCache = {
val newFactory: Dtabs => ServiceFactory[Req, Rep] = {
case Dtabs(baseDtab, localDtab) =>
case Dtabs(baseDtab, localDtab, limitedDtab) =>
val dynFactory = new DynNameFactory(
NameInterpreter.bind(baseDtab ++ localDtab, path),
NameInterpreter.bind(baseDtab ++ limitedDtab ++ localDtab, path),
nameTreeCache,
statsReceiver = statsReceiver
)
Expand All @@ -124,7 +124,8 @@ class BindingFactory[Req, Rep] private[naming] (
// we don't have the dtabs handy at the point we throw
// the exception; fill them in on the way out
case e: NoBrokersAvailableException =>
Future.exception(new NoBrokersAvailableException(e.name, baseDtab, localDtab))
Future.exception(
new NoBrokersAvailableException(e.name, baseDtab, localDtab, limitedDtab))
}
}
}
Expand All @@ -140,12 +141,12 @@ class BindingFactory[Req, Rep] private[naming] (
}

def apply(conn: ClientConnection): Future[Service[Req, Rep]] =
dtabCache(Dtabs(baseDtab(), Dtab.local), conn)
dtabCache(Dtabs(baseDtab(), Dtab.local, Dtab.limited), conn)

def close(deadline: Time): Future[Unit] =
Closable.sequence(dtabCache, nameTreeCache, nameCache).close(deadline)

override def status: Status = dtabCache.status(Dtabs(baseDtab(), Dtab.local))
override def status: Status = dtabCache.status(Dtabs(baseDtab(), Dtab.local, Dtab.limited))
}

object BindingFactory {
Expand All @@ -155,10 +156,11 @@ object BindingFactory {
private[finagle] val NamerPathAnnotationKey = "clnt/namer.path"
private[finagle] val DtabBaseAnnotationKey = "clnt/namer.dtab.base"

private case class Dtabs(base: Dtab, local: Dtab) extends CachedHashCode.ForClass {
private case class Dtabs(base: Dtab, local: Dtab, limited: Dtab) extends CachedHashCode.ForClass {
override protected def computeHashCode: Int = {
var hash = 1
hash = 31 * hash + base.hashCode
hash = 31 * hash + limited.hashCode
hash = 31 * hash + local.hashCode
hash
}
Expand Down Expand Up @@ -283,7 +285,7 @@ object BindingFactory {
// resulting in wasteful connections. The second condition applies only if
// EagerConnectionsType.ForceWithDtab is false.
val finalParams =
if (!eagerlyConnect || (!forceWithDtab && !Dtab.local.isEmpty))
if (!eagerlyConnect || (!forceWithDtab && !(Dtab.local.isEmpty && Dtab.limited.isEmpty)))
updatedParams + EagerConnections(false)
else updatedParams

Expand All @@ -292,7 +294,7 @@ object BindingFactory {
}

val factory = dest match {
case bound @ Name.Bound(addr) => newStack(label, bound)
case bound @ Name.Bound(_) => newStack(label, bound)

case Name.Path(path) =>
val BaseDtab(baseDtab) = params[BaseDtab]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private[finagle] object BalancingPool {
extends NoBrokersAvailableException(
name = "balancing_pool",
baseDtabFn = () => Dtab.empty,
localDtabFn = () => Dtab.empty
localDtabFn = () => Dtab.empty,
limitedDtabFn = () => Dtab.empty
) {
override def exceptionMessage: String = "PoolNodes should be non-empty!"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ public class DtabCompilationTest {
public void testCompilation() {
Dtab d = Dtab.emptyDtab();
d = Dtab.local();
d = Dtab.limited();
d = Dtab.base();
Dtab.setLocal(d);
Dtab.setLimited(d);
Dtab base = Dtab.base();
Dtab.setBase(Dtab.emptyDtab());
Dtab.setBase(base);
d = Dtab.local().concat(Dtab.base());
d = Dtab.local().append(Dentry.read("/foo=>/bar"));
d = Dtab.limited().concat(Dtab.base());
d = Dtab.limited().append(Dentry.read("/baz=>/foo"));
Dentry dentry = new Dentry(Dentry.readPrefix("/s/*"), new NameTree.Leaf<Path>(Path.read("/a")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,33 @@ class ExceptionsTest extends AnyFunSuite {
val ex = new NoBrokersAvailableException(
"/s/cool/story",
Dtab.base,
Dtab.read("/foo=>/$/com.twitter.butt")
Dtab.read("/foo=>/$/com.twitter.butt"),
Dtab.empty
)

assert(
ex.getMessage ==
"No hosts are available for /s/cool/story, " +
s"Dtab.base=[${Dtab.base.show}], " +
"Dtab.limited=[], " +
"Dtab.local=[/foo=>/$/com.twitter.butt]. " +
"Remote Info: Not Available"
)

val ex2 = new NoBrokersAvailableException(
"/s/cool/story",
Dtab.base,
Dtab.empty,
Dtab.read("/foo=>/$/com.twitter.butt")
)

assert(
ex2.getMessage ==
"No hosts are available for /s/cool/story, " +
s"Dtab.base=[${Dtab.base.show}], " +
"Dtab.limited=[/foo=>/$/com.twitter.butt], " +
"Dtab.local=[]. " +
"Remote Info: Not Available"
)
}
}
52 changes: 52 additions & 0 deletions finagle-core/src/test/scala/com/twitter/finagle/NamerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,58 @@ class NamerTest extends AnyFunSuite with AssertionsForJUnit {
})
}

test("Namer.resolve: No resolution for empty Dtab.base") {
Dtab.unwind {
Namer.resolve(Path.read("/s/foo")).sample() match {
case Addr.Neg =>
() // pass
case _ => fail()
}
}
}

test("Namer.resolve: resolve when Dtab.local is set") {
Dtab.unwind {
Dtab.limited = Dtab.empty
Dtab.local = Dtab.read("/s/foo => /$/inet/5678")

Namer.resolve(Path.read("/s/foo")).sample() match {
case Addr.Bound(addr, _) =>
// pass
assert(addr == Set(Address(5678)))
case e => fail(e.toString)
}
}
}

test("Namer.resolve: resolve when Dtab.limited is set") {
Dtab.unwind {
Dtab.limited = Dtab.read("/s/foo => /$/inet/1234")
Dtab.local = Dtab.empty

Namer.resolve(Path.read("/s/foo")).sample() match {
case Addr.Bound(addr, _) =>
assert(addr == Set(Address(1234)))
case _ => fail()
}
}
}

test(
"Namer.resolve: resolve prefers Dtab.local when both Dtab.local and Dtab.limited" +
" are set for same Path") {
Dtab.unwind {
Dtab.limited = Dtab.read("/s/foo => /$/inet/1234")
Dtab.local = Dtab.read("/s/foo => /$/inet/5678")

Namer.resolve(Path.read("/s/foo")).sample() match {
case Addr.Bound(addr, _) =>
assert(addr == Set(Address(5678)))
case _ => fail()
}
}
}

test("Namer.bind: max recursion level reached") {
namerMaxDepth.let(2) {
assert(Namer.resolve(Dtab.read("/s => /s/srv"), Path.read("/s/foo/bar")).sample() match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class BindingFactoryTest extends AnyFunSuite with MockitoSugar with BeforeAndAft

assert(noBrokers.name == "/foo/bar")
assert(noBrokers.localDtab == localDtab)
assert(noBrokers.limitedDtab == Dtab.empty)
})

test("Includes path and Dtab.local in NoBrokersAvailableException from service creation") {
Expand Down Expand Up @@ -237,6 +238,7 @@ class BindingFactoryTest extends AnyFunSuite with MockitoSugar with BeforeAndAft

assert(noBrokers.name == "/foo/bar")
assert(noBrokers.localDtab == localDtab)
assert(noBrokers.limitedDtab == Dtab.empty)
}

test("Traces name information per request")(new Ctx {
Expand Down Expand Up @@ -499,6 +501,16 @@ class BindingFactoryTest extends AnyFunSuite with MockitoSugar with BeforeAndAft
await(factory())
assert(endpoint.total == 5)

// With the flag set, we should still get an extra connection when a new Dtab.limited is defined
Dtab.unwind {
Dtab.limited = Dtab.read("/foo=>/$/inet/3")
await(factory())
assert(endpoint.total == 7)
}

await(factory())
assert(endpoint.total == 8)

}

test(
Expand Down Expand Up @@ -539,5 +551,12 @@ class BindingFactoryTest extends AnyFunSuite with MockitoSugar with BeforeAndAft

await(factory())
assert(endpoint.total == 4)

Dtab.unwind {
Dtab.local = Dtab.empty
Dtab.limited = Dtab.read("/foo=>/$/inet/3")
await(factory())
assert(endpoint.total == 5)
}
}
}
Loading

0 comments on commit 2e06c66

Please sign in to comment.