Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
introduce queue aliases (including stats)
Browse files Browse the repository at this point in the history
RB_ID=75040
  • Loading branch information
Stephan Zuercher committed Jul 12, 2012
1 parent 886a997 commit ff976b0
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 92 deletions.
8 changes: 8 additions & 0 deletions config/development.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ new KestrelConfig {
syncJournal = 10.milliseconds
}

aliases = new AliasBuilder {
name = "wx_updates"
destinationQueues = List("weather_updates")
} :: new AliasBuilder {
name = "spam_all"
destinationQueues = List("spam", "spam0")
}

loggers = new LoggerConfig {
level = Level.INFO
handlers = new FileHandlerConfig {
Expand Down
8 changes: 8 additions & 0 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ is created, and it will start receiving new items written to the parent queue.
Existing items are not copied over. A fanout queue can be deleted to stop it
from receiving new items.

Queue Aliases
-------------

Queue aliases are somewhat similar to fanout queues, but without a required
naming convention or implicit creation of child queues. A queue alias can
only be used in set operations. Kestrel responds to attempts to retrieve
items from the alias as if it were an empty queue. Delete and flush requests
are also ignored.

Thrift protocol
---------------
Expand Down
66 changes: 66 additions & 0 deletions src/main/scala/net/lag/kestrel/AliasedQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2012 Twitter, Inc.
* Copyright 2012 Robey Pointer <robeypointer@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.lag.kestrel

import com.twitter.ostrich.stats.Stats
import com.twitter.util.Time
import java.util.concurrent.atomic.AtomicLong
import config._

class AliasedQueue(val name: String, @volatile var config: AliasConfig,
queueLookup: (String => Option[PersistentQueue])) {

def statNamed(statName: String) = "q/" + name + "/" + statName

// # of items EVER added to the alias:
val putItems = new AtomicLong(0)
Stats.removeCounter(statNamed("put_items"))
Stats.makeCounter(statNamed("put_items"), putItems)

// # of bytes EVER added to the alias:
val putBytes = new AtomicLong(0)
Stats.removeCounter(statNamed("put_bytes"))
Stats.makeCounter(statNamed("put_bytes"), putBytes)

val createTime: Long = Time.now.inSeconds
Stats.addGauge(statNamed("create_time"))(createTime)

/**
* Add a value to the end of the aliased queue(s).
*/
def add(value: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
putItems.getAndIncrement()
putBytes.getAndAdd(value.length)

config.destinationQueues.foldLeft(true) { case (result, name) =>
val thisResult = queueLookup(name) match {
case Some(q) => q.add(value, expiry, None, addTime)
case None => true
}
result && thisResult
}
}

def dumpStats(): Array[(String, String)] = synchronized {
Array(
("put_items", putItems.toString),
("put_bytes", putBytes.toString),
("children", config.destinationQueues.size.toString)
)
}
}
9 changes: 5 additions & 4 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.jboss.netty.util.{HashedWheelTimer, Timer => NettyTimer}
import scala.collection.{immutable, mutable}
import config._

class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], aliases: List[AliasBuilder],
listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int],
thriftListenPort: Option[Int], queuePath: String,
expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration],
Expand Down Expand Up @@ -128,7 +128,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],

try {
queueCollection = new QueueCollection(queuePath, new FinagleTimer(timer), journalSyncScheduler,
defaultQueueConfig, builders)
defaultQueueConfig, builders, aliases)
queueCollection.loadQueues()
} catch {
case e: InaccessibleQueuePath =>
Expand Down Expand Up @@ -216,8 +216,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
}
}

def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
queueCollection.reload(newDefaultQueueConfig, newQueueBuilders)
def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder],
newAliasBuilders: List[AliasBuilder]) {
queueCollection.reload(newDefaultQueueConfig, newQueueBuilders, newAliasBuilders)
}
}

Expand Down
115 changes: 91 additions & 24 deletions src/main/scala/net/lag/kestrel/QueueCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import com.twitter.util.{Duration, Future, Time, Timer}
import config._

class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
class UndefinedAlias(name: String) extends Exception("Undefined alias: %s".format(name))

class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: ScheduledExecutorService,
@volatile private var defaultQueueConfig: QueueConfig,
@volatile var queueBuilders: List[QueueBuilder]) {
@volatile var queueBuilders: List[QueueBuilder],
@volatile var aliasBuilders: List[AliasBuilder]) {
private val log = Logger.get(getClass.getName)

private val path = new File(queueFolder)
Expand All @@ -44,9 +46,18 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S

private val queues = new mutable.HashMap[String, PersistentQueue]
private val fanout_queues = new mutable.HashMap[String, mutable.HashSet[String]]
private val aliases = new mutable.HashMap[String, AliasedQueue]
@volatile private var shuttingDown = false

@volatile private var queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
@volatile private var aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)

private def checkNames {
val duplicates = queueConfigMap.keySet & aliasConfigMap.keySet
if (!duplicates.isEmpty) {
log.warning("queue name(s) masked by alias(es): %s".format(duplicates.toList.sorted.mkString(", ")))
}
}

