Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
modernize kestrel libraries
Browse files Browse the repository at this point in the history
RB_ID=88730
  • Loading branch information
Stephan Zuercher committed Sep 27, 2012
1 parent 962c65c commit 59e3d33
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 52 deletions.
1 change: 1 addition & 0 deletions ChangeLog
Expand Up @@ -6,6 +6,7 @@ release: TBD
- Support for service discovery via ZooKeeper
- Kestrel now ignores directories in its queue path.
- Fix load test scripts to use correct JAR name [Bryan English]
- Scala 2.9.2, finagle 5.3.x

2.3.4
-----
Expand Down
2 changes: 1 addition & 1 deletion docs/guide.md
Expand Up @@ -369,7 +369,7 @@ The kestrel implementation of the memcache protocol commands is described below.
per-item, so many clients may monitor a queue at once. After the given
timeout, a separate `END` response will signal the end of the monitor
period. Any fetched items are open transactions (see "Reliable Reads"
below), and should be closed with `CONFIRM`.
below), and should be closed with `CONFIRM`.

- `CONFIRM <queue-name> <count>`

Expand Down
21 changes: 10 additions & 11 deletions project/Build.scala
Expand Up @@ -3,7 +3,7 @@ import Keys._
import com.twitter.sbt._

object Kestrel extends Build {
val finagleVersion = "1.11.1"
val finagleVersion = "5.3.17"

lazy val root = Project(
id = "kestrel",
Expand All @@ -16,21 +16,20 @@ object Kestrel extends Build {
name := "kestrel",
organization := "net.lag",
version := "2.4.0-SNAPSHOT",
scalaVersion := "2.9.1",
scalaVersion := "2.9.2",

// time-based tests cannot be run in parallel
logBuffered in Test := false,
parallelExecution in Test := false,

libraryDependencies ++= Seq(
"com.twitter" %% "ostrich" % "4.10.6",
"com.twitter" %% "naggati" % "2.2.3" intransitive(), // allow custom netty
"com.twitter" %% "finagle-core" % finagleVersion,
"com.twitter" %% "finagle-ostrich4" % finagleVersion,
"com.twitter" %% "finagle-thrift" % finagleVersion, // override scrooge's version
"commons-codec" % "commons-codec" % "1.6", // override scrooge/util-codec's version
"com.twitter" %% "scrooge-runtime" % "1.1.3",
"com.twitter.common.zookeeper" % "server-set" % "1.0.15",
"com.twitter" % "ostrich" % "8.2.6",
"com.twitter" %% "naggati" % "4.1.0",
"com.twitter" % "finagle-core" % finagleVersion,
"com.twitter" % "finagle-ostrich4" % finagleVersion,
"com.twitter" % "finagle-thrift" % finagleVersion, // override scrooge's version
"com.twitter" %% "scrooge-runtime" % "3.0.1",
"com.twitter.common.zookeeper" % "server-set" % "1.0.16",

// for tests only:
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test",
Expand All @@ -43,7 +42,7 @@ object Kestrel extends Build {

mainClass in Compile := Some("net.lag.kestrel.Kestrel"),

CompileThriftScrooge.scroogeVersion := "1.1.7",
CompileThriftScrooge.scroogeVersion := "3.0.1",
PackageDist.packageDistConfigFilesValidationRegex := Some(".*"),
SubversionPublisher.subversionRepository := Some("https://svn.twitter.biz/maven-public"),
publishArtifact in Test := true
Expand Down
5 changes: 2 additions & 3 deletions project/plugins.sbt
Expand Up @@ -19,7 +19,6 @@ resolvers <<= (resolvers) { r =>

externalResolvers <<= (resolvers) map identity

addSbtPlugin("com.twitter" %% "sbt-package-dist" % "1.0.5")

addSbtPlugin("com.twitter" %% "sbt11-scrooge" % "1.0.0")
addSbtPlugin("com.twitter" %% "sbt-package-dist" % "1.0.6")

addSbtPlugin("com.twitter" %% "sbt11-scrooge" % "3.0.0")
8 changes: 4 additions & 4 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Expand Up @@ -23,13 +23,13 @@ import com.twitter.finagle.{ClientConnection, Codec => FinagleCodec, Service =>
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.stats.OstrichStatsReceiver
import com.twitter.finagle.thrift._
import com.twitter.finagle.util.{Timer => FinagleTimer}
import com.twitter.finagle.util.TimerFromNettyTimer
import com.twitter.logging.Logger
import com.twitter.naggati.Codec
import com.twitter.naggati.codec.{MemcacheResponse, MemcacheRequest, MemcacheCodec}
import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service, ServiceTracker}
import com.twitter.ostrich.stats.Stats
import com.twitter.util.{Duration, Eval, Future, Time}
import com.twitter.util.{Duration, Eval, Future, Time, Timer}
import java.net.InetSocketAddress
import java.util.Collections._
import java.util.concurrent._
Expand Down Expand Up @@ -87,7 +87,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
def startThriftServer(
name: String,
port: Int,
fTimer: FinagleTimer
fTimer: Timer
): Server = {
val address = new InetSocketAddress(listenAddress, port)
var builder = ServerBuilder()
Expand Down Expand Up @@ -135,7 +135,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
}
})

val finagleTimer = new FinagleTimer(timer)
val finagleTimer = new TimerFromNettyTimer(timer)
try {
queueCollection = new QueueCollection(queuePath, finagleTimer, journalSyncScheduler,
defaultQueueConfig, builders, aliases)
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/net/lag/kestrel/MemcacheHandler.scala
Expand Up @@ -18,8 +18,8 @@
package net.lag.kestrel

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import scala.collection.mutable
import com.twitter.concurrent.ChannelSource
import com.twitter.conversions.time._
import com.twitter.finagle.{ClientConnection, Service}
import com.twitter.logging.Logger
Expand Down Expand Up @@ -90,8 +90,11 @@ class MemcacheHandler(
} else {
Some(Time.epoch + expiry.seconds)
}
val buffer = request.data.get
val data = new Array[Byte](buffer.remaining())
buffer.get(data)
try {
if (handler.setItem(request.line(1), request.line(2).toInt, normalizedExpiry, request.data.get)) {
if (handler.setItem(request.line(1), request.line(2).toInt, normalizedExpiry, data)) {
Future(new MemcacheResponse("STORED"))
} else {
Future(new MemcacheResponse("NOT_STORED"))
Expand Down Expand Up @@ -191,7 +194,7 @@ class MemcacheHandler(
case None =>
new MemcacheResponse("END")
case Some(item) =>
new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data))
new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(ByteBuffer.wrap(item.data)))
}
}
} catch {
Expand All @@ -211,7 +214,7 @@ class MemcacheHandler(
case None =>
channel.send(new MemcacheResponse("END") then Codec.EndStream)
case Some(item) =>
channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data)))
channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(ByteBuffer.wrap(item.data))))
}
}
new MemcacheResponse("") then Codec.Stream(channel)
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/net/lag/kestrel/TextHandler.scala
Expand Up @@ -18,7 +18,6 @@
package net.lag.kestrel

