Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
kestrel: server status & zookeeper server sets
Browse files Browse the repository at this point in the history
RB_ID=81686
  • Loading branch information
Stephan Zuercher committed Aug 24, 2012
1 parent 8c95afc commit 1ae7657
Show file tree
Hide file tree
Showing 22 changed files with 2,342 additions and 136 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -15,6 +15,9 @@ ignore/
go go
kestrel.sublime-* kestrel.sublime-*
kcluster-*.gem kcluster-*.gem
*~
[#]*[#]
.[#]*
.ensime .ensime
.ensime/ .ensime/
.ensime_lucene/ .ensime_lucene/
8 changes: 8 additions & 0 deletions ChangeLog
@@ -1,3 +1,11 @@
2.4.0
-----
release: TBD

- Server status: up, read-only, quiescent
- Support for service discovery via ZooKeeper
- Kestrel now ignores directories in its queue path.

2.3.2 2.3.2
----- -----
release: 23 Aug 2012 release: 23 Aug 2012
Expand Down
97 changes: 91 additions & 6 deletions docs/guide.md
Expand Up @@ -2,7 +2,8 @@
A working guide to kestrel A working guide to kestrel
========================== ==========================


Kestrel is a very simple message queue that runs on the JVM. It supports multiple protocols: Kestrel is a very simple message queue that runs on the JVM. It supports
multiple protocols:


- memcache: the memcache protocol, with some extensions - memcache: the memcache protocol, with some extensions
- thrift: Apache Thrift-based RPC - thrift: Apache Thrift-based RPC
Expand All @@ -26,9 +27,10 @@ case-sensitive.


A cluster of kestrel servers is like a memcache cluster: the servers don't A cluster of kestrel servers is like a memcache cluster: the servers don't
know about each other, and don't do any cross-communication, so you can add as know about each other, and don't do any cross-communication, so you can add as
many as you like. Clients have a list of all servers in the cluster, and pick many as you like. The simplest clients have a list of all servers in the
one at random for each operation. In this way, each queue appears to be spread cluster, and pick one at random for each operation. In this way, each queue
out across every server, with items in a loose ordering. appears to be spread out across every server, with items in a loose ordering.
More advanced clients can find kestrel servers via ZooKeeper.


When kestrel starts up, it scans the journal folder and creates queues based When kestrel starts up, it scans the journal folder and creates queues based
on any journal files it finds there, to restore state to the way it was when on any journal files it finds there, to restore state to the way it was when
Expand All @@ -55,8 +57,8 @@ a server (which can be done over telnet).


To reload the config file on a running server, send "reload" the same way. To reload the config file on a running server, send "reload" the same way.
You should immediately see the changes in "dump_config", to confirm. Reloading You should immediately see the changes in "dump_config", to confirm. Reloading
will only affect queue configuration, not global server configuration. To will only affect queue and alias configuration, not global server configuration.
change the server configuration, restart the server. To change the server configuration, restart the server.


Logging is configured according to `util-logging`. The logging configuration Logging is configured according to `util-logging`. The logging configuration
syntax is described here: syntax is described here:
Expand Down Expand Up @@ -322,6 +324,16 @@ The kestrel implementation of the memcache protocol commands is described below.
to a `MONITOR` command, to confirm the items that arrived during the monitor to a `MONITOR` command, to confirm the items that arrived during the monitor
period. period.


- `STATUS`

Displays the kestrel server's current status (see section on Server Status,
below).

- `STATUS <new-status>`

Switches the kestrel server's current status to the given status (see section
on Server Status, below).



#### Reliable reads #### Reliable reads
------------------- -------------------
Expand Down Expand Up @@ -375,6 +387,79 @@ memcache protocol instead.
The text protocol does not support reliable reads. The text protocol does not support reliable reads.




Server Status
-------------

Each kestrel server maintains its current status. Normal statuses are

- `Up`: the server is available for all operations
- `ReadOnly`: the server is available for non-modifying operations only;
commands that modify queues (set, delete, flush) are rejected as
errors.
- `Quiescent`: the server rejects as an error operations on any queue. One
notable exception is transactions begun before the server entered
the quiesecent state may still be confirmed.

