Skip to content

Commit

Permalink
fixed connection counter leak in WatermarkPool when unable to establi…
Browse files Browse the repository at this point in the history
…sh a connection to the server (e.g., if it's down)
  • Loading branch information
Nick Kallen committed Mar 3, 2011
1 parent 2b00e8f commit 0a71dc7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
Expand Up @@ -85,7 +85,9 @@ class WatermarkPool[Req, Rep](
Future.value(service)
case None if numServices < highWatermark =>
numServices += 1
factory.make() map { new ServiceWrapper(_) }
factory.make() map { new ServiceWrapper(_) } onFailure { f =>
numServices -= 1
}
case None =>
val promise = new Promise[Service[Req, Rep]]
waiters += promise
Expand Down
Expand Up @@ -3,8 +3,8 @@ package com.twitter.finagle.pool
import org.specs.Specification
import org.specs.mock.Mockito
import org.mockito.Matchers

import com.twitter.util.{Future, Promise, Return}
import com.twitter.conversions.time._
import com.twitter.util.{Future, Promise, Return, Throw}

import com.twitter.finagle._

Expand Down Expand Up @@ -99,7 +99,7 @@ object WatermarkPoolSpec extends Specification with Mockito {
"maintain at all times up to 100 items" in {
val mocks = 0 until 100 map { _ => mock[Service[Int, Int]] }

val services = 0 until 100 map { i =>
val services = 0 until 100 map { i =>
factory.make() returns Future.value(mocks(i))
pool.make()()
}
Expand Down Expand Up @@ -135,12 +135,28 @@ object WatermarkPoolSpec extends Specification with Mockito {
"service lifecycle" should {
val factory = mock[ServiceFactory[Int, Int]]
val service = mock[Service[Int, Int]]
val promise = new Promise[Service[Int, Int]]
service.isAvailable returns true
factory.make() returns promise
val highWaterMark = 5
val pool = new WatermarkPool(factory, 1, highWaterMark)

"not leak services when they are born unhealthy" in {
(0 until highWaterMark) foreach { _ =>
val promise = new Promise[Service[Int, Int]]
factory.make() returns promise
promise() = Throw(new Exception)
pool.make().isThrow mustBe true
}

val promise = new Promise[Service[Int, Int]]
factory.make() returns promise
promise() = Return(service)
pool.make()(1.second).isAvailable mustBe true
}

"release unhealthy services that have been queued" in {
val pool = new WatermarkPool(factory, 5)
val promise = new Promise[Service[Int, Int]]
factory.make() returns promise

promise() = Return(service)

val f = pool.make()
Expand All @@ -153,7 +169,7 @@ object WatermarkPoolSpec extends Specification with Mockito {
there was no(service).release()

factory.make() returns new Promise[Service[Int, Int]]
service.isAvailable returns false
service.isAvailable returns false

// The service is now unhealty, so it should be discarded, and a
// new one should be made.
Expand Down

0 comments on commit 0a71dc7

Please sign in to comment.