import scala.collection.mutable
import com.twitter.concurrent.ChannelSource
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.naggati._
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/net/lag/kestrel/ThriftHandler.scala
Expand Up @@ -29,7 +29,7 @@ import org.apache.thrift.protocol.TProtocolFactory
import scala.collection.mutable
import scala.collection.Set

class ThriftFinagledService(val handler: ThriftHandler, override val protocolFactory: TProtocolFactory)
class ThriftFinagledService(val handler: ThriftHandler, val protocolFactory: TProtocolFactory)
extends thrift.Kestrel.FinagledService(handler, protocolFactory) {

override def release() {
Expand Down Expand Up @@ -187,7 +187,7 @@ class ThriftHandler (
}
case Some(item) => {
val externalXid = externalXidOption.getOrElse(0L)
rv += new thrift.Item(ByteBuffer.wrap(item.data), externalXid)
rv += thrift.Item(ByteBuffer.wrap(item.data), externalXid)
}
}
}
Expand Down Expand Up @@ -219,7 +219,7 @@ class ThriftHandler (
handler.getItem(queueName, None, false, true).map { itemOption =>
val data = itemOption.map { item => ByteBuffer.wrap(item.data) }
val stats = queueCollection.stats(queueName).toMap
new thrift.QueueInfo(data, stats("items").toLong, stats("bytes").toLong,
thrift.QueueInfo(data, stats("items").toLong, stats("bytes").toLong,
stats("logsize").toLong, stats("age").toLong, stats("waiters").toInt,
stats("open_transactions").toInt)
}
Expand Down
2 changes: 1 addition & 1 deletion src/scripts/kestrel.sh
Expand Up @@ -10,7 +10,7 @@
APP_NAME="kestrel"
ADMIN_PORT="2223"
VERSION="@VERSION@"
SCALA_VERSION="2.9.1"
SCALA_VERSION="2.9.2"
APP_HOME="/usr/local/$APP_NAME/current"
INITIAL_SLEEP=15

Expand Down
12 changes: 8 additions & 4 deletions src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala
Expand Up @@ -23,6 +23,7 @@ import com.twitter.naggati.codec.{MemcacheRequest, MemcacheResponse}
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.util.{Future, Promise, Time}
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
import scala.collection.mutable
Expand All @@ -36,14 +37,17 @@ class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker {
val address = new InetSocketAddress("", 0)
val qitem = QItem(Time.now, None, "state shirt".getBytes, 23)

def toReq(command: String, data: Option[Array[Byte]] = None): MemcacheRequest = {
def toReq(command: String, payload: Option[String] = None): MemcacheRequest = {
val parts = command.split(" ").toList
val dataLength = data map { _.length + 2 } orElse { Some(0) }
val data = payload map { s =>
ByteBuffer.wrap(s.getBytes)
}
val dataLength = data map { _.remaining + 2 } orElse { Some(0) }
MemcacheRequest(parts, data, command.length + 2 + dataLength.get)
}

def toResp(queue: String, qItem: QItem): MemcacheResponse = {
MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(qItem.data))
MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(ByteBuffer.wrap(qItem.data)))
}

val endResponse = MemcacheResponse("END", None)
Expand Down Expand Up @@ -372,7 +376,7 @@ class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker {
}

val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
memcacheHandler(toReq("set test 0 0 5", Some("hello".getBytes)))() mustEqual MemcacheResponse("STORED", None)
memcacheHandler(toReq("set test 0 0 5", Some("hello")))() mustEqual MemcacheResponse("STORED", None)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala
Expand Up @@ -357,7 +357,7 @@ class ThriftHandlerSpec extends Specification with JMocker with ClassMocker {
).toArray
}

val qinfo = new thrift.QueueInfo(Some(ByteBuffer.wrap(item1)), 10, 10240, 29999, 500, 2, 1)
val qinfo = thrift.QueueInfo(Some(ByteBuffer.wrap(item1)), 10, 10240, 29999, 500, 2, 1)
thriftHandler.peek("test")() mustEqual qinfo
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/test/scala/net/lag/kestrel/load/Client.scala
Expand Up @@ -145,28 +145,28 @@ object ThriftClient extends Client {
withProtocol { p =>
p.writeMessageBegin(new TMessage("put", TMessageType.CALL, 0))
val item = data.map { item => ByteBuffer.wrap(item.getBytes) }
(new thrift.Kestrel.put_args(queueName, item, 0)).write(p)
(thrift.Kestrel.put_args(queueName, item, 0)).write(p)
}
}

def putNSuccess(count: Int) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("put", TMessageType.REPLY, 0))
(new thrift.Kestrel.put_result(success = Some(count))).write(p)
(thrift.Kestrel.put_result(success = Some(count))).write(p)
}
}