One additional status is `Down`, which is only used transiently when kestrel is
in the process of shutting down.

The server's current status is persisted (specified in
[KestrelConfig](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html)).
When kestrel is restarted it automatically returns to it's previous status,
based on the value in the status file. If the status file does not exist or
cannot be read, kestrel uses a default status, also configured in KestrelConfig.

When changing from a less restrictive status to a more restrictive status
(e.g., from `Up` to `ReadOnly` or from `ReadOnly` to `Quiescent`), the
config option `statusChangeGracePeriod` determines how long kestrel will
continue to allow restricted operations to continue before it begins rejecting
them. This allows clients that are aware of the kestrel server's status a
grace period to learn the new status and cease the forbidden operations before
beginning to encounter errors.

### ZooKeeper Server Sets
-------------------------

Kestrel uses Twitter's ServerSet library to support discovery of kestrel
servers allowing a given operation. The ServerSet class is documented here:
[ServerSet](http://twitter.github.com/commons/apidocs/index.html#com.twitter.common.zookeeper.ServerSet)

If the optional `zookeeper` field of `KestrelConfig` is specified, kestrel will
attempt to use the given configuration to join a logical set of kestrel servers.
The ZooKeeper host, port and other connection options are documented here:
[ZooKeeperBuilder](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/ZooKeeperBuilder.html)

Kestrel servers will join 0, 1, or 2 server sets depending on their current
status. When `Up`, the server joins two server sets: one for writes and one for
reads. When `ReadOnly`, the server joins only the read set. When `Quiescent`,
the server joins no sets. ZooKeeper-aware kestrel clients can watch the
server set for changes and adjust their connections accordingly. The
`statusChangeGracePeriod` configuration option may be used to allow clients
time to detect and react to the status change before they begin receiving
errors from kestrel.

The ZooKeeper path used to register the server set
is based on the `pathPrefix` option. Kestrel automatically appends `/write` and
`/read` to distinguish the write and read sets.

Kestrel advertises all of its endpoints in each server set that it joins.
The default endpoint is memcache, if configured. The default endpoint falls
back to the thrift endpoint and then the text protocol endpoint. All three
endpoints are advertised as additional endpoints under the names `memcache`,
`thrift` and `text`.

Consider setting the `defaultStatus` option to `Quiescent` to prevent kestrel
from prematurely advertising its status via ZooKeeper.

Installations that require additional customization of ZooKeeper credentials,
or other site-specific ZooKeeper initialization can override the
`clientInitializer` and `serverSetInitializer` options to invoke the
necessary site-specific code. The recommended implementation is to place
the site-specific code in its own JAR file, take the necessary steps to
include the JAR in kestrel's class path, and place as little logic as possible
in the kestrel configuration file.


Server stats Server stats
------------ ------------


Expand Down
3 changes: 2 additions & 1 deletion project/Build.scala
Expand Up @@ -28,8 +28,9 @@ object Kestrel extends Build {
"com.twitter" %% "finagle-core" % finagleVersion, "com.twitter" %% "finagle-core" % finagleVersion,
"com.twitter" %% "finagle-ostrich4" % finagleVersion, "com.twitter" %% "finagle-ostrich4" % finagleVersion,
"com.twitter" %% "finagle-thrift" % finagleVersion, // override scrooge's version "com.twitter" %% "finagle-thrift" % finagleVersion, // override scrooge's version
"org.jboss.netty" % "netty" % "3.2.6.Final", "commons-codec" % "commons-codec" % "1.6", // override scrooge/util-codec's version
"com.twitter" %% "scrooge-runtime" % "1.1.3", "com.twitter" %% "scrooge-runtime" % "1.1.3",
"com.twitter.common.zookeeper" % "server-set" % "1.0.14",


// for tests only: // for tests only:
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test", "org.scala-tools.testing" %% "specs" % "1.6.9" % "test",
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/net/lag/kestrel/Journal.scala
Expand Up @@ -503,7 +503,11 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut


object Journal { object Journal {
def getQueueNamesFromFolder(path: File): Set[String] = { def getQueueNamesFromFolder(path: File): Set[String] = {
path.list().filter { name => path.listFiles().filter { file =>
!file.isDirectory()
}.map { file =>
file.getName
}.filter { name =>
!(name contains "~~") !(name contains "~~")
}.map { name => }.map { name =>
name.split('.')(0) name.split('.')(0)
Expand Down
57 changes: 45 additions & 12 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Expand Up @@ -27,11 +27,11 @@ import com.twitter.finagle.util.{Timer => FinagleTimer}
import com.twitter.logging.Logger import com.twitter.logging.Logger
import com.twitter.naggati.Codec import com.twitter.naggati.Codec
import com.twitter.naggati.codec.{MemcacheResponse, MemcacheRequest, MemcacheCodec} import com.twitter.naggati.codec.{MemcacheResponse, MemcacheRequest, MemcacheCodec}
import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service, import com.twitter.ostrich.admin.{PeriodicBackgroundProcess, RuntimeEnvironment, Service, ServiceTracker}
ServiceTracker}
import com.twitter.ostrich.stats.Stats import com.twitter.ostrich.stats.Stats
import com.twitter.util.{Duration, Eval, Future, Time} import com.twitter.util.{Duration, Eval, Future, Time}
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.Collections._
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.protocol.TBinaryProtocol
Expand All @@ -43,7 +43,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int], listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int],
thriftListenPort: Option[Int], queuePath: String, thriftListenPort: Option[Int], queuePath: String,
expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration], expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration],
maxOpenTransactions: Int, connectionBacklog: Option[Int]) maxOpenTransactions: Int, connectionBacklog: Option[Int], statusFile: String,
defaultStatus: Status, statusChangeGracePeriod: Duration,
zkConfig: Option[ZooKeeperConfig])
extends Service { extends Service {
private val log = Logger.get(getClass.getName) private val log = Logger.get(getClass.getName)


Expand All @@ -55,6 +57,8 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
var thriftService: Option[Server] = None var thriftService: Option[Server] = None
var expirationBackgroundProcess: Option[PeriodicBackgroundProcess] = None var expirationBackgroundProcess: Option[PeriodicBackgroundProcess] = None


var serverStatus: ServerStatus = null

def thriftCodec = ThriftServerFramedCodec() def thriftCodec = ThriftServerFramedCodec()


private def finagledCodec[Req, Resp](codec: => Codec[Resp]) = { private def finagledCodec[Req, Resp](codec: => Codec[Resp]) = {
Expand Down Expand Up @@ -82,7 +86,8 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali


def startThriftServer( def startThriftServer(
name: String, name: String,
port: Int port: Int,
fTimer: FinagleTimer
): Server = { ): Server = {
val address = new InetSocketAddress(listenAddress, port) val address = new InetSocketAddress(listenAddress, port)
var builder = ServerBuilder() var builder = ServerBuilder()
Expand All @@ -94,8 +99,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) } clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) }
// calling build() is equivalent to calling start() in finagle. // calling build() is equivalent to calling start() in finagle.
builder.build(connection => { builder.build(connection => {
val handler = new ThriftHandler(connection, queueCollection, maxOpenTransactions, val handler = new ThriftHandler(connection, queueCollection, maxOpenTransactions, fTimer, Some(serverStatus))
new FinagleTimer(timer))
new ThriftFinagledService(handler, new TBinaryProtocol.Factory()) new ThriftFinagledService(handler, new TBinaryProtocol.Factory())
}) })
} }
Expand All @@ -110,9 +114,11 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali


def start() { def start() {
log.info("Kestrel config: listenAddress=%s memcachePort=%s textPort=%s queuePath=%s " + log.info("Kestrel config: listenAddress=%s memcachePort=%s textPort=%s queuePath=%s " +
"expirationTimerFrequency=%s clientTimeout=%s maxOpenTransactions=%d connectionBacklog=%s", "expirationTimerFrequency=%s clientTimeout=%s maxOpenTransactions=%d connectionBacklog=%s " +
"statusFile=%s defaultStatus=%s statusChangeGracePeriod=%s zookeeper=<%s>",
listenAddress, memcacheListenPort, textListenPort, queuePath, listenAddress, memcacheListenPort, textListenPort, queuePath,
expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog) expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog,
statusFile, defaultStatus, statusChangeGracePeriod, zkConfig)


Stats.setLabel("version", Kestrel.runtime.jarVersion) Stats.setLabel("version", Kestrel.runtime.jarVersion)


Expand All @@ -129,8 +135,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
} }
}) })