private def buildQueue(name: String, realName: String, path: String) = {
if ((realName contains ".") || (realName contains "/") || (realName contains "~")) {
Expand All @@ -61,12 +72,36 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
// preload any queues
def loadQueues() {
Journal.getQueueNamesFromFolder(path) map { queue(_) }
createAliases()
}

def createAliases(): Unit = synchronized {
checkNames
aliasConfigMap.foreach { case (name, config) =>
aliases.get(name) match {
case Some(alias) =>
alias.config = config
case None =>
log.info("Setting up alias %s: %s", name, config)
val alias = new AliasedQueue(name, config, this.apply)
aliases(name) = alias
}
}
}

def queueNames: List[String] = synchronized {
queues.keys.toList
def queueNames(excludeAliases: Boolean): List[String] = {
val names = synchronized {
if (excludeAliases) {
queues.keys
} else {
queues.keys ++ aliases.keys
}
}
names.toList
}

def queueNames: List[String] = queueNames(false)

def currentItems = queues.values.foldLeft(0L) { _ + _.length }
def currentBytes = queues.values.foldLeft(0L) { _ + _.bytes }
def reservedMemoryRatio = {
Expand All @@ -75,14 +110,18 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
lazy val systemMaxHeapBytes = Runtime.getRuntime.maxMemory

def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder]) {
def reload(newDefaultQueueConfig: QueueConfig, newQueueBuilders: List[QueueBuilder],
newAliasBuilders: List[AliasBuilder]) {
defaultQueueConfig = newDefaultQueueConfig
queueBuilders = newQueueBuilders
queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
queues.foreach { case (name, queue) =>
val configName = if (name contains '+') name.split('+')(0) else name
queue.config = queueConfigMap.get(configName).getOrElse(defaultQueueConfig)
}
aliasBuilders = newAliasBuilders
aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
createAliases()
}

/**
Expand All @@ -97,7 +136,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
if (shuttingDown) {
None
} else if (create) {
Some(queues.get(name) getOrElse {
queues.get(name) orElse {
// only happens when creating a queue for the first time.
val q = if (name contains '+') {
val master = name.split('+')(0)
Expand All @@ -110,32 +149,48 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
q.setup
queues(name) = q
q
})
Some(q)
}
} else {
queues.get(name)
}
}

def apply(name: String) = queue(name)

/**
* Get an alias, creating it if necessary.
*/
def alias(name: String): Option[AliasedQueue] = synchronized {
if (shuttingDown) {
None
} else {
aliases.get(name)
}
}

/**
* Add an item to a named queue. Will not return until the item has been synchronously added
* and written to the queue journal file.
*
* @return true if the item was added; false if the server is shutting down
*/
def add(key: String, item: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
for (fanouts <- fanout_queues.get(key); name <- fanouts) {
add(name, item, expiry, addTime)
}
alias(key) match {
case Some(alias) =>
alias.add(item, expiry, addTime)
case None =>
for (fanouts <- fanout_queues.get(key); name <- fanouts) {
add(name, item, expiry, addTime)
}

queue(key) match {
case None => false
case Some(q) =>
val result = q.add(item, expiry, None, addTime)
if (result) Stats.incr("total_items")
result
queue(key) match {
case None => false
case Some(q) =>
val result = q.add(item, expiry, None, addTime)
if (result) Stats.incr("total_items")
result
}
}
}

Expand All @@ -147,6 +202,11 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
* the requested time, or the server is shutting down, None is passed.
*/
def remove(key: String, deadline: Option[Time] = None, transaction: Boolean = false, peek: Boolean = false): Future[Option[QItem]] = {
if (alias(key).isDefined) {
// make remove from alias return "no items"
return Future.value(None)
}

queue(key) match {
case None =>
Future.value(None)
Expand Down Expand Up @@ -205,7 +265,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
}

def expireQueue(name: String): Unit = {
def expireQueue(name: String): Unit = synchronized {
if (!shuttingDown) {
queues.get(name) map { q =>
if (q.isReadyForExpiration) {
Expand All @@ -218,18 +278,25 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}

def flushAllExpired(limit: Boolean = false): Int = {
queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
queueNames(true).foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
}

def deleteExpiredQueues(): Unit = {
queueNames.map { qName => expireQueue(qName) }
queueNames(true).map { qName => expireQueue(qName) }
}

def stats(key: String): Array[(String, String)] = queue(key, false) match {
case None => Array[(String, String)]()
case Some(q) =>
q.dumpStats() ++
fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
def stats(key: String): Array[(String, String)] = {
queue(key, false) match {
case Some(q) =>
q.dumpStats() ++
fanout_queues.get(key).map { qset => ("children", qset.mkString(",")) }.toList
case None =>
// check for alias under this name
alias(key) match {
case Some(a) => a.dumpStats()
case None => Array[(String, String)]()
}
}
}

/**
Expand Down
Loading

0 comments on commit ff976b0

Please sign in to comment.