From a03db42b8dddbb9deaf8e781be2fe189f3a32e64 Mon Sep 17 00:00:00 2001 From: "marius a. eriksen" Date: Fri, 18 Mar 2011 13:23:32 -0700 Subject: [PATCH 1/5] fix kestrel regression from memcache decoding changes. --- .../protocol/text/AbstractDecoder.scala | 22 +++++++------- .../memcached/util/ChannelBufferUtils.scala | 30 +++++++++++++++---- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/text/AbstractDecoder.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/text/AbstractDecoder.scala index ed717509e9..3d7471ccbb 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/text/AbstractDecoder.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/text/AbstractDecoder.scala @@ -8,18 +8,18 @@ import com.twitter.finagle.memcached.util.ParserUtils import com.twitter.finagle.memcached.util.ChannelBufferUtils._ object AbstractDecoder { - private val DELIMITER = ChannelBuffers.wrappedBuffer("\r\n".getBytes) - private val SKIP_SPACE = 1 - private val FIND_CRLF = new ChannelBufferIndexFinder() { + private val Delimiter = ChannelBuffers.wrappedBuffer("\r\n".getBytes) + private val DelimiterLength = Delimiter.capacity + private val FindCRLF = new ChannelBufferIndexFinder() { def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = { - val enoughBytesForDelimeter = guessedIndex + DELIMITER.readableBytes + val enoughBytesForDelimeter = guessedIndex + Delimiter.readableBytes if (buffer.writerIndex < enoughBytesForDelimeter) return false val cr = buffer.getByte(guessedIndex) val lf = buffer.getByte(guessedIndex + 1) cr == '\r' && lf == '\n' } -} + } } abstract class AbstractDecoder extends FrameDecoder { @@ -38,12 +38,12 @@ abstract class AbstractDecoder extends FrameDecoder { } protected def decodeLine(buffer: ChannelBuffer, needsData: Seq[ChannelBuffer] => Option[Int])(continue: Seq[ChannelBuffer] => Decoding): Decoding = { - val frameLength = buffer.bytesBefore(FIND_CRLF) + val frameLength = buffer.bytesBefore(FindCRLF) if (frameLength < 0) { needMoreData } else { val frame = buffer.slice(buffer.readerIndex, frameLength) - buffer.skipBytes(frameLength + DELIMITER.capacity) + buffer.skipBytes(frameLength + DelimiterLength) val tokens = frame.split val bytesNeeded = needsData(tokens) @@ -58,14 +58,14 @@ abstract class AbstractDecoder extends FrameDecoder { } protected def decodeData(bytesNeeded: Int, buffer: ChannelBuffer)(continue: ChannelBuffer => Decoding): Decoding = { - if (buffer.readableBytes < (bytesNeeded + DELIMITER.capacity)) + if (buffer.readableBytes < (bytesNeeded + DelimiterLength)) needMoreData else { - val lastTwoBytesInFrame = buffer.slice(bytesNeeded + buffer.readerIndex, DELIMITER.capacity) - if (!lastTwoBytesInFrame.equals(DELIMITER)) throw new ClientError("Missing delimiter") + val lastTwoBytesInFrame = buffer.slice(bytesNeeded + buffer.readerIndex, DelimiterLength) + if (!lastTwoBytesInFrame.equals(Delimiter)) throw new ClientError("Missing delimiter") val data = buffer.slice(buffer.readerIndex, bytesNeeded) - buffer.skipBytes(bytesNeeded + DELIMITER.capacity) + buffer.skipBytes(bytesNeeded + DelimiterLength) start() // Copied rather than wrapped to avoid caching data outside the reader/writer mark. diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/ChannelBufferUtils.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/ChannelBufferUtils.scala index 55a96208ab..9d367ad56f 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/ChannelBufferUtils.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/ChannelBufferUtils.scala @@ -22,12 +22,17 @@ object ChannelBufferUtils { override def toString = buffer.toString(CharsetUtil.UTF_8) def size = buffer.writerIndex() - buffer.readerIndex() - def split = { + def split: Seq[ChannelBuffer] = + split(FIND_SPACE, 1) + + def split(delimiter: String): Seq[ChannelBuffer] = + split(stringToChannelBufferIndexFinder(delimiter), delimiter.size) + + def split(indexFinder: ChannelBufferIndexFinder, delimiterLength: Int): Seq[ChannelBuffer] = { val tokens = new ArrayBuffer[ChannelBuffer] - val skipDelimiter = 1 var scratch = buffer while (scratch.capacity > 0) { - val tokenLength = scratch.bytesBefore(FIND_SPACE) + val tokenLength = scratch.bytesBefore(indexFinder) if (tokenLength < 0) { tokens += scratch.copy @@ -35,12 +40,13 @@ object ChannelBufferUtils { } else { tokens += scratch.slice(0, tokenLength).copy scratch = scratch.slice( - tokenLength + skipDelimiter, - scratch.capacity - tokenLength - skipDelimiter) + tokenLength + delimiterLength, + scratch.capacity - tokenLength - delimiterLength) } } tokens } + } implicit def channelBufferToRichChannelBuffer(buffer: ChannelBuffer) = @@ -54,4 +60,18 @@ object ChannelBufferUtils { implicit def stringToByteArray(string: String) = string.getBytes + + implicit def stringToChannelBufferIndexFinder(string: String): ChannelBufferIndexFinder = + new ChannelBufferIndexFinder { + def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = { + val array = string.toArray + var i: Int = 0 + while (i < string.size) { + if (buffer.getByte(guessedIndex + i) != array(i).toByte) + return false + i += 1 + } + return true + } + } } From ee1349965afcbf6d1b59b7d2d8ee0a38d589c1cd Mon Sep 17 00:00:00 2001 From: "marius a. eriksen" Date: Fri, 18 Mar 2011 13:30:43 -0700 Subject: [PATCH 2/5] 1.2.3 --- project/build.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/build.properties b/project/build.properties index 1ba7adac7e..1fa25f9f8c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,8 +1,8 @@ #Project properties -#Wed Mar 16 10:34:26 PDT 2011 +#Fri Mar 18 13:24:17 PDT 2011 project.organization=com.twitter project.name=finagle sbt.version=0.7.4 -project.version=1.2.3-SNAPSHOT +project.version=1.2.3 build.scala.versions=2.8.1 project.initialize=false From e0070fbb59b2f571f145291ed5b69dfc1eccc8e7 Mon Sep 17 00:00:00 2001 From: "marius a. eriksen" Date: Fri, 18 Mar 2011 13:30:44 -0700 Subject: [PATCH 3/5] 1.2.4-SNAPSHOT --- project/build.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/build.properties b/project/build.properties index 1fa25f9f8c..02103dc48b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,8 +1,8 @@ #Project properties -#Fri Mar 18 13:24:17 PDT 2011 +#Fri Mar 18 13:30:44 PDT 2011 project.organization=com.twitter project.name=finagle sbt.version=0.7.4 -project.version=1.2.3 +project.version=1.2.4-SNAPSHOT build.scala.versions=2.8.1 project.initialize=false From d3f0050d57a75baf7c1f0d6915c788376a7cba24 Mon Sep 17 00:00:00 2001 From: "marius a. eriksen" Date: Fri, 18 Mar 2011 13:51:05 -0700 Subject: [PATCH 4/5] include changelog in README --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index 3a2fcb85b2..41e6e7fe4e 100644 --- a/README.md +++ b/README.md @@ -375,3 +375,14 @@ Here's how to use it: .build() That's it! + +# Changes + +## 1.2.3 (2011-03-18) + +- A faster memcache codec (joint work with Arya from profiling rooster usage) +- Fix a bug in memcache decoding of big responses (due to Arya) +- More exported pool & connection statistics +- Fix a bug in the CachingPool wherein connection accounting broke +- Speed up leastqueued loadbalancer strategy +- finagle-serversets is now included in the opensource distribution, instructions at https://github.com/twitter/finagle/blob/master/README.md From b74ddb0c050f624bed1aef6bc4f99f98d9fce1d2 Mon Sep 17 00:00:00 2001 From: Wanli Yang Date: Mon, 21 Mar 2011 11:04:21 -0700 Subject: [PATCH 5/5] add lifeTimer for ExpiringService. --- .../finagle/builder/ClientBuilder.scala | 8 +- .../finagle/builder/ServerBuilder.scala | 10 +- .../finagle/service/ExpiringService.scala | 57 ++++-- .../finagle/service/ExpiringServiceSpec.scala | 166 +++++++++++++----- 4 files changed, 187 insertions(+), 54 deletions(-) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala index 85306e1400..16a5cecf84 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala @@ -51,6 +51,7 @@ case class ClientBuilder[Req, Rep]( _hostConnectionLimit: Option[Int], _hostConnectionIdleTime: Option[Duration], _hostConnectionMaxIdleTime: Option[Duration], + _hostConnectionMaxLifeTime: Option[Duration], _sendBufferSize: Option[Int], _recvBufferSize: Option[Int], _retries: Option[Int], @@ -71,6 +72,7 @@ case class ClientBuilder[Req, Rep]( None, // hostConnectionLimit None, // hostConnectionIdleTime None, // hostConnectionMaxIdleTime + None, // hostConnectionMaxLifeTime None, // sendBufferSize None, // recvBufferSize None, // retries @@ -92,6 +94,7 @@ case class ClientBuilder[Req, Rep]( "hostConnectionCoresize" -> Some(_hostConnectionCoresize), "hostConnectionIdleTime" -> Some(_hostConnectionIdleTime), "hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime), + "hostConnectionMaxLifeTime" -> Some(_hostConnectionMaxLifeTime), "sendBufferSize" -> _sendBufferSize, "recvBufferSize" -> _recvBufferSize, "retries" -> _retries, @@ -244,8 +247,9 @@ case class ClientBuilder[Req, Rep]( future = protocol.codec.prepareClientChannel(service) future = future.flatMap { protocol.prepareChannel(_) } - _hostConnectionMaxIdleTime.foreach { idleTime => - future = future.map { new ExpiringService(_, idleTime) } + + if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) { + future = future.map { new ExpiringService(_, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) } } future diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala index e2dcb54770..45cb101493 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala @@ -69,6 +69,7 @@ case class ServerBuilder[Req, Rep]( _channelFactory: Option[ReferenceCountedChannelFactory], _maxConcurrentRequests: Option[Int], _hostConnectionMaxIdleTime: Option[Duration], + _hostConnectionMaxLifeTime: Option[Duration], _requestTimeout: Option[Duration], _readTimeout: Option[Duration], _writeCompletionTimeout: Option[Duration], @@ -89,6 +90,7 @@ case class ServerBuilder[Req, Rep]( None, // channelFactory None, // maxConcurrentRequests None, // hostConnectionMaxIdleTime + None, // hostConnectionMaxLifeTime None, // requestTimeout None, // readTimeout None, // writeCompletionTimeout @@ -108,6 +110,7 @@ case class ServerBuilder[Req, Rep]( "channelFactory" -> _channelFactory, "maxConcurrentRequests" -> _maxConcurrentRequests, "hostConnectionMaxIdleTime" -> _hostConnectionMaxIdleTime, + "hostConnectionMaxLifeTime" -> _hostConnectionMaxLifeTime, "requestTimeout" -> _requestTimeout, "readTimeout" -> _readTimeout, "writeCompletionTimeout" -> _writeCompletionTimeout, @@ -153,6 +156,9 @@ case class ServerBuilder[Req, Rep]( def hostConnectionMaxIdleTime(howlong: Duration) = copy(_hostConnectionMaxIdleTime = Some(howlong)) + def hostConnectionMaxLifeTime(howlong: Duration) = + copy(_hostConnectionMaxLifeTime = Some(howlong)) + def requestTimeout(howlong: Duration) = copy(_requestTimeout = Some(howlong)) @@ -292,8 +298,8 @@ case class ServerBuilder[Req, Rep]( val closingHandler = new ChannelClosingHandler pipeline.addLast("closingHandler", closingHandler) - _hostConnectionMaxIdleTime foreach { duration => - service = new ExpiringService(service, duration) { + if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) { + service = new ExpiringService(service, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime) { override def didExpire() { closingHandler.close() } } } diff --git a/finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala b/finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala index 2ef0ae4e74..3e5a9d5666 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala @@ -15,14 +15,28 @@ import com.twitter.finagle.{Service, WriteException, ChannelClosedException} class ExpiringService[Req, Rep]( underlying: Service[Req, Rep], - maxIdleTime: Duration, + maxIdleTime: Option[Duration], + maxLifeTime: Option[Duration], timer: util.Timer = Timer.default) extends Service[Req, Rep] { private[this] var requestCount = 0 private[this] var expired = false - private[this] var task: Option[com.twitter.util.TimerTask] = - Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() }) + private[this] var idleTimeTask: Option[com.twitter.util.TimerTask] = + maxIdleTime match { + case Some(idleTime: Duration) => + Some(timer.schedule(idleTime.fromNow) { maybeIdleExpire() }) + case _ => + None + } + + private[this] var lifeTimeTask: Option[com.twitter.util.TimerTask] = + maxLifeTime match { + case Some(lifeTime: Duration) => + Some(timer.schedule(lifeTime.fromNow) { maybeLifeTimeExpire() }) + case _ => + None + } private[this] def maybeExpire() = { val justExpired = synchronized { @@ -37,6 +51,19 @@ class ExpiringService[Req, Rep]( } if (justExpired) didExpire() + justExpired + } + + private[this] def maybeIdleExpire() { + if (maybeExpire()) cancelLifeTimer() + } + + private[this] def maybeLifeTimeExpire() { + if (maybeExpire()) { + cancelIdleTimer() + } else { + expired = true + } } // May be overriden to provide your own expiration action. @@ -44,6 +71,18 @@ class ExpiringService[Req, Rep]( underlying.release() } + protected def cancelIdleTimer() { + // Cancel the existing timer. + idleTimeTask foreach { _.cancel() } + idleTimeTask = None + } + + protected def cancelLifeTimer() { + // Cancel the existing timer. + lifeTimeTask foreach { _.cancel() } + lifeTimeTask = None + } + def apply(request: Req): Future[Rep] = synchronized { if (expired) { return Future.exception( @@ -51,18 +90,16 @@ class ExpiringService[Req, Rep]( } requestCount += 1 - if (requestCount == 1) { - // Cancel the existing timer. - task foreach { _.cancel() } - task = None - } + if (requestCount == 1) cancelIdleTimer() underlying(request) ensure { synchronized { requestCount -= 1 if (requestCount == 0) { - require(!task.isDefined) - task = Some(timer.schedule(maxIdleTime.fromNow) { maybeExpire() }) + require(!idleTimeTask.isDefined) + maxIdleTime foreach { time => + idleTimeTask = Some(timer.schedule(time.fromNow) { maybeIdleExpire() }) + } } } } diff --git a/finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala b/finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala index f106932a7b..2666915c90 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala @@ -18,53 +18,139 @@ object ExpiringServiceSpec extends Specification with Mockito { underlying(123) returns promise underlying.isAvailable returns true - val service = new ExpiringService[Any, Any](underlying, 10.seconds, timer) - timer.tasks must haveSize(1) - - "expire after the given idle time" in { - // For some reason, this complains of different types: - // timer.tasks.head.when must be_==(Time.now + 10.seconds) - service.isAvailable must beTrue - - timeControl.advance(10.seconds) - timer.tick() - - service.isAvailable must beFalse - there was one(underlying).release() - - timer.tasks must beEmpty - } - "cancel the timer when a request is issued" in { - service(123) + "idle time between requests" in { + val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), None, timer) timer.tasks must haveSize(1) - timer.tasks.head.isCancelled must beTrue - } + "expire after the given idle time" in { + // For some reason, this complains of different types: + // timer.tasks.head.when must be_==(Time.now + 10.seconds) + service.isAvailable must beTrue - "restart the timer when the request finished" in { - service(123) - timer.tasks must haveSize(1) - timer.tasks.head.isCancelled must beTrue - - timeControl.advance(10.seconds) - timer.tick() - - timer.tasks must beEmpty - promise() = Return(321) + timeControl.advance(10.seconds) + timer.tick() + + service.isAvailable must beFalse + there was one(underlying).release() + + timer.tasks must beEmpty + } + + "cancel the timer when a request is issued" in { + service(123) + timer.tasks must haveSize(1) + timer.tasks.head.isCancelled must beTrue + } + + "restart the timer when the request finished" in { + service(123) + timer.tasks must haveSize(1) + timer.tasks.head.isCancelled must beTrue + + timeControl.advance(10.seconds) + timer.tick() + + timer.tasks must beEmpty + promise() = Return(321) + timer.tasks must haveSize(1) + + there was no(underlying).release() + timeControl.advance(10.seconds) + timer.tick() + + there was one(underlying).release() + } + + "throw an write exception if we attempt to use an expired service" in { + timeControl.advance(10.seconds) + timer.tick() + + service(132)() must throwA[WriteException] + } + } + + "life time of a connection" in { + val service = new ExpiringService[Any, Any](underlying, None, Some(10.seconds), timer) timer.tasks must haveSize(1) - - there was no(underlying).release() - timeControl.advance(10.seconds) - timer.tick() - - there was one(underlying).release() + "expire after the given idle time" in { + // For some reason, this complains of different types: + // timer.tasks.head.when must be_==(Time.now + 10.seconds) + service.isAvailable must beTrue + + timeControl.advance(10.seconds) + timer.tick() + + service.isAvailable must beFalse + there was one(underlying).release() + + timer.tasks must beEmpty + } + + "does not cancel the timer when a request is issued" in { + service(123) + timer.tasks must haveSize(1) + timer.tasks.head.isCancelled must beFalse + } + + "throw an write exception if we attempt to use an expired service" in { + timeControl.advance(10.seconds) + timer.tick() + + service(132)() must throwA[WriteException] + } + } + + "idle timer fires before life timer fires" in { + val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(1.minute), timer) + timer.tasks must haveSize(2) + + "expire after the given idle time" in { + // For some reason, this complains of different types: + // timer.tasks.head.when must be_==(Time.now + 10.seconds) + service.isAvailable must beTrue + + timeControl.advance(10.seconds) + timer.tick() + + service.isAvailable must beFalse + there was one(underlying).release() + + timer.tasks must haveSize(1) + timer.tasks.head.isCancelled must beTrue + } } + + "life timer fires before idle timer fires" in { + val service = new ExpiringService[Any, Any](underlying, Some(10.seconds), Some(15.seconds), timer) + timer.tasks must haveSize(2) + timer.tasks forall(!_.isCancelled) must beTrue + + "expire after the given life time" in { + service(123) + timer.tasks must haveSize(2) + timer.tasks(0).isCancelled must beTrue + timer.tasks(1).isCancelled must beFalse + + timeControl.advance(8.seconds) + timer.tick() + + timer.tasks must haveSize(2) + promise() = Return(321) + + timer.tasks must haveSize(3) + timer.tasks(0).isCancelled must beTrue + timer.tasks(1).isCancelled must beFalse + timer.tasks(2).isCancelled must beFalse - "throw an write exception if we attempt to use an expired service" in { - timeControl.advance(10.seconds) - timer.tick() + there was no(underlying).release() + timeControl.advance(8.seconds) + timer.tick() - service(132)() must throwA[WriteException] + timer.tasks must haveSize(1) + timer.tasks forall(_.isCancelled) must beTrue + service.isAvailable must beFalse + there was one(underlying).release() + } } } }