Skip to content

Commit

Permalink
Merge branch 'connection_ttl'
Browse files Browse the repository at this point in the history
  • Loading branch information
Wanli Yang authored and Wanli Yang committed Mar 21, 2011
2 parents 77dd110 + b74ddb0 commit d2883a5
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 72 deletions.
11 changes: 11 additions & 0 deletions README.md
Expand Up @@ -375,3 +375,14 @@ Here's how to use it:
.build()

That's it!

# Changes

## 1.2.3 (2011-03-18)

- A faster memcache codec (joint work with Arya from profiling rooster usage)
- Fix a bug in memcache decoding of big responses (due to Arya)
- More exported pool & connection statistics
- Fix a bug in the CachingPool wherein connection accounting broke
- Speed up leastqueued loadbalancer strategy
- finagle-serversets is now included in the opensource distribution, instructions at https://github.com/twitter/finagle/blob/master/README.md
Expand Up @@ -51,6 +51,7 @@ case class ClientBuilder[Req, Rep](
_hostConnectionLimit: Option[Int],
_hostConnectionIdleTime: Option[Duration],
_hostConnectionMaxIdleTime: Option[Duration],
_hostConnectionMaxLifeTime: Option[Duration],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
_retries: Option[Int],
Expand All @@ -71,6 +72,7 @@ case class ClientBuilder[Req, Rep](
None, // hostConnectionLimit
None, // hostConnectionIdleTime
None, // hostConnectionMaxIdleTime
None, // hostConnectionMaxLifeTime
None, // sendBufferSize
None, // recvBufferSize
None, // retries
Expand All @@ -92,6 +94,7 @@ case class ClientBuilder[Req, Rep](
"hostConnectionCoresize" -> Some(_hostConnectionCoresize),
"hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
"hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
"hostConnectionMaxLifeTime" -> Some(_hostConnectionMaxLifeTime),
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
"retries" -> _retries,
Expand Down Expand Up @@ -244,8 +247,9 @@ case class ClientBuilder[Req, Rep](

future = protocol.codec.prepareClientChannel(service)
future = future.flatMap { protocol.prepareChannel(_) }
_hostConnectionMaxIdleTime.foreach { idleTime =>
future = future.map { new ExpiringService(_, idleTime) }

if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) {
future = future.map { new ExpiringService(_, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) }
}

future
Expand Down
Expand Up @@ -69,6 +69,7 @@ case class ServerBuilder[Req, Rep](
_channelFactory: Option[ReferenceCountedChannelFactory],
_maxConcurrentRequests: Option[Int],
_hostConnectionMaxIdleTime: Option[Duration],
_hostConnectionMaxLifeTime: Option[Duration],
_requestTimeout: Option[Duration],
_readTimeout: Option[Duration],
_writeCompletionTimeout: Option[Duration],
Expand All @@ -89,6 +90,7 @@ case class ServerBuilder[Req, Rep](
None, // channelFactory
None, // maxConcurrentRequests
None, // hostConnectionMaxIdleTime
None, // hostConnectionMaxLifeTime
None, // requestTimeout
None, // readTimeout
None, // writeCompletionTimeout
Expand All @@ -108,6 +110,7 @@ case class ServerBuilder[Req, Rep](
"channelFactory" -> _channelFactory,
"maxConcurrentRequests" -> _maxConcurrentRequests,
"hostConnectionMaxIdleTime" -> _hostConnectionMaxIdleTime,
"hostConnectionMaxLifeTime" -> _hostConnectionMaxLifeTime,
"requestTimeout" -> _requestTimeout,
"readTimeout" -> _readTimeout,
"writeCompletionTimeout" -> _writeCompletionTimeout,
Expand Down Expand Up @@ -153,6 +156,9 @@ case class ServerBuilder[Req, Rep](
def hostConnectionMaxIdleTime(howlong: Duration) =
copy(_hostConnectionMaxIdleTime = Some(howlong))

def hostConnectionMaxLifeTime(howlong: Duration) =
copy(_hostConnectionMaxLifeTime = Some(howlong))

def requestTimeout(howlong: Duration) =
copy(_requestTimeout = Some(howlong))

Expand Down Expand Up @@ -292,8 +298,8 @@ case class ServerBuilder[Req, Rep](

val closingHandler = new ChannelClosingHandler
pipeline.addLast("closingHandler", closingHandler)
_hostConnectionMaxIdleTime foreach { duration =>
service = new ExpiringService(service, duration) {
if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) {
service = new ExpiringService(service, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) {
override def didExpire() { closingHandler.close() }
}
}
Expand Down
Expand Up @@ -15,14 +15,28 @@ import com.twitter.finagle.{Service, WriteException, ChannelClosedException}

class ExpiringService[Req, Rep](
underlying: Service[Req, Rep],
maxIdleTime: Duration,
maxIdleTime: Option[Duration],
maxLifeTime: Option[Duration],
timer: util.Timer = Timer.default)
extends Service[Req, Rep]
{
private[this] var requestCount = 0
private[this] var expired = false
private[this] var task: Option[com.twitter.util.TimerTask] =
Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() })
private[this] var idleTimeTask: Option[com.twitter.util.TimerTask] =
maxIdleTime match {
case Some(idleTime: Duration) =>
Some(timer.schedule(idleTime.fromNow) { maybeIdleExpire() })
case _ =>
None
}

private[this] var lifeTimeTask: Option[com.twitter.util.TimerTask] =
maxLifeTime match {
case Some(lifeTime: Duration) =>
Some(timer.schedule(lifeTime.fromNow) { maybeLifeTimeExpire() })
case _ =>
None
}

private[this] def maybeExpire() = {
val justExpired = synchronized {
Expand All @@ -37,32 +51,55 @@ class ExpiringService[Req, Rep](
}

if (justExpired) didExpire()
justExpired
}

private[this] def maybeIdleExpire() {
if (maybeExpire()) cancelLifeTimer()
}

private[this] def maybeLifeTimeExpire() {
if (maybeExpire()) {
cancelIdleTimer()
} else {
expired = true
}
}

// May be overriden to provide your own expiration action.
protected def didExpire() {
underlying.release()
}

protected def cancelIdleTimer() {
// Cancel the existing timer.
idleTimeTask foreach { _.cancel() }
idleTimeTask = None
}

protected def cancelLifeTimer() {
// Cancel the existing timer.
lifeTimeTask foreach { _.cancel() }
lifeTimeTask = None
}

def apply(request: Req): Future[Rep] = synchronized {
if (expired) {
return Future.exception(
new WriteException(new ChannelClosedException))
}

requestCount += 1
if (requestCount == 1) {
// Cancel the existing timer.
task foreach { _.cancel() }
task = None
}
if (requestCount == 1) cancelIdleTimer()

underlying(request) ensure {
synchronized {
requestCount -= 1
if (requestCount == 0) {
require(!task.isDefined)
task = Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() })
require(!idleTimeTask.isDefined)
maxIdleTime foreach { time =>
idleTimeTask = Some(timer.schedule(time.fromNow) { maybeIdleExpire() })
}
}
}
}
Expand Down
Expand Up @@ -18,53 +18,139 @@ object ExpiringServiceSpec extends Specification with Mockito {
underlying(123) returns promise
underlying.isAvailable returns true

val service = new ExpiringService[Any, Any](underlying, 10.seconds, timer)
timer.tasks must haveSize(1)

"expire after the given idle time" in {
// For some reason, this complains of different types:
// timer.tasks.head.when must be_==(Time.now + 10.seconds)
service.isAvailable must beTrue

timeControl.advance(10.seconds)
timer.tick()

service.isAvailable must beFalse
there was one(underlying).release()

timer.tasks must beEmpty
}

"cancel the timer when a request is issued" in {
service(123)
"idle time between requests" in {
val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), None, timer)
timer.tasks must haveSize(1)
timer.tasks.head.isCancelled must beTrue
}
"expire after the given idle time" in {
// For some reason, this complains of different types:
// timer.tasks.head.when must be_==(Time.now + 10.seconds)
service.isAvailable must beTrue

"restart the timer when the request finished" in {
service(123)
timer.tasks must haveSize(1)
timer.tasks.head.isCancelled must beTrue

timeControl.advance(10.seconds)
timer.tick()

timer.tasks must beEmpty
promise() = Return(321)
timeControl.advance(10.seconds)
timer.tick()

service.isAvailable must beFalse
there was one(underlying).release()

timer.tasks must beEmpty
}

"cancel the timer when a request is issued" in {
service(123)
timer.tasks must haveSize(1)
timer.tasks.head.isCancelled must beTrue
}

"restart the timer when the request finished" in {
service(123)
timer.tasks must haveSize(1)
timer.tasks.head.isCancelled must beTrue

timeControl.advance(10.seconds)
timer.tick()

timer.tasks must beEmpty
promise() = Return(321)
timer.tasks must haveSize(1)

there was no(underlying).release()
timeControl.advance(10.seconds)
timer.tick()

there was one(underlying).release()
}

"throw an write exception if we attempt to use an expired service" in {
timeControl.advance(10.seconds)
timer.tick()

service(132)() must throwA[WriteException]
}
}

"life time of a connection" in {
val service = new ExpiringService[Any, Any](underlying, None, Some(10.seconds), timer)
timer.tasks must haveSize(1)

there was no(underlying).release()
timeControl.advance(10.seconds)
timer.tick()

there was one(underlying).release()
"expire after the given idle time" in {
// For some reason, this complains of different types:
// timer.tasks.head.when must be_==(Time.now + 10.seconds)
service.isAvailable must beTrue

timeControl.advance(10.seconds)
timer.tick()

service.isAvailable must beFalse
there was one(underlying).release()

timer.tasks must beEmpty
}

"does not cancel the timer when a request is issued" in {
service(123)
timer.tasks must haveSize(1)
timer.tasks.head.isCancelled must beFalse
}

"throw an write exception if we attempt to use an expired service" in {
timeControl.advance(10.seconds)
timer.tick()

service(132)() must throwA[WriteException]
}
}

"idle timer fires before life timer fires" in {
val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(1.minute), timer)
timer.tasks must haveSize(2)

"expire after the given idle time" in {
// For some reason, this complains of different types:
// timer.tasks.head.when must be_==(Time.now + 10.seconds)
service.isAvailable must beTrue

timeControl.advance(10.seconds)
timer.tick()

service.isAvailable must beFalse
there was one(underlying).release()

timer.tasks must haveSize(1)
timer.tasks.head.isCancelled must beTrue
}
}

"life timer fires before idle timer fires" in {
val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(15.seconds), timer)
timer.tasks must haveSize(2)
timer.tasks forall(!_.isCancelled) must beTrue

"expire after the given life time" in {
service(123)
timer.tasks must haveSize(2)
timer.tasks(0).isCancelled must beTrue
timer.tasks(1).isCancelled must beFalse

timeControl.advance(8.seconds)
timer.tick()

timer.tasks must haveSize(2)
promise() = Return(321)

timer.tasks must haveSize(3)
timer.tasks(0).isCancelled must beTrue
timer.tasks(1).isCancelled must beFalse
timer.tasks(2).isCancelled must beFalse

"throw an write exception if we attempt to use an expired service" in {
timeControl.advance(10.seconds)
timer.tick()
there was no(underlying).release()
timeControl.advance(8.seconds)
timer.tick()

service(132)() must throwA[WriteException]
timer.tasks must haveSize(1)
timer.tasks forall(_.isCancelled) must beTrue
service.isAvailable must beFalse
there was one(underlying).release()
}
}
}
}
Expand Down

0 comments on commit d2883a5

Please sign in to comment.