Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

introduce queue aliases (including stats)

RB_ID=75040
  • Loading branch information...
commit ff976b0f6a35841b3a72de72819c9c267ade50f7 1 parent 886a997
Stephan Zuercher authored
View
8 config/development.scala
@@ -65,6 +65,14 @@ new KestrelConfig {
syncJournal = 10.milliseconds
}
+ aliases = new AliasBuilder {
+ name = "wx_updates"
+ destinationQueues = List("weather_updates")
+ } :: new AliasBuilder {
+ name = "spam_all"
+ destinationQueues = List("spam", "spam0")
+ }
+
loggers = new LoggerConfig {
level = Level.INFO
handlers = new FileHandlerConfig {
View
8 docs/guide.md
@@ -175,6 +175,14 @@ is created, and it will start receiving new items written to the parent queue.
Existing items are not copied over. A fanout queue can be deleted to stop it
from receiving new items.
+Queue Aliases
+-------------
+
+Queue aliases are somewhat similar to fanout queues, but without a required
+naming convention or implicit creation of child queues. A queue alias can
+only be used in set operations. Kestrel responds to attempts to retrieve
+items from the alias as if it were an empty queue. Delete and flush requests
+are also ignored.
Thrift protocol
---------------
View
66 src/main/scala/net/lag/kestrel/AliasedQueue.scala
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ * Copyright 2012 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+
+import com.twitter.ostrich.stats.Stats
+import com.twitter.util.Time
+import java.util.concurrent.atomic.AtomicLong
+import config._
+
+class AliasedQueue(val name: String, @volatile var config: AliasConfig,
+ queueLookup: (String => Option[PersistentQueue])) {
+
+ def statNamed(statName: String) = "q/" + name + "/" + statName
+
+ // # of items EVER added to the alias:
+ val putItems = new AtomicLong(0)
+ Stats.removeCounter(statNamed("put_items"))
+ Stats.makeCounter(statNamed("put_items"), putItems)
+
+ // # of bytes EVER added to the alias:
+ val putBytes = new AtomicLong(0)
+ Stats.removeCounter(statNamed("put_bytes"))
+ Stats.makeCounter(statNamed("put_bytes"), putBytes)
+
+ val createTime: Long = Time.now.inSeconds
+ Stats.addGauge(statNamed("create_time"))(createTime)
+
+ /**
+ * Add a value to the end of the aliased queue(s).
+ */
+ def add(value: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
+ putItems.getAndIncrement()
+ putBytes.getAndAdd(value.length)
+
+ config.destinationQueues.foldLeft(true) { case (result, name) =>
+ val thisResult = queueLookup(name) match {
+ case Some(q) => q.add(value, expiry, None, addTime)
+ case None => true
+ }
+ result && thisResult
+ }
+ }
+
+ def dumpStats(): Array[(String, String)] = synchronized {
+ Array(
+ ("put_items", putItems.toString),
+ ("put_bytes", putBytes.toString),
+ ("children", config.destinationQueues.size.toString)
+ )
+ }
+}
View
9 src/main/scala/net/lag/kestrel/Kestrel.scala
@@ -39,7 +39,7 @@ import org.jboss.netty.util.{HashedWheelTimer, Timer => NettyTimer}
import scala.collection.{immutable, mutable}
import config._
-class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
+class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], aliases: List[AliasBuilder],
listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int],
thriftListenPort: Option[Int], queuePath: String,
expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration],
@@ -128,7 +128,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
try {
queueCollection = new QueueCollection(queuePath, new FinagleTimer(timer), journalSyncScheduler,
- defaultQueueConfig, builders)
+ defaultQueueConfig, builders, aliases)
queueCollection.loadQueues()
} catch {
case e: InaccessibleQueuePath =>
@@ -216,8 +216,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
}
}
- def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
- queueCollection.reload(newDefaultQueueConfig, newQueueBuilders)
+ def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder],
+ newAliasBuilders: List[AliasBuilder]) {
+ queueCollection.reload(newDefaultQueueConfig, newQueueBuilders, newAliasBuilders)
}
}
View
115 src/main/scala/net/lag/kestrel/QueueCollection.scala
@@ -27,10 +27,12 @@ import com.twitter.util.{Duration, Future, Time, Timer}
import config._
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
+class UndefinedAlias(name: String) extends Exception("Undefined alias: %s".format(name))
class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: ScheduledExecutorService,
@volatile private var defaultQueueConfig: QueueConfig,
- @volatile var queueBuilders: List[QueueBuilder]) {
+ @volatile var queueBuilders: List[QueueBuilder],
+ @volatile var aliasBuilders: List[AliasBuilder]) {
private val log = Logger.get(getClass.getName)
private val path = new File(queueFolder)
@@ -44,9 +46,18 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
private val queues = new mutable.HashMap[String, PersistentQueue]
private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]]
+ private val aliases = new mutable.HashMap[String, AliasedQueue]
@volatile private var shuttingDown = false
@volatile private var queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
+ @volatile private var aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
+
+ private def checkNames {
+ val duplicates = queueConfigMap.keySet & aliasConfigMap.keySet
+ if (!duplicates.isEmpty) {
+ log.warning("queue name(s) masked by alias(es): %s".format(duplicates.toList.sorted.mkString(", ")))
+ }
+ }
private def buildQueue(name: String, realName: String, path: String) = {
if ((realName contains ".") || (realName contains "/") || (realName contains "~")) {
@@ -61,12 +72,36 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
// preload any queues
def loadQueues() {
Journal.getQueueNamesFromFolder(path) map { queue(_) }
+ createAliases()
+ }
+
+ def createAliases(): Unit = synchronized {
+ checkNames
+ aliasConfigMap.foreach { case (name, config) =>
+ aliases.get(name) match {
+ case Some(alias) =>
+ alias.config = config
+ case None =>
+ log.info("Setting up alias %s: %s", name, config)
+ val alias = new AliasedQueue(name, config, this.apply)
+ aliases(name) = alias
+ }
+ }
}
- def queueNames: List[String] = synchronized {
- queues.keys.toList
+ def queueNames(excludeAliases: Boolean): List[String] = {
+ val names = synchronized {
+ if (excludeAliases) {
+ queues.keys
+ } else {
+ queues.keys ++ aliases.keys
+ }
+ }
+ names.toList
}
+ def queueNames: List[String] = queueNames(false)
+
def currentItems = queues.values.foldLeft(0L) { _ + _.length }
def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
def reservedMemoryRatio = {
@@ -75,7 +110,8 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
lazy val systemMaxHeapBytes = Runtime.getRuntime.maxMemory
- def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
+ def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder],
+ newAliasBuilders: List[AliasBuilder]) {
defaultQueueConfig = newDefaultQueueConfig
queueBuilders = newQueueBuilders
queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
@@ -83,6 +119,9 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
val configName = if (name contains '+') name.split('+')(0) else name
queue.config = queueConfigMap.get(configName).getOrElse(defaultQueueConfig)
}
+ aliasBuilders = newAliasBuilders
+ aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
+ createAliases()
}
/**
@@ -97,7 +136,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
if (shuttingDown) {
None
} else if (create) {
- Some(queues.get(name) getOrElse {
+ queues.get(name) orElse {
// only happens when creating a queue for the first time.
val q = if (name contains '+') {
val master = name.split('+')(0)
@@ -110,8 +149,8 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
q.setup
queues(name) = q
- q
- })
+ Some(q)
+ }
} else {
queues.get(name)
}
@@ -120,22 +159,38 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
def apply(name: String) = queue(name)
/**
+ * Get an alias, creating it if necessary.
+ */
+ def alias(name: String): Option[AliasedQueue] = synchronized {
+ if (shuttingDown) {
+ None
+ } else {
+ aliases.get(name)
+ }
+ }
+
+ /**
* Add an item to a named queue. Will not return until the item has been synchronously added
* and written to the queue journal file.
*
* @return true if the item was added; false if the server is shutting down
*/
def add(key: String, item: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
- for (fanouts <- fanout_queues.get(key); name <- fanouts) {
- add(name, item, expiry, addTime)
- }
+ alias(key) match {
+ case Some(alias) =>
+ alias.add(item, expiry, addTime)
+ case None =>
+ for (fanouts <- fanout_queues.get(key); name <- fanouts) {
+ add(name, item, expiry, addTime)
+ }
- queue(key) match {
- case None => false
- case Some(q) =>
- val result = q.add(item, expiry, None, addTime)
- if (result) Stats.incr("total_items")
- result
+ queue(key) match {
+ case None => false
+ case Some(q) =>
+ val result = q.add(item, expiry, None, addTime)
+ if (result) Stats.incr("total_items")
+ result
+ }
}
}
@@ -147,6 +202,11 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
* the requested time, or the server is shutting down, None is passed.
*/
def remove(key: String, deadline: Option[Time] = None, transaction: Boolean = false, peek: Boolean = false): Future[Option[QItem]] = {
+ if (alias(key).isDefined) {
+ // make remove from alias return "no items"
+ return Future.value(None)
+ }
+
queue(key) match {
case None =>
Future.value(None)
@@ -205,7 +265,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
}
- def expireQueue(name: String): Unit = {
+ def expireQueue(name: String): Unit = synchronized {
if (!shuttingDown) {
queues.get(name) map { q =>
if (q.isReadyForExpiration) {
@@ -218,18 +278,25 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
def flushAllExpired(limit: Boolean = false): Int = {
- queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
+ queueNames(true).foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
}
def deleteExpiredQueues(): Unit = {
- queueNames.map { qName => expireQueue(qName) }
+ queueNames(true).map { qName => expireQueue(qName) }
}
- def stats(key: String): Array[(String, String)] = queue(key, false) match {
- case None => Array[(String, String)]()
- case Some(q) =>
- q.dumpStats() ++
- fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
+ def stats(key: String): Array[(String, String)] = {
+ queue(key, false) match {
+ case Some(q) =>
+ q.dumpStats() ++
+ fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
+ case None =>
+ // check for alias under this name
+ alias(key) match {
+ case Some(a) => a.dumpStats()
+ case None => Array[(String, String)]()
+ }
+ }
}
/**
View
33 src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
@@ -154,6 +154,30 @@ class QueueBuilder extends Config[QueueConfig] {
}
}
+case class AliasConfig(
+ destinationQueues: List[String]
+) {
+ override def toString() = {
+ ("destinationQueues=[%s]").format(destinationQueues.mkString(", "))
+ }
+}
+
+class AliasBuilder extends Config[AliasConfig] {
+ /**
+ * Name of the alias being configured.
+ */
+ var name: String = null
+
+ /**
+ * List of queues which receive items added to this alias.
+ */
+ var destinationQueues: List[String] = Nil
+
+ def apply() = {
+ AliasConfig(destinationQueues)
+ }
+}
+
trait KestrelConfig extends ServerConfig[Kestrel] {
/**
* Settings for a queue that isn't explicitly listed in `queues`.
@@ -165,6 +189,11 @@ trait KestrelConfig extends ServerConfig[Kestrel] {
*/
var queues: List[QueueBuilder] = Nil
+ /*
+ * Alias configurations.
+ */
+ var aliases: List[AliasBuilder] = Nil
+
/**
* Address to listen for client connections. By default, accept from any interface.
*/
@@ -215,7 +244,7 @@ trait KestrelConfig extends ServerConfig[Kestrel] {
def apply(runtime: RuntimeEnvironment) = {
new Kestrel(
- default(), queues, listenAddress, memcacheListenPort, textListenPort, thriftListenPort,
+ default(), queues, aliases, listenAddress, memcacheListenPort, textListenPort, thriftListenPort,
queuePath, expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog
)
}
@@ -223,6 +252,6 @@ trait KestrelConfig extends ServerConfig[Kestrel] {
def reload(kestrel: Kestrel) {
Logger.configure(loggers)
// only the queue configs can be changed.
- kestrel.reload(default(), queues)
+ kestrel.reload(default(), queues, aliases)
}
}
View
1  src/test/scala/net/lag/kestrel/JournalSpec.scala
@@ -19,6 +19,7 @@ package net.lag.kestrel
import java.io._
import org.specs.Specification
+import com.twitter.logging.TestLogging
import com.twitter.util.{Duration, TempFolder, Time}
class JournalSpec extends Specification with TempFolder with TestLogging with DumpJournal {
View
21 src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.collection.mutable
import scala.util.Sorting
import com.twitter.conversions.time._
+import com.twitter.logging.TestLogging
import com.twitter.ostrich.stats.Stats
import com.twitter.util.{TempFolder, Time, Timer}
import org.specs.Specification
@@ -82,7 +83,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"set and get" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
@@ -94,7 +95,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"track stats" in {
withTempFolder {
Stats.clearAll()
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
Stats.getCounter("cmd_get")() mustEqual 0
@@ -127,7 +128,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"track monitor stats" in {
withTempFolder {
Stats.clearAll()
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
@@ -182,7 +183,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"abort and confirm a read" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.getItem("test", None, true, false)() must beString("one")
@@ -196,7 +197,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"abort reads on a deleted queue without resurrecting the queue" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.getItem("test", None, true, false)() must beString("one")
@@ -211,7 +212,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"open several reads" in {
"on one queue" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
@@ -229,7 +230,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"on several queues" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
@@ -258,7 +259,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"but not if open reads are limited" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 1)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
@@ -269,7 +270,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"obey maxItems" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 5)
val got = new mutable.ListBuffer[QItem]()
handler.setItem("red", 0, None, "red1".getBytes)
@@ -284,7 +285,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
"close all reads" in {
withTempFolder {
- queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
val handler = new FakeKestrelHandler(queues, 2)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
View
1  src/test/scala/net/lag/kestrel/PeriodicSyncFileSpec.scala
@@ -18,6 +18,7 @@
package net.lag.kestrel
import com.twitter.conversions.time._
+import com.twitter.logging.TestLogging
import com.twitter.util.Duration
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
View
1  src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.ostrich.stats.Stats
+import com.twitter.logging.TestLogging
import com.twitter.util.{Duration, TempFolder, Time, Timer, TimerTask}
import org.specs.Specification
import org.specs.matcher.Matcher
View
101 src/test/scala/net/lag/kestrel/QueueCollectionSpec.scala
@@ -20,6 +20,7 @@ package net.lag.kestrel
import java.io.{File, FileInputStream}
import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.util.Sorting
+import com.twitter.logging.{Level, TestLogging}
import com.twitter.util.{TempFolder, Time, Timer}
import com.twitter.conversions.time._
import com.twitter.conversions.storage._
@@ -45,7 +46,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"create a queue" in {
withTempFolder {
Stats.clearAll()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.queueNames mustEqual Nil
Stats.getCounter("queue_creates")() mustEqual 0
Stats.getCounter("queue_deletes")() mustEqual 0
@@ -73,14 +74,14 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"refuse to create a bad queue" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.queue("hello.there") must throwA[Exception]
}
}
"refuse to create a bad fanout queue and not break the master queue" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.queue("the_queue")
qc.queue("the_queue+fanout/open/close") must throwA[Exception]
@@ -88,11 +89,10 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
}
}
-
"report reserved memory usage as a fraction of max heap" in {
withTempFolder {
val maxHeapBytes = config.maxMemorySize.inBytes * 4
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil) {
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil) {
override lazy val systemMaxHeapBytes = maxHeapBytes
}
@@ -106,7 +106,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"load from journal" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.add("ducklings", "huey".getBytes)
qc.add("ducklings", "dewey".getBytes)
qc.add("ducklings", "louie".getBytes)
@@ -115,7 +115,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
qc.currentItems mustEqual 3
qc.shutdown
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.queueNames mustEqual Nil
qc.remove("ducklings")() must beSomeQItem("huey")
// now the queue should be suddenly instantiated:
@@ -127,7 +127,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"queue hit/miss tracking" in {
withTempFolder {
Stats.clearAll()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.add("ducklings", "ugly1".getBytes)
qc.add("ducklings", "ugly2".getBytes)
Stats.getCounter("get_hits")() mustEqual 0
@@ -157,7 +157,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges.101").createNewFile()
new File(folderName + "/oranges.133").createNewFile()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.loadQueues()
qc.queueNames.sorted mustEqual List("apples", "oranges")
}
@@ -168,7 +168,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges").createNewFile()
new File(folderName + "/oranges~~900").createNewFile()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.loadQueues()
qc.queueNames.sorted mustEqual List("apples", "oranges")
}
@@ -178,7 +178,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges").createNewFile()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
Stats.getCounter("queue_deletes")() mustEqual 0
qc.loadQueues()
qc.delete("oranges")
@@ -193,7 +193,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
"fanout queues" in {
"generate on the fly" in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.add("jobs", "job1".getBytes)
qc.remove("jobs+client1")() mustEqual None
qc.add("jobs", "job2".getBytes)
@@ -208,7 +208,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
new File(folderName + "/jobs").createNewFile()
new File(folderName + "/jobs+client1").createNewFile()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.loadQueues()
qc.add("jobs", "job1".getBytes)
qc.remove("jobs+client1")() must beSomeQItem("job1")
@@ -224,7 +224,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
new File(folderName + "/jobs").createNewFile()
new File(folderName + "/jobs+client1").createNewFile()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.loadQueues()
qc.add("jobs", "job1".getBytes)
@@ -246,7 +246,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
name = "jobs"
fanoutOnly = true
}
- qc = new QueueCollection(folderName, timer, scheduler, config, List(jobConfig))
+ qc = new QueueCollection(folderName, timer, scheduler, config, List(jobConfig), Nil)
qc.loadQueues()
qc.add("jobs", "job1".getBytes)
qc.remove("jobs")() mustEqual None
@@ -260,7 +260,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
withTempFolder {
Time.withCurrentTimeFrozen { time =>
new File(folderName + "/expired").createNewFile()
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
qc.loadQueues()
qc.add("expired", "hello".getBytes, Some(5.seconds.fromNow))
@@ -282,7 +282,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
name = "jobs"
expireToQueue = "expired"
}
- qc = new QueueCollection(folderName, timer, scheduler, config, List(expireConfig))
+ qc = new QueueCollection(folderName, timer, scheduler, config, List(expireConfig), Nil)
qc.loadQueues()
qc.add("jobs", "hello".getBytes, Some(1.second.fromNow))
qc.queue("jobs").get.length mustEqual 1
@@ -300,6 +300,71 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
}
}
+ "aliases" in {
+ "alias to a single queue" in {
+ withTempFolder {
+ val aliasConfig = new AliasBuilder() {
+ name = "nom-de-guerre"
+ destinationQueues = List("fromage")
+ }
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, List(aliasConfig))
+ qc.loadQueues()
+ qc.add("nom-de-guerre", "brie".getBytes)
+ qc.remove("fromage")() must beSomeQItem("brie")
+ }
+ }
+
+ "alias to multiple queues" in {
+ withTempFolder {
+ val aliasConfig = new AliasBuilder() {
+ name = "nom-de-guerre"
+ destinationQueues = List("fromage", "formaggio")
+ }
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, List(aliasConfig))
+ qc.loadQueues()
+ qc.add("nom-de-guerre", "brie".getBytes)
+ qc.remove("fromage")() must beSomeQItem("brie")
+ qc.remove("formaggio")() must beSomeQItem("brie")
+ }
+ }
+
+ "alias reads always return None" in {
+ withTempFolder {
+ val aliasConfig = new AliasBuilder() {
+ name = "nom-de-guerre"
+ destinationQueues = List("fromage")
+ }
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, List(aliasConfig))
+ qc.loadQueues()
+ qc.add("nom-de-guerre", "brie".getBytes)
+ qc.remove("nom-de-guerre")() must beNone
+ }
+ }
+
+ "log queue name/alias duplicates" in {
+ withTempFolder {
+ traceLogger(Level.WARNING)
+
+ val queueConfigs = List("q1", "q2", "q3") map { q =>
+ new QueueBuilder {
+ name = q
+ }
+ }
+ val aliasConfigs = List("a1", "q1", "q2") map { a =>
+ new AliasBuilder() {
+ name = a
+ destinationQueues = List("fromage")
+ }
+ }
+
+ qc = new QueueCollection(folderName, timer, scheduler, config, queueConfigs, aliasConfigs)
+ qc.loadQueues()
+
+ mustLog("queue name(s) masked by alias(es): q1, q2")
+ }
+ }
+ }
+
"non-existent queues" in {
val tests: Map[String, (QueueCollection, String) => Unit] =
Map("unremove" -> { (qc: QueueCollection, name: String) => qc.unremove(name, 100) },
@@ -311,7 +376,7 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
tests foreach { case (op, test) =>
"%s should not cause a queue to be created".format(op) in {
withTempFolder {
- qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ qc = new QueueCollection(folderName, timer, scheduler, config, Nil, Nil)
test(qc, "some_queue")
qc.queueNames mustEqual Nil
}
View
1  src/test/scala/net/lag/kestrel/ReadBehindSpec.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{CountDownLatch, ScheduledThreadPoolExecutor}
import scala.collection.mutable
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
+import com.twitter.logging.TestLogging
import com.twitter.util.{Duration, TempFolder, Time, Timer, TimerTask}
import org.specs.Specification
import config._
View
41 src/test/scala/net/lag/kestrel/ServerSpec.scala
@@ -23,7 +23,7 @@ import scala.collection.Map
import scala.util.Random
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
-import com.twitter.logging.Logger
+import com.twitter.logging.{Logger, TestLogging}
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.util.{TempFolder, Time}
import org.specs.Specification
@@ -46,8 +46,13 @@ class ServerSpec extends Specification with TempFolder with TestLogging {
maxItems = 1500000
maxAge = 1800.seconds
}
- kestrel = new Kestrel(defaultConfig, List(weatherUpdatesConfig), "localhost",
- Some(PORT), None, None, canonicalFolderName, None, None, 1, None)
+ val aliasWeatherUpdatesConfig = new AliasBuilder() {
+ name = "wx_updates"
+ destinationQueues = List("weather_updates")
+ }
+
+ kestrel = new Kestrel(defaultConfig, List(weatherUpdatesConfig), List(aliasWeatherUpdatesConfig),
+ "localhost", Some(PORT), None, None, canonicalFolderName, None, None, 1, None)
kestrel.start()
}
@@ -440,5 +445,35 @@ class ServerSpec extends Specification with TempFolder with TestLogging {
}
}
}
+
+ "use configured aliases" in {
+ withTempFolder {
+ makeServer()
+ val client = new TestClient("localhost", PORT)
+ client.set("wx_updates", "sunny and mild", 1)
+ client.get("weather_updates") mustEqual "sunny and mild"
+ }
+ }
+
+ "reload aliases" in {
+ withTempFolder {
+ makeServer()
+ new KestrelConfig {
+ queues = new QueueBuilder {
+ name = "weather_updates"
+ } :: new QueueBuilder {
+ name = "starship"
+ }
+ aliases = new AliasBuilder {
+ name = "wx_updates"
+ destinationQueues = List("weather_updates", "starship")
+ }
+ }.reload(kestrel)
+ val client = new TestClient("localhost", PORT)
+ client.set("wx_updates", "dark and stormy", 1)
+ client.get("weather_updates") mustEqual "dark and stormy"
+ client.get("starship") mustEqual "dark and stormy"
+ }
+ }
}
}
View
31 src/test/scala/net/lag/kestrel/TestLogging.scala
@@ -1,31 +0,0 @@
-/*
- * Copyright 2009 Twitter, Inc.
- * Copyright 2009 Robey Pointer <robeypointer@gmail.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License. You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package net.lag.kestrel
-
-import org.specs.Specification
-import com.twitter.logging.{Level, Logger}
-
-trait TestLogging { self: Specification =>
- val logLevel = Logger.levelNames(Option[String](System.getenv("log")).getOrElse("FATAL").toUpperCase)
-
- new SpecContext {
- beforeSpec {
- Logger.get("").setLevel(logLevel)
- }
- }
-}
Please sign in to comment.
Something went wrong with that request. Please try again.