def flush(queueName: String) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("flush_queue", TMessageType.CALL, 0))
(new thrift.Kestrel.flushQueue_args(queueName)).write(p)
(thrift.Kestrel.flushQueue_args(queueName)).write(p)
}
}

def flushSuccess() = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("flush_queue", TMessageType.REPLY, 0))
(new thrift.Kestrel.flushQueue_result()).write(p)
(thrift.Kestrel.flushQueue_result()).write(p)
}
}

Expand All @@ -186,15 +186,15 @@ object ThriftClient extends Client {
def monitor(queueName: String, timeoutMsec: Int, maxItems: Int) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("get", TMessageType.CALL, 0))
(new thrift.Kestrel.get_args(queueName, maxItems, timeoutMsec, 0)).write(p)
(thrift.Kestrel.get_args(queueName, maxItems, timeoutMsec, 0)).write(p)
}
}

def monitorSuccess(queueName: String, data: Seq[String]) = {
withProtocol { p =>
p.writeMessageBegin(new TMessage("get", TMessageType.REPLY, 0))
val items = data.map { item => new thrift.Item(ByteBuffer.wrap(item.getBytes), 0) }
(new thrift.Kestrel.get_result(success = Some(items))).write(p)
val items = data.map { item => thrift.Item(ByteBuffer.wrap(item.getBytes), 0) }
(thrift.Kestrel.get_result(success = Some(items))).write(p)
}
}

