Skip to content

Commit

Permalink
finagle-core: Requeue module can leak sessions
Browse files Browse the repository at this point in the history
Summary: Problem

Finagle's requeue module will swallow sessions depending on their status without
closing the session. This causes a resource leak.

Solution

Close the sessions before swallowing them.

JIRA Issues: CSL-6100

Differential Revision: https://phabricator.twitter.biz/D142457
  • Loading branch information
Daniel Schobel authored and jenkins committed Feb 28, 2018
1 parent 0fe5a0a commit f5cdda1
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 37 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Bug Fixes
* finagle-core: `c.t.f.pool.CachingPool` service wrapper instances are resilient to multiple closes.
``PHAB_ID=D136781``

* finagle-core: Requeue module now closes sessions it prevented from propagating up the stack.
``PHAB_ID=D142457``

* finagle-base-http: `c.t.f.http.Netty4CookieCodec.encode` now wraps Cookie values that would
be wrapped in `c.t.f.http.Netty3CookieCodec.encode`. ``PHAB_ID=D134566``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ object Retries {
// to point out when a connection is dead for a reason that shouldn't
// trigger circuit breaking
case Return(deadSvc) if deadSvc.status == Status.Closed && n > 0 =>
// Since we're stopping `deadSvc` from propagating up the stack to either an application
// or FactoryToService wrapper and since it's not part of the Service contract that Status
// can only be Closed when the close method was called, we must manually close the session
// to forestall resource leaks.
deadSvc.close()
requeuesCounter.incr()
applySelf(conn, n - 1)
case t => Future.const(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ abstract class AbstractStackClientTest
val client = baseClient.configured(param.Stats(sr))
}

def await[T](awaitable: Awaitable[T]): T = Await.result(awaitable, 5.seconds)

after {
NameInterpreter.global = DefaultInterpreter
}
Expand Down Expand Up @@ -160,17 +162,17 @@ abstract class AbstractStackClientTest

def testClient(name: String, failFastOn: Option[Boolean]): Unit = {
val svc = newClient(name, failFastOn)
val e = intercept[RuntimeException] { Await.result(svc("hi")) }
val e = intercept[RuntimeException] { await(svc("hi")) }
assert(e == ex)
failFastOn match {
case Some(on) if !on =>
assert(ctx.sr.counters.get(Seq(name, "failfast", "marked_dead")) == None)
intercept[RuntimeException] { Await.result(svc("hi2")) }
intercept[RuntimeException] { await(svc("hi2")) }
case _ =>
eventually {
assert(ctx.sr.counters(Seq(name, "failfast", "marked_dead")) == 1)
}
intercept[FailedFastException] { Await.result(svc("hi2")) }
intercept[FailedFastException] { await(svc("hi2")) }
}
}

Expand Down Expand Up @@ -215,7 +217,7 @@ abstract class AbstractStackClientTest
)

val service = new FactoryToService(factory)
Await.result(service(()))
await(service(()))

assert(closed)
}
Expand Down Expand Up @@ -268,33 +270,42 @@ abstract class AbstractStackClientTest
)

val service = new FactoryToService(factory)
Await.result(service(()))
await(service(()))

assert(!closed)
}

trait RequeueCtx {
var count = 0
var _status: Status = Status.Open
var sessionDispatchCount = 0
var sessionCloseCount = 0
var _svcFacStatus: Status = Status.Open
var _sessionStatus: Status = Status.Open

var runSideEffect = (_: Int) => false
var sideEffect = () => ()
var closeSideEffect = () => ()

val stubLB = new ServiceFactory[String, String] {
def apply(conn: ClientConnection) =
Future.value(new Service[String, String] {
def apply(request: String): Future[String] = {
count += 1
if (runSideEffect(count)) sideEffect()
sessionDispatchCount += 1
if (runSideEffect(sessionDispatchCount)) sideEffect()
Future.exception(WriteException(new Exception("boom")))
}

override def close(deadline: Time) = Future.Done
override def close(deadline: Time) = {
sessionCloseCount += 1
closeSideEffect()
Future.Done
}

override def status: Status = _sessionStatus
})

def close(deadline: Time) = Future.Done

override def status = _status
override def status = _svcFacStatus
}

val sr = new InMemoryStatsReceiver
Expand All @@ -321,26 +332,26 @@ abstract class AbstractStackClientTest
val session = cl()
val b = budget
// failing request and Open load balancer => max requeues
Await.ready(session.map(_("hi")), 5.seconds)
await(session.map(_("hi")))
assert(requeues == Some(DefaultRequeues))
assert(budget == b - DefaultRequeues)
})

for (status <- Seq(Status.Busy, Status.Closed)) {
test(s"don't requeue failing requests when the stack is $status")(new RequeueCtx {
// failing request and Busy | Closed load balancer => zero requeues
_status = status
Await.ready(cl().map(_("hi")), 5.seconds)
_svcFacStatus = status
await(cl().map(_("hi")))
assert(requeues.isEmpty)
})
}

test("dynamically stop requeuing")(new RequeueCtx {
// load balancer begins Open, becomes Busy after 10 requeues => 10 requeues
_status = Status.Open
_svcFacStatus = Status.Open
runSideEffect = _ > DefaultRequeues
sideEffect = () => _status = Status.Busy
Await.ready(cl().map(_("hi")), 5.seconds)
sideEffect = () => _svcFacStatus = Status.Busy
await(cl().map(_("hi")))
assert(requeues == Some(DefaultRequeues))
})

