Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

KEST-272: proper config inheritance; log queue/alias config on reload

RB_ID=87497
  • Loading branch information...
commit bf4ac4da5957d4de4c76be36b6d8634e6128379f 1 parent ea9d4da
@zuercher zuercher authored
View
10 ChangeLog
@@ -1,3 +1,13 @@
+2.3.4
+-----
+- modify configuration to allow inheritance by fanouts to masters, masters
+ to default.
+
+2.3.3
+-----
+- change kestrel.sh to use old PID file so that daemon-based installs can
+ be halted
+
2.3.2
-----
release: 23 Aug 2012
View
2  README.md
@@ -136,7 +136,7 @@ Configuration
Queue configuration is described in detail in `docs/guide.md` (an operational
guide). Scala docs for the config variables are
-[here](http://robey.github.com/kestrel/doc/main/api/net/lag/kestrel/config/KestrelConfig.html).
+[here](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html).
Performance
View
63 docs/guide.md
@@ -40,6 +40,9 @@ deleted with the "delete" command.
Configuration
-------------
+**NOTE:** Kestrel 2.3.4 introduces inheritance for queue configurations. For more
+information, see below.
+
The config files for kestrel are scala expressions loaded at runtime, usually
from `production.scala`, although you can use `development.scala` by passing
`-Dstage=development` to the java command line.
@@ -48,7 +51,7 @@ The config file evaluates to a `KestrelConfig` object that's used to configure
the server as a whole, a default queue, and any overrides for specific named
queues. The fields on `KestrelConfig` are documented here with their default
values:
-[KestrelConfig.html](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html)
+[KestrelConfig](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html)
To confirm the current configuration of each queue, send "dump_config" to
a server (which can be done over telnet).
@@ -62,11 +65,61 @@ Logging is configured according to `util-logging`. The logging configuration
syntax is described here:
[util-logging](https://github.com/twitter/util/blob/master/util-logging/README.markdown)
-Per-queue configuration is documented here:
-[QueueBuilder.html](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/QueueBuilder.html)
+Per-queue configuration options are documented here:
+[QueueBuilder](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/QueueBuilder.html)
+
+Queue alias configuration options are documented here:
+[AliasBuilder](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/AliasBuilder.html)
+
+Configuration Changes Starting in Kestrel 2.3.4
+-----------------------------------------------
+
+Starting with Kestrel 2.3.4, queue configurations are inherited:
+
+* Any queue with no explict configuration (see `queues` in `KestrelConfig`) uses the default
+ queue configuration (see `default` in `KestrelConfig`). This behavior is unchanged from
+ previous versions.
+* Any master (e.g. not fanout) queue with a queue configuration overrides the default queue
+ configuration. For example, if `default.maxMemorySize` is set, all explicitly configured
+ queues will inherit that setting *unless* explicitly overridden in the queue's configuration.
+ Older versions of Kestrel *did not* apply values from the default queue configuration to any
+ explicitly configured queue.
+* Any fanout queue (e.g., a queue with a `+` in its name), inherits its master queue's
+ configuration, unless explicitly overridden (see `queues` in `KestrelConfig`). Older versions
+ of Kestrel silently ignored explicit fanout queue configurations.
+
+### Example Configuration
+-------------------------
+
+Existing configurations should continue to load, but the resulting configuration may
+differ. As an example, the following configuration file and table illustrate the differences
+between a configuration loaded by Kestrel 2.3.3 and Kestrel 2.3.4 (and later).
+
+ new KestrelConfig {
+ default.maxMemorySize = 8.megabytes
+
+ queues = new QueueBuilder() {
+ name = "q"
+ maxItems = 500
+ } :: new QueueBuilder() {
+ name = "q+fanout"
+ maxAge = 1.minute
+ } :: new QueueBuilder() {
+ name = "x"
+ maxMemorySize = 16.megabytes
+ }
+ }
+
-Queue alias configuration is documented here:
-[AliasBuilder.html](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/AliasBuilder.html)
+<table>
+ <tr><th>Queue</th> <th>Setting</th> <th>Kestrel <= 2.3.3</th> <th>Kestrel >= 2.3.4</th> </tr>
+ <tr><td>q</td> <td>maxMemorySize</td> <td>128.megabytes</td> <td>8.megabytes</td> </tr>
+ <tr><td>q+fanout</td> <td>maxMemorySize</td> <td>128.megabytes</td> <td>8.megabytes</td> </tr>
+ <tr><td>x</td> <td>maxMemorySize</td> <td>16.megabytes</td> <td>16.megabytes</td> </tr>
+ <tr><td>q</td> <td>maxItems</td> <td>500</td> <td>500</td> </tr>
+ <tr><td>q+fanout</td> <td>maxItems</td> <td>500</td> <td>500</td> </tr>
+ <tr><td>q+fanout</td> <td>maxAge</td> <td>None</td> <td>Some(1.minute)</td> </tr>
+</table>
Full queues
View
37 src/main/scala/net/lag/kestrel/QueueCollection.scala
@@ -48,24 +48,34 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
private val aliases = new mutable.HashMap[String, AliasedQueue]
@volatile private var shuttingDown = false
- @volatile private var queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
+ @volatile private var queueBuilderMap = Map(queueBuilders.map { builder => (builder.name, builder) }: _*)
@volatile private var aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
private def checkNames {
- val duplicates = queueConfigMap.keySet & aliasConfigMap.keySet
+ val duplicates = queueBuilderMap.keySet & aliasConfigMap.keySet
if (!duplicates.isEmpty) {
log.warning("queue name(s) masked by alias(es): %s".format(duplicates.toList.sorted.mkString(", ")))
}
}
- private def buildQueue(name: String, realName: String, path: String) = {
- if ((realName contains ".") || (realName contains "/") || (realName contains "~")) {
+ private def getQueueConfig(name: String, masterName: Option[String] = None): QueueConfig = {
+ masterName match {
+ case Some(master) =>
+ val masterConfig = getQueueConfig(master)
+ queueBuilderMap.get(name).map { _.apply(Some(masterConfig)) }.getOrElse(masterConfig)
+ case None =>
+ queueBuilderMap.get(name).map { _.apply(Some(defaultQueueConfig)) }.getOrElse(defaultQueueConfig)
+ }
+ }
+
+ private def buildQueue(name: String, masterName: Option[String], path: String) = {
+ if ((name contains ".") || (name contains "/") || (name contains "~")) {
throw new Exception("Queue name contains illegal characters (one of: ~ . /).")
}
- val config = queueConfigMap.getOrElse(name, defaultQueueConfig)
- log.info("Setting up queue %s: %s", realName, config)
+ val config = getQueueConfig(name, masterName)
+ log.info("Setting up queue %s: %s", name, config)
Stats.incr("queue_creates")
- new PersistentQueue(realName, path, config, timer, journalSyncScheduler, Some(this.apply))
+ new PersistentQueue(name, path, config, timer, journalSyncScheduler, Some(this.apply))
}
// preload any queues
@@ -80,6 +90,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
aliases.get(name) match {
case Some(alias) =>
alias.config = config
+ log.info("Reloaded alias config %s: %s", name, config)
case None =>
log.info("Setting up alias %s: %s", name, config)
val alias = new AliasedQueue(name, config, this)
@@ -113,10 +124,12 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
newAliasBuilders: List[AliasBuilder]) {
defaultQueueConfig = newDefaultQueueConfig
queueBuilders = newQueueBuilders
- queueConfigMap = Map(queueBuilders.map { builder => (builder.name, builder()) }: _*)
+ queueBuilderMap = Map(queueBuilders.map { builder => (builder.name, builder) }: _*)
queues.foreach { case (name, queue) =>
- val configName = if (name contains '+') name.split('+')(0) else name
- queue.config = queueConfigMap.get(configName).getOrElse(defaultQueueConfig)
+ val masterName = if (name contains '+') Some(name.split('+')(0)) else None
+ val config = getQueueConfig(name, masterName)
+ queue.config = config
+ log.info("Reloaded queue config %s: %s", name, config)
}
aliasBuilders = newAliasBuilders
aliasConfigMap = Map(aliasBuilders.map { builder => (builder.name, builder()) }: _*)
@@ -139,12 +152,12 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
// only happens when creating a queue for the first time.
val q = if (name contains '+') {
val master = name.split('+')(0)
- val fanoutQ = buildQueue(master, name, path.getPath)
+ val fanoutQ = buildQueue(name, Some(master), path.getPath)
fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
log.info("Fanout queue %s added to %s", name, master)
fanoutQ
} else {
- buildQueue(name, name, path.getPath)
+ buildQueue(name, None, path.getPath)
}
q.setup
queues(name) = q
View
44 src/main/scala/net/lag/kestrel/config/AliasConfig.scala
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009 Twitter, Inc.
+ * Copyright 2009 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+package config
+
+case class AliasConfig(
+ destinationQueues: List[String]
+) {
+ override def toString() = {
+ ("destinationQueues=[%s]").format(destinationQueues.mkString(", "))
+ }
+}
+
+class AliasBuilder {
+ /**
+ * Name of the alias being configured.
+ */
+ var name: String = null
+
+ /**
+ * List of queues which receive items added to this alias.
+ */
+ var destinationQueues: List[String] = Nil
+
+ def apply() = {
+ AliasConfig(destinationQueues)
+ }
+}
+
View
59 src/main/scala/net/lag/kestrel/config/ConfigValue.scala
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+package config
+
+/**
+ * ConfigValue represents a value that may either be a default or a
+ * configuration-specified value. A ConfigValue may be resolved against
+ * another value.
+ */
+sealed abstract class ConfigValue[A](x: A) {
+ def isSpecified: Boolean
+ def get = x
+ def resolve(default: Option[A]): A = {
+ default match {
+ case Some(_) if isSpecified => get
+ case Some(v) => v
+ case None => get
+ }
+ }
+}
+
+/*
+ * SpecifiedValue is a ConfigValue specified in a configuration.
+ */
+case class SpecifiedValue[A](x: A) extends ConfigValue[A](x) {
+ def isSpecified = true
+}
+
+/*
+ * Default is a ConfigValue specified as a default.
+ */
+case class Default[A] private[config] (x: A) extends ConfigValue[A](x) {
+ def isSpecified = false
+}
+
+/**
+ * The ConfigValue companion object provides implicit methods to convert
+ * bare objects into SpecifiedValue instances.
+ */
+object ConfigValue {
+ implicit def wrap[A](x: A): ConfigValue[A] = SpecifiedValue(x)
+ implicit def wrapToOption[A](x: A): ConfigValue[Option[A]] = SpecifiedValue(Some(x))
+ implicit def wrapNone[A](x: None.type): ConfigValue[Option[A]] = SpecifiedValue(None)
+}
View
165 src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
@@ -18,174 +18,25 @@
package net.lag.kestrel
package config
-import com.twitter.conversions.storage._
-import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.logging.config._
import com.twitter.ostrich.admin.{RuntimeEnvironment, ServiceTracker}
import com.twitter.ostrich.admin.config._
-import com.twitter.util.{Config, Duration, StorageUnit}
-
-case class QueueConfig(
- maxItems: Int,
- maxSize: StorageUnit,
- maxItemSize: StorageUnit,
- maxAge: Option[Duration],
- defaultJournalSize: StorageUnit,
- maxMemorySize: StorageUnit,
- maxJournalSize: StorageUnit,
- discardOldWhenFull: Boolean,
- keepJournal: Boolean,
- syncJournal: Duration,
- expireToQueue: Option[String],
- maxExpireSweep: Int,
- fanoutOnly: Boolean,
- maxQueueAge: Option[Duration]
-) {
- override def toString() = {
- ("maxItems=%d maxSize=%s maxItemSize=%s maxAge=%s defaultJournalSize=%s maxMemorySize=%s " +
- "maxJournalSize=%s discardOldWhenFull=%s keepJournal=%s syncJournal=%s " +
- "expireToQueue=%s maxExpireSweep=%d fanoutOnly=%s maxQueueAge=%s").format(maxItems, maxSize,
- maxItemSize, maxAge, defaultJournalSize, maxMemorySize, maxJournalSize, discardOldWhenFull,
- keepJournal, syncJournal, expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
- }
-}
-
-class QueueBuilder extends Config[QueueConfig] {
- /**
- * Name of the queue being configured.
- */
- var name: String = null
-
- /**
- * Set a hard limit on the number of items this queue can hold. When the queue is full,
- * `discardOldWhenFull` dictates the behavior when a client attempts to add another item.
- */
- var maxItems: Int = Int.MaxValue
-
- /**
- * Set a hard limit on the number of bytes (of data in queued items) this queue can hold.
- * When the queue is full, discardOldWhenFull dictates the behavior when a client attempts
- * to add another item.
- */
- var maxSize: StorageUnit = Long.MaxValue.bytes
-
- /**
- * Set a hard limit on the number of bytes a single queued item can contain.
- * An add request for an item larger than this will be rejected.
- */
- var maxItemSize: StorageUnit = Long.MaxValue.bytes
-
- /**
- * Expiration time for items on this queue. Any item that has been sitting on the queue longer
- * than this duration will be discarded. Clients may also attach an expiration time when adding
- * items to a queue, but if the expiration time is longer than `maxAge`, `max_Age` will be
- * used instead.
- */
- var maxAge: Option[Duration] = None
-
- /**
- * If the queue is empty, truncate the journal when it reaches this size.
- */
- var defaultJournalSize: StorageUnit = 16.megabytes
-
- /**
- * Keep only this much of the queue in memory. The journal will be used to store backlogged
- * items, and they'll be read back into memory as the queue is drained. This setting is a release
- * valve to keep a backed-up queue from consuming all memory.
- */
- var maxMemorySize: StorageUnit = 128.megabytes
-
- /**
- * If the queue fits entirely in memory (see maxMemorySize) and the journal files get larger than
- * this, rebuild the journal.
- */
- var maxJournalSize: StorageUnit = 1.gigabyte
-
- /**
- * If this is false, when a queue is full, clients attempting to add another item will get an
- * error. No new items will be accepted. If this is true, old items will be discarded to make
- * room for the new one. This settting has no effect unless at least one of `maxItems` or
- * `maxSize` is set.
- */
- var discardOldWhenFull: Boolean = false
-
- /**
- * If false, don't keep a journal file for this queue. When kestrel exits, any remaining contents
- * in the queue will be lost.
- */
- var keepJournal: Boolean = true
-
- /**
- * How often to sync the journal file. To sync after every write, set this to `0.milliseconds`.
- * To never sync, set it to `Duration.MaxValue`. Syncing the journal will reduce the maximum
- * throughput of the server in exchange for a lower chance of losing data.
- */
- var syncJournal: Duration = Duration.MaxValue
-
- /**
- * Name of a queue to add expired items to. If set, expired items are added to the requested
- * queue as if by a `SET` command. This can be used to implement special processing for expired
- * items, or to implement a simple "delayed processing" queue.
- */
- var expireToQueue: Option[String] = None
-
- /**
- * Maximum number of expired items to move into the `expireToQueue` at once.
- */
- var maxExpireSweep: Int = Int.MaxValue
-
- /**
- * If true, don't actually store any items in this queue. Only deliver them to fanout client
- * queues.
- */
- var fanoutOnly: Boolean = false
-
- /**
- * Expiration time for the queue itself. If the queue is empty and older
- * than this value then we should delete it.
- */
- var maxQueueAge: Option[Duration] = None
-
- def apply() = {
- QueueConfig(maxItems, maxSize, maxItemSize, maxAge, defaultJournalSize, maxMemorySize,
- maxJournalSize, discardOldWhenFull, keepJournal, syncJournal,
- expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
- }
-}
-
-case class AliasConfig(
- destinationQueues: List[String]
-) {
- override def toString() = {
- ("destinationQueues=[%s]").format(destinationQueues.mkString(", "))
- }
-}
-
-class AliasBuilder extends Config[AliasConfig] {
- /**
- * Name of the alias being configured.
- */
- var name: String = null
-
- /**
- * List of queues which receive items added to this alias.
- */
- var destinationQueues: List[String] = Nil
-
- def apply() = {
- AliasConfig(destinationQueues)
- }
-}
+import com.twitter.util.{Config, Duration}
+/**
+ * KestrelConfig is the main point of configuration for Kestrel.
+ */
trait KestrelConfig extends ServerConfig[Kestrel] {
/**
- * Settings for a queue that isn't explicitly listed in `queues`.
+ * Default queue settings. Starting with Kestrel 2.3.4, queue settings are
+ * inherited. See QueueBuilder for more information.
*/
val default: QueueBuilder = new QueueBuilder
/**
- * Specific per-queue config.
+ * Specific per-queue config. Starting with Kestrel 2.3.4, queue settings are
+ * inherited. See QueueBuilder for more information.
*/
var queues: List[QueueBuilder] = Nil
View
187 src/main/scala/net/lag/kestrel/config/QueueConfig.scala
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009 Twitter, Inc.
+ * Copyright 2009 Robey Pointer <robeypointer@gmail.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel
+package config
+
+import com.twitter.conversions.storage._
+import com.twitter.util.{Duration, StorageUnit}
+
+case class QueueConfig(
+ maxItems: Int,
+ maxSize: StorageUnit,
+ maxItemSize: StorageUnit,
+ maxAge: Option[Duration],
+ defaultJournalSize: StorageUnit,
+ maxMemorySize: StorageUnit,
+ maxJournalSize: StorageUnit,
+ discardOldWhenFull: Boolean,
+ keepJournal: Boolean,
+ syncJournal: Duration,
+ expireToQueue: Option[String],
+ maxExpireSweep: Int,
+ fanoutOnly: Boolean,
+ maxQueueAge: Option[Duration]
+) {
+ override def toString() = {
+ ("maxItems=%d maxSize=%s maxItemSize=%s maxAge=%s defaultJournalSize=%s maxMemorySize=%s " +
+ "maxJournalSize=%s discardOldWhenFull=%s keepJournal=%s syncJournal=%s " +
+ "expireToQueue=%s maxExpireSweep=%d fanoutOnly=%s maxQueueAge=%s").format(maxItems, maxSize,
+ maxItemSize, maxAge, defaultJournalSize, maxMemorySize, maxJournalSize, discardOldWhenFull,
+ keepJournal, syncJournal, expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
+ }
+}
+
+/**
+ * QueueBuilder produces QueueConfig objects and can resolve the QueueConfig against
+ * a parent QueueConfig, inheriting the parent's values wherever the QueueBuilder's
+ * defaults are left unmodified. The default queue (see KestrelConfig) inherits
+ * exclusively from the defaults in QueueBuilder. Master queues inherit from the
+ * default queue. Fanout queues inherit from their master queue.
+ *
+ * When constructing a QueueBuilder, values may be overridden as follows:
+ * <code>
+ * import com.twitter.conversions.time._
+ *
+ * new QueueBuilder() {
+ * name = "the_queue_name"
+ * maxItems = 100
+ * maxAge = 1.day
+ * }
+ * </code>
+ *
+ * Objects are implicitly converted into SpecifiedValue instances. In the case of
+ * <code>maxAge</code> the Duration is also wrapped in an option (e.g.,
+ * <code>Some(1.day)</code>).
+ */
+class QueueBuilder {
+ import ConfigValue._
+
+ /**
+ * Name of the queue being configured.
+ */
+ var name: String = null
+
+ /**
+ * Set a hard limit on the number of items this queue can hold. When the queue is full,
+ * `discardOldWhenFull` dictates the behavior when a client attempts to add another item.
+ */
+ var maxItems: ConfigValue[Int] = Default(Int.MaxValue)
+
+ /**
+ * Set a hard limit on the number of bytes (of data in queued items) this queue can hold.
+ * When the queue is full, discardOldWhenFull dictates the behavior when a client attempts
+ * to add another item.
+ */
+ var maxSize: ConfigValue[StorageUnit] = Default(Long.MaxValue.bytes)
+
+ /**
+ * Set a hard limit on the number of bytes a single queued item can contain.
+ * An add request for an item larger than this will be rejected.
+ */
+ var maxItemSize: ConfigValue[StorageUnit] = Default(Long.MaxValue.bytes)
+
+ /**
+ * Expiration time for items on this queue. Any item that has been sitting on the queue longer
+ * than this duration will be discarded. Clients may also attach an expiration time when adding
+ * items to a queue, but if the expiration time is longer than `maxAge`, `max_Age` will be
+ * used instead.
+ */
+ var maxAge: ConfigValue[Option[Duration]] = Default(None)
+
+ /**
+ * If the queue is empty, truncate the journal when it reaches this size.
+ */
+ var defaultJournalSize: ConfigValue[StorageUnit] = Default(16.megabytes)
+
+ /**
+ * Keep only this much of the queue in memory. The journal will be used to store backlogged
+ * items, and they'll be read back into memory as the queue is drained. This setting is a release
+ * valve to keep a backed-up queue from consuming all memory.
+ */
+ var maxMemorySize: ConfigValue[StorageUnit] = Default(128.megabytes)
+
+ /**
+ * If the queue fits entirely in memory (see maxMemorySize) and the journal files get larger than
+ * this, rebuild the journal.
+ */
+ var maxJournalSize: ConfigValue[StorageUnit] = Default(1.gigabyte)
+
+ /**
+ * If this is false, when a queue is full, clients attempting to add another item will get an
+ * error. No new items will be accepted. If this is true, old items will be discarded to make
+ * room for the new one. This settting has no effect unless at least one of `maxItems` or
+ * `maxSize` is set.
+ */
+ var discardOldWhenFull: ConfigValue[Boolean] = Default(false)
+
+ /**
+ * If false, don't keep a journal file for this queue. When kestrel exits, any remaining contents
+ * in the queue will be lost.
+ */
+ var keepJournal: ConfigValue[Boolean] = Default(true)
+
+ /**
+ * How often to sync the journal file. To sync after every write, set this to `0.milliseconds`.
+ * To never sync, set it to `Duration.MaxValue`. Syncing the journal will reduce the maximum
+ * throughput of the server in exchange for a lower chance of losing data.
+ */
+ var syncJournal: ConfigValue[Duration] = Default(Duration.MaxValue)
+
+ /**
+ * Name of a queue to add expired items to. If set, expired items are added to the requested
+ * queue as if by a `SET` command. This can be used to implement special processing for expired
+ * items, or to implement a simple "delayed processing" queue.
+ */
+ var expireToQueue: ConfigValue[Option[String]] = Default(None)
+
+ /**
+ * Maximum number of expired items to move into the `expireToQueue` at once.
+ */
+ var maxExpireSweep: ConfigValue[Int] = Default(Int.MaxValue)
+
+ /**
+ * If true, don't actually store any items in this queue. Only deliver them to fanout client
+ * queues.
+ */
+ var fanoutOnly: ConfigValue[Boolean] = Default(false)
+
+ /**
+ * Expiration time for the queue itself. If the queue is empty and older
+ * than this value then we should delete it.
+ */
+ var maxQueueAge: ConfigValue[Option[Duration]] = Default(None)
+
+ def apply(): QueueConfig = apply(None)
+
+ def apply(parent: Option[QueueConfig]) = {
+ QueueConfig(maxItems.resolve(parent.map { _.maxItems }),
+ maxSize.resolve(parent.map { _.maxSize }),
+ maxItemSize.resolve(parent.map { _.maxItemSize }),
+ maxAge.resolve(parent.map { _.maxAge }),
+ defaultJournalSize.resolve(parent.map { _.defaultJournalSize }),
+ maxMemorySize.resolve(parent.map { _.maxMemorySize }),
+ maxJournalSize.resolve(parent.map { _.maxJournalSize }),
+ discardOldWhenFull.resolve(parent.map { _.discardOldWhenFull }),
+ keepJournal.resolve(parent.map { _.keepJournal }),
+ syncJournal.resolve(parent.map { _.syncJournal }),
+ expireToQueue.resolve(parent.map { _.expireToQueue }),
+ maxExpireSweep.resolve(parent.map { _.maxExpireSweep }),
+ fanoutOnly.resolve(parent.map { _.fanoutOnly }),
+ maxQueueAge.resolve(parent.map { _.maxQueueAge }))
+ }
+}
View
4 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -263,14 +263,14 @@ class PersistentQueueSpec extends Specification
maxMemorySize = 123.bytes
}.apply()
val q1 = new PersistentQueue("test1", folderName, config1, timer, scheduler)
- q1.config.maxJournalSize mustEqual new QueueBuilder().maxJournalSize
+ q1.config.maxJournalSize mustEqual new QueueBuilder().apply().maxJournalSize
q1.config.maxMemorySize mustEqual 123.bytes
val config2 = new QueueBuilder {
maxJournalSize = 123.bytes
}.apply()
val q2 = new PersistentQueue("test1", folderName, config2, timer, scheduler)
q2.config.maxJournalSize mustEqual 123.bytes
- q2.config.maxMemorySize mustEqual new QueueBuilder().maxMemorySize
+ q2.config.maxMemorySize mustEqual new QueueBuilder().apply().maxMemorySize
}
}
View
67 src/test/scala/net/lag/kestrel/QueueCollectionSpec.scala
@@ -413,5 +413,72 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
}
}
}
+
+ "configuration hierarchy" in {
+ "default queue should inherit from/override built-in default" in {
+ withTempFolder {
+ val builtInDefaultConfig = new QueueBuilder().apply()
+ val defaultQueueConfig = new QueueBuilder() {
+ maxItems = 100
+ }.apply()
+
+ qc = new QueueCollection(folderName, timer, scheduler, defaultQueueConfig, Nil, Nil)
+ val q = qc.queue("foo").get
+ q.config mustEqual builtInDefaultConfig.copy(maxItems = 100)
+ }
+ }
+
+ "queue should override inherit from/override default queue" in {
+ withTempFolder {
+ val builtInDefaultConfig = new QueueBuilder().apply()
+ val defaultQueueConfig = new QueueBuilder() {
+ maxItems = 100
+ maxSize = 100.bytes
+ }.apply()
+ val queueBuilder = new QueueBuilder() {
+ name = "foo"
+ maxItems = 200
+ maxItemSize = 200.bytes
+ }
+
+ qc = new QueueCollection(folderName, timer, scheduler, defaultQueueConfig, List(queueBuilder), Nil)
+ val q = qc.queue("foo").get
+ q.config mustEqual builtInDefaultConfig.copy(maxItems = 200, // override default queue
+ maxSize = 100.bytes, // inherit default queue
+ maxItemSize = 200.bytes) // override built-in
+ }
+ }
+
+ "fanout queue should inherit from/override master queue" in {
+ withTempFolder {
+ val builtInDefaultConfig = new QueueBuilder().apply()
+ val defaultQueueConfig = new QueueBuilder() {
+ maxItems = 100
+ maxSize = 100.bytes
+ maxJournalSize = 100.bytes
+ }.apply()
+ val queueBuilders = List(
+ new QueueBuilder() {
+ name = "foo"
+ maxItems = 200
+ maxItemSize = 200.bytes
+ },
+ new QueueBuilder() {
+ name = "foo+fanout"
+ maxItems = 300
+ defaultJournalSize = 300.bytes
+ maxJournalSize = 300.bytes
+ })
+
+ qc = new QueueCollection(folderName, timer, scheduler, defaultQueueConfig, queueBuilders, Nil)
+ val q = qc.queue("foo+fanout").get
+ q.config mustEqual builtInDefaultConfig.copy(maxItems = 300, // override master queue
+ maxItemSize = 200.bytes, // inherit master queue
+ maxJournalSize = 300.bytes, // override default queue
+ maxSize = 100.bytes, // inherit default queue
+ defaultJournalSize = 300.bytes) // override built-in
+ }
+ }
+ }
}
}
View
58 src/test/scala/net/lag/kestrel/config/ConfigValueSpec.scala
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel.config
+
+import org.specs.Specification
+
+class ConfigValueSpec extends Specification {
+ "Default" should {
+ "return a default value" in {
+ Default(1).get mustEqual 1
+ }
+
+ "indicate a default value" in {
+ Default(1).isSpecified mustEqual false
+ }
+ }
+
+ "SpecifiedValue" should {
+ "return a specified value" in {
+ SpecifiedValue(10).get mustEqual 10
+ }
+
+ "indicate a specified value" in {
+ SpecifiedValue(10).isSpecified mustEqual true
+ }
+ }
+
+ "ConfigValue" should {
+ "resolve against a sequence of ancestors" in {
+ "when value is specified" in {
+ SpecifiedValue(10).resolve(Some(1)) mustEqual 10
+ }
+
+ "when value is not specified" in {
+ Default(10).resolve(Some(1)) mustEqual 1
+ }
+
+ "when no default is given" in {
+ SpecifiedValue(10).resolve(None) mustEqual 10
+ Default(10).resolve(None) mustEqual 10
+ }
+ }
+ }
+}
View
71 src/test/scala/net/lag/kestrel/config/QueueConfigSpec.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.lag.kestrel.config
+
+import com.twitter.conversions.time._
+import com.twitter.conversions.storage._
+import org.specs.Specification
+
+class QueueConfigSpec extends Specification {
+ "QueueBuilder" should {
+ val sampleConfig =
+ QueueConfig(1, 1.byte, 1.byte, None, 1.byte, 1.byte, 1.byte, false, false, 0.seconds, None, 1, false, None)
+
+ val unconfiguredBuilder = new QueueBuilder()
+ val configSettings = unconfiguredBuilder.getClass.getMethods.filter {
+ _.getReturnType() == classOf[ConfigValue[_]]
+ }.map { m => (m.getName, m) }
+
+ "have the correct number of settings" in {
+ // e.g. one per field in QueueConfig
+ configSettings.size mustEqual sampleConfig.productArity
+ }
+
+ // Guarantees that the implicit functions which convert an object to a SpecifiedValue
+ // are not invoked by the QueueBuilder itself (only configs should use the implicit
+ // conversions).
+ configSettings.foreach { case (name, method) =>
+ "unconfigured QueueBuilder returns default value for %s".format(name) in {
+ val value = method.invoke(unconfiguredBuilder)
+ (value.getClass == classOf[Default[_]]) mustEqual true
+ }
+ }
+
+ "implicitly convert overridden values directly to SpecifiedValue[_] instances" in {
+ val builder = new QueueBuilder() {
+ maxItems = 100
+ maxSize = 1000.bytes
+ maxAge = Some(1.day)
+ }
+
+ builder.maxItems mustEqual SpecifiedValue(100)
+ builder.maxSize mustEqual SpecifiedValue(1000.bytes)
+ builder.maxAge mustEqual SpecifiedValue(Some(1.day))
+ }
+
+ "implicitly convert overridden values to SpecifiedValue[Option[_]] instances" in {
+ val builder = new QueueBuilder() {
+ maxAge = 1.day
+ maxQueueAge = None
+ }
+
+ builder.maxAge mustEqual SpecifiedValue(Some(1.day))
+ builder.maxQueueAge mustEqual SpecifiedValue(None)
+ }
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.