Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

add lifeTimer for ExpiringService.

  • Loading branch information...
commit b74ddb0c050f624bed1aef6bc4f99f98d9fce1d2 1 parent d3f0050
Wanli Yang authored
View
8 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -51,6 +51,7 @@ case class ClientBuilder[Req, Rep](
_hostConnectionLimit: Option[Int],
_hostConnectionIdleTime: Option[Duration],
_hostConnectionMaxIdleTime: Option[Duration],
+ _hostConnectionMaxLifeTime: Option[Duration],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
_retries: Option[Int],
@@ -71,6 +72,7 @@ case class ClientBuilder[Req, Rep](
None, // hostConnectionLimit
None, // hostConnectionIdleTime
None, // hostConnectionMaxIdleTime
+ None, // hostConnectionMaxLifeTime
None, // sendBufferSize
None, // recvBufferSize
None, // retries
@@ -92,6 +94,7 @@ case class ClientBuilder[Req, Rep](
"hostConnectionCoresize" -> Some(_hostConnectionCoresize),
"hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
"hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
+ "hostConnectionMaxLifeTime" -> Some(_hostConnectionMaxLifeTime),
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
"retries" -> _retries,
@@ -244,8 +247,9 @@ case class ClientBuilder[Req, Rep](
future = protocol.codec.prepareClientChannel(service)
future = future.flatMap { protocol.prepareChannel(_) }
- _hostConnectionMaxIdleTime.foreach { idleTime =>
- future = future.map { new ExpiringService(_, idleTime) }
+
+ if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) {
+ future = future.map { new ExpiringService(_, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) }
}
future
View
10 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -69,6 +69,7 @@ case class ServerBuilder[Req, Rep](
_channelFactory: Option[ReferenceCountedChannelFactory],
_maxConcurrentRequests: Option[Int],
_hostConnectionMaxIdleTime: Option[Duration],
+ _hostConnectionMaxLifeTime: Option[Duration],
_requestTimeout: Option[Duration],
_readTimeout: Option[Duration],
_writeCompletionTimeout: Option[Duration],
@@ -89,6 +90,7 @@ case class ServerBuilder[Req, Rep](
None, // channelFactory
None, // maxConcurrentRequests
None, // hostConnectionMaxIdleTime
+ None, // hostConnectionMaxLifeTime
None, // requestTimeout
None, // readTimeout
None, // writeCompletionTimeout
@@ -108,6 +110,7 @@ case class ServerBuilder[Req, Rep](
"channelFactory" -> _channelFactory,
"maxConcurrentRequests" -> _maxConcurrentRequests,
"hostConnectionMaxIdleTime" -> _hostConnectionMaxIdleTime,
+ "hostConnectionMaxLifeTime" -> _hostConnectionMaxLifeTime,
"requestTimeout" -> _requestTimeout,
"readTimeout" -> _readTimeout,
"writeCompletionTimeout" -> _writeCompletionTimeout,
@@ -153,6 +156,9 @@ case class ServerBuilder[Req, Rep](
def hostConnectionMaxIdleTime(howlong: Duration) =
copy(_hostConnectionMaxIdleTime = Some(howlong))
+ def hostConnectionMaxLifeTime(howlong: Duration) =
+ copy(_hostConnectionMaxLifeTime = Some(howlong))
+
def requestTimeout(howlong: Duration) =
copy(_requestTimeout = Some(howlong))
@@ -292,8 +298,8 @@ case class ServerBuilder[Req, Rep](
val closingHandler = new ChannelClosingHandler
pipeline.addLast("closingHandler", closingHandler)
- _hostConnectionMaxIdleTime foreach { duration =>
- service = new ExpiringService(service, duration) {
+ if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) {
+ service = new ExpiringService(service, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) {
override def didExpire() { closingHandler.close() }
}
}
View
57 finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
@@ -15,14 +15,28 @@ import com.twitter.finagle.{Service, WriteException, ChannelClosedException}
class ExpiringService[Req, Rep](
underlying: Service[Req, Rep],
- maxIdleTime: Duration,
+ maxIdleTime: Option[Duration],
+ maxLifeTime: Option[Duration],
timer: util.Timer = Timer.default)
extends Service[Req, Rep]
{
private[this] var requestCount = 0
private[this] var expired = false
- private[this] var task: Option[com.twitter.util.TimerTask] =
- Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() })
+ private[this] var idleTimeTask: Option[com.twitter.util.TimerTask] =
+ maxIdleTime match {
+ case Some(idleTime: Duration) =>
+ Some(timer.schedule(idleTime.fromNow) { maybeIdleExpire() })
+ case _ =>
+ None
+ }
+
+ private[this] var lifeTimeTask: Option[com.twitter.util.TimerTask] =
+ maxLifeTime match {
+ case Some(lifeTime: Duration) =>
+ Some(timer.schedule(lifeTime.fromNow) { maybeLifeTimeExpire() })
+ case _ =>
+ None
+ }
private[this] def maybeExpire() = {
val justExpired = synchronized {
@@ -37,6 +51,19 @@ class ExpiringService[Req, Rep](
}
if (justExpired) didExpire()
+ justExpired
+ }
+
+ private[this] def maybeIdleExpire() {
+ if (maybeExpire()) cancelLifeTimer()
+ }
+
+ private[this] def maybeLifeTimeExpire() {
+ if (maybeExpire()) {
+ cancelIdleTimer()
+ } else {
+ expired = true
+ }
}
// May be overriden to provide your own expiration action.
@@ -44,6 +71,18 @@ class ExpiringService[Req, Rep](
underlying.release()
}
+ protected def cancelIdleTimer() {
+ // Cancel the existing timer.
+ idleTimeTask foreach { _.cancel() }
+ idleTimeTask = None
+ }
+
+ protected def cancelLifeTimer() {
+ // Cancel the existing timer.
+ lifeTimeTask foreach { _.cancel() }
+ lifeTimeTask = None
+ }
+
def apply(request: Req): Future[Rep] = synchronized {
if (expired) {
return Future.exception(
@@ -51,18 +90,16 @@ class ExpiringService[Req, Rep](
}
requestCount += 1
- if (requestCount == 1) {
- // Cancel the existing timer.
- task foreach { _.cancel() }
- task = None
- }
+ if (requestCount == 1) cancelIdleTimer()
underlying(request) ensure {
synchronized {
requestCount -= 1
if (requestCount == 0) {
- require(!task.isDefined)
- task = Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() })
+ require(!idleTimeTask.isDefined)
+ maxIdleTime foreach { time =>
+ idleTimeTask = Some(timer.schedule(time.fromNow) { maybeIdleExpire() })
+ }
}
}
}
View
166 finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala
@@ -18,53 +18,139 @@ object ExpiringServiceSpec extends Specification with Mockito {
underlying(123) returns promise
underlying.isAvailable returns true
- val service = new ExpiringService[Any, Any](underlying, 10.seconds, timer)
- timer.tasks must haveSize(1)
-
- "expire after the given idle time" in {
- // For some reason, this complains of different types:
- // timer.tasks.head.when must be_==(Time.now + 10.seconds)
- service.isAvailable must beTrue
-
- timeControl.advance(10.seconds)
- timer.tick()
-
- service.isAvailable must beFalse
- there was one(underlying).release()
-
- timer.tasks must beEmpty
- }
- "cancel the timer when a request is issued" in {
- service(123)
+ "idle time between requests" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), None, timer)
timer.tasks must haveSize(1)
- timer.tasks.head.isCancelled must beTrue
- }
+ "expire after the given idle time" in {
+ // For some reason, this complains of different types:
+ // timer.tasks.head.when must be_==(Time.now + 10.seconds)
+ service.isAvailable must beTrue
- "restart the timer when the request finished" in {
- service(123)
- timer.tasks must haveSize(1)
- timer.tasks.head.isCancelled must beTrue
-
- timeControl.advance(10.seconds)
- timer.tick()
-
- timer.tasks must beEmpty
- promise() = Return(321)
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ timer.tasks must beEmpty
+ }
+
+ "cancel the timer when a request is issued" in {
+ service(123)
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beTrue
+ }
+
+ "restart the timer when the request finished" in {
+ service(123)
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beTrue
+
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ timer.tasks must beEmpty
+ promise() = Return(321)
+ timer.tasks must haveSize(1)
+
+ there was no(underlying).release()
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ there was one(underlying).release()
+ }
+
+ "throw an write exception if we attempt to use an expired service" in {
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service(132)() must throwA[WriteException]
+ }
+ }
+
+ "life time of a connection" in {
+ val service = new ExpiringService[Any, Any](underlying, None, Some(10.seconds), timer)
timer.tasks must haveSize(1)
-
- there was no(underlying).release()
- timeControl.advance(10.seconds)
- timer.tick()
-
- there was one(underlying).release()
+ "expire after the given idle time" in {
+ // For some reason, this complains of different types:
+ // timer.tasks.head.when must be_==(Time.now + 10.seconds)
+ service.isAvailable must beTrue
+
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ timer.tasks must beEmpty
+ }
+
+ "does not cancel the timer when a request is issued" in {
+ service(123)
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beFalse
+ }
+
+ "throw an write exception if we attempt to use an expired service" in {
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service(132)() must throwA[WriteException]
+ }
+ }
+
+ "idle timer fires before life timer fires" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(1.minute), timer)
+ timer.tasks must haveSize(2)
+
+ "expire after the given idle time" in {
+ // For some reason, this complains of different types:
+ // timer.tasks.head.when must be_==(Time.now + 10.seconds)
+ service.isAvailable must beTrue
+
+ timeControl.advance(10.seconds)
+ timer.tick()
+
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+
+ timer.tasks must haveSize(1)
+ timer.tasks.head.isCancelled must beTrue
+ }
}
+
+ "life timer fires before idle timer fires" in {
+ val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(15.seconds), timer)
+ timer.tasks must haveSize(2)
+ timer.tasks forall(!_.isCancelled) must beTrue
+
+ "expire after the given life time" in {
+ service(123)
+ timer.tasks must haveSize(2)
+ timer.tasks(0).isCancelled must beTrue
+ timer.tasks(1).isCancelled must beFalse
+
+ timeControl.advance(8.seconds)
+ timer.tick()
+
+ timer.tasks must haveSize(2)
+ promise() = Return(321)
+
+ timer.tasks must haveSize(3)
+ timer.tasks(0).isCancelled must beTrue
+ timer.tasks(1).isCancelled must beFalse
+ timer.tasks(2).isCancelled must beFalse
- "throw an write exception if we attempt to use an expired service" in {
- timeControl.advance(10.seconds)
- timer.tick()
+ there was no(underlying).release()
+ timeControl.advance(8.seconds)
+ timer.tick()
- service(132)() must throwA[WriteException]
+ timer.tasks must haveSize(1)
+ timer.tasks forall(_.isCancelled) must beTrue
+ service.isAvailable must beFalse
+ there was one(underlying).release()
+ }
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.