Skip to content

Commit

Permalink
finagle-core: newClient closed service lifecycle
Browse files Browse the repository at this point in the history
Problem:
Closing a session created by newClient's `ServiceFactory` does not
prevent its reuse, as the connection remains alive in the connection pool

Solution / Result:
New client stack module that sits above `DefaultPool`. If FactoryToService is
not enabled, the returned service from DefaultPool is wrapped in a
`ClosableService` that ensures closed sessions are not reused.

JIRA Issues: CSL-9151

Differential Revision: https://phabricator.twitter.biz/D407805
  • Loading branch information
hamdiallam authored and jenkins committed Dec 5, 2019
1 parent bcfdbd3 commit c64bea0
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 82 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ Breaking API Changes
deprecated, so to construct it, you must call one of the RichClientParam.apply
methods. ``PHAB_ID=D400382``

Bug Fixes
~~~~~~~~~

* finagle-core: `ClosableService` client stack module that prevents the reuse of closed services
when `FactoryToService` is not set. This is important for clients making use of the `newClient`
api. ``PHAB_ID=D407805``

19.11.0
-------

Expand Down
15 changes: 15 additions & 0 deletions finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,21 @@ trait ServiceException extends Exception with SourcedException
*/
class ServiceClosedException extends ServiceException

/**
* Indicates that this service was closed and returned to the underlying pool.
*/
class ServiceReturnedToPoolException(val flags: Long = FailureFlags.NonRetryable)
extends IllegalStateException
with ServiceException
with HasLogLevel
with FailureFlags[ServiceReturnedToPoolException] {

protected def copyWithFlags(newFlags: Long): ServiceReturnedToPoolException =
new ServiceReturnedToPoolException(newFlags)

def logLevel: Level = Level.WARNING
}

/**
* Indicates that a request was applied to a [[com.twitter.finagle.Service]]
* that is unavailable. This constitutes a fail-stop condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object StackClient {
* @see [[com.twitter.finagle.service.FailFastFactory]]
* @see [[com.twitter.finagle.service.PendingRequestFilter]]
* @see [[com.twitter.finagle.client.DefaultPool]]
* @see [[com.twitter.finagle.service.ClosableService]]
* @see [[com.twitter.finagle.service.TimeoutFilter]]
* @see [[com.twitter.finagle.liveness.FailureAccrualFactory]]
* @see [[com.twitter.finagle.service.StatsServiceFactory]]
Expand Down Expand Up @@ -139,8 +140,13 @@ object StackClient {
* `DefaultPool` configures connection pooling. Like the `LoadBalancerFactory`
* module it is a potentially aggregate [[ServiceFactory]] composed of multiple
* [[Service Services]] which represent a distinct session to the same endpoint.
*
* When the service lifecycle is managed independently of the stack. `ClosableService`
* ensures a closed service cannot be reused. Typically a closed service releases
* its connection into the configured pool.
*/
stk.push(DefaultPool.module)
stk.push(ClosableService.client)

/**
* `TimeoutFilter` enforces static request timeouts and broadcast request deadlines,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.twitter.finagle.service

import com.twitter.finagle._
import com.twitter.util.{Future, Time}
import java.util.concurrent.atomic.AtomicBoolean

private[finagle] object ClosableService {
val role = Stack.Role("ClosableService")

/**
* Client stack module that creates a [[ClosableService]] wrapper when services are managed
* independently of the client stack.
*/
def client[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module1[FactoryToService.Enabled, ServiceFactory[Req, Rep]] {
val role = ClosableService.role
val description = "Explictly prevent reuse of a closed session"
def make(
factoryToService: FactoryToService.Enabled,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
if (factoryToService.enabled) next
else {
// session lifecycle is managed independently. Connections can be pooled so we must
// make sure a closed session is not reused
next.map(new ClosableService(_) {
def closedException = new ServiceReturnedToPoolException
})
}
}
}
}