Expand Down
32 changes: 28 additions & 4 deletions src/test/scala/net/lag/kestrel/load/Flood.scala
Expand Up @@ -221,8 +221,32 @@ object Flood extends LoadTesting {
put(socket, queueName, prefillItems, data)
}

println("flood: producers=%d consumers=%d each sending %d items of %dkB through %s".format(
producerThreadCount, consumerThreadCount, totalItems, kilobytes, queueName))
// round up itemsPerProducer (we may over-produce), but not itemsPerConsumer (we will not over-consume)
// (consuming more than producing will result in the test hanging)
val itemsPerProducer = {
val base = ((totalItems + producerThreadCount - 1) / producerThreadCount) max 1
if (rollup > 1 && base % rollup > 0) {
// bump up to the next multiple of rollup
rollup - (base % rollup) + base
} else {
base
}
}

val itemsPerConsumer = {
val base = (totalItems / consumerThreadCount) max 1
if (rollup > 1 && base % rollup > 0) {
// bump up to the next multiple of rollup
rollup - (base % rollup) + base
} else {
base
}
}

println("flood: producers=%d each sending %d items (in chunks of %d) of %dkB to %s".format(
producerThreadCount, itemsPerProducer, rollup, kilobytes, queueName))
println("flood: consumers=%d each reading %d items (in chunks of %d) from %s".format(
consumerThreadCount, itemsPerConsumer, rollup, queueName))

var threadList: List[Thread] = Nil
val misses = new AtomicInteger
Expand All @@ -231,7 +255,7 @@ object Flood extends LoadTesting {
val producerThread = new Thread {
override def run = {
val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) }
put(socket, queueName, totalItems, data)
put(socket, queueName, itemsPerProducer, data)
}
}
threadList = producerThread :: threadList
Expand All @@ -241,7 +265,7 @@ object Flood extends LoadTesting {
val consumerThread = new Thread {
override def run = {
val socket = tryHard { SocketChannel.open(new InetSocketAddress(hostname, port)) }
val n = get(socket, queueName, totalItems, data)
val n = get(socket, queueName, itemsPerConsumer, data)
socket.close()
misses.addAndGet(n)
}
Expand Down
26 changes: 18 additions & 8 deletions src/test/scala/net/lag/kestrel/load/LoadTesting.scala
Expand Up @@ -96,30 +96,40 @@ trait LoadTesting {

val failedConnects = new AtomicInteger(0)

@tailrec
final def tryHard[A](f: => A): A = {
try {
f
val result = try {
Right(f)
} catch {
case e: java.io.IOException =>
failedConnects.incrementAndGet()
tryHard(f)
val failures = failedConnects.incrementAndGet()
if (failures % 1000 == 0) println("Failed to connect after %d attempts".format(failures))
Left(e)
}
result match {
case Right(a) => a
case Left(_) => tryHard(f)
}
}

def monitorQueue(hostname: String, queueName: String) {
val client = new TestClient(hostname, 22133)
client.connect()
val stats = client.stats()
val baseTotalItems = stats.getOrElse("queue_" + queueName + "_total_items", "0").toInt

val t = new Thread("monitor-queue") {
override def run() {
val client = new TestClient(hostname, 22133)
client.connect()
while (true) {
val stats = client.stats()
val items = stats.getOrElse("queue_" + queueName + "_items", "0").toInt
val totalItems = stats.getOrElse("queue_" + queueName + "_total_items", "0").toInt
val bytes = stats.getOrElse("queue_" + queueName + "_bytes", "0").toInt
val memItems = stats.getOrElse("queue_" + queueName + "_mem_items", "0").toInt
val memBytes = stats.getOrElse("queue_" + queueName + "_mem_bytes", "0").toInt
val journalBytes = stats.getOrElse("queue_" + queueName + "_logsize", "0").toInt
println("%s: items=%d bytes=%d mem_items=%d mem_bytes=%d journal_bytes=%d".format(
queueName, items, bytes, memItems, memBytes, journalBytes
println("%s: items=%d total_items=%d (%d) bytes=%d mem_items=%d mem_bytes=%d journal_bytes=%d".format(
queueName, items, totalItems, totalItems - baseTotalItems, bytes, memItems, memBytes, journalBytes
))
Thread.sleep(1000)
}
Expand Down

0 comments on commit 59e3d33

Please sign in to comment.