Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

kestrel: server status & zookeeper server sets

RB_ID=81686
  • Loading branch information...
commit 1ae7657e2e272e8ee2dae8f93697ca607f228bfd 1 parent 8c95afc
@zuercher zuercher authored
Showing with 2,342 additions and 136 deletions.
  1. +4 −1 .gitignore
  2. +8 −0 ChangeLog
  3. +91 −6 docs/guide.md
  4. +2 −1  project/Build.scala
  5. +5 −1 src/main/scala/net/lag/kestrel/Journal.scala
  6. +45 −12 src/main/scala/net/lag/kestrel/Kestrel.scala
  7. +67 −1 src/main/scala/net/lag/kestrel/KestrelHandler.scala
  8. +25 −2 src/main/scala/net/lag/kestrel/MemcacheHandler.scala
  9. +269 −0 src/main/scala/net/lag/kestrel/ServerStatus.scala
  10. +25 −3 src/main/scala/net/lag/kestrel/TextHandler.scala
  11. +20 −2 src/main/scala/net/lag/kestrel/ThriftHandler.scala
  12. +211 −0 src/main/scala/net/lag/kestrel/ZooKeeperServerStatus.scala
  13. +151 −4 src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
  14. +25 −1 src/main/thrift/kestrel.thrift
  15. +38 −0 src/test/scala/net/lag/kestrel/JournalSpec.scala
  16. +36 −3 src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
  17. +473 −0 src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala
  18. +3 −2 src/test/scala/net/lag/kestrel/ServerSpec.scala
  19. +383 −0 src/test/scala/net/lag/kestrel/ServerStatusSpec.scala
  20. +43 −1 src/test/scala/net/lag/kestrel/TextHandlerSpec.scala
  21. +206 −96 src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala
  22. +212 −0 src/test/scala/net/lag/kestrel/ZooKeeperServerStatusSpec.scala