Expand All @@ -352,7 +363,7 @@ abstract class AbstractStackClientTest
def close(deadline: Time) = Future.Done
}

intercept[Failure] { Await.result(cl(), 5.seconds) }
intercept[Failure] { await(cl()) }
assert(requeues.isDefined)
assert(budget > 0)
})
Expand All @@ -365,19 +376,28 @@ abstract class AbstractStackClientTest
def close(deadline: Time) = Future.Done
}

intercept[Failure] { Await.result(cl(), 5.seconds) }
intercept[Failure] { await(cl()) }

assert(requeues.isEmpty)
assert(budget > 0)
})

test("service acquisition requeues respect Status.Open")(new RequeueCtx {
_status = Status.Closed
Await.result(cl(), 5.seconds)
_svcFacStatus = Status.Closed
await(cl())
assert(requeues.isEmpty)
assert(budget > 0)
})

test("service acquisition requeues will close Status.Closed sessions") {
val ctx = new RequeueCtx { }
ctx._svcFacStatus = Status.Open
ctx._sessionStatus = Status.Closed
ctx.closeSideEffect = () => ctx._sessionStatus = Status.Open
await(ctx.cl())
assert(ctx.sessionCloseCount == 1)
}

test("Requeues all go to the same cluster in a Union") {
/*
* Once we have distributed a request to a particular cluster (in
Expand Down Expand Up @@ -454,7 +474,7 @@ abstract class AbstractStackClientTest
)

intercept[Failure] {
Await.result(service(()), 5.seconds)
await(service(()))
}

val requeues = sr.counters(Seq("retries", "requeues"))
Expand Down Expand Up @@ -493,7 +513,7 @@ abstract class AbstractStackClientTest
val name = Name.bound(addr)
val service = baseClient.newService(name, "sfsa-test")
val forward = "a man a plan a canal: panama"
val reversed = Await.result(service(forward), 1.second)
val reversed = await(service(forward))
assert(reversed == forward.reverse)
}

Expand All @@ -511,7 +531,7 @@ abstract class AbstractStackClientTest
}

val svc = baseClient.filtered(reverseFilter).newService(name, "test_client")
assert(Await.result(svc("ping"), 1.second) == "ping".reverse)
assert(await(svc("ping")) == "ping".reverse)
}

test("endpointer clears Contexts") {
Expand All @@ -527,7 +547,7 @@ abstract class AbstractStackClientTest
val client = new LocalCheckingStringClient(key)
.newService(Name.bound(Address(ia)), "a-label")

val result = Await.result(client("abc"), 5.seconds)
val result = await(client("abc"))
assert("abc" == result)
}
}
Expand Down Expand Up @@ -583,7 +603,7 @@ abstract class AbstractStackClientTest
BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0")))

val svcFac = stack.make(params)
val session1 = Await.result(svcFac(), 3.seconds)
val session1 = await(svcFac())

// pending
val e1r1 = session1(())
Expand All @@ -592,17 +612,17 @@ abstract class AbstractStackClientTest
// rejected
val e1r3 = session1(())

val e1rejected = intercept[Failure] { Await.result(e1r3, 3.seconds) }
val e1rejected = intercept[Failure] { await(e1r3) }

val session2 = Await.result(svcFac(), 3.seconds)
val session2 = await(svcFac())
// pending
val e2r1 = session2(())
// pending
val e2r2 = session2(())
// rejected
val e2r3 = session2(())

val e2rejected = intercept[Failure] { Await.result(e2r3, 3.seconds) }
val e2rejected = intercept[Failure] { await(e2r3) }

// endpoint1 and endpoint2 both only see the first two requests,
// meaning they get distinct pending request limits
Expand All @@ -626,9 +646,9 @@ abstract class AbstractStackClientTest
val e2r5 = session2(())
val e2r6 = session2(())

Await.result(e2r4, 3.seconds)
Await.result(e2r5, 3.seconds)
Await.result(e2r6, 3.seconds)
await(e2r4)
await(e2r5)
await(e2r6)

assert(endpoint2.satisfied.get() == 5)
}
Expand All @@ -642,9 +662,9 @@ abstract class AbstractStackClientTest
val svc = baseClient.newService(Name.bound(Address(boundAddress)), label)

val registry = new SimpleRegistry
Await.result(GlobalRegistry.withRegistry(registry) {
await(GlobalRegistry.withRegistry(registry) {
svc("hello world")
}, 5.seconds)
})

val expectedEntry = Entry(
key = Seq("client", StringClient.protocolLibrary, label, "Transporter"),
Expand All @@ -653,8 +673,8 @@ abstract class AbstractStackClientTest

assert(registry.iterator.contains(expectedEntry))

Await.result(listeningServer.close(), 5.seconds)
Await.result(svc.close(), 5.seconds)
await(listeningServer.close())
await(svc.close())
}

test("Sources exceptions") {
Expand Down Expand Up @@ -687,7 +707,7 @@ abstract class AbstractStackClientTest
.newService(Name.bound(Address(boundAddress)), label)

val failure = intercept[Failure] {
Await.result(svc("hello"), 5.seconds)
await(svc("hello"))
}

assert(failure.toString == "Failure(boom!, flags=0x00) with Service -> stringClient")
Expand Down

0 comments on commit f5cdda1

Please sign in to comment.