val finagleTimer = new FinagleTimer(timer)
try { try {
queueCollection = new QueueCollection(queuePath, new FinagleTimer(timer), journalSyncScheduler, queueCollection = new QueueCollection(queuePath, finagleTimer, journalSyncScheduler,
defaultQueueConfig, builders, aliases) defaultQueueConfig, builders, aliases)
queueCollection.loadQueues() queueCollection.loadQueues()
} catch { } catch {
Expand All @@ -143,13 +150,22 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
Stats.addGauge("bytes") { queueCollection.currentBytes.toDouble } Stats.addGauge("bytes") { queueCollection.currentBytes.toDouble }
Stats.addGauge("reserved_memory_ratio") { queueCollection.reservedMemoryRatio } Stats.addGauge("reserved_memory_ratio") { queueCollection.reservedMemoryRatio }


serverStatus =
zkConfig.map { cfg =>
new ZooKeeperServerStatus(cfg, statusFile, finagleTimer, defaultStatus,
statusChangeGracePeriod)
} getOrElse {
new ServerStatus(statusFile, finagleTimer, defaultStatus, statusChangeGracePeriod)
}
serverStatus.start()

// finagle setup: // finagle setup:
val memcachePipelineFactoryCodec = finagledCodec[MemcacheRequest, MemcacheResponse] { val memcachePipelineFactoryCodec = finagledCodec[MemcacheRequest, MemcacheResponse] {
MemcacheCodec.asciiCodec(bytesRead, bytesWritten) MemcacheCodec.asciiCodec(bytesRead, bytesWritten)
} }
memcacheService = memcacheListenPort.map { port => memcacheService = memcacheListenPort.map { port =>
startFinagleServer("kestrel-memcache", port, memcachePipelineFactoryCodec) { connection => startFinagleServer("kestrel-memcache", port, memcachePipelineFactoryCodec) { connection =>
new MemcacheHandler(connection, queueCollection, maxOpenTransactions) new MemcacheHandler(connection, queueCollection, maxOpenTransactions, Some(serverStatus))
} }
} }


Expand All @@ -158,11 +174,13 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
} }
textService = textListenPort.map { port => textService = textListenPort.map { port =>
startFinagleServer("kestrel-text", port, textPipelineFactory) { connection => startFinagleServer("kestrel-text", port, textPipelineFactory) { connection =>
new TextHandler(connection, queueCollection, maxOpenTransactions) new TextHandler(connection, queueCollection, maxOpenTransactions, Some(serverStatus))
} }
} }


