Skip to content

Commit

Permalink
Introduce ClientSession configuration section that provides ways to c…
Browse files Browse the repository at this point in the history
…ontrol idleTime and lifeTime of client connections.

Signed-off-by: Nikolay Pshenichnyy <nikolay.pshenichny@gmail.com>
  • Loading branch information
nikolay-pshenichny committed Apr 18, 2018
1 parent a41b8ce commit 30a36d2
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 8 deletions.
17 changes: 17 additions & 0 deletions linkerd/core/src/main/scala/io/buoyant/linkerd/ClientConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ trait ClientConfig {
var failureAccrual: Option[FailureAccrualConfig] = None
var requestAttemptTimeoutMs: Option[Int] = None
var requeueBudget: Option[RetryBudgetConfig] = None
var clientSession: Option[ClientSessionConfig] = None

@JsonIgnore
def params(vars: Map[String, String]): Stack.Params = Stack.Params.empty
Expand All @@ -33,6 +34,7 @@ trait ClientConfig {
.maybeWith(failFast.map(FailFastFactory.FailFast(_)))
.maybeWith(requeueBudget)
.maybeWith(failureAccrual.map(FailureAccrualConfig.param))
.maybeWith(clientSession.map(_.param))
}

case class TlsClientConfig(
Expand Down Expand Up @@ -74,3 +76,18 @@ case class HostConnectionPool(
maxWaiters = maxWaiters.getOrElse(default.maxWaiters)
)
}

case class ClientSessionConfig(
lifeTimeMs: Option[Int],
idleTimeMs: Option[Int]
) {
@JsonIgnore
private[this] val default = ExpiringService.Param.param.default

@JsonIgnore
def param = ExpiringService.Param(
lifeTime = lifeTimeMs.map(_.millis).getOrElse(default.lifeTime),
idleTime = idleTimeMs.map(_.millis).getOrElse(default.idleTime)
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.buoyant.linkerd

import com.twitter.conversions.time._
import com.twitter.finagle.Path
import com.twitter.finagle.service.ExpiringService
import com.twitter.util.Duration
import io.buoyant.linkerd.Linker.Initializers
import io.buoyant.router.StackRouter.Client.PerClientParams
import org.scalatest.FunSuite

class ClientSessionConfigTest extends FunSuite {

test("expiring service params") {
// Arrange
val config = """
|routers:
|- protocol: plain
| client:
| clientSession:
| idleTimeMs: 5000
| lifeTimeMs: 7000
| servers:
| - {}
|""".stripMargin

// Act
val linker = Linker.load(config, Initializers(protocol = Seq(TestProtocol.Plain)))
val params = linker.routers.head.params[PerClientParams].paramsFor(Path.read("/foo"))
val result = params[ExpiringService.Param]

// Assert
assert(result.idleTime == 5.seconds)
assert(result.lifeTime == 7.seconds)
}

test("expiring service empty idle param") {
// Arrange
val config = """
|routers:
|- protocol: plain
| client:
| clientSession:
| lifeTimeMs: 7000
| servers:
| - {}
|""".stripMargin

// Act
val linker = Linker.load(config, Initializers(protocol = Seq(TestProtocol.Plain)))
val params = linker.routers.head.params[PerClientParams].paramsFor(Path.read("/foo"))
val result = params[ExpiringService.Param]

// Assert
assert(result.idleTime == Duration.Top)
assert(result.lifeTime == 7.seconds)
}

test("expiring service empty lifetime param") {
// Arrange
val config = """
|routers:
|- protocol: plain
| client:
| clientSession:
| idleTimeMs: 5000
| servers:
| - {}
|""".stripMargin

// Act
val linker = Linker.load(config, Initializers(protocol = Seq(TestProtocol.Plain)))
val params = linker.routers.head.params[PerClientParams].paramsFor(Path.read("/foo"))
val result = params[ExpiringService.Param]

// Assert
assert(result.idleTime == 5.seconds)
assert(result.lifeTime == Duration.Top)
}

}
15 changes: 14 additions & 1 deletion linkerd/docs/routers.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,13 @@ failFast | `false` | If `true`, connection failures are punished more aggressive
requeueBudget | see [retry budget](#retry-budget-parameters) | A [requeue budget](#retry-budget-parameters) for connection-level retries.
failureAccrual | 5 consecutive failures | a [failure accrual policy](#failure-accrual) for all clients created by this router.
requestAttemptTimeoutMs | no timeout | The timeout, in milliseconds, for each attempt (original or retry) of the request made by this client.
clientSession | An empty object | see [clientSession](#client-session)

#### Host Connection Pool

This section defines the behavior of [watermark connection pools](https://twitter.github.io/finagle/docs/com/twitter/finagle/client/DefaultPool) on which most of the protocols are relying.
Note that Http2 protocol uses [SingletonPool](https://github.com/twitter/finagle/blob/master/finagle-core/src/main/scala/com/twitter/finagle/pool/SingletonPool.scala) that maintains a single connection per endpoint and will not be affected by the settings in this section.

```yaml
client:
hostConnectionPool:
Expand All @@ -269,5 +273,14 @@ Key | Default Value | Description
--- | ------------- | -----------
minSize | `0` | The minimum number of connections to maintain to each host.
maxSize | Int.MaxValue | The maximum number of connections to maintain to each host.
idleTimeMs | forever | The amount of idle time for which a connection is cached in milliseconds.
idleTimeMs | forever | The amount of idle time for which a connection is cached in milliseconds. Only applied to connections that number greater than minSize, but fewer than maxSize.
maxWaiters | Int.MaxValue | The maximum number of connection requests that are queued when the connection concurrency exceeds maxSize.

#### Client Session

Configures the behavior of established client sessions.

Key | Default Value | Description
--- | ------------- | -----------
idleTimeMs | forever | The max amount of time for which a connection is allowed to be idle. When this time exceeded the connection will close itself.
lifeTimeMs | forever | Max lifetime of a connection.
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package io.buoyant.linkerd.protocol.h2

import com.twitter.concurrent.AsyncQueue
import com.twitter.conversions.time._
import com.twitter.finagle.buoyant.h2._
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.param.Stats
import io.buoyant.linkerd.Linker
import com.twitter.finagle.Path
import com.twitter.finagle.service.ExpiringService
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.Duration
import io.buoyant.linkerd.{Linker, Router}
import io.buoyant.linkerd.protocol.H2Initializer
import io.buoyant.router.StackRouter.Client.PerClientParams
import io.buoyant.test.FunSuite
import io.buoyant.test.h2.StreamTestUtils._
import java.io.File
import org.scalatest.time.{Millis, Seconds, Span}
import scala.io.Source
import scala.util.Random

Expand Down Expand Up @@ -226,4 +232,147 @@ class H2EndToEndTest extends FunSuite {
routers.foreach { router => await(router.close()) }
}
}

test("clientSession idleTimeMs should close client connections") {
val config =
s"""|routers:
|- protocol: h2
| experimental: true
| dtab: |
| /svc/dog => /$$/inet/127.1/{dog.port} ;
| servers:
| - port: 0
| client:
| clientSession:
| idleTimeMs: 1500
|""".stripMargin

idleTimeMsBaseTest(config){ (router:Router.Initialized, stats:InMemoryStatsReceiver, dogPort:Int) =>

// Assert
def activeConnectionsCount = stats.gauges(Seq("rt", "h2", "client", s"$$/inet/127.1/${dogPort}", "connections"))

// An incoming request through the H2.Router will establish an active connection; We expect to see it here
assert(activeConnectionsCount() == 1.0)

eventually(timeout(Span(5, Seconds)), interval(Span(250, Millis))) {
val cnt = activeConnectionsCount()
assert(cnt == 0.0)
}

val clientSessionParams = router.params[PerClientParams].paramsFor(Path.read("/svc/dog"))[ExpiringService.Param]
assert(clientSessionParams.idleTime == 1500.milliseconds)
assert(clientSessionParams.lifeTime == Duration.Top)

()
}
}

test("clientSession idleTimeMs should close client connections for static client") {
val config =
s"""|routers:
|- protocol: h2
| experimental: true
| dtab: |
| /svc/dog => /$$/inet/127.1/{dog.port} ;
| servers:
| - port: 0
| client:
| kind: io.l5d.static
| configs:
| - prefix: /*
| clientSession:
| idleTimeMs: 1500
|""".stripMargin

idleTimeMsBaseTest(config) { (router: Router.Initialized, stats: InMemoryStatsReceiver, dogPort: Int) =>

// Assert
def activeConnectionsCount = stats.gauges(Seq("rt", "h2", "client", s"$$/inet/127.1/${dogPort}", "connections"))

// An incoming request through the H2.Router will establish an active connection; We expect to see it here
assert(activeConnectionsCount() == 1.0)

eventually(timeout(Span(5, Seconds)), interval(Span(250, Millis))) {
val cnt = activeConnectionsCount()
assert(cnt == 0.0)
}

val clientSessionParams = router.params[PerClientParams].paramsFor(Path.read("/svc/dog"))[ExpiringService.Param]
assert(clientSessionParams.idleTime == 1500.milliseconds)
assert(clientSessionParams.lifeTime == Duration.Top)

()
}
}

test("clientSession idleTimeMs should not close client connections when isn't specified") {
val config =
s"""|routers:
|- protocol: h2
| experimental: true
| dtab: |
| /svc/dog => /$$/inet/127.1/{dog.port} ;
| servers:
| - port: 0
| client:
| forwardClientCert: false
|""".stripMargin

idleTimeMsBaseTest(config) { (router: Router.Initialized, stats: InMemoryStatsReceiver, dogPort: Int) =>
// Assert
def activeConnectionsCount = stats.gauges(Seq("rt", "h2", "client", s"$$/inet/127.1/${dogPort}", "connections"))

// An incoming request through the H2.Router will establish an active connection; We expect to see it here
assert(activeConnectionsCount() == 1.0)

val clientSessionParams = router.params[PerClientParams].paramsFor(Path.read("/svc/dog"))[ExpiringService.Param]
assert(clientSessionParams.idleTime == Duration.Top)
assert(clientSessionParams.lifeTime == Duration.Top)

()
}
}

def idleTimeMsBaseTest(config:String)(assertionsF: (Router.Initialized, InMemoryStatsReceiver, Int) => Unit): Unit = {
// Arrange
val stats = new InMemoryStatsReceiver

val dog = Downstream.const("dog", "woof")

val configWithPort = config.replace("{dog.port}", dog.port.toString)

val linker = Linker.Initializers(Seq(H2Initializer)).load(configWithPort)
.configured(Stats(stats))
val router = linker.routers.head.initialize()
val server = router.servers.head.serve()

val client = Upstream.mk(server)
def get(host: String, path: String = "/")(f: Response => Unit) = {
val req = Request("http", Method.Get, host, path, Stream.empty())
val rsp = await(client(req))
f(rsp)
}

// Act
try {
// This will force linkerd to open a connection to the `dog` service and hold it
get("dog") { rsp =>
assert(rsp.status == Status.Ok)
assert(await(rsp.stream.readDataString) == "woof")
()
}

// Assert
assertionsF(router, stats, dog.port)

} finally {
await(client.close())
await(dog.server.close())
await(server.close())
await(router.close())
}
}


}
Loading

0 comments on commit 30a36d2

Please sign in to comment.