Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' of github.com:twitter/finagle

Conflicts:
	finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  • Loading branch information...
commit 7c94f5b18508242c02e9a604f71e2e3b71dd073e 2 parents 62f760f + eb1b626
Nick Kallen authored
11 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
View
@@ -53,6 +53,7 @@ case class ClientBuilder[Req, Rep](
private val _hostConnectionLimit: Option[Int],
private val _hostConnectionIdleTime: Option[Duration],
private val _hostConnectionMaxIdleTime: Option[Duration],
+ private val _hostConnectionMaxLifeTime: Option[Duration],
private val _sendBufferSize: Option[Int],
private val _recvBufferSize: Option[Int],
private val _retries: Option[Int],
@@ -73,6 +74,7 @@ case class ClientBuilder[Req, Rep](
None, // hostConnectionLimit
None, // hostConnectionIdleTime
None, // hostConnectionMaxIdleTime
+ None, // hostConnectionMaxLifeTime
None, // sendBufferSize
None, // recvBufferSize
None, // retries
@@ -94,6 +96,7 @@ case class ClientBuilder[Req, Rep](
"hostConnectionCoresize" -> Some(_hostConnectionCoresize),
"hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
"hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
+ "hostConnectionMaxLifeTime" -> Some(_hostConnectionMaxLifeTime),
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
"retries" -> _retries,
@@ -168,6 +171,9 @@ case class ClientBuilder[Req, Rep](
def hostConnectionMaxIdleTime(timeout: Duration) =
copy(_hostConnectionMaxIdleTime = Some(timeout))
+ def hostConnectionMaxLifeTime(timeout: Duration) =
+ copy(_hostConnectionMaxLifeTime = Some(timeout))
+
def retries(value: Int) =
copy(_retries = Some(value))
@@ -246,8 +252,9 @@ case class ClientBuilder[Req, Rep](
future = protocol.codec.prepareClientChannel(service)
future = future.flatMap { protocol.prepareChannel(_) }
- _hostConnectionMaxIdleTime.foreach { idleTime =>
- future = future.map { new ExpiringService(_, idleTime) }
+
+ if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) {
+ future = future.map { new ExpiringService(_, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) }
}
future
11 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
View
@@ -67,6 +67,7 @@ final case class ServerConfig[Req, Rep](
private val _channelFactory: ReferenceCountedChannelFactory = ServerBuilder.defaultChannelFactory,
private val _maxConcurrentRequests: Option[Int] = None,
private val _hostConnectionMaxIdleTime: Option[Duration] = None,
+ private val _hostConnectionMaxLifeTime: Option[Duration] = None,
private val _requestTimeout: Option[Duration] = None,
private val _readTimeout: Option[Duration] = None,
private val _writeCompletionTimeout: Option[Duration] = None,
@@ -89,6 +90,7 @@ final case class ServerConfig[Req, Rep](
val channelFactory = _channelFactory
val maxConcurrentRequests = _maxConcurrentRequests
val hostConnectionMaxIdleTime = _hostConnectionMaxIdleTime
+ val hostConnectionMaxLifeTime = _hostConnectionMaxIdleTime
val requestTimeout = _requestTimeout
val readTimeout = _readTimeout
val writeCompletionTimeout = _writeCompletionTimeout
@@ -107,6 +109,7 @@ final case class ServerConfig[Req, Rep](
"channelFactory" -> Some(_channelFactory),
"maxConcurrentRequests" -> _maxConcurrentRequests,
"hostConnectionMaxIdleTime" -> _hostConnectionMaxIdleTime,
+ "hostConnectionMaxLifeTime" -> _hostConnectionMaxLifeTime,
"requestTimeout" -> _requestTimeout,
"readTimeout" -> _readTimeout,
"writeCompletionTimeout" -> _writeCompletionTimeout,
@@ -184,6 +187,9 @@ class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
def hostConnectionMaxIdleTime(howlong: Duration) =
copy(config.copy(_hostConnectionMaxIdleTime = Some(howlong)))
+ def hostConnectionMaxLifeTime(howlong: Duration) =
+ copy(config.copy(_hostConnectionMaxLifeTime = Some(howlong)))
+
def requestTimeout(howlong: Duration) =
copy(config.copy(_requestTimeout = Some(howlong)))
@@ -327,8 +333,9 @@ class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
val closingHandler = new ChannelClosingHandler
pipeline.addLast("closingHandler", closingHandler)
- config.hostConnectionMaxIdleTime foreach { duration =>
- service = new ExpiringService(service, duration) {
+
+ if (config.hostConnectionMaxIdleTime.isDefined || config.hostConnectionMaxLifeTime.isDefined) {
+ service = new ExpiringService(service, config.hostConnectionMaxIdleTime, config.hostConnectionMaxLifeTime) {
override def didExpire() { closingHandler.close() }
}
}
47 finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
View
@@ -15,16 +15,17 @@ import com.twitter.finagle.{Service, WriteException, ChannelClosedException}
*/
class ExpiringService[Req, Rep](
underlying: Service[Req, Rep],
- maxIdleTime: Duration,
+ maxIdleTime: Option[Duration],
+ maxLifeTime: Option[Duration],
timer: util.Timer = Timer.default)
extends Service[Req, Rep]
{
private[this] var requestCount = 0
private[this] var expired = false
- private[this] var task: Option[com.twitter.util.TimerTask] =
- Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() })
+ private[this] var idleTimeTask = maxIdleTime map { idleTime => timer.schedule(idleTime.fromNow) { maybeIdleExpire() } }
+ private[this] var lifeTimeTask = maxLifeTime map { lifeTime => timer.schedule(lifeTime.fromNow) { maybeLifeTimeExpire() } }
- private[this] def maybeExpire() = {
+ private[this] def maybeExpire(forceExpire: Boolean) = {
val justExpired = synchronized {
// We check requestCount == 0 here to avoid the race between
// cancellation & running of the timer.
@@ -32,11 +33,21 @@ class ExpiringService[Req, Rep](
expired = true
true
} else {
+ if (forceExpire) expired = true
false
}
}
if (justExpired) didExpire()
+ justExpired
+ }
+
+ private[this] def maybeIdleExpire() {
+ if (maybeExpire(false)) cancelLifeTimer()
+ }
+
+ private[this] def maybeLifeTimeExpire() {
+ if (maybeExpire(true)) cancelIdleTimer()
}
// May be overriden to provide your own expiration action.
@@ -44,6 +55,18 @@ class ExpiringService[Req, Rep](
underlying.release()
}
+ protected def cancelIdleTimer() {
+ // Cancel the existing timer.
+ idleTimeTask foreach { _.cancel() }
+ idleTimeTask = None
+ }
+
+ protected def cancelLifeTimer() {
+ // Cancel the existing timer.
+ lifeTimeTask foreach { _.cancel() }
+ lifeTimeTask = None
+ }
+
def apply(request: Req): Future[Rep] = synchronized {
if (expired) {
return Future.exception(
@@ -51,18 +74,20 @@ class ExpiringService[Req, Rep](
}
requestCount += 1
- if (requestCount == 1) {
- // Cancel the existing timer.
- task foreach { _.cancel() }
- task = None
- }
+ if (requestCount == 1) cancelIdleTimer()
underlying(request) ensure {
synchronized {
requestCount -= 1
if (requestCount == 0) {
- require(!task.isDefined)
- task = Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() })
+ require(!idleTimeTask.isDefined)
+ if (expired) {
+ didExpire()
+ } else {
+ maxIdleTime foreach { time =>
+ idleTimeTask = Some(timer.schedule(time.fromNow) { maybeIdleExpire() })
+ }
+ }
}
}
}
4 finagle-core/src/test/scala/com/twitter/finagle/Timer.scala
View
@@ -5,11 +5,11 @@ import collection.mutable.ArrayBuffer
import com.twitter.util.{Time, Duration}
class MockTimer extends com.twitter.util.Timer {
- case class Task(when: Time, runner: Function0[Unit])
+ case class Task(var when: Time, runner: Function0[Unit])
extends com.twitter.util.TimerTask
{
var isCancelled = false
- def cancel() { isCancelled = true }
+ def cancel() { isCancelled = true; when = Time.now; tick() }
}
var isStopped = false
186 finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala
View
@@ -18,53 +18,159 @@ object ExpiringServiceSpec extends Specification with Mockito {
underlying(123) returns promise
underlying.isAvailable returns true
- val service = new ExpiringService[Any, Any](underlying, 10.seconds, timer)
- timer.tasks must haveSize(1)
-
- "expire after the given idle time" in {
- // For some reason, this complains of different types:
- // timer.tasks.head.when must be_==(Time.now + 10.seconds)
- service.isAvailable must beTrue
-
- timeControl.advance(10.seconds)
- timer.tick()
-
- service.isAvailable must beFalse
- there was one(underlying).release()
-
- timer.tasks must beEmpty
- }
- "cancel the timer when a request is issued" in {
- service(123)
+ "idle time between requests" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), None, timer)
timer.tasks must haveSize(1)
- timer.tasks.head.isCancelled must beTrue
- }
+ "expire after the given idle time" in {
+ // For some reason, this complains of different types:
+ // timer.tasks.head.when must be_==(Time.now + 10.seconds)
+ service.isAvailable must beTrue
- "restart the timer when the request finished" in {
- service(123)
- timer.tasks must haveSize(1)
- timer.tasks.head.isCancelled must beTrue
-
- timeControl.advance(10.seconds)
- timer.tick()
-
- timer.tasks must beEmpty
- promise() = Return(321)
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ timer.tasks must beEmpty
+ }
+
+ "cancel the timer when a request is issued" in {
+ service(123)
+ timer.tasks must beEmpty
+ }
+
+ "restart the timer when the request finished" in {
+ service(123)
+ timer.tasks must beEmpty
+
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ timer.tasks must beEmpty
+ promise() = Return(321)
+ timer.tasks must haveSize(1)
+
+ there was no(underlying).release()
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ there was one(underlying).release()
+ }
+
+ "throw an write exception if we attempt to use an expired service" in {
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service(132)() must throwA[WriteException]
+ }
+ }
+
+ "life time of a connection" in {
+ val service = new ExpiringService[Any, Any](underlying, None, Some(10.seconds), timer)
timer.tasks must haveSize(1)
-
- there was no(underlying).release()
- timeControl.advance(10.seconds)
- timer.tick()
-
- there was one(underlying).release()
+ "expire after the given idle time" in {
+ // For some reason, this complains of different types:
+ // timer.tasks.head.when must be_==(Time.now + 10.seconds)
+ service.isAvailable must beTrue
+
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ timer.tasks must beEmpty
+ }
+
+ "does not cancel the timer when a request is issued" in {
+ service(123)
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beFalse
+ }
+
+ "throw an write exception if we attempt to use an expired service" in {
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service(132)() must throwA[WriteException]
+ }
}
+
+ "idle timer fires before life timer fires" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(1.minute), timer)
+ timer.tasks must haveSize(2)
+
+ "expire after the given idle time" in {
+ // For some reason, this complains of different types:
+ // timer.tasks.head.when must be_==(Time.now + 10.seconds)
+ service.isAvailable must beTrue
+
+ timeControl.advance(10.seconds)
+ timer.tick()
- "throw an write exception if we attempt to use an expired service" in {
- timeControl.advance(10.seconds)
- timer.tick()
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ timer.tasks must beEmpty
+ }
+ }
+
+ "life timer fires before idle timer fires" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(15.seconds), timer)
+ timer.tasks must haveSize(2)
+ timer.tasks forall(!_.isCancelled) must beTrue
+
+ "expire after the given life time" in {
+ service(123)
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beFalse
+
+ timeControl.advance(8.seconds)
+ timer.tick()
+
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beFalse
+
+ promise() = Return(321)
+ timer.tasks must haveSize(2)
+ timer.tasks forall(!_.isCancelled) must beTrue
+
+ there was no(underlying).release()
+ timeControl.advance(8.seconds)
+ timer.tick()
+
+ timer.tasks must beEmpty
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+ }
+ }
+
+ "life timer fires while there are requests" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(5.seconds), timer)
+ timer.tasks must haveSize(2)
+ timer.tasks forall(!_.isCancelled) must beTrue
+
+ "expire after the given life time" in {
+ service(123)
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beFalse
+
+ timeControl.advance(8.seconds)
+ timer.tick()
- service(132)() must throwA[WriteException]
+ timer.tasks must beEmpty
+ service.isAvailable must beFalse
+ there was no(underlying).release()
+
+ promise() = Return(321)
+ timer.tasks must beEmpty
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ service(132)() must throwA[WriteException]
+ }
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.