Permalink
Browse files

modernize kestrel libraries

RB_ID=88730
  • Loading branch information...
Stephan Zuercher
Stephan Zuercher committed Sep 27, 2012
1 parent 962c65c commit 59e3d333d3a7dab586428aa9fcac055cb85cf1ed
View
@@ -6,6 +6,7 @@ release: TBD
- Support for service discovery via ZooKeeper
- Kestrel now ignores directories in its queue path.
- Fix load test scripts to use correct JAR name [Bryan English]
+- Scala 2.9.2, finagle 5.3.x
2.3.4
-----
View
@@ -369,7 +369,7 @@ The kestrel implementation of the memcache protocol commands is described below.
per-item, so many clients may monitor a queue at once. After the given
timeout, a separate `END` response will signal the end of the monitor
period. Any fetched items are open transactions (see "Reliable Reads"
- below), and should be closed with `CONFIRM`.
+ below), and should be closed with `CONFIRM`.
- `CONFIRM <queue-name> <count>`
View
@@ -3,7 +3,7 @@ import Keys._
import com.twitter.sbt._
object Kestrel extends Build {
- val finagleVersion = "1.11.1"
+ val finagleVersion = "5.3.17"
lazy val root = Project(
id = "kestrel",
@@ -16,21 +16,20 @@ object Kestrel extends Build {
name := "kestrel",
organization := "net.lag",
version := "2.4.0-SNAPSHOT",
- scalaVersion := "2.9.1",
+ scalaVersion := "2.9.2",
// time-based tests cannot be run in parallel
logBuffered in Test := false,
parallelExecution in Test := false,
libraryDependencies ++= Seq(
- "com.twitter" %% "ostrich" % "4.10.6",
- "com.twitter" %% "naggati" % "2.2.3" intransitive(), // allow custom netty
- "com.twitter" %% "finagle-core" % finagleVersion,
- "com.twitter" %% "finagle-ostrich4" % finagleVersion,
- "com.twitter" %% "finagle-thrift" % finagleVersion, // override scrooge's version
- "commons-codec" % "commons-codec" % "1.6", // override scrooge/util-codec's version
- "com.twitter" %% "scrooge-runtime" % "1.1.3",
- "com.twitter.common.zookeeper" % "server-set" % "1.0.15",
+ "com.twitter" % "ostrich" % "8.2.6",
+ "com.twitter" %% "naggati" % "4.1.0",
+ "com.twitter" % "finagle-core" % finagleVersion,
+ "com.twitter" % "finagle-ostrich4" % finagleVersion,
+ "com.twitter" % "finagle-thrift" % finagleVersion, // override scrooge's version
+ "com.twitter" %% "scrooge-runtime" % "3.0.1",
+ "com.twitter.common.zookeeper" % "server-set" % "1.0.16",
// for tests only:
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test",
@@ -43,7 +42,7 @@ object Kestrel extends Build {
mainClass in Compile := Some("net.lag.kestrel.Kestrel"),
- CompileThriftScrooge.scroogeVersion := "1.1.7",
+ CompileThriftScrooge.scroogeVersion := "3.0.1",
PackageDist.packageDistConfigFilesValidationRegex := Some(".*"),
SubversionPublisher.subversionRepository := Some("https://svn.twitter.biz/maven-public"),
publishArtifact in Test := true
View
@@ -19,7 +19,6 @@ resolvers <<= (resolvers) { r =>
externalResolvers <<= (resolvers) map identity
-addSbtPlugin("com.twitter" %% "sbt-package-dist" % "1.0.5")
-
-addSbtPlugin("com.twitter" %% "sbt11-scrooge" % "1.0.0")
+addSbtPlugin("com.twitter" %% "sbt-package-dist" % "1.0.6")
+addSbtPlugin("com.twitter" %% "sbt11-scrooge" % "3.0.0")
@@ -23,13 +23,13 @@ import com.twitter.finagle.{ClientConnection, Codec => FinagleCodec, Service =>
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.stats.OstrichStatsReceiver
import com.twitter.finagle.thrift._
-import com.twitter.finagle.util.{Timer => FinagleTimer}
+import com.twitter.finagle.util.TimerFromNettyTimer
import com.twitter.logging.Logger
import com.twitter.naggati.Codec
import com.twitter.naggati.codec.{MemcacheResponse, MemcacheRequest, MemcacheCodec}
import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service, ServiceTracker}
import com.twitter.ostrich.stats.Stats
-import com.twitter.util.{Duration, Eval, Future, Time}
+import com.twitter.util.{Duration, Eval, Future, Time, Timer}
import java.net.InetSocketAddress
import java.util.Collections._
import java.util.concurrent._
@@ -87,7 +87,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
def startThriftServer(
name: String,
port: Int,
- fTimer: FinagleTimer
+ fTimer: Timer
): Server = {
val address = new InetSocketAddress(listenAddress, port)
var builder = ServerBuilder()
@@ -135,7 +135,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
}
})
- val finagleTimer = new FinagleTimer(timer)
+ val finagleTimer = new TimerFromNettyTimer(timer)
try {
queueCollection = new QueueCollection(queuePath, finagleTimer, journalSyncScheduler,
defaultQueueConfig, builders, aliases)
@@ -18,8 +18,8 @@
package net.lag.kestrel
import java.net.InetSocketAddress
+import java.nio.ByteBuffer
import scala.collection.mutable
-import com.twitter.concurrent.ChannelSource
import com.twitter.conversions.time._
import com.twitter.finagle.{ClientConnection, Service}
import com.twitter.logging.Logger
@@ -90,8 +90,11 @@ class MemcacheHandler(
} else {
Some(Time.epoch + expiry.seconds)
}
+ val buffer = request.data.get
+ val data = new Array[Byte](buffer.remaining())
+ buffer.get(data)
try {
- if (handler.setItem(request.line(1), request.line(2).toInt, normalizedExpiry, request.data.get)) {
+ if (handler.setItem(request.line(1), request.line(2).toInt, normalizedExpiry, data)) {
Future(new MemcacheResponse("STORED"))
} else {
Future(new MemcacheResponse("NOT_STORED"))
@@ -191,7 +194,7 @@ class MemcacheHandler(
case None =>
new MemcacheResponse("END")
case Some(item) =>
- new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data))
+ new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(ByteBuffer.wrap(item.data)))
}
}
} catch {
@@ -211,7 +214,7 @@ class MemcacheHandler(
case None =>
channel.send(new MemcacheResponse("END") then Codec.EndStream)
case Some(item) =>
- channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data)))
+ channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(ByteBuffer.wrap(item.data))))
}
}
new MemcacheResponse("") then Codec.Stream(channel)
@@ -18,7 +18,6 @@
package net.lag.kestrel
import scala.collection.mutable
-import com.twitter.concurrent.ChannelSource
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.naggati._
@@ -29,7 +29,7 @@ import org.apache.thrift.protocol.TProtocolFactory
import scala.collection.mutable
import scala.collection.Set
-class ThriftFinagledService(val handler: ThriftHandler, override val protocolFactory: TProtocolFactory)
+class ThriftFinagledService(val handler: ThriftHandler, val protocolFactory: TProtocolFactory)
extends thrift.Kestrel.FinagledService(handler, protocolFactory) {
override def release() {
@@ -187,7 +187,7 @@ class ThriftHandler (
}
case Some(item) => {
val externalXid = externalXidOption.getOrElse(0L)
- rv += new thrift.Item(ByteBuffer.wrap(item.data), externalXid)
+ rv += thrift.Item(ByteBuffer.wrap(item.data), externalXid)
}
}
}
@@ -219,7 +219,7 @@ class ThriftHandler (
handler.getItem(queueName, None, false, true).map { itemOption =>
val data = itemOption.map { item => ByteBuffer.wrap(item.data) }
val stats = queueCollection.stats(queueName).toMap
- new thrift.QueueInfo(data, stats("items").toLong, stats("bytes").toLong,
+ thrift.QueueInfo(data, stats("items").toLong, stats("bytes").toLong,
stats("logsize").toLong, stats("age").toLong, stats("waiters").toInt,
stats("open_transactions").toInt)
}
View
@@ -10,7 +10,7 @@
APP_NAME="kestrel"
ADMIN_PORT="2223"
VERSION="@VERSION@"
-SCALA_VERSION="2.9.1"
+SCALA_VERSION="2.9.2"
APP_HOME="/usr/local/$APP_NAME/current"
INITIAL_SLEEP=15
@@ -23,6 +23,7 @@ import com.twitter.naggati.codec.{MemcacheRequest, MemcacheResponse}
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.util.{Future, Promise, Time}
import java.net.InetSocketAddress
+import java.nio.ByteBuffer
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
import scala.collection.mutable
@@ -36,14 +37,17 @@ class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker {
val address = new InetSocketAddress("", 0)
val qitem = QItem(Time.now, None, "state shirt".getBytes, 23)
- def toReq(command: String, data: Option[Array[Byte]] = None): MemcacheRequest = {
+ def toReq(command: String, payload: Option[String] = None): MemcacheRequest = {
val parts = command.split(" ").toList
- val dataLength = data map { _.length + 2 } orElse { Some(0) }
+ val data = payload map { s =>
+ ByteBuffer.wrap(s.getBytes)
+ }
+ val dataLength = data map { _.remaining + 2 } orElse { Some(0) }
MemcacheRequest(parts, data, command.length + 2 + dataLength.get)
}
def toResp(queue: String, qItem: QItem): MemcacheResponse = {
- MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(qItem.data))
+ MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(ByteBuffer.wrap(qItem.data)))
}
val endResponse = MemcacheResponse("END", None)
@@ -372,7 +376,7 @@ class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker {
}
val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
- memcacheHandler(toReq("set test 0 0 5", Some("hello".getBytes)))() mustEqual MemcacheResponse("STORED", None)
+ memcacheHandler(toReq("set test 0 0 5", Some("hello")))() mustEqual MemcacheResponse("STORED", None)
}
}
@@ -357,7 +357,7 @@ class ThriftHandlerSpec extends Specification with JMocker with ClassMocker {
).toArray
}
- val qinfo = new thrift.QueueInfo(Some(ByteBuffer.wrap(item1)), 10, 10240, 29999, 500, 2, 1)
+ val qinfo = thrift.QueueInfo(Some(ByteBuffer.wrap(item1)), 10, 10240, 29999, 500, 2, 1)
thriftHandler.peek("test")() mustEqual qinfo
}
}
@@ -145,28 +145,28 @@ object ThriftClient extends Client {
withProtocol { p =>
p.writeMessageBegin(new TMessage("put", TMessageType.CALL, 0))
val item = data.map { item => ByteBuffer.wrap(item.getBytes) }
- (new thrift.Kestrel.put_args(queueName, item, 0)).write(p)
+ (thrift.Kestrel.put_args(queueName, item, 0)).write(p)
}
}
def putNSuccess(count: Int) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("put", TMessageType.REPLY, 0))
- (new thrift.Kestrel.put_result(success = Some(count))).write(p)
+ (thrift.Kestrel.put_result(success = Some(count))).write(p)
}
}
def flush(queueName: String) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("flush_queue", TMessageType.CALL, 0))
- (new thrift.Kestrel.flushQueue_args(queueName)).write(p)
+ (thrift.Kestrel.flushQueue_args(queueName)).write(p)
}
}
def flushSuccess() = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("flush_queue", TMessageType.REPLY, 0))
- (new thrift.Kestrel.flushQueue_result()).write(p)
+ (thrift.Kestrel.flushQueue_result()).write(p)
}
}
@@ -186,15 +186,15 @@ object ThriftClient extends Client {
def monitor(queueName: String, timeoutMsec: Int, maxItems: Int) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("get", TMessageType.CALL, 0))
- (new thrift.Kestrel.get_args(queueName, maxItems, timeoutMsec, 0)).write(p)
+ (thrift.Kestrel.get_args(queueName, maxItems, timeoutMsec, 0)).write(p)
}
}
def monitorSuccess(queueName: String, data: Seq[String]) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("get", TMessageType.REPLY, 0))
- val items = data.map { item => new thrift.Item(ByteBuffer.wrap(item.getBytes), 0) }
- (new thrift.Kestrel.get_result(success = Some(items))).write(p)
+ val items = data.map { item => thrift.Item(ByteBuffer.wrap(item.getBytes), 0) }
+ (thrift.Kestrel.get_result(success = Some(items))).write(p)
}
}
@@ -221,8 +221,32 @@ object Flood extends LoadTesting {
put(socket, queueName, prefillItems, data)
}
- println("flood: producers=%d consumers=%d each sending %d items of %dkB through %s".format(
- producerThreadCount, consumerThreadCount, totalItems, kilobytes, queueName))
+ // round up itemsPerProducer (we may over-produce), but not itemsPerConsumer (we will not over-consume)
+ // (consuming more than producing will result in the test hanging)
+ val itemsPerProducer = {
+ val base = ((totalItems + producerThreadCount - 1) / producerThreadCount) max 1
+ if (rollup > 1 && base % rollup > 0) {
+ // bump up to the next multiple of rollup
+ rollup - (base % rollup) + base
+ } else {
+ base
+ }
+ }
+
+ val itemsPerConsumer = {
+ val base = (totalItems / consumerThreadCount) max 1
+ if (rollup > 1 && base % rollup > 0) {
+ // bump up to the next multiple of rollup
+ rollup - (base % rollup) + base
+ } else {
+ base
+ }
+ }
+
+ println("flood: producers=%d each sending %d items (in chunks of %d) of %dkB to %s".format(
+ producerThreadCount, itemsPerProducer, rollup, kilobytes, queueName))
+ println("flood: consumers=%d each reading %d items (in chunks of %d) from %s".format(
+ consumerThreadCount, itemsPerConsumer, rollup, queueName))
var threadList: List[Thread] = Nil
val misses = new AtomicInteger
@@ -231,7 +255,7 @@ object Flood extends LoadTesting {
val producerThread = new Thread {
override def run = {
val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) }
- put(socket, queueName, totalItems, data)
+ put(socket, queueName, itemsPerProducer, data)
}
}
threadList = producerThread :: threadList
@@ -241,7 +265,7 @@ object Flood extends LoadTesting {
val consumerThread = new Thread {
override def run = {
val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) }
- val n = get(socket, queueName, totalItems, data)
+ val n = get(socket, queueName, itemsPerConsumer, data)
socket.close()
misses.addAndGet(n)
}
@@ -96,30 +96,40 @@ trait LoadTesting {
val failedConnects = new AtomicInteger(0)
+ @tailrec
final def tryHard[A](f: => A): A = {
- try {
- f
+ val result = try {
+ Right(f)
} catch {
case e: java.io.IOException =>
- failedConnects.incrementAndGet()
- tryHard(f)
+ val failures = failedConnects.incrementAndGet()
+ if (failures % 1000 == 0) println("Failed to connect after %d attempts".format(failures))
+ Left(e)
+ }
+ result match {
+ case Right(a) => a
+ case Left(_) => tryHard(f)
}
}
def monitorQueue(hostname: String, queueName: String) {
+ val client = new TestClient(hostname, 22133)
+ client.connect()
+ val stats = client.stats()
+ val baseTotalItems = stats.getOrElse("queue_" + queueName + "_total_items", "0").toInt
+
val t = new Thread("monitor-queue") {
override def run() {
- val client = new TestClient(hostname, 22133)
- client.connect()
while (true) {
val stats = client.stats()
val items = stats.getOrElse("queue_" + queueName + "_items", "0").toInt
+ val totalItems = stats.getOrElse("queue_" + queueName + "_total_items", "0").toInt
val bytes = stats.getOrElse("queue_" + queueName + "_bytes", "0").toInt
val memItems = stats.getOrElse("queue_" + queueName + "_mem_items", "0").toInt
val memBytes = stats.getOrElse("queue_" + queueName + "_mem_bytes", "0").toInt
val journalBytes = stats.getOrElse("queue_" + queueName + "_logsize", "0").toInt
- println("%s: items=%d bytes=%d mem_items=%d mem_bytes=%d journal_bytes=%d".format(
- queueName, items, bytes, memItems, memBytes, journalBytes
+ println("%s: items=%d total_items=%d (%d) bytes=%d mem_items=%d mem_bytes=%d journal_bytes=%d".format(
+ queueName, items, totalItems, totalItems - baseTotalItems, bytes, memItems, memBytes, journalBytes
))
Thread.sleep(1000)
}

0 comments on commit 59e3d33

Please sign in to comment.