Permalink
Browse files

introduce queue aliases (including stats)

RB_ID=75040
  • Loading branch information...
1 parent 886a997 commit ff976b0f6a35841b3a72de72819c9c267ade50f7 Stephan Zuercher committed Jul 12, 2012
View
@@ -65,6 +65,14 @@ new KestrelConfig {
syncJournal = 10.milliseconds
}
+ aliases = new AliasBuilder {
+ name = "wx_updates"
+ destinationQueues = List("weather_updates")
+ } :: new AliasBuilder {
+ name = "spam_all"
+ destinationQueues = List("spam", "spam0")
+ }
+
loggers = new LoggerConfig {
level = Level.INFO
handlers = new FileHandlerConfig {
View
@@ -175,6 +175,14 @@ is created, and it will start receiving new items written to the parent queue.
Existing items are not copied over. A fanout queue can be deleted to stop it
from receiving new items.
+Queue Aliases
+-------------
+
+Queue aliases are somewhat similar to fanout queues, but without a required
+naming convention or implicit creation of child queues. A queue alias can
+only be used in set operations. Kestrel responds to attempts to retrieve
+items from the alias as if it were an empty queue. Delete and flush requests
+are also ignored.
Thrift protocol
---------------
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ * Copyright 2012 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.ostrich.stats.Stats
+import com.twitter.util.Time
+import java.util.concurrent.atomic.AtomicLong
+import config._
+
+class AliasedQueue(val name: String, @volatile var config: AliasConfig,
+ queueLookup: (String => Option[PersistentQueue])) {
+
+ def statNamed(statName: String) = "q/" + name + "/" + statName
+
+ // # of items EVER added to the alias:
+ val putItems = new AtomicLong(0)
+ Stats.removeCounter(statNamed("put_items"))
+ Stats.makeCounter(statNamed("put_items"), putItems)
+
+ // # of bytes EVER added to the alias:
+ val putBytes = new AtomicLong(0)
+ Stats.removeCounter(statNamed("put_bytes"))
+ Stats.makeCounter(statNamed("put_bytes"), putBytes)
+
+ val createTime: Long = Time.now.inSeconds
+ Stats.addGauge(statNamed("create_time"))(createTime)
+
+ /**
+ * Add a value to the end of the aliased queue(s).
+ */
+ def add(value: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
+ putItems.getAndIncrement()
+ putBytes.getAndAdd(value.length)
+
+ config.destinationQueues.foldLeft(true) { case (result, name) =>
+ val thisResult = queueLookup(name) match {
+ case Some(q) => q.add(value, expiry, None, addTime)
+ case None => true
+ }
+ result && thisResult
+ }
+ }
+
+ def dumpStats(): Array[(String, String)] = synchronized {
+ Array(
+ ("put_items", putItems.toString),
+ ("put_bytes", putBytes.toString),
+ ("children", config.destinationQueues.size.toString)
+ )
+ }
+}
@@ -39,7 +39,7 @@ import org.jboss.netty.util.{HashedWheelTimer, Timer => NettyTimer}
import scala.collection.{immutable, mutable}
import config._
-class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
+class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], aliases: List[AliasBuilder],
listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int],
thriftListenPort: Option[Int], queuePath: String,
expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration],
@@ -128,7 +128,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
try {
queueCollection = new QueueCollection(queuePath, new FinagleTimer(timer), journalSyncScheduler,
- defaultQueueConfig, builders)
+ defaultQueueConfig, builders, aliases)
queueCollection.loadQueues()
} catch {
case e: InaccessibleQueuePath =>
@@ -216,8 +216,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
}
}
- def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
- queueCollection.reload(newDefaultQueueConfig, newQueueBuilders)
+ def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder],
+ newAliasBuilders: List[AliasBuilder]) {
+ queueCollection.reload(newDefaultQueueConfig, newQueueBuilders, newAliasBuilders)
}
}
@@ -27,10 +27,12 @@ import com.twitter.util.{Duration, Future, Time, Timer}
import config._
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
+class UndefinedAlias(name: String) extends Exception("Undefined alias: %s".format(name))
class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: ScheduledExecutorService,
@volatile private var defaultQueueConfig: QueueConfig,
- @volatile var queueBuilders: List[QueueBuilder]) {
+ @volatile var queueBuilders: List[QueueBuilder],
+ @volatile var aliasBuilders: List[AliasBuilder]) {
private val log = Logger.get(getClass.getName)
private val path = new File(queueFolder)
@@ -44,9 +46,18 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
private val queues = new mutable.HashMap[String, PersistentQueue]
private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]]
+ private val aliases = new mutable.HashMap[String, AliasedQueue]
@volatile private var shuttingDown = false
@volatile private var queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
+ @volatile private var aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
+
+ private def checkNames {
+ val duplicates = queueConfigMap.keySet & aliasConfigMap.keySet
+ if (!duplicates.isEmpty) {
+ log.warning("queue name(s) masked by alias(es): %s".format(duplicates.toList.sorted.mkString(", ")))
+ }
+ }
private def buildQueue(name: String, realName: String, path: String) = {
if ((realName contains ".") || (realName contains "/") || (realName contains "~")) {
@@ -61,12 +72,36 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
// preload any queues
def loadQueues() {
Journal.getQueueNamesFromFolder(path) map { queue(_) }
+ createAliases()
+ }
+
+ def createAliases(): Unit = synchronized {
+ checkNames
+ aliasConfigMap.foreach { case (name, config) =>
+ aliases.get(name) match {
+ case Some(alias) =>
+ alias.config = config
+ case None =>
+ log.info("Setting up alias %s: %s", name, config)
+ val alias = new AliasedQueue(name, config, this.apply)
+ aliases(name) = alias
+ }
+ }
}
- def queueNames: List[String] = synchronized {
- queues.keys.toList
+ def queueNames(excludeAliases: Boolean): List[String] = {
+ val names = synchronized {
+ if (excludeAliases) {
+ queues.keys
+ } else {
+ queues.keys ++ aliases.keys
+ }
+ }
+ names.toList
}
+ def queueNames: List[String] = queueNames(false)
+
def currentItems = queues.values.foldLeft(0L) { _ + _.length }
def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
def reservedMemoryRatio = {
@@ -75,14 +110,18 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
lazy val systemMaxHeapBytes = Runtime.getRuntime.maxMemory
- def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
+ def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder],
+ newAliasBuilders: List[AliasBuilder]) {
defaultQueueConfig = newDefaultQueueConfig
queueBuilders = newQueueBuilders
queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
queues.foreach { case (name, queue) =>
val configName = if (name contains '+') name.split('+')(0) else name
queue.config = queueConfigMap.get(configName).getOrElse(defaultQueueConfig)
}
+ aliasBuilders = newAliasBuilders
+ aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
+ createAliases()
}
/**
@@ -97,7 +136,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
if (shuttingDown) {
None
} else if (create) {
- Some(queues.get(name) getOrElse {
+ queues.get(name) orElse {
// only happens when creating a queue for the first time.
val q = if (name contains '+') {
val master = name.split('+')(0)
@@ -110,8 +149,8 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
q.setup
queues(name) = q
- q
- })
+ Some(q)
+ }
} else {
queues.get(name)
}
@@ -120,22 +159,38 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
def apply(name: String) = queue(name)
/**
+ * Get an alias, creating it if necessary.
+ */
+ def alias(name: String): Option[AliasedQueue] = synchronized {
+ if (shuttingDown) {
+ None
+ } else {
+ aliases.get(name)
+ }
+ }
+
+ /**
* Add an item to a named queue. Will not return until the item has been synchronously added
* and written to the queue journal file.
*
* @return true if the item was added; false if the server is shutting down
*/
def add(key: String, item: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
- for (fanouts <- fanout_queues.get(key); name <- fanouts) {
- add(name, item, expiry, addTime)
- }
+ alias(key) match {
+ case Some(alias) =>
+ alias.add(item, expiry, addTime)
+ case None =>
+ for (fanouts <- fanout_queues.get(key); name <- fanouts) {
+ add(name, item, expiry, addTime)
+ }
- queue(key) match {
- case None => false
- case Some(q) =>
- val result = q.add(item, expiry, None, addTime)
- if (result) Stats.incr("total_items")
- result
+ queue(key) match {
+ case None => false
+ case Some(q) =>
+ val result = q.add(item, expiry, None, addTime)
+ if (result) Stats.incr("total_items")
+ result
+ }
}
}
@@ -147,6 +202,11 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
* the requested time, or the server is shutting down, None is passed.
*/
def remove(key: String, deadline: Option[Time] = None, transaction: Boolean = false, peek: Boolean = false): Future[Option[QItem]] = {
+ if (alias(key).isDefined) {
+ // make remove from alias return "no items"
+ return Future.value(None)
+ }
+
queue(key) match {
case None =>
Future.value(None)
@@ -205,7 +265,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
}
- def expireQueue(name: String): Unit = {
+ def expireQueue(name: String): Unit = synchronized {
if (!shuttingDown) {
queues.get(name) map { q =>
if (q.isReadyForExpiration) {
@@ -218,18 +278,25 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
def flushAllExpired(limit: Boolean = false): Int = {
- queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
+ queueNames(true).foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
}
def deleteExpiredQueues(): Unit = {
- queueNames.map { qName => expireQueue(qName) }
+ queueNames(true).map { qName => expireQueue(qName) }
}
- def stats(key: String): Array[(String, String)] = queue(key, false) match {
- case None => Array[(String, String)]()
- case Some(q) =>
- q.dumpStats() ++
- fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
+ def stats(key: String): Array[(String, String)] = {
+ queue(key, false) match {
+ case Some(q) =>
+ q.dumpStats() ++
+ fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
+ case None =>
+ // check for alias under this name
+ alias(key) match {
+ case Some(a) => a.dumpStats()
+ case None => Array[(String, String)]()
+ }
+ }
}
/**
Oops, something went wrong.

0 comments on commit ff976b0

Please sign in to comment.