/**
* A service wrapper that prevents reuse of the `underlying` service after the first call to
* `.close`.
*/
private[service] abstract class ClosableService[Req, Rep](underlying: Service[Req, Rep])
extends Service[Req, Rep] {
private val closed = new AtomicBoolean(false)

protected def closedException: Exception

override def apply(req: Req): Future[Rep] = {
if (closed.get) Future.exception(closedException)
else underlying(req)
}

override def close(deadline: Time): Future[Unit] = {
val closeUnderlying: Boolean = closed.compareAndSet(false, true)

if (closeUnderlying) underlying.close(deadline)
else Future.Done
}

override def status: Status = {
if (closed.get) Status.Closed
else underlying.status
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ object ExpiringService {
}
}

// scoped this way for testing
private[service] def closeOnReleaseSvc[Req, Rep](
service: Service[Req, Rep]
): Service[Req, Rep] = {
new ClosableService(service) {
def closedException = WriteException(new ServiceClosedException)
}
}

/**
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.service.ExpiringService]]
* which simply extracts the service from the underlying `ServiceFactory` and calls
Expand All @@ -84,7 +93,8 @@ object ExpiringService {
case (None, None) => next
case _ =>
next.map { service =>
val closeOnRelease = new CloseOnReleaseService(service)
val closeOnRelease = closeOnReleaseSvc[Req, Rep](service)

new ExpiringService(closeOnRelease, idle, life, timer, statsReceiver) {
def onExpire(): Unit = { closeOnRelease.close() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,37 @@ abstract class AbstractStackClientTest
}
}

test("closed services from newClient respect close()")(new Ctx {
val nilModule = new Module0[ServiceFactory[String, String]] {
val role = Stack.Role("yo")
val description = "yoo"

def make(next: ServiceFactory[String, String]): ServiceFactory[String, String] = {
ServiceFactory.const[String, String](
Service.mk[String, String](_ => Future.value(""))
)
}
}

val nilStack = new StackBuilder(stack.nilStack[String, String])
.push(nilModule)
.result

val svcFac = client
.withStack(_.concat(nilStack))
.newClient("/$/inet/localhost/0")

val svc = await(svcFac())
assert(svc.status == Status.Open)

await(svc.close())
assert(svc.status == Status.Closed)

intercept[ServiceReturnedToPoolException] {
await(svc(""))
}
})

test("FactoryToService close propagated to underlying service") {
/*
* This test ensures that the following one doesn't succeed vacuously.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.twitter.finagle.service

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.{Service, ServiceClosedException, Status}
import com.twitter.util.{Await, Future, Time}
import org.scalatest.FunSuite

class ClosableServiceTest extends FunSuite {
def await[A](f: Future[A]): A = Await.result(f, 5.seconds)

trait Ctx {
var numClosed = 0
private val noOpService = new Service[Unit, Unit] {
def apply(req: Unit): Future[Unit] = Future.Done
override def close(time: Time) = {
numClosed += 1
Future.Done
}
}

val svc = new ClosableService(noOpService) {
val closedException = new ServiceClosedException
}
}

test("cannot reuse a closed session")(new Ctx {
await(svc())

assert(svc.status == Status.Open)
assert(numClosed == 0)

await(svc.close())
intercept[Exception] {
await(svc())
}

assert(svc.status == Status.Closed)
assert(numClosed == 1)
})

test("only closes the underlying session once")(new Ctx {
await(svc.close())
await(svc.close())

assert(numClosed == 1)
})
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package com.twitter.finagle.service

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.stats.{Counter, StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.{Service, Status}
import com.twitter.util.{Future, Time, MockTimer, Promise, Return, Duration, Timer}
import com.twitter.finagle.{Service, Status, WriteException}
import com.twitter.util.{Await, Future, Time, MockTimer, Promise, Return, Duration, Timer}
import org.mockito.Matchers.any
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.FunSuite
import org.scalatestplus.mockito.MockitoSugar

class ExpiringServiceTest extends FunSuite with MockitoSugar {

def await[A](f: Future[A]): A = Await.result(f, 5.seconds)
val frozenNow = Time.now

class ReleasingExpiringService[Req, Rep](
Expand Down Expand Up @@ -38,6 +39,16 @@ class ExpiringServiceTest extends FunSuite with MockitoSugar {
when(underlying.status).thenReturn(Status.Open)
}

test("throws a write exception if we attempt to use a released service") {
val noOpSvc = Service.const(Future.value(""))
val svc = ExpiringService.closeOnReleaseSvc(noOpSvc)

await(svc.close()) // action taken on 'release'
intercept[WriteException] {
await(svc())
}
}

test("cancelling timers on release") {
Time.withTimeAt(frozenNow) { _ =>
val ctx = newCtx()
Expand Down

0 comments on commit c64bea0

Please sign in to comment.