diff --git a/ChangeLog b/ChangeLog index 463f3da8..63866726 100644 --- a/ChangeLog +++ b/ChangeLog @@ -6,6 +6,7 @@ release: TBD - Support for service discovery via ZooKeeper - Kestrel now ignores directories in its queue path. - Fix load test scripts to use correct JAR name [Bryan English] +- Scala 2.9.2, finagle 5.3.x 2.3.4 ----- diff --git a/docs/guide.md b/docs/guide.md index 6d9c7a30..6d206b02 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -369,7 +369,7 @@ The kestrel implementation of the memcache protocol commands is described below. per-item, so many clients may monitor a queue at once. After the given timeout, a separate `END` response will signal the end of the monitor period. Any fetched items are open transactions (see "Reliable Reads" - below), and should be closed with `CONFIRM`. + below), and should be closed with `CONFIRM`. - `CONFIRM ` diff --git a/project/Build.scala b/project/Build.scala index e782e505..55eb983c 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -3,7 +3,7 @@ import Keys._ import com.twitter.sbt._ object Kestrel extends Build { - val finagleVersion = "1.11.1" + val finagleVersion = "5.3.17" lazy val root = Project( id = "kestrel", @@ -16,21 +16,20 @@ object Kestrel extends Build { name := "kestrel", organization := "net.lag", version := "2.4.0-SNAPSHOT", - scalaVersion := "2.9.1", + scalaVersion := "2.9.2", // time-based tests cannot be run in parallel logBuffered in Test := false, parallelExecution in Test := false, libraryDependencies ++= Seq( - "com.twitter" %% "ostrich" % "4.10.6", - "com.twitter" %% "naggati" % "2.2.3" intransitive(), // allow custom netty - "com.twitter" %% "finagle-core" % finagleVersion, - "com.twitter" %% "finagle-ostrich4" % finagleVersion, - "com.twitter" %% "finagle-thrift" % finagleVersion, // override scrooge's version - "commons-codec" % "commons-codec" % "1.6", // override scrooge/util-codec's version - "com.twitter" %% "scrooge-runtime" % "1.1.3", - "com.twitter.common.zookeeper" % "server-set" % "1.0.15", + "com.twitter" % "ostrich" % "8.2.6", + "com.twitter" %% "naggati" % "4.1.0", + "com.twitter" % "finagle-core" % finagleVersion, + "com.twitter" % "finagle-ostrich4" % finagleVersion, + "com.twitter" % "finagle-thrift" % finagleVersion, // override scrooge's version + "com.twitter" %% "scrooge-runtime" % "3.0.1", + "com.twitter.common.zookeeper" % "server-set" % "1.0.16", // for tests only: "org.scala-tools.testing" %% "specs" % "1.6.9" % "test", @@ -43,7 +42,7 @@ object Kestrel extends Build { mainClass in Compile := Some("net.lag.kestrel.Kestrel"), - CompileThriftScrooge.scroogeVersion := "1.1.7", + CompileThriftScrooge.scroogeVersion := "3.0.1", PackageDist.packageDistConfigFilesValidationRegex := Some(".*"), SubversionPublisher.subversionRepository := Some("https://svn.twitter.biz/maven-public"), publishArtifact in Test := true diff --git a/project/plugins.sbt b/project/plugins.sbt index 4de65363..c6ec0db0 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -19,7 +19,6 @@ resolvers <<= (resolvers) { r => externalResolvers <<= (resolvers) map identity -addSbtPlugin("com.twitter" %% "sbt-package-dist" % "1.0.5") - -addSbtPlugin("com.twitter" %% "sbt11-scrooge" % "1.0.0") +addSbtPlugin("com.twitter" %% "sbt-package-dist" % "1.0.6") +addSbtPlugin("com.twitter" %% "sbt11-scrooge" % "3.0.0") diff --git a/src/main/scala/net/lag/kestrel/Kestrel.scala b/src/main/scala/net/lag/kestrel/Kestrel.scala index e9a56180..eb46c67e 100644 --- a/src/main/scala/net/lag/kestrel/Kestrel.scala +++ b/src/main/scala/net/lag/kestrel/Kestrel.scala @@ -23,13 +23,13 @@ import com.twitter.finagle.{ClientConnection, Codec => FinagleCodec, Service => import com.twitter.finagle.builder.{Server, ServerBuilder} import com.twitter.finagle.stats.OstrichStatsReceiver import com.twitter.finagle.thrift._ -import com.twitter.finagle.util.{Timer => FinagleTimer} +import com.twitter.finagle.util.TimerFromNettyTimer import com.twitter.logging.Logger import com.twitter.naggati.Codec import com.twitter.naggati.codec.{MemcacheResponse, MemcacheRequest, MemcacheCodec} import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service, ServiceTracker} import com.twitter.ostrich.stats.Stats -import com.twitter.util.{Duration, Eval, Future, Time} +import com.twitter.util.{Duration, Eval, Future, Time, Timer} import java.net.InetSocketAddress import java.util.Collections._ import java.util.concurrent._ @@ -87,7 +87,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali def startThriftServer( name: String, port: Int, - fTimer: FinagleTimer + fTimer: Timer ): Server = { val address = new InetSocketAddress(listenAddress, port) var builder = ServerBuilder() @@ -135,7 +135,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali } }) - val finagleTimer = new FinagleTimer(timer) + val finagleTimer = new TimerFromNettyTimer(timer) try { queueCollection = new QueueCollection(queuePath, finagleTimer, journalSyncScheduler, defaultQueueConfig, builders, aliases) diff --git a/src/main/scala/net/lag/kestrel/MemcacheHandler.scala b/src/main/scala/net/lag/kestrel/MemcacheHandler.scala index ee29970c..58398891 100644 --- a/src/main/scala/net/lag/kestrel/MemcacheHandler.scala +++ b/src/main/scala/net/lag/kestrel/MemcacheHandler.scala @@ -18,8 +18,8 @@ package net.lag.kestrel import java.net.InetSocketAddress +import java.nio.ByteBuffer import scala.collection.mutable -import com.twitter.concurrent.ChannelSource import com.twitter.conversions.time._ import com.twitter.finagle.{ClientConnection, Service} import com.twitter.logging.Logger @@ -90,8 +90,11 @@ class MemcacheHandler( } else { Some(Time.epoch + expiry.seconds) } + val buffer = request.data.get + val data = new Array[Byte](buffer.remaining()) + buffer.get(data) try { - if (handler.setItem(request.line(1), request.line(2).toInt, normalizedExpiry, request.data.get)) { + if (handler.setItem(request.line(1), request.line(2).toInt, normalizedExpiry, data)) { Future(new MemcacheResponse("STORED")) } else { Future(new MemcacheResponse("NOT_STORED")) @@ -191,7 +194,7 @@ class MemcacheHandler( case None => new MemcacheResponse("END") case Some(item) => - new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data)) + new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(ByteBuffer.wrap(item.data))) } } } catch { @@ -211,7 +214,7 @@ class MemcacheHandler( case None => channel.send(new MemcacheResponse("END") then Codec.EndStream) case Some(item) => - channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data))) + channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(ByteBuffer.wrap(item.data)))) } } new MemcacheResponse("") then Codec.Stream(channel) diff --git a/src/main/scala/net/lag/kestrel/TextHandler.scala b/src/main/scala/net/lag/kestrel/TextHandler.scala index b62ecf74..136cb891 100644 --- a/src/main/scala/net/lag/kestrel/TextHandler.scala +++ b/src/main/scala/net/lag/kestrel/TextHandler.scala @@ -18,7 +18,6 @@ package net.lag.kestrel import scala.collection.mutable -import com.twitter.concurrent.ChannelSource import com.twitter.conversions.time._ import com.twitter.logging.Logger import com.twitter.naggati._ diff --git a/src/main/scala/net/lag/kestrel/ThriftHandler.scala b/src/main/scala/net/lag/kestrel/ThriftHandler.scala index 9a004502..200d6ba2 100644 --- a/src/main/scala/net/lag/kestrel/ThriftHandler.scala +++ b/src/main/scala/net/lag/kestrel/ThriftHandler.scala @@ -29,7 +29,7 @@ import org.apache.thrift.protocol.TProtocolFactory import scala.collection.mutable import scala.collection.Set -class ThriftFinagledService(val handler: ThriftHandler, override val protocolFactory: TProtocolFactory) +class ThriftFinagledService(val handler: ThriftHandler, val protocolFactory: TProtocolFactory) extends thrift.Kestrel.FinagledService(handler, protocolFactory) { override def release() { @@ -187,7 +187,7 @@ class ThriftHandler ( } case Some(item) => { val externalXid = externalXidOption.getOrElse(0L) - rv += new thrift.Item(ByteBuffer.wrap(item.data), externalXid) + rv += thrift.Item(ByteBuffer.wrap(item.data), externalXid) } } } @@ -219,7 +219,7 @@ class ThriftHandler ( handler.getItem(queueName, None, false, true).map { itemOption => val data = itemOption.map { item => ByteBuffer.wrap(item.data) } val stats = queueCollection.stats(queueName).toMap - new thrift.QueueInfo(data, stats("items").toLong, stats("bytes").toLong, + thrift.QueueInfo(data, stats("items").toLong, stats("bytes").toLong, stats("logsize").toLong, stats("age").toLong, stats("waiters").toInt, stats("open_transactions").toInt) } diff --git a/src/scripts/kestrel.sh b/src/scripts/kestrel.sh index 88cc71db..3710bc1a 100644 --- a/src/scripts/kestrel.sh +++ b/src/scripts/kestrel.sh @@ -10,7 +10,7 @@ APP_NAME="kestrel" ADMIN_PORT="2223" VERSION="@VERSION@" -SCALA_VERSION="2.9.1" +SCALA_VERSION="2.9.2" APP_HOME="/usr/local/$APP_NAME/current" INITIAL_SLEEP=15 diff --git a/src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala b/src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala index 494ad9d1..b875e7b4 100644 --- a/src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala +++ b/src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala @@ -23,6 +23,7 @@ import com.twitter.naggati.codec.{MemcacheRequest, MemcacheResponse} import com.twitter.ostrich.admin.RuntimeEnvironment import com.twitter.util.{Future, Promise, Time} import java.net.InetSocketAddress +import java.nio.ByteBuffer import org.specs.Specification import org.specs.mock.{ClassMocker, JMocker} import scala.collection.mutable @@ -36,14 +37,17 @@ class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker { val address = new InetSocketAddress("", 0) val qitem = QItem(Time.now, None, "state shirt".getBytes, 23) - def toReq(command: String, data: Option[Array[Byte]] = None): MemcacheRequest = { + def toReq(command: String, payload: Option[String] = None): MemcacheRequest = { val parts = command.split(" ").toList - val dataLength = data map { _.length + 2 } orElse { Some(0) } + val data = payload map { s => + ByteBuffer.wrap(s.getBytes) + } + val dataLength = data map { _.remaining + 2 } orElse { Some(0) } MemcacheRequest(parts, data, command.length + 2 + dataLength.get) } def toResp(queue: String, qItem: QItem): MemcacheResponse = { - MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(qItem.data)) + MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(ByteBuffer.wrap(qItem.data))) } val endResponse = MemcacheResponse("END", None) @@ -372,7 +376,7 @@ class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker { } val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10) - memcacheHandler(toReq("set test 0 0 5", Some("hello".getBytes)))() mustEqual MemcacheResponse("STORED", None) + memcacheHandler(toReq("set test 0 0 5", Some("hello")))() mustEqual MemcacheResponse("STORED", None) } } diff --git a/src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala b/src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala index 06590081..54849d15 100644 --- a/src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala +++ b/src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala @@ -357,7 +357,7 @@ class ThriftHandlerSpec extends Specification with JMocker with ClassMocker { ).toArray } - val qinfo = new thrift.QueueInfo(Some(ByteBuffer.wrap(item1)), 10, 10240, 29999, 500, 2, 1) + val qinfo = thrift.QueueInfo(Some(ByteBuffer.wrap(item1)), 10, 10240, 29999, 500, 2, 1) thriftHandler.peek("test")() mustEqual qinfo } } diff --git a/src/test/scala/net/lag/kestrel/load/Client.scala b/src/test/scala/net/lag/kestrel/load/Client.scala index 15447ac6..fc3734af 100644 --- a/src/test/scala/net/lag/kestrel/load/Client.scala +++ b/src/test/scala/net/lag/kestrel/load/Client.scala @@ -145,28 +145,28 @@ object ThriftClient extends Client { withProtocol { p => p.writeMessageBegin(new TMessage("put", TMessageType.CALL, 0)) val item = data.map { item => ByteBuffer.wrap(item.getBytes) } - (new thrift.Kestrel.put_args(queueName, item, 0)).write(p) + (thrift.Kestrel.put_args(queueName, item, 0)).write(p) } } def putNSuccess(count: Int) = { withProtocol { p => p.writeMessageBegin(new TMessage("put", TMessageType.REPLY, 0)) - (new thrift.Kestrel.put_result(success = Some(count))).write(p) + (thrift.Kestrel.put_result(success = Some(count))).write(p) } } def flush(queueName: String) = { withProtocol { p => p.writeMessageBegin(new TMessage("flush_queue", TMessageType.CALL, 0)) - (new thrift.Kestrel.flushQueue_args(queueName)).write(p) + (thrift.Kestrel.flushQueue_args(queueName)).write(p) } } def flushSuccess() = { withProtocol { p => p.writeMessageBegin(new TMessage("flush_queue", TMessageType.REPLY, 0)) - (new thrift.Kestrel.flushQueue_result()).write(p) + (thrift.Kestrel.flushQueue_result()).write(p) } } @@ -186,15 +186,15 @@ object ThriftClient extends Client { def monitor(queueName: String, timeoutMsec: Int, maxItems: Int) = { withProtocol { p => p.writeMessageBegin(new TMessage("get", TMessageType.CALL, 0)) - (new thrift.Kestrel.get_args(queueName, maxItems, timeoutMsec, 0)).write(p) + (thrift.Kestrel.get_args(queueName, maxItems, timeoutMsec, 0)).write(p) } } def monitorSuccess(queueName: String, data: Seq[String]) = { withProtocol { p => p.writeMessageBegin(new TMessage("get", TMessageType.REPLY, 0)) - val items = data.map { item => new thrift.Item(ByteBuffer.wrap(item.getBytes), 0) } - (new thrift.Kestrel.get_result(success = Some(items))).write(p) + val items = data.map { item => thrift.Item(ByteBuffer.wrap(item.getBytes), 0) } + (thrift.Kestrel.get_result(success = Some(items))).write(p) } } diff --git a/src/test/scala/net/lag/kestrel/load/Flood.scala b/src/test/scala/net/lag/kestrel/load/Flood.scala index e2719e1e..e929bb64 100644 --- a/src/test/scala/net/lag/kestrel/load/Flood.scala +++ b/src/test/scala/net/lag/kestrel/load/Flood.scala @@ -221,8 +221,32 @@ object Flood extends LoadTesting { put(socket, queueName, prefillItems, data) } - println("flood: producers=%d consumers=%d each sending %d items of %dkB through %s".format( - producerThreadCount, consumerThreadCount, totalItems, kilobytes, queueName)) + // round up itemsPerProducer (we may over-produce), but not itemsPerConsumer (we will not over-consume) + // (consuming more than producing will result in the test hanging) + val itemsPerProducer = { + val base = ((totalItems + producerThreadCount - 1) / producerThreadCount) max 1 + if (rollup > 1 && base % rollup > 0) { + // bump up to the next multiple of rollup + rollup - (base % rollup) + base + } else { + base + } + } + + val itemsPerConsumer = { + val base = (totalItems / consumerThreadCount) max 1 + if (rollup > 1 && base % rollup > 0) { + // bump up to the next multiple of rollup + rollup - (base % rollup) + base + } else { + base + } + } + + println("flood: producers=%d each sending %d items (in chunks of %d) of %dkB to %s".format( + producerThreadCount, itemsPerProducer, rollup, kilobytes, queueName)) + println("flood: consumers=%d each reading %d items (in chunks of %d) from %s".format( + consumerThreadCount, itemsPerConsumer, rollup, queueName)) var threadList: List[Thread] = Nil val misses = new AtomicInteger @@ -231,7 +255,7 @@ object Flood extends LoadTesting { val producerThread = new Thread { override def run = { val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) } - put(socket, queueName, totalItems, data) + put(socket, queueName, itemsPerProducer, data) } } threadList = producerThread :: threadList @@ -241,7 +265,7 @@ object Flood extends LoadTesting { val consumerThread = new Thread { override def run = { val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) } - val n = get(socket, queueName, totalItems, data) + val n = get(socket, queueName, itemsPerConsumer, data) socket.close() misses.addAndGet(n) } diff --git a/src/test/scala/net/lag/kestrel/load/LoadTesting.scala b/src/test/scala/net/lag/kestrel/load/LoadTesting.scala index b654efcf..fb4b7352 100644 --- a/src/test/scala/net/lag/kestrel/load/LoadTesting.scala +++ b/src/test/scala/net/lag/kestrel/load/LoadTesting.scala @@ -96,30 +96,40 @@ trait LoadTesting { val failedConnects = new AtomicInteger(0) + @tailrec final def tryHard[A](f: => A): A = { - try { - f + val result = try { + Right(f) } catch { case e: java.io.IOException => - failedConnects.incrementAndGet() - tryHard(f) + val failures = failedConnects.incrementAndGet() + if (failures % 1000 == 0) println("Failed to connect after %d attempts".format(failures)) + Left(e) + } + result match { + case Right(a) => a + case Left(_) => tryHard(f) } } def monitorQueue(hostname: String, queueName: String) { + val client = new TestClient(hostname, 22133) + client.connect() + val stats = client.stats() + val baseTotalItems = stats.getOrElse("queue_" + queueName + "_total_items", "0").toInt + val t = new Thread("monitor-queue") { override def run() { - val client = new TestClient(hostname, 22133) - client.connect() while (true) { val stats = client.stats() val items = stats.getOrElse("queue_" + queueName + "_items", "0").toInt + val totalItems = stats.getOrElse("queue_" + queueName + "_total_items", "0").toInt val bytes = stats.getOrElse("queue_" + queueName + "_bytes", "0").toInt val memItems = stats.getOrElse("queue_" + queueName + "_mem_items", "0").toInt val memBytes = stats.getOrElse("queue_" + queueName + "_mem_bytes", "0").toInt val journalBytes = stats.getOrElse("queue_" + queueName + "_logsize", "0").toInt - println("%s: items=%d bytes=%d mem_items=%d mem_bytes=%d journal_bytes=%d".format( - queueName, items, bytes, memItems, memBytes, journalBytes + println("%s: items=%d total_items=%d (%d) bytes=%d mem_items=%d mem_bytes=%d journal_bytes=%d".format( + queueName, items, totalItems, totalItems - baseTotalItems, bytes, memItems, memBytes, journalBytes )) Thread.sleep(1000) }