View
5 .gitignore
@@ -15,6 +15,9 @@ ignore/
go
kestrel.sublime-*
kcluster-*.gem
+*~
+[#]*[#]
+.[#]*
.ensime
.ensime/
-.ensime_lucene/
+.ensime_lucene/
View
8 ChangeLog
@@ -1,3 +1,11 @@
+2.4.0
+-----
+release: TBD
+
+- Server status: up, read-only, quiescent
+- Support for service discovery via ZooKeeper
+- Kestrel now ignores directories in its queue path.
+
2.3.2
-----
release: 23 Aug 2012
View
97 docs/guide.md
@@ -2,7 +2,8 @@
A working guide to kestrel
==========================
-Kestrel is a very simple message queue that runs on the JVM. It supports multiple protocols:
+Kestrel is a very simple message queue that runs on the JVM. It supports
+multiple protocols:
- memcache: the memcache protocol, with some extensions
- thrift: Apache Thrift-based RPC
@@ -26,9 +27,10 @@ case-sensitive.
A cluster of kestrel servers is like a memcache cluster: the servers don't
know about each other, and don't do any cross-communication, so you can add as
-many as you like. Clients have a list of all servers in the cluster, and pick
-one at random for each operation. In this way, each queue appears to be spread
-out across every server, with items in a loose ordering.
+many as you like. The simplest clients have a list of all servers in the
+cluster, and pick one at random for each operation. In this way, each queue
+appears to be spread out across every server, with items in a loose ordering.
+More advanced clients can find kestrel servers via ZooKeeper.
When kestrel starts up, it scans the journal folder and creates queues based
on any journal files it finds there, to restore state to the way it was when
@@ -55,8 +57,8 @@ a server (which can be done over telnet).
To reload the config file on a running server, send "reload" the same way.
You should immediately see the changes in "dump_config", to confirm. Reloading
-will only affect queue configuration, not global server configuration. To
-change the server configuration, restart the server.
+will only affect queue and alias configuration, not global server configuration.
+To change the server configuration, restart the server.
Logging is configured according to `util-logging`. The logging configuration
syntax is described here:
@@ -322,6 +324,16 @@ The kestrel implementation of the memcache protocol commands is described below.
to a `MONITOR` command, to confirm the items that arrived during the monitor
period.
+- `STATUS`
+
+Displays the kestrel server's current status (see section on Server Status,
+below).
+
+- `STATUS <new-status>`
+
+Switches the kestrel server's current status to the given status (see section
+on Server Status, below).
+
#### Reliable reads
-------------------
@@ -375,6 +387,79 @@ memcache protocol instead.
The text protocol does not support reliable reads.
+Server Status
+-------------
+
+Each kestrel server maintains its current status. Normal statuses are
+
+- `Up`: the server is available for all operations
+- `ReadOnly`: the server is available for non-modifying operations only;
+ commands that modify queues (set, delete, flush) are rejected as
+ errors.
+- `Quiescent`: the server rejects as an error operations on any queue. One
+ notable exception is transactions begun before the server entered
+ the quiesecent state may still be confirmed.
+
+One additional status is `Down`, which is only used transiently when kestrel is
+in the process of shutting down.
+
+The server's current status is persisted (specified in
+[KestrelConfig](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html)).
+When kestrel is restarted it automatically returns to it's previous status,
+based on the value in the status file. If the status file does not exist or
+cannot be read, kestrel uses a default status, also configured in KestrelConfig.
+
+When changing from a less restrictive status to a more restrictive status
+(e.g., from `Up` to `ReadOnly` or from `ReadOnly` to `Quiescent`), the
+config option `statusChangeGracePeriod` determines how long kestrel will
+continue to allow restricted operations to continue before it begins rejecting
+them. This allows clients that are aware of the kestrel server's status a
+grace period to learn the new status and cease the forbidden operations before
+beginning to encounter errors.
+
+### ZooKeeper Server Sets
+-------------------------
+
+Kestrel uses Twitter's ServerSet library to support discovery of kestrel
+servers allowing a given operation. The ServerSet class is documented here:
+[ServerSet](http://twitter.github.com/commons/apidocs/index.html#com.twitter.common.zookeeper.ServerSet)
+
+If the optional `zookeeper` field of `KestrelConfig` is specified, kestrel will
+attempt to use the given configuration to join a logical set of kestrel servers.
+The ZooKeeper host, port and other connection options are documented here:
+[ZooKeeperBuilder](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/ZooKeeperBuilder.html)
+
+Kestrel servers will join 0, 1, or 2 server sets depending on their current
+status. When `Up`, the server joins two server sets: one for writes and one for
+reads. When `ReadOnly`, the server joins only the read set. When `Quiescent`,
+the server joins no sets. ZooKeeper-aware kestrel clients can watch the
+server set for changes and adjust their connections accordingly. The
+`statusChangeGracePeriod` configuration option may be used to allow clients
+time to detect and react to the status change before they begin receiving
+errors from kestrel.
+
+The ZooKeeper path used to register the server set
+is based on the `pathPrefix` option. Kestrel automatically appends `/write` and
+`/read` to distinguish the write and read sets.
+
+Kestrel advertises all of its endpoints in each server set that it joins.
+The default endpoint is memcache, if configured. The default endpoint falls
+back to the thrift endpoint and then the text protocol endpoint. All three
+endpoints are advertised as additional endpoints under the names `memcache`,
+`thrift` and `text`.
+
+Consider setting the `defaultStatus` option to `Quiescent` to prevent kestrel
+from prematurely advertising its status via ZooKeeper.
+
+Installations that require additional customization of ZooKeeper credentials,
+or other site-specific ZooKeeper initialization can override the
+`clientInitializer` and `serverSetInitializer` options to invoke the
+necessary site-specific code. The recommended implementation is to place
+the site-specific code in its own JAR file, take the necessary steps to
+include the JAR in kestrel's class path, and place as little logic as possible
+in the kestrel configuration file.
+
+
Server stats
------------
View
3  project/Build.scala
@@ -28,8 +28,9 @@ object Kestrel extends Build {
"com.twitter" %% "finagle-core" % finagleVersion,
"com.twitter" %% "finagle-ostrich4" % finagleVersion,
"com.twitter" %% "finagle-thrift" % finagleVersion, // override scrooge's version
- "org.jboss.netty" % "netty" % "3.2.6.Final",
+ "commons-codec" % "commons-codec" % "1.6", // override scrooge/util-codec's version
"com.twitter" %% "scrooge-runtime" % "1.1.3",
+ "com.twitter.common.zookeeper" % "server-set" % "1.0.14",
// for tests only:
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test",
View
6 src/main/scala/net/lag/kestrel/Journal.scala
@@ -503,7 +503,11 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
object Journal {
def getQueueNamesFromFolder(path: File): Set[String] = {
- path.list().filter { name =>
+ path.listFiles().filter { file =>
+ !file.isDirectory()
+ }.map { file =>
+ file.getName
+ }.filter { name =>
!(name contains "~~")
}.map { name =>
name.split('.')(0)
View
57 src/main/scala/net/lag/kestrel/Kestrel.scala
@@ -27,11 +27,11 @@ import com.twitter.finagle.util.{Timer => FinagleTimer}
import com.twitter.logging.Logger
import com.twitter.naggati.Codec
import com.twitter.naggati.codec.{MemcacheResponse, MemcacheRequest, MemcacheCodec}
-import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service,
- ServiceTracker}
+import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service, ServiceTracker}
import com.twitter.ostrich.stats.Stats
import com.twitter.util.{Duration, Eval, Future, Time}
import java.net.InetSocketAddress
+import java.util.Collections._
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import org.apache.thrift.protocol.TBinaryProtocol
@@ -43,7 +43,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int],
thriftListenPort: Option[Int], queuePath: String,
expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration],
- maxOpenTransactions: Int, connectionBacklog: Option[Int])
+ maxOpenTransactions: Int, connectionBacklog: Option[Int], statusFile: String,
+ defaultStatus: Status, statusChangeGracePeriod: Duration,
+ zkConfig: Option[ZooKeeperConfig])
extends Service {
private val log = Logger.get(getClass.getName)
@@ -55,6 +57,8 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
var thriftService: Option[Server] = None
var expirationBackgroundProcess: Option[PeriodicBackgroundProcess] = None
+ var serverStatus: ServerStatus = null
+
def thriftCodec = ThriftServerFramedCodec()
private def finagledCodec[Req, Resp](codec: => Codec[Resp]) = {
@@ -82,7 +86,8 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
def startThriftServer(
name: String,
- port: Int
+ port: Int,
+ fTimer: FinagleTimer
): Server = {
val address = new InetSocketAddress(listenAddress, port)
var builder = ServerBuilder()
@@ -94,8 +99,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) }
// calling build() is equivalent to calling start() in finagle.
builder.build(connection => {
- val handler = new ThriftHandler(connection, queueCollection, maxOpenTransactions,
- new FinagleTimer(timer))
+ val handler = new ThriftHandler(connection, queueCollection, maxOpenTransactions, fTimer, Some(serverStatus))
new ThriftFinagledService(handler, new TBinaryProtocol.Factory())
})
}
@@ -110,9 +114,11 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
def start() {
log.info("Kestrel config: listenAddress=%s memcachePort=%s textPort=%s queuePath=%s " +
- "expirationTimerFrequency=%s clientTimeout=%s maxOpenTransactions=%d connectionBacklog=%s",
+ "expirationTimerFrequency=%s clientTimeout=%s maxOpenTransactions=%d connectionBacklog=%s " +
+ "statusFile=%s defaultStatus=%s statusChangeGracePeriod=%s zookeeper=<%s>",
listenAddress, memcacheListenPort, textListenPort, queuePath,
- expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog)
+ expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog,
+ statusFile, defaultStatus, statusChangeGracePeriod, zkConfig)
Stats.setLabel("version", Kestrel.runtime.jarVersion)
@@ -129,8 +135,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
}
})
+ val finagleTimer = new FinagleTimer(timer)
try {
- queueCollection = new QueueCollection(queuePath, new FinagleTimer(timer), journalSyncScheduler,
+ queueCollection = new QueueCollection(queuePath, finagleTimer, journalSyncScheduler,
defaultQueueConfig, builders, aliases)
queueCollection.loadQueues()
} catch {
@@ -143,13 +150,22 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
Stats.addGauge("bytes") { queueCollection.currentBytes.toDouble }
Stats.addGauge("reserved_memory_ratio") { queueCollection.reservedMemoryRatio }
+ serverStatus =
+ zkConfig.map { cfg =>
+ new ZooKeeperServerStatus(cfg, statusFile, finagleTimer, defaultStatus,
+ statusChangeGracePeriod)
+ } getOrElse {
+ new ServerStatus(statusFile, finagleTimer, defaultStatus, statusChangeGracePeriod)
+ }
+ serverStatus.start()
+
// finagle setup:
val memcachePipelineFactoryCodec = finagledCodec[MemcacheRequest, MemcacheResponse] {
MemcacheCodec.asciiCodec(bytesRead, bytesWritten)
}
memcacheService = memcacheListenPort.map { port =>
startFinagleServer("kestrel-memcache", port, memcachePipelineFactoryCodec) { connection =>
- new MemcacheHandler(connection, queueCollection, maxOpenTransactions)
+ new MemcacheHandler(connection, queueCollection, maxOpenTransactions, Some(serverStatus))
}
}
@@ -158,11 +174,13 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
}
textService = textListenPort.map { port =>
startFinagleServer("kestrel-text", port, textPipelineFactory) { connection =>
- new TextHandler(connection, queueCollection, maxOpenTransactions)
+ new TextHandler(connection, queueCollection, maxOpenTransactions, Some(serverStatus))
}
}
- thriftService = thriftListenPort.map { port => startThriftServer("kestrel-thrift", port) }
+ thriftService = thriftListenPort.map { port =>
+ startThriftServer("kestrel-thrift", port, finagleTimer)
+ }
// optionally, start a periodic timer to clean out expired items.
expirationBackgroundProcess = expirationTimerFrequency.map { period =>
@@ -181,6 +199,21 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
proc.start()
proc
}
+
+ // Order is important: the main endpoint published in zookeeper is the
+ // first configured protocol in the list: memcache, thrift, text.
+ val endpoints =
+ memcacheService.map { s => "memcache" -> s.localAddress } ++
+ thriftService.map { s => "thrift" -> s.localAddress } ++
+ textService.map { s => "text" -> s.localAddress }
+ if (endpoints.nonEmpty) {
+ val mainEndpoint = endpoints.head._1
+ val inetEndpoints =
+ endpoints.map { case (name, addr) => (name, addr.asInstanceOf[InetSocketAddress]) }
+ serverStatus.addEndpoints(mainEndpoint, inetEndpoints.toMap)
+ } else {
+ log.error("No protocols configured; set a listener port for at least one protocol.")
+ }
}
def shutdown() {
View
68 src/main/scala/net/lag/kestrel/KestrelHandler.scala
@@ -29,6 +29,12 @@ import scala.collection.Set
class TooManyOpenReadsException extends Exception("Too many open reads.")
object TooManyOpenReadsException extends TooManyOpenReadsException
+class ServerStatusNotConfiguredException
+extends Exception("Server status not configured.")
+
+class AvailabilityException(op: String)
+extends Exception("Server not available for operation %s".format(op))
+
trait SimplePendingReads {
def queues: QueueCollection
protected def log: Logger
@@ -120,7 +126,8 @@ abstract class KestrelHandler(
val queues: QueueCollection,
val maxOpenReads: Int,
val clientDescription: () => String,
- val sessionId: Int
+ val sessionId: Int,
+ val serverStatus: Option[ServerStatus]
) {
protected val log = Logger.get(getClass.getName)
@@ -142,6 +149,7 @@ abstract class KestrelHandler(
}
def flushAllQueues() {
+ checkBlockWrites("flushAll", "<all>")
queues.queueNames.foreach { qName => queues.flush(qName) }
}
@@ -151,10 +159,17 @@ abstract class KestrelHandler(
// will do a continuous fetch on a queue until time runs out or read buffer is full.
final def monitorUntil(key: String, timeLimit: Option[Time], maxItems: Int, opening: Boolean)(f: (Option[QItem], Option[Long]) => Unit) {
+ checkBlockReads("monitorUntil", key)
+
log.debug("monitor -> q=%s t=%s max=%d open=%s", key, timeLimit, maxItems, opening)
Stats.incr("cmd_monitor")
def monitorLoop(maxItems: Int) {
+ if (safeCheckBlockReads) {
+ f(None, None)
+ return
+ }
+
log.debug("monitor loop -> q=%s t=%s max=%d open=%s", key, timeLimit, maxItems, opening)
if (maxItems == 0 || (timeLimit.isDefined && timeLimit.get <= Time.now) || countPendingReads(key) >= maxOpenReads) {
f(None, None)
@@ -175,6 +190,8 @@ abstract class KestrelHandler(
}
def getItem(key: String, timeout: Option[Time], opening: Boolean, peeking: Boolean): Future[Option[QItem]] = {
+ checkBlockReads("getItem", key)
+
if (opening && countPendingReads(key) >= maxOpenReads) {
log.warning("Attempt to open too many reads on '%s' (sid %d, %s)", key, sessionId,
clientDescription)
@@ -210,6 +227,7 @@ abstract class KestrelHandler(
}
def setItem(key: String, flags: Int, expiry: Option[Time], data: Array[Byte]) = {
+ checkBlockWrites("setItem", key)
log.debug("set -> q=%s flags=%d expiry=%s size=%d", key, flags, expiry, data.length)
Stats.incr("cmd_set")
val (rv, nsec) = Duration.inNanoseconds {
@@ -221,20 +239,68 @@ abstract class KestrelHandler(
}
def flush(key: String) {
+ checkBlockWrites("flush", key)
log.debug("flush -> q=%s", key)
queues.flush(key)
}
def delete(key: String) {
+ checkBlockWrites("delete", key)
log.debug("delete -> q=%s", key)
queues.delete(key)
}
def flushExpired(key: String) = {
+ checkBlockWrites("flushExpired", key)
log.debug("flush_expired -> q=%s", key)
queues.flushExpired(key)
}
+ private def withServerStatus[T](f: (ServerStatus) => T): T = {
+ serverStatus match {
+ case Some(s) => f(s)
+ case None => throw new ServerStatusNotConfiguredException
+ }
+ }
+
+ def safeCheckBlockReads: Boolean = serverStatus map { _.blockReads } getOrElse(false)
+
+ def checkBlockReads(op: String, key: String) {
+ if (safeCheckBlockReads) {
+ log.debug("Blocking %s on '%s' (sid %d, %s)", op, key, sessionId, clientDescription)
+ throw new AvailabilityException(op)
+ }
+ }
+
+ def checkBlockWrites(op: String, key: String) {
+ if (serverStatus map { _.blockWrites } getOrElse(false)) {
+ log.debug("Blocking %s on '%s' (sid %d, %s)", op, key, sessionId, clientDescription)
+ throw new AvailabilityException(op)
+ }
+ }
+
+ def currentStatus: String = {
+ log.debug("read status")
+ withServerStatus(_.status.toString)
+ }
+
+ def setStatus(status: String) {
+ log.debug("status to %s", status)
+ withServerStatus(_.setStatus(status))
+ }
+
+ def markQuiescecent() {
+ withServerStatus(_.markQuiescent)
+ }
+
+ def markReadOnly() {
+ withServerStatus(_.markReadOnly)
+ }
+
+ def markUp() {
+ withServerStatus(_.markUp)
+ }
+
def shutdown() {
BackgroundProcess {
Thread.sleep(100)
View
27 src/main/scala/net/lag/kestrel/MemcacheHandler.scala
@@ -34,12 +34,14 @@ import com.twitter.util.{Future, Duration, Time}
class MemcacheHandler(
connection: ClientConnection,
queueCollection: QueueCollection,
- maxOpenReads: Int
+ maxOpenReads: Int,
+ serverStatus: Option[ServerStatus] = None
) extends Service[MemcacheRequest, MemcacheResponse] {
val log = Logger.get(getClass.getName)
val sessionId = Kestrel.sessionId.incrementAndGet()
- protected val handler = new KestrelHandler(queueCollection, maxOpenReads, clientDescription _, sessionId) with SimplePendingReads
+ val handler = new KestrelHandler(queueCollection, maxOpenReads, clientDescription _, sessionId,
+ serverStatus) with SimplePendingReads
log.debug("New session %d from %s", sessionId, clientDescription)
override def release() {
@@ -57,6 +59,15 @@ class MemcacheHandler(
}
final def apply(request: MemcacheRequest): Future[MemcacheResponse] = {
+ try {
+ handle(request)
+ } catch {
+ case e: AvailabilityException =>
+ Future(new MemcacheResponse("ERROR") then Codec.Disconnect)
+ }
+ }
+
+ private def handle(request: MemcacheRequest): Future[MemcacheResponse] = {
request.line(0) match {
case "get" | "gets" =>
get(request.line(1))
@@ -113,6 +124,18 @@ class MemcacheHandler(
case "flush_all_expired" =>
val flushed = queueCollection.flushAllExpired()
Future(new MemcacheResponse(flushed.toString))
+ case "status" =>
+ Future {
+ if (request.line.size == 1) {
+ new MemcacheResponse(handler.currentStatus.toString.toUpperCase)
+ } else {
+ handler.setStatus(request.line(1))
+ new MemcacheResponse("END")
+ }
+ } rescue {
+ case e: ServerStatusNotConfiguredException => Future(new MemcacheResponse("ERROR") then Codec.Disconnect)
+ case e => Future(new MemcacheResponse("CLIENT_ERROR") then Codec.Disconnect)
+ }
case "version" =>
Future(version())
case "quit" =>
View
269 src/main/scala/net/lag/kestrel/ServerStatus.scala
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ * Copyright 2012 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.conversions.time._
+import com.twitter.logging.Logger
+import com.twitter.ostrich.stats.Stats
+import com.twitter.util.{Duration, FutureTask, Timer, TimerTask}
+import java.io._
+import java.net.InetSocketAddress
+
+abstract sealed class Status(val strictness: Int) {
+ def stricterThan(that: Status) = (this.strictness - that.strictness) > 0
+ def blocksReads: Boolean
+ def blocksWrites: Boolean
+}
+
+case object Down extends Status(100) {
+ val blocksReads = true
+ val blocksWrites = true
+}
+
+case object Quiescent extends Status(3) {
+ val blocksReads = true
+ val blocksWrites = true
+}
+
+case object ReadOnly extends Status(2) {
+ val blocksReads = false
+ val blocksWrites = true
+}
+
+case object Up extends Status(1) {
+ val blocksReads = false
+ val blocksWrites = false
+}
+
+object Status {
+ def unapply(name: String): Option[Status] = {
+ Option(name).map(_.toUpperCase) match {
+ case Some("DOWN") => Some(Down)
+ case Some("QUIESCENT") => Some(Quiescent)
+ case Some("READONLY") => Some(ReadOnly)
+ case Some("UP") => Some(Up)
+ case _ => None
+ }
+ }
+}
+
+class ForbiddenStatusException extends Exception("Status forbidden.")
+class UnknownStatusException extends Exception("Unknown status.")
+
+class ServerStatus(val statusFile: String, val timer: Timer, val defaultStatus: Status = Quiescent,
+ val statusChangeGracePeriod: Duration = 30.seconds) {
+ private val log = Logger.get(getClass.getName)
+
+ private[this] var currentStatus: Status = defaultStatus
+ @volatile private[this] var currentOperationStatus: Status = defaultStatus
+
+ private[this] var timerTask: Option[TimerTask] = None
+
+ private val statusStore = new File(statusFile)
+ if (!statusStore.getParentFile.isDirectory) statusStore.getParentFile.mkdirs()
+
+ private[this] val startTask = new FutureTask {
+ loadStatus()
+
+ Stats.addGauge("status/readable") {
+ if (status.blocksReads) 0.0 else 1.0
+ }
+
+ Stats.addGauge("status/writeable") {
+ if (status.blocksWrites) 0.0 else 1.0
+ }
+ }
+
+ def start() {
+ startTask.run()
+ }
+
+ def addEndpoints(mainEndpoint: String, endpoints: Map[String, InetSocketAddress]) { }
+
+ def shutdown() {
+ synchronized {
+ // Force completion of pending task
+ timerTask.foreach { t =>
+ t.cancel()
+ log.debug("status change grace period canceled due shutdown")
+ }
+ timerTask = None
+
+ if (currentStatus != Down) {
+ // do not persist this change
+ setStatus(Down, persistStatus = false, immediate = true)
+ }
+ }
+ }
+
+ /**
+ * Mark this host with an arbitrary status level. Returns the new status of the server,
+ * which may be unchanged if a pending change is in progress.
+ *
+ * Note: cannot be used to mark this server as Down. Use Quiescent.
+ */
+ def setStatus(newStatus: Status): Status = {
+ if (newStatus eq null) throw new UnknownStatusException
+ if (newStatus == Down) throw new ForbiddenStatusException
+
+ setStatus(newStatus, persistStatus = true, immediate = false)
+ }
+
+ private def setStatus(newStatus: Status, persistStatus: Boolean, immediate: Boolean): Status = synchronized {
+ val oldStatus = currentStatus
+ try {
+ if (proposeStatusChange(oldStatus, newStatus)) {
+ log.debug("Status change from %s to %s accepted", oldStatus, newStatus)
+ currentStatus = newStatus
+ statusChanged(oldStatus, newStatus, immediate)
+ if (persistStatus) {
+ storeStatus()
+ }
+
+ Stats.setLabel("status", currentStatus.toString)
+
+ log.info("switched to status '%s' (previously '%s')", currentStatus, oldStatus)
+ } else {
+ log.warning("status change from '%s' to '%s' rejected", oldStatus, newStatus)
+ }
+ } catch { case e =>
+ log.error(e, "unable to update server status from '%s' to '%s'", oldStatus, newStatus)
+ timerTask.foreach { task =>
+ task.cancel()
+ log.warning("status change grace period canceled due to error")
+ }
+ timerTask = None
+ currentStatus = oldStatus
+ currentOperationStatus = oldStatus
+ Stats.setLabel("status", currentStatus.toString)
+ throw e
+ }
+
+ currentStatus
+ }
+
+ def status: Status = synchronized { currentStatus }
+
+ /**
+ * Invoked just before the status of the server changes.
+ */
+ protected def proposeStatusChange(oldStatus: Status, newStatus: Status): Boolean = {
+ // ok to change if loosening restrictions OR no pending change
+ (oldStatus stricterThan newStatus) || !timerTask.isDefined
+ }
+
+ /**
+ * Invoked after the status of the server has changed.
+ */
+ protected def statusChanged(oldStatus: Status, newStatus: Status, immediate: Boolean) {
+ if ((newStatus stricterThan oldStatus) && !immediate) {
+ // e.g., Up -> ReadOnly (more strict)
+ if (statusChangeGracePeriod > 0.seconds) {
+ val task = timer.schedule(statusChangeGracePeriod.fromNow) {
+ currentOperationStatus = newStatus
+ timerTask = None
+ log.info("status change grace period expired; status '%s' now enforced", newStatus)
+ }
+ timerTask = Some(task)
+ } else {
+ currentOperationStatus = newStatus
+ }
+ } else {
+ // less or same strictness
+ timerTask.foreach { task =>
+ task.cancel()
+ log.debug("status change grace period canceled due to subsequent status change")
+ }
+ timerTask = None
+ currentOperationStatus = newStatus
+ }
+ }
+
+ def blockReads = currentOperationStatus.blocksReads
+ def blockWrites = currentOperationStatus.blocksWrites
+
+ /**
+ * Mark this host as stopped.
+ */
+ def markQuiescent() {
+ setStatus(Quiescent)
+ }
+
+ /**
+ * Mark this host read-only.
+ */
+ def markReadOnly() {
+ setStatus(ReadOnly)
+ }
+
+ /**
+ * Mark this host as up (available for reads & writes)
+ */
+ def markUp() {
+ setStatus(Up)
+ }
+
+ /**
+ * Mark this host with an arbitrary status level name. Names are case insensitive.
+ *
+ * Note: cannot be used to mark this server as Down. Use Quiescent.
+ */
+ def setStatus(statusName: String) {
+ statusName match {
+ case Status(newStatus) => setStatus(newStatus)
+ case _ => throw new UnknownStatusException
+ }
+ }
+
+ private def loadStatus() {
+ if (statusStore.exists()) {
+ val reader =
+ new BufferedReader(new InputStreamReader(new FileInputStream(statusStore), "UTF-8"))
+ val statusName =
+ try {
+ reader.readLine
+ } catch { case e =>
+ log.error(e, "unable to read stored status at '%s'; status remains '%s'", statusStore, currentStatus)
+ defaultStatus.toString
+ } finally {
+ reader.close()
+ }
+
+ statusName match {
+ case Status(newStatus) =>
+ setStatus(newStatus, persistStatus = false, immediate = true)
+ case _ =>
+ log.error("unable to parse stored status '%s'; status remains '%s'", statusName, defaultStatus)
+ }
+ } else {
+ log.info("no status stored at '%s'; status remains '%s'", statusStore, defaultStatus)
+ }
+ }
+
+ private def storeStatus() {
+ val writer = new OutputStreamWriter(new FileOutputStream(statusStore), "UTF-8")
+ try {
+ writer.write(status.toString + "\n")
+ log.debug("stored status '%s' in '%s'", status, statusStore)
+ } catch { case e =>
+ log.error(e, "unable store status at '%s'", statusStore)
+ } finally {
+ writer.close()
+ }
+ }
+}
View
28 src/main/scala/net/lag/kestrel/TextHandler.scala
@@ -103,12 +103,14 @@ case class StringResponse(message: String) extends TextResponse {
class TextHandler(
connection: ClientConnection,
queueCollection: QueueCollection,
- maxOpenReads: Int
+ maxOpenReads: Int,
+ serverStatus: Option[ServerStatus] = None
) extends Service[TextRequest, TextResponse] {
val log = Logger.get(getClass)
val sessionId = Kestrel.sessionId.incrementAndGet()
- val handler = new KestrelHandler(queueCollection, maxOpenReads, clientDescription _, sessionId) with SimplePendingReads
+ val handler = new KestrelHandler(queueCollection, maxOpenReads, clientDescription _, sessionId,
+ serverStatus) with SimplePendingReads
log.debug("New text session %d from %s", sessionId, clientDescription)
protected def clientDescription: String = {
@@ -121,7 +123,16 @@ class TextHandler(
super.release()
}
- def apply(request: TextRequest) = {
+ def apply(request: TextRequest): Future[TextResponse] = {
+ try {
+ handle(request)
+ } catch {
+ case e: AvailabilityException =>
+ Future(ErrorResponse(e.getMessage))
+ }
+ }
+
+ def handle(request: TextRequest): Future[TextResponse] = {
request.command match {
case "put" =>
// put <queue> [expiry]:
@@ -223,6 +234,17 @@ class TextHandler(
handler.delete(request.args(0))
Future(CountResponse(0))
}
+ case "status" =>
+ Future {
+ if (request.args.size < 1) {
+ StringResponse(handler.currentStatus.toUpperCase)
+ } else {
+ handler.setStatus(request.args(0))
+ CountResponse(0)
+ }
+ } rescue {
+ case e => Future(ErrorResponse(e.getMessage))
+ }
case "quit" =>
connection.close()
Future(CountResponse(0))
View
22 src/main/scala/net/lag/kestrel/ThriftHandler.scala
@@ -149,12 +149,14 @@ class ThriftHandler (
connection: ClientConnection,
queueCollection: QueueCollection,
maxOpenReads: Int,
- timer: Timer
+ timer: Timer,
+ serverStatus: Option[ServerStatus] = None
) extends thrift.Kestrel.FutureIface {
val log = Logger.get(getClass.getName)
val sessionId = Kestrel.sessionId.incrementAndGet()
- val handler = new KestrelHandler(queueCollection, maxOpenReads, clientDescription _, sessionId) with ThriftPendingReads
+ val handler = new KestrelHandler(queueCollection, maxOpenReads, clientDescription _, sessionId,
+ serverStatus) with ThriftPendingReads
log.debug("New thrift session %d from %s", sessionId, clientDescription)
protected def clientDescription: String = {
@@ -238,6 +240,22 @@ class ThriftHandler (
Future.Unit
}
+ def currentStatus(): Future[thrift.Status] = {
+ Future(
+ handler.serverStatus map { _ =>
+ thrift.Status.valueOf(handler.currentStatus) match {
+ case Some(thriftStatus) => thriftStatus
+ case None => thrift.Status.NotConfigured // e.g., down
+ }
+ } getOrElse {
+ thrift.Status.NotConfigured
+ })
+ }
+
+ def setStatus(thriftStatus: thrift.Status): Future[Unit] = {
+ Future(handler.setStatus(thriftStatus.name))
+ }
+
def getVersion(): Future[String] = Future(Kestrel.runtime.jarVersion)
def release() {
View
211 src/main/scala/net/lag/kestrel/ZooKeeperServerStatus.scala
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ * Copyright 2012 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.common.quantity.{Amount, Time}
+import com.twitter.common.zookeeper.{ServerSet, ServerSets, ZooKeeperClient, ZooKeeperUtils}
+import com.twitter.common.zookeeper.ServerSet.EndpointStatus
+import com.twitter.conversions.time._
+import com.twitter.logging.Logger
+import com.twitter.thrift.{Status => TStatus}
+import com.twitter.util.{Duration, Timer}
+import java.net.{InetAddress, InetSocketAddress}
+import scala.collection.JavaConversions
+import config.ZooKeeperConfig
+
+object ZooKeeperServerStatus {
+ /**
+ * Default mechanism for creating a ZooKeeperClient from kestrel's ZooKeeperConfig.
+ *
+ * If credentials are given, they are passed as digest credentials along with the configured session timeout
+ * and host/port. In the absence of credentials, an unauthorized connection is attempted.
+ */
+ def createClient(zkConfig: ZooKeeperConfig): ZooKeeperClient = {
+ val address = new InetSocketAddress(zkConfig.host, zkConfig.port)
+ val timeout = Amount.of(zkConfig.sessionTimeout.inMilliseconds.toInt, Time.MILLISECONDS)
+ zkConfig.credentials match {
+ case Some((username, password)) =>
+ val credentials = ZooKeeperClient.digestCredentials(username, password)
+ new ZooKeeperClient(timeout, credentials, address)
+ case None =>
+ new ZooKeeperClient(timeout, address)
+ }
+ }
+
+ /**
+ * Default mechanism for creating a ServerSet from kestrel's ZooKeeperConfig, a previously created
+ * ZooKeeperClient, and the node type (always "read" or "write").
+ *
+ * The ZooKeeper node is determined by taking the configured path prefix and appending a slash and
+ * the node type. The configured ACL is used to create the node. If the ACL is not OpenUnsafeACL,
+ * credentials must have been provided during creation of the ZooKeeperClient.
+ */
+ def createServerSet(zkConfig: ZooKeeperConfig, zkClient: ZooKeeperClient, nodeType: String): ServerSet = {
+ val node = "%s/%s".format(zkConfig.pathPrefix, nodeType)
+ ServerSets.create(zkClient, JavaConversions.asJavaIterable(zkConfig.acl.asList), node)
+ }
+
+ def statusToReadStatus(status: Status): TStatus =
+ status match {
+ case Down => TStatus.DEAD
+ case Quiescent => TStatus.DEAD
+ case ReadOnly => TStatus.ALIVE
+ case Up => TStatus.ALIVE
+ }
+
+ def statusToWriteStatus(status: Status): TStatus =
+ status match {
+ case Down => TStatus.DEAD
+ case Quiescent => TStatus.DEAD
+ case ReadOnly => TStatus.DEAD
+ case Up => TStatus.ALIVE
+ }
+}
+
+class ZooKeeperServerStatus(val zkConfig: ZooKeeperConfig, statusFile: String, timer: Timer,
+ defaultStatus: Status = Quiescent,
+ statusChangeGracePeriod: Duration = 30.seconds)
+extends ServerStatus(statusFile, timer, defaultStatus, statusChangeGracePeriod) {
+
+ import ZooKeeperServerStatus._
+
+ private val log = Logger.get(getClass.getName)
+
+ protected val zkClient: ZooKeeperClient =
+ zkConfig.clientInitializer.getOrElse(ZooKeeperServerStatus.createClient _)(zkConfig)
+
+ private var readEndpointStatus: Option[EndpointStatus] = None
+ private var writeEndpointStatus: Option[EndpointStatus] = None
+
+ override def shutdown() {
+ synchronized {
+ super.shutdown()
+
+ try {
+ updateWriteMembership(Down)
+ } catch { case e =>
+ log.error(e, "error updating write server set to Down on shutdown")
+ }
+ writeEndpointStatus = None
+
+ try {
+ updateReadMembership(Down)
+ } catch { case e =>
+ log.error(e, "error updating read server set to Down on shutdown")
+ }
+ readEndpointStatus = None
+
+ zkClient.close()
+ }
+ }
+
+ protected def createServerSet(nodeType: String): ServerSet = {
+ zkConfig.serverSetInitializer.getOrElse(ZooKeeperServerStatus.createServerSet _)(zkConfig,
+ zkClient,
+ nodeType)
+ }
+
+ private def join(nodeType: String,
+ mainAddr: InetSocketAddress,
+ endpoints: Map[String, InetSocketAddress],
+ status: TStatus): Option[EndpointStatus] = {
+ try {
+ val set = createServerSet(nodeType)
+ val endpointStatus = set.join(mainAddr, JavaConversions.asJavaMap(endpoints), status)
+ Some(endpointStatus)
+ } catch { case e =>
+ // join will auto-retry the retryable set of errors -- anything we catch
+ // here is not retryable
+ log.error(e, "error joining %s server set for endpoint '%s'".format(nodeType, mainAddr))
+ throw e
+ }
+ }
+
+ override def addEndpoints(mainEndpoint: String, endpoints: Map[String, InetSocketAddress]) {
+ val externalEndpoints = endpoints.map { case (name, givenAddress) =>
+ val givenInetAddress = givenAddress.getAddress
+ val address =
+ if (givenInetAddress.isAnyLocalAddress || givenInetAddress.isLoopbackAddress) {
+ // wildcard (e.g., 0.0.0.0) loopback (e.g., 127.0.0.1) address: replace it
+ // with one external address for this machine
+ val external = InetAddress.getLocalHost.getHostAddress
+ new InetSocketAddress(external, givenAddress.getPort)
+ } else {
+ givenAddress
+ }
+ (name, address)
+ }
+
+ val mainAddress = externalEndpoints(mainEndpoint)
+
+ // reader first, then writer in case of some failure
+ val readStatus = statusToReadStatus(status)
+ readEndpointStatus = join("read", mainAddress, externalEndpoints, readStatus)
+ log.info("joined read server set with status '%s'".format(status))
+
+ val writeStatus = statusToWriteStatus(status)
+ writeEndpointStatus = join("write", mainAddress, externalEndpoints, writeStatus)
+ log.info("joined write server set with status '%s'".format(status))
+ }
+
+ override protected def proposeStatusChange(oldStatus: Status, newStatus: Status): Boolean = {
+ if (!super.proposeStatusChange(oldStatus, newStatus)) return false
+
+ if (newStatus stricterThan oldStatus) {
+ // e.g. Up -> ReadOnly: update zk first, then allow change
+ updateWriteMembership(newStatus)
+ updateReadMembership(newStatus)
+ true
+ } else {
+ // looser or same strictness; go ahead
+ true
+ }
+ }
+
+ override protected def statusChanged(oldStatus: Status, newStatus: Status, immediate: Boolean) {
+ if (oldStatus stricterThan newStatus) {
+ // looser or same strictness; update zk now
+ updateReadMembership(newStatus)
+ updateWriteMembership(newStatus)
+ }
+
+ super.statusChanged(oldStatus, newStatus, immediate)
+ }
+
+ private def updateWriteMembership(newStatus: Status) {
+ writeEndpointStatus match {
+ case Some(endpointStatus) =>
+ val writeStatus = statusToWriteStatus(newStatus)
+ endpointStatus.update(writeStatus)
+ log.info("updated write server set with status '%s'", writeStatus)
+ case None =>
+ ()
+ }
+ }
+
+ private def updateReadMembership(newStatus: Status) {
+ readEndpointStatus match {
+ case Some(endpointStatus) =>
+ val readStatus = statusToReadStatus(newStatus)
+ endpointStatus.update(readStatus)
+ log.info("updated read server set with status '%s'".format(readStatus))
+ case None =>
+ ()
+ }
+ }
+}
View
155 src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
@@ -18,6 +18,7 @@
package net.lag.kestrel
package config
+import com.twitter.common.zookeeper.{ServerSet, ZooKeeperClient, ZooKeeperUtils}
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
@@ -25,6 +26,8 @@ import com.twitter.logging.config._
import com.twitter.ostrich.admin.{RuntimeEnvironment, ServiceTracker}
import com.twitter.ostrich.admin.config._
import com.twitter.util.{Config, Duration, StorageUnit}
+import org.apache.zookeeper.data.ACL
+import scala.collection.JavaConversions
case class QueueConfig(
maxItems: Int,
@@ -79,8 +82,7 @@ class QueueBuilder extends Config[QueueConfig] {
/**
* Expiration time for items on this queue. Any item that has been sitting on the queue longer
* than this duration will be discarded. Clients may also attach an expiration time when adding
- * items to a queue, but if the expiration time is longer than `maxAge`, `max_Age` will be
- * used instead.
+ * items to a queue, in which case the item expires at the earlier of the two expiration times.
*/
var maxAge: Option[Duration] = None
@@ -178,6 +180,121 @@ class AliasBuilder extends Config[AliasConfig] {
}
}
+case class ZooKeeperConfig(
+ host: String,
+ port: Int,
+ pathPrefix: String,
+ sessionTimeout: Duration,
+ credentials: Option[(String, String)],
+ acl: ZooKeeperACL,
+ clientInitializer: Option[(ZooKeeperConfig) => ZooKeeperClient],
+ serverSetInitializer: Option[(ZooKeeperConfig, ZooKeeperClient, String) => ServerSet]) {
+ override def toString() = {
+ val creds = credentials.map { _ => "<set>" }.getOrElse("<none>")
+ val clientInit = clientInitializer.map { _ => "<custom>" }.getOrElse("<default>")
+ val serverSetInit = serverSetInitializer.map { _ => "<custom>" }.getOrElse("<default>")
+
+ ("host=%s port=%d pathPrefix=%s sessionTimeout=%s credentials=%s acl=%s " +
+ "clientInitializer=%s serverSetInitializer=%s").format(
+ host, port, pathPrefix, sessionTimeout, creds, acl, clientInit, serverSetInit)
+ }
+}
+
+sealed abstract class ZooKeeperACL {
+ def asList: List[ACL]
+}
+
+/**
+ * Predefined, "open" ZooKeeper ACL. This allows any ZooKeeper session to modify or delete the server set and
+ * is therefore unsafe for production use. It does, however, work with unauthenticated sessions.
+ */
+case object OpenUnsafeACL extends ZooKeeperACL {
+ def asList = List[ACL]() ++ JavaConversions.asScalaBuffer(ZooKeeperUtils.OPEN_ACL_UNSAFE)
+ override def toString() = "<open-unsafe>"
+}
+
+/**
+ * Predefined ZooKeeper ACL. This allows any ZooKeeper session to read the server set, but only sessions with
+ * the creator's credentials may modify it. May only be used with properly configured credentials.
+ */
+case object EveryoneReadCreatorAllACL extends ZooKeeperACL {
+ def asList = List[ACL]() ++ JavaConversions.asScalaBuffer(ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL)
+ override def toString() = "<everyone-read/creator-all>"
+}
+
+/**
+ * Custom ZooKeeper ACL.
+ */
+case class CustomACL(val asList: List[ACL]) extends ZooKeeperACL {
+ override def toString() = "<custom>"
+}
+
+class ZooKeeperBuilder {
+ /**
+ * Hostname for an Apache ZooKeeper cluster to be used for tracking Kestrel
+ * server availability. Required.
+ *
+ * Valid values include "zookeeper.domain.com" and "10.1.2.3".
+ */
+ var host: String = null
+
+ /**
+ * Port for Apache ZooKeeper cluster. Defaults to 2181.
+ */
+ var port: Int = 2181
+
+ /**
+ * Path prefix used to publish Kestrel server availability to the Apache ZooKeeper cluster.
+ * Kestrel will append an additional level of hierarchy for the type of operations accepted
+ * (e.g., "/read" or "/write"). Required.
+ *
+ * Example: "/kestrel/production"
+ */
+ var pathPrefix: String = null
+
+ /**
+ * ZooKeeper session timeout. Defaults to 10 seconds.
+ */
+ var sessionTimeout = 10.seconds
+
+ /**
+ * ZooKeeper client connection credentials (username, password). Defaults to unauthenticated
+ * connections.
+ */
+ var credentials: Option[(String, String)] = None
+
+ /**
+ * A ZooKeeper ACL to apply to nodes created in the paths created by Kestrel.
+ */
+ var acl: ZooKeeperACL = OpenUnsafeACL
+
+ /**
+ * Overrides ZooKeeperClient intialization. The default implementation uses the configuration
+ * options above in a straightforward way. If your environment requires obtaining server set
+ * configuration information (e.g., credentials) via another mechanism, you can provide a custom
+ * initializer method here. The default implemenation is ZooKeeperServerStatus.createClient.
+ *
+ * It is strongly recommended that you reference an object method provided in an external JAR
+ * rather than placing arbitary code in this configuration file.
+ */
+ var clientInitializer: Option[(ZooKeeperConfig) => ZooKeeperClient] = None
+
+ /**
+ * Overrides ServerSet intialization. The default implementation uses a ZooKeeperClient, the
+ * configured pathPrefix and "read" or "write" to produce a ServerSet. The default implementation
+ * is ZooKeeperServerStatus.createServerSet.
+ *
+ * It is strongly recommended that you reference an object method provided in an external JAR
+ * rather than placing arbitary code in this configuration file.
+ */
+ var serverSetInitializer: Option[(ZooKeeperConfig, ZooKeeperClient, String) => ServerSet] = None
+
+ def apply() = {
+ ZooKeeperConfig(host, port, pathPrefix, sessionTimeout, credentials, acl, clientInitializer,
+ serverSetInitializer)
+ }
+}
+
trait KestrelConfig extends ServerConfig[Kestrel] {
/**
* Settings for a queue that isn't explicitly listed in `queues`.
@@ -242,16 +359,46 @@ trait KestrelConfig extends ServerConfig[Kestrel] {
*/
var connectionBacklog: Option[Int] = None
+ /**
+ * Path to a file where Kestrel can store information about its current status.
+ * When restarted, the server will come up with the same status that it had at shutdown,
+ * provided data in this file can be accessed.
+ *
+ * Kestrel will attempt to create the parent directories of this file if they do not already
+ * exist.
+ */
+ var statusFile: String = "/tmp/.kestrel-status"
+
+ /**
+ * In the absence of a readable status file, Kestrel will default to this status.
+ */
+ var defaultStatus: Status = Up
+
+ /**
+ * When changing to a more restricted status (e.g., from Up to ReadOnly), Kestrel will wait
+ * until this duration expires before beginning to reject operations. Non-zero settings
+ * are useful when using ZooKeeper-based server status. It allows clients to gracefully
+ * cease operations without incurring errors.
+ */
+ var statusChangeGracePeriod = 0.seconds
+
+ /**
+ * Optional Apache Zookeeper configuration used to publish serverset-based availability of Kestrel
+ * instances. By default no such information is published.
+ */
+ var zookeeper: Option[ZooKeeperBuilder] = None
+
def apply(runtime: RuntimeEnvironment) = {
new Kestrel(
default(), queues, aliases, listenAddress, memcacheListenPort, textListenPort, thriftListenPort,
- queuePath, expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog
+ queuePath, expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog,
+ statusFile, defaultStatus, statusChangeGracePeriod, zookeeper.map { _() }
)
}
def reload(kestrel: Kestrel) {
Logger.configure(loggers)
- // only the queue configs can be changed.
+ // only the queue and alias configs can be changed.
kestrel.reload(default(), queues, aliases)
}
}
View
26 src/main/thrift/kestrel.thrift
@@ -31,6 +31,20 @@ struct QueueInfo {
7: i32 open_transactions
}
+enum Status {
+ /* Server status not configured, status levels not in use. */
+ NOT_CONFIGURED = 0,
+
+ /* Server marked quiescent -- clients should stop reading and writing. */
+ QUIESCENT = 1,
+
+ /* Server marked read only -- clients should stop writing. */
+ READ_ONLY = 2,
+
+ /* Server marked up -- clients may read or write. */
+ UP = 3,
+}
+
service Kestrel {
/*
* Put one or more items into a queue.
@@ -107,8 +121,18 @@ service Kestrel {
void delete_queue(1: string queue_name)
/*
+ * Retrieve server's current advertised status.
+ */
+ Status current_status()
+
+ /*
+ * Set the server's current status. Throws an exception if server status
+ * is not configured.
+ */
+ void set_status(1: Status status)
+
+ /*
* Return a string form of the version of this kestrel server.
*/
string get_version()
}
-
View
38 src/test/scala/net/lag/kestrel/JournalSpec.scala
@@ -59,6 +59,44 @@ class JournalSpec extends Specification with TempFolder with TestLogging with Du
}
}
+ "identify valid queue names" in {
+ "simple" in {
+ withTempFolder {
+ new FileOutputStream(folderName + "/j1").close()
+ new FileOutputStream(folderName + "/j2").close()
+ Journal.getQueueNamesFromFolder(new File(folderName)) mustEqual Set("j1", "j2")
+ }
+ }
+
+ "handle queues with archived journals" in {
+ withTempFolder {
+ new FileOutputStream(folderName + "/j1").close()
+ new FileOutputStream(folderName + "/j1.1000").close()
+ new FileOutputStream(folderName + "/j1.2000").close()
+ new FileOutputStream(folderName + "/j2").close()
+ Journal.getQueueNamesFromFolder(new File(folderName)) mustEqual Set("j1", "j2")
+ }
+ }
+
+ "ignore queues with journals being packed" in {
+ withTempFolder {
+ new FileOutputStream(folderName + "/j1").close()
+ new FileOutputStream(folderName + "/j2").close()
+ new FileOutputStream(folderName + "/j2~~").close()
+ Journal.getQueueNamesFromFolder(new File(folderName)) mustEqual Set("j1", "j2")
+ }
+ }
+
+ "ignore subdirectories" in {
+ withTempFolder {
+ new FileOutputStream(folderName + "/j1").close()
+ new FileOutputStream(folderName + "/j2").close()
+ new File(folderName, "subdir").mkdirs()
+ Journal.getQueueNamesFromFolder(new File(folderName)) mustEqual Set("j1", "j2")
+ }
+ }
+ }
+
"identify valid journal files" in {
"simple" in {
withTempFolder {
View
39 src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
@@ -27,10 +27,12 @@ import com.twitter.ostrich.stats.Stats
import com.twitter.util.{TempFolder, Time, Timer}
import org.specs.Specification
import org.specs.matcher.Matcher
+import org.specs.mock.{ClassMocker, JMocker}
import config._
-class FakeKestrelHandler(queues: QueueCollection, maxOpenTransactions: Int)
- extends KestrelHandler(queues, maxOpenTransactions, () => "none", 0) with SimplePendingReads
+class FakeKestrelHandler(queues: QueueCollection, maxOpenTransactions: Int,
+ serverStatus: Option[ServerStatus] = None)
+ extends KestrelHandler(queues, maxOpenTransactions, () => "none", 0, serverStatus) with SimplePendingReads
class ItemIdListSpec extends Specification with TestLogging {
"ItemIdList" should {
@@ -62,7 +64,7 @@ class ItemIdListSpec extends Specification with TestLogging {
}
}
-class KestrelHandlerSpec extends Specification with TempFolder with TestLogging {
+class KestrelHandlerSpec extends Specification with JMocker with ClassMocker with TempFolder with TestLogging {
val config = new QueueBuilder().apply()
case class beString(expected: String) extends Matcher[Option[QItem]]() {
@@ -297,5 +299,36 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
}
}
}
+
+ "manage server status" in {
+ "by updating server status, if configured" in {
+ withTempFolder {
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
+ val serverStatus = mock[ServerStatus]
+ val handler = new FakeKestrelHandler(queues, 10, Some(serverStatus))
+
+ expect {
+ one(serverStatus).markQuiescent()
+ one(serverStatus).status willReturn Quiescent
+ one(serverStatus).setStatus("ReadOnly")
+ one(serverStatus).markUp()
+ }
+
+ handler.markQuiescecent()
+ handler.currentStatus mustEqual "Quiescent"
+ handler.setStatus("ReadOnly")
+ handler.markUp()
+ }
+ }
+
+ "by throwing an exception if server status not configured" in {
+ withTempFolder {
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
+ val handler = new FakeKestrelHandler(queues, 10, None)
+
+ handler.markQuiescecent() must throwA[ServerStatusNotConfiguredException]
+ }
+ }
+ }
}
}
View
473 src/test/scala/net/lag/kestrel/MemcacheHandlerSpec.scala
@@ -0,0 +1,473 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.conversions.time._
+import com.twitter.finagle.ClientConnection
+import com.twitter.naggati.{Codec, LatchedChannelSource}
+import com.twitter.naggati.codec.{MemcacheRequest, MemcacheResponse}
+import com.twitter.ostrich.admin.RuntimeEnvironment
+import com.twitter.util.{Future, Promise, Time}
+import java.net.InetSocketAddress
+import org.specs.Specification
+import org.specs.mock.{ClassMocker, JMocker}
+import scala.collection.mutable
+
+class MemcacheHandlerSpec extends Specification with JMocker with ClassMocker {
+ "MemcacheHandler" should {
+ val queueCollection = mock[QueueCollection]
+ val connection = mock[ClientConnection]
+ val address = new InetSocketAddress("", 0)
+ val qitem = QItem(Time.now, None, "state shirt".getBytes, 23)
+
+ def toReq(command: String, data: Option[Array[Byte]] = None): MemcacheRequest = {
+ val parts = command.split(" ").toList
+ val dataLength = data map { _.length + 2 } orElse { Some(0) }
+ MemcacheRequest(parts, data, command.length + 2 + dataLength.get)
+ }
+
+ def toResp(queue: String, qItem: QItem): MemcacheResponse = {
+ MemcacheResponse("VALUE %s 0 %d".format(queue, qItem.data.length), Some(qItem.data))
+ }
+
+ val endResponse = MemcacheResponse("END", None)
+ val errorResponse = MemcacheResponse("ERROR", None)
+ val clientErrorResponse = MemcacheResponse("CLIENT_ERROR", None)
+
+ "get request (transactional)" in {
+ expect {
+ 1.atLeastOf(connection).remoteAddress willReturn address
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+
+ "closes transactions" in {
+ expect {
+ one(queueCollection).confirmRemove("test", 100)
+ one(queueCollection).remove("test", None, true, false) willReturn Future.value(Some(qitem))
+ }
+
+ memcacheHandler.handler.pendingReads.add("test", 100)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(100)
+ memcacheHandler(toReq("get test/close/open"))() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(qitem.xid)
+ }
+
+ "with timeout" in {
+ "value ready immediately" in {
+ Time.withCurrentTimeFrozen { time =>
+ expect {
+ one(queueCollection).remove("test", Some(500.milliseconds.fromNow), true, false) willReturn Future.value(Some(qitem))
+ }
+
+ memcacheHandler(toReq("get test/t=500/close/open"))() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(qitem.xid)
+ }
+ }
+
+ "value ready eventually" in {
+ Time.withCurrentTimeFrozen { time =>
+ val promise = new Promise[Option[QItem]]
+
+ expect {
+ one(queueCollection).remove("test", Some(500.milliseconds.fromNow), true, false) willReturn promise
+ }
+
+ val future = memcacheHandler(toReq("get test/t=500/close/open"))
+
+ promise.setValue(Some(qitem))
+ future() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(qitem.xid)
+ }
+ }
+
+ "timed out" in {
+ Time.withCurrentTimeFrozen { time =>
+ val promise = new Promise[Option[QItem]]
+
+ expect {
+ one(queueCollection).confirmRemove("test", 100)
+ one(queueCollection).remove("test", Some(500.milliseconds.fromNow), true, false) willReturn promise
+ }
+
+ memcacheHandler.handler.pendingReads.add("test", 100)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(100)
+
+ val future = memcacheHandler(toReq("get test/t=500/close/open"))
+
+ promise.setValue(None)
+ future() mustEqual endResponse
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List()
+ }
+ }
+ }
+
+ "empty queue" in {
+ expect {
+ one(queueCollection).confirmRemove("test", 100)
+ one(queueCollection).remove("test", None, true, false) willReturn Future.value(None)
+ }
+
+ memcacheHandler.handler.pendingReads.add("test", 100)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(100)
+ memcacheHandler(toReq("get test/close/open"))() mustEqual endResponse
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List()
+ }
+
+ "item ready" in {
+ expect {
+ one(queueCollection).remove("test", None, true, false) willReturn Future.value(Some(qitem))
+ }
+
+ memcacheHandler(toReq("get test/close/open"))() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(qitem.xid)
+ }
+
+ "aborting" in {
+ memcacheHandler.handler.pendingReads.add("test", 100)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(100)
+
+ expect {
+ one(queueCollection).unremove("test", 100)
+ }
+
+ memcacheHandler(toReq("get test/abort"))() mustEqual endResponse
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List()
+ }
+
+ "forbidden option combinations" in {
+ memcacheHandler(toReq("get test/open/peek"))() mustEqual clientErrorResponse
+ memcacheHandler(toReq("get test/close/peek"))() mustEqual clientErrorResponse
+ memcacheHandler(toReq("get test/open/abort"))() mustEqual clientErrorResponse
+ memcacheHandler(toReq("get test/close/abort"))() mustEqual clientErrorResponse
+ }
+
+ "queue name required" in {
+ memcacheHandler(toReq("get /close/open"))() mustEqual clientErrorResponse
+ }
+ }
+
+ "get request (non-transactional)" in {
+ expect {
+ 1.atLeastOf(connection).remoteAddress willReturn address
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+
+ "with timeout" in {
+ "value ready immediately" in {
+ Time.withCurrentTimeFrozen { time =>
+ expect {
+ one(queueCollection).remove("test", Some(500.milliseconds.fromNow), false, false) willReturn Future.value(Some(qitem))
+ }
+
+ memcacheHandler(toReq("get test/t=500"))() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List()
+ }
+ }
+
+ "value ready eventually" in {
+ Time.withCurrentTimeFrozen { time =>
+ val promise = new Promise[Option[QItem]]
+
+ expect {
+ one(queueCollection).remove("test", Some(500.milliseconds.fromNow), false, false) willReturn promise
+ }
+
+ val future = memcacheHandler(toReq("get test/t=500"))
+
+ promise.setValue(Some(qitem))
+ future() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List()
+ }
+ }
+ }
+
+ "item ready" in {
+ expect {
+ one(queueCollection).remove("test", None, false, false) willReturn Future.value(Some(qitem))
+ }
+
+ memcacheHandler(toReq("get test"))() mustEqual toResp("test", qitem)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List()
+ }
+
+ "peek" in {
+ expect {
+ one(queueCollection).remove("test", None, false, true) willReturn Future.value(Some(qitem))
+ }
+
+ memcacheHandler(toReq("get test/peek"))() mustEqual toResp("test", qitem)
+ }
+ }
+
+ "monitor request" in {
+ val qitem2 = QItem(Time.now, None, "homunculus".getBytes, 24)
+ val qitem3 = QItem(Time.now, None, "automaton".getBytes, 25)
+
+ def responseStreamToList(response: MemcacheResponse): List[MemcacheResponse] = {
+ val received = new mutable.ListBuffer[MemcacheResponse]
+ val signals = response.signals
+ signals.length mustEqual 1
+ signals.head must haveClass[Codec.Stream[MemcacheResponse]]
+
+ val codecStream = signals.head.asInstanceOf[Codec.Stream[MemcacheResponse]]
+ codecStream.stream respond { r =>
+ received += r
+ Future.Done
+ }
+ received.toList
+ }
+
+ "items ready" in {
+ Time.withCurrentTimeFrozen { tc =>
+ val timeLimit = Some(Time.now + 100.seconds)
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(Some(qitem))
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(Some(qitem2))
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(Some(qitem3))
+ }
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+
+ val response = memcacheHandler(toReq("monitor test 100 3"))()
+ val received = responseStreamToList(response)
+ received mustEqual List(qitem, qitem2, qitem3).map { i => toResp("test", i) } ++ List(endResponse)
+ }
+ }
+
+ "items eventually ready" in {
+ Time.withCurrentTimeFrozen { tc =>
+ val timeLimit = Some(Time.now + 100.seconds)
+ val promise = new Promise[Option[QItem]]
+ val promise2 = new Promise[Option[QItem]]
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn promise
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn promise2
+ }
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ val response = memcacheHandler(toReq("monitor test 100 2"))
+
+ tc.advance(1.second)
+ promise.setValue(Some(qitem))
+ tc.advance(1.second)
+ promise2.setValue(Some(qitem2))
+
+ val received = responseStreamToList(response())
+ received mustEqual List(qitem, qitem2).map { i => toResp("test", i) } ++ List(endResponse)
+ }
+ }
+
+ "some items ready" in {
+ Time.withCurrentTimeFrozen { tc =>
+ val timeLimit = Some(Time.now + 100.seconds)
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(Some(qitem))
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(Some(qitem2))
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(Some(qitem3))
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future.value(None)
+ }
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+
+ val response = memcacheHandler(toReq("monitor test 100 5"))()
+ val received = responseStreamToList(response)
+ received mustEqual List(qitem, qitem2, qitem3).map { i => toResp("test", i) } ++ List(endResponse)
+ }
+ }
+
+ "timeout" in {
+ Time.withCurrentTimeFrozen { tc =>
+ val timeLimit = Some(Time.now + 100.seconds)
+ val promise = new Promise[Option[QItem]]
+
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn promise
+ }
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ val response = memcacheHandler(toReq("monitor test 100 2"))
+
+ tc.advance(101.seconds)
+ promise.setValue(Some(qitem))
+
+ val received = responseStreamToList(response())
+ received mustEqual List(toResp("test", qitem), endResponse)
+ }
+ }
+
+ "max open reads" in {
+ Time.withCurrentTimeFrozen { tc =>
+ val timeLimit = Some(Time.now + 100.seconds)
+
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).remove("test", timeLimit, true, false) willReturn Future(Some(qitem))
+ }
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ (100 until 109).foreach { xid =>
+ memcacheHandler.handler.pendingReads.add("test", xid)
+ }
+ memcacheHandler.handler.pendingReads.size("test") mustEqual 9
+
+ val response = memcacheHandler(toReq("monitor test 100 2"))
+
+ val received = responseStreamToList(response())
+ received mustEqual List(toResp("test", qitem), endResponse)
+ }
+ }
+ }
+
+ "confirm" in {
+ "single" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).confirmRemove("test", 100)
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ memcacheHandler.handler.pendingReads.add("test", 100)
+ memcacheHandler(toReq("confirm test 1"))() mustEqual MemcacheResponse("END", None)
+ memcacheHandler.handler.pendingReads.size("test") mustEqual 0
+ }
+
+ "multiple" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).confirmRemove("test", 100)
+ one(queueCollection).confirmRemove("test", 101)
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ memcacheHandler.handler.pendingReads.add("test", 100)
+ memcacheHandler.handler.pendingReads.add("test", 101)
+ memcacheHandler.handler.pendingReads.add("test", 102)
+ memcacheHandler(toReq("confirm test 2"))() mustEqual MemcacheResponse("END", None)
+ memcacheHandler.handler.pendingReads.peek("test") mustEqual List(102)
+ }
+ }
+
+ "put request" in {
+ Time.withCurrentTimeFrozen { timeMutator =>
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).add("test", "hello".getBytes, None, Time.now) willReturn true
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ memcacheHandler(toReq("set test 0 0 5", Some("hello".getBytes)))() mustEqual MemcacheResponse("STORED", None)
+ }
+ }
+
+ "delete request" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).delete("test")
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ memcacheHandler(toReq("delete test"))() mustEqual MemcacheResponse("DELETED", None)
+ }
+
+ "flush request" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ one(queueCollection).flush("test")
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ memcacheHandler(toReq("flush test"))() mustEqual MemcacheResponse("END", None)
+ }
+
+ "version request" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ }
+
+ val runtime = RuntimeEnvironment(this, Array())
+ Kestrel.runtime = runtime
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ val response = memcacheHandler(toReq("version"))()
+ response.line mustEqual ("VERSION " + runtime.jarVersion)
+ }
+
+ "status request" in {
+ "without server status" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+
+ "check status should return an error" in {
+ memcacheHandler(toReq("status"))() mustEqual errorResponse
+ }
+
+ "set status should return an error" in {
+ memcacheHandler(toReq("status up"))() mustEqual errorResponse
+ }
+ }
+
+ "with server status" in {
+ val serverStatus = mock[ServerStatus]
+
+ expect {
+ one(connection).remoteAddress willReturn address
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10, Some(serverStatus))
+
+ "check status should return current status" in {
+ expect {
+ one(serverStatus).status willReturn Up
+ }
+
+ memcacheHandler(toReq("status"))() mustEqual MemcacheResponse("UP", None)
+ }
+
+ "set status should set current status" in {
+ expect {
+ one(serverStatus).setStatus("readonly")
+ }
+
+ memcacheHandler(toReq("status readonly"))() mustEqual endResponse
+ }
+
+ "set status should report client error on invalid status" in {
+ expect {
+ one(serverStatus).setStatus("spongebob") willThrow new UnknownStatusException
+ }
+
+ memcacheHandler(toReq("status spongebob"))() mustEqual clientErrorResponse
+ }
+ }
+ }
+
+ "unknown command" in {
+ expect {
+ one(connection).remoteAddress willReturn address
+ }
+
+ val memcacheHandler = new MemcacheHandler(connection, queueCollection, 10)
+ memcacheHandler(toReq("die in a fire"))() mustEqual clientErrorResponse
+ }
+
+ // FIXME: shutdown
+ }
+}
View
5 src/test/scala/net/lag/kestrel/ServerSpec.scala
@@ -50,9 +50,10 @@ class ServerSpec extends Specification with TempFolder with TestLogging {
name = "wx_updates"
destinationQueues = List("weather_updates")
}
-
+ val statusFile = new File(canonicalFolderName, ".status")
kestrel = new Kestrel(defaultConfig, List(weatherUpdatesConfig), List(aliasWeatherUpdatesConfig),
- "localhost", Some(PORT), None, None, canonicalFolderName, None, None, 1, None)
+ "localhost", Some(PORT), None, None, canonicalFolderName, None, None, 1, None, statusFile.getPath,
+ Up, 30.seconds, None)
kestrel.start()
}
View
383 src/test/scala/net/lag/kestrel/ServerStatusSpec.scala
@@ -0,0 +1,383 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ * Copyright 2012 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.conversions.time._
+import com.twitter.logging.TestLogging
+import com.twitter.ostrich.stats.Stats
+import com.twitter.util.{Duration, MockTimer, TempFolder, Time, TimeControl}
+import java.io._
+import org.specs.Specification
+import org.specs.mock.{ClassMocker, JMocker}
+
+class ServerStatusSpec extends Specification with JMocker with ClassMocker with TempFolder
+with TestLogging {
+ val mockTimer: MockTimer = new MockTimer
+
+ def statusFile = canonicalFolderName + "/state"
+
+ def makeServerStatus(
+ proposeStatusChangeImpl: Option[(Status, Status) => Boolean] = None,
+ statusChangedImpl: Option[(Status, Status, Boolean) => Unit] = None,
+ statusChangeGracePeriod: Option[Duration] = None): ServerStatus = {
+
+ val gracePeriod = statusChangeGracePeriod getOrElse { 30.seconds }
+ val serverStatus = new ServerStatus(statusFile, mockTimer, statusChangeGracePeriod = gracePeriod) {
+ override def proposeStatusChange(oldStatus: Status, newStatus: Status): Boolean = {
+ super.proposeStatusChange(oldStatus, newStatus) &&
+ proposeStatusChangeImpl.map { _(oldStatus, newStatus) }.getOrElse(true)
+ }
+
+ override def statusChanged(oldStatus: Status, newStatus: Status, immediate: Boolean) {
+ super.statusChanged(oldStatus, newStatus, immediate)
+ statusChangedImpl.foreach { _(oldStatus, newStatus, immediate) }
+ }
+ }
+ serverStatus.start()
+ serverStatus
+ }
+
+ def withServerStatus(f: (ServerStatus, TimeControl) => Unit) {
+ withCustomizedServerStatus(None, None, None)(f)
+ }
+
+ def withCustomizedServerStatus(
+ proposeStatusChangeImpl: Option[(Status, Status) => Boolean] = None,
+ statusChangedImpl: Option[(Status, Status, Boolean) => Unit] = None,
+ statusChangeGracePeriod: Option[Duration] = None)
+ (f: (ServerStatus, TimeControl) => Unit) {
+ withTempFolder {
+ Time.withCurrentTimeFrozen { mutator =>
+ val serverStatus = makeServerStatus(proposeStatusChangeImpl, statusChangedImpl, statusChangeGracePeriod)
+ f(serverStatus, mutator)
+ }
+ }
+ }
+
+ def storedStatus(): String = {
+ val reader = new BufferedReader(new InputStreamReader(new FileInputStream(statusFile), "UTF-8"))
+ try {
+ reader.readLine
+ } finally {
+ reader.close()
+ }
+ }
+
+ def writeStatus(status: Status) {
+ new File(statusFile).getParentFile.mkdirs()
+ val writer = new OutputStreamWriter(new FileOutputStream(statusFile), "UTF-8")
+ try {
+ writer.write(status.toString)
+ } finally {
+ writer.close()
+ }
+ }
+
+ "Status" should {
+ "parse from strings to objects" in {
+ Map("Up" -> Up,
+ "Down" -> Down,
+ "ReadOnly" -> ReadOnly,
+ "Quiescent" -> Quiescent,
+ "up" -> Up,
+ "DOWN" -> Down,
+ "readOnly" -> ReadOnly,
+ "QuIeScEnT" -> Quiescent).foreach {
+ case (name, expected) =>
+ Status.unapply(name) mustEqual Some(expected)
+ }
+ }
+
+ "handle unknown status" in {
+ Status.unapply("wut?") mustEqual None
+ }
+ }
+
+ "ServerStatus" should {
+ "start only once" in {
+ withServerStatus { (serverStatus, _) =>
+ serverStatus.start() must throwA[Exception]
+ }
+ }
+
+ "status" in {
+ "default to quiescent" in {
+ withServerStatus { (serverStatus, _) =>
+ serverStatus.status mustEqual Quiescent
+ }
+ }
+
+ "load previously stored status" in {
+ withTempFolder {
+ writeStatus(ReadOnly)
+ val serverStatus = makeServerStatus()
+ serverStatus.status mustEqual ReadOnly
+ }
+ }
+
+ "switch to other statuses" in {
+ withServerStatus { (serverStatus, tc) =>
+ serverStatus.markUp()
+ serverStatus.status mustEqual Up
+
+ serverStatus.markReadOnly()
+ serverStatus.status mustEqual ReadOnly
+
+ tc.advance(31.seconds)
+ mockTimer.tick()
+
+ serverStatus.markQuiescent()
+ serverStatus.status mustEqual Quiescent
+ }
+ }
+
+ "switch to other statuses by values" in {
+ withServerStatus { (serverStatus, tc) =>
+ serverStatus.setStatus(Up)
+ serverStatus.status mustEqual Up
+
+ serverStatus.setStatus(ReadOnly)
+ serverStatus.status mustEqual ReadOnly
+
+ tc.advance(31.seconds)
+ mockTimer.tick()
+
+ serverStatus.setStatus(Quiescent)
+ serverStatus.status mustEqual Quiescent
+ }
+ }
+
+ "switch to other statuses by names" in {
+ withServerStatus { (serverStatus, tc) =>
+ serverStatus.setStatus("up")
+ serverStatus.status mustEqual Up
+
+ serverStatus.setStatus("readonly")
+ serverStatus.status mustEqual ReadOnly
+
+ tc.advance(31.seconds)
+ mockTimer.tick()
+
+ serverStatus.setStatus("quiescent")
+ serverStatus.status mustEqual Quiescent
+ }
+ }
+
+ "not switch to invalid statuses" in {
+ withServerStatus { (serverStatus, _) =>
+ serverStatus.setStatus(Down) must throwA[ForbiddenStatusException]
+ serverStatus.setStatus("down") must throwA[ForbiddenStatusException]
+
+ serverStatus.setStatus(null: Status) must throwA[UnknownStatusException]
+ serverStatus.setStatus(null: String) must throwA[UnknownStatusException]
+
+ serverStatus.setStatus("trolling") must throwA[UnknownStatusException]
+ }
+ }
+
+ "switch to dead on shutdown" in {
+ withServerStatus { (serverStatus, _) =>
+ serverStatus.markUp()
+ serverStatus.status mustEqual Up
+
+ serverStatus.shutdown()
+ serverStatus.status mustEqual Down
+ }
+ }
+
+ "store status after successful change" in {
+ withServerStatus { (serverStatus, _) =>
+ new File(statusFile).exists() mustEqual false
+ serverStatus.markUp()
+ storedStatus() mustEqual "Up"
+ }
+ }
+
+ "not change status if previous change has not yet completed" in {
+ withServerStatus { (serverStatus, tc) =>
+ serverStatus.setStatus(Up) mustEqual Up
+ serverStatus.setStatus(ReadOnly) mustEqual ReadOnly
+ serverStatus.setStatus(Quiescent) mustEqual ReadOnly // rejected change
+
+ tc.advance(31.seconds)
+ mockTimer.tick()
+
+ serverStatus.setStatus(Quiescent) mustEqual Quiescent
+ }
+ }
+
+ "not change status if proposed change is rejected" in {
+ var changes = 0
+ val pscImpl = (oldStatus: Status, newStatus: Status) => {
+ changes += 1
+ changes <= 1
+ }
+ withCustomizedServerStatus(proposeStatusChangeImpl = Some(pscImpl)) { (serverStatus, tc) =>
+ serverStatus.markQuiescent()
+ storedStatus() mustEqual "Quiescent"
+
+ tc.advance(31.seconds)
+ mockTimer.tick()
+
+ serverStatus.markUp()
+ serverStatus.status mustEqual Quiescent
+ storedStatus() mustEqual "Quiescent"
+ }
+ }
+
+ "not store status after unsuccessful change" in {
+ var changes = 0
+ val scImpl = (oldStatus: Status, newStatus: Status, immediate: Boolean) => {
+ changes += 1
+ if (changes > 1) throw new RuntimeException("boom")
+ }
+ withCustomizedServerStatus(statusChangedImpl = Some(scImpl)) { (serverStatus, tc) =>
+ serverStatus.markQuiescent()
+ storedStatus() mustEqual "Quiescent"
+
+ tc.advance(31.seconds)
+ mockTimer.tick()
+
+ serverStatus.markUp() must throwA[RuntimeException]
+ storedStatus() mustEqual "Quiescent"
+ }
+ }
+
+ "not store dead status on shutdown" in {
+ withServerStatus { (serverStatus, _) =>
+ serverStatus.markUp()
+ storedStatus() mustEqual "Up"
+
+ serverStatus.shutdown()
+ serverStatus.status mustEqual Down
+ storedStatus() mustEqual "Up"
+ }
+ }
+
+ "update stats on status change" in {
+ Stats.clearAll()
+ withCustomizedServerStatus(statusChangeGracePeriod = Some(0.seconds)) { (serverStatus, _) =>
+ serverStatus.setStatus(Up)
+ serverStatus.status mustEqual Up
+ Stats.getLabel("status") mustEqual Some("Up")
+ Stats.getGauge("status/readable") mustEqual Some(1.0)
+ Stats.getGauge("status/writeable") mustEqual Some(1.0)
+
+ serverStatus.setStatus(ReadOnly)
+ serverStatus.status mustEqual ReadOnly
+ Stats.getLabel("status") mustEqual Some("ReadOnly")
+ Stats.getGauge("status/readable") mustEqual Some(1.0)
+ Stats.getGauge("status/writeable") mustEqual Some(0.0)
+
+ serverStatus.setStatus(Quiescent)