thriftService = thriftListenPort.map { port => startThriftServer("kestrel-thrift", port) } thriftService = thriftListenPort.map { port =>
startThriftServer("kestrel-thrift", port, finagleTimer)
}


// optionally, start a periodic timer to clean out expired items. // optionally, start a periodic timer to clean out expired items.
expirationBackgroundProcess = expirationTimerFrequency.map { period => expirationBackgroundProcess = expirationTimerFrequency.map { period =>
Expand All @@ -181,6 +199,21 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
proc.start() proc.start()
proc proc
} }

// Order is important: the main endpoint published in zookeeper is the
// first configured protocol in the list: memcache, thrift, text.
val endpoints =
memcacheService.map { s => "memcache" -> s.localAddress } ++
thriftService.map { s => "thrift" -> s.localAddress } ++
textService.map { s => "text" -> s.localAddress }
if (endpoints.nonEmpty) {
val mainEndpoint = endpoints.head._1
val inetEndpoints =
endpoints.map { case (name, addr) => (name, addr.asInstanceOf[InetSocketAddress]) }
serverStatus.addEndpoints(mainEndpoint, inetEndpoints.toMap)
} else {
log.error("No protocols configured; set a listener port for at least one protocol.")
}
} }


def shutdown() { def shutdown() {
Expand Down

0 comments on commit 1ae7657

Please sign in to comment.