Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'release-2.3.4'
Browse files Browse the repository at this point in the history
Conflicts:
	ChangeLog
	project/Build.scala
	src/main/scala/net/lag/kestrel/KestrelHandler.scala
	src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
	src/test/scala/net/lag/kestrel/ThriftHandlerSpec.scala
  • Loading branch information
Stephan Zuercher committed Sep 21, 2012
2 parents 939af21 + 3e1c887 commit f935206
Show file tree
Hide file tree
Showing 19 changed files with 736 additions and 285 deletions.
10 changes: 10 additions & 0 deletions ChangeLog
Expand Up @@ -7,6 +7,16 @@ release: TBD
- Kestrel now ignores directories in its queue path.
- Fix load test scripts to use correct JAR name [Bryan English]

2.3.4
-----
- modify configuration to allow inheritance by fanouts to masters, masters
to default.

2.3.3
-----
- change kestrel.sh to use old PID file so that daemon-based installs can
be halted

2.3.2
-----
release: 23 Aug 2012
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -136,7 +136,7 @@ Configuration

Queue configuration is described in detail in `docs/guide.md` (an operational
guide). Scala docs for the config variables are
[here](http://robey.github.com/kestrel/doc/main/api/net/lag/kestrel/config/KestrelConfig.html).
[here](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html).


Performance
Expand Down
63 changes: 58 additions & 5 deletions docs/guide.md
Expand Up @@ -42,6 +42,9 @@ deleted with the "delete" command.
Configuration
-------------

**NOTE:** Kestrel 2.3.4 introduces inheritance for queue configurations. For more
information, see below.

The config files for kestrel are scala expressions loaded at runtime, usually
from `production.scala`, although you can use `development.scala` by passing
`-Dstage=development` to the java command line.
Expand All @@ -50,7 +53,7 @@ The config file evaluates to a `KestrelConfig` object that's used to configure
the server as a whole, a default queue, and any overrides for specific named
queues. The fields on `KestrelConfig` are documented here with their default
values:
[KestrelConfig.html](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html)
[KestrelConfig](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/KestrelConfig.html)

To confirm the current configuration of each queue, send "dump_config" to
a server (which can be done over telnet).
Expand All @@ -64,11 +67,61 @@ Logging is configured according to `util-logging`. The logging configuration
syntax is described here:
[util-logging](https://github.com/twitter/util/blob/master/util-logging/README.markdown)

Per-queue configuration is documented here:
[QueueBuilder.html](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/QueueBuilder.html)
Per-queue configuration options are documented here:
[QueueBuilder](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/QueueBuilder.html)

Queue alias configuration options are documented here:
[AliasBuilder](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/AliasBuilder.html)

Configuration Changes Starting in Kestrel 2.3.4
-----------------------------------------------

Starting with Kestrel 2.3.4, queue configurations are inherited:

* Any queue with no explict configuration (see `queues` in `KestrelConfig`) uses the default
queue configuration (see `default` in `KestrelConfig`). This behavior is unchanged from
previous versions.
* Any master (e.g. not fanout) queue with a queue configuration overrides the default queue
configuration. For example, if `default.maxMemorySize` is set, all explicitly configured
queues will inherit that setting *unless* explicitly overridden in the queue's configuration.
Older versions of Kestrel *did not* apply values from the default queue configuration to any
explicitly configured queue.
* Any fanout queue (e.g., a queue with a `+` in its name), inherits its master queue's
configuration, unless explicitly overridden (see `queues` in `KestrelConfig`). Older versions
of Kestrel silently ignored explicit fanout queue configurations.

### Example Configuration
-------------------------

Existing configurations should continue to load, but the resulting configuration may
differ. As an example, the following configuration file and table illustrate the differences
between a configuration loaded by Kestrel 2.3.3 and Kestrel 2.3.4 (and later).

new KestrelConfig {
default.maxMemorySize = 8.megabytes

queues = new QueueBuilder() {
name = "q"
maxItems = 500
} :: new QueueBuilder() {
name = "q+fanout"
maxAge = 1.minute
} :: new QueueBuilder() {
name = "x"
maxMemorySize = 16.megabytes
}
}


Queue alias configuration is documented here:
[AliasBuilder.html](http://robey.github.com/kestrel/api/main/api/net/lag/kestrel/config/AliasBuilder.html)
<table>
<tr><th>Queue</th> <th>Setting</th> <th>Kestrel <= 2.3.3</th> <th>Kestrel >= 2.3.4</th> </tr>
<tr><td>q</td> <td>maxMemorySize</td> <td>128.megabytes</td> <td>8.megabytes</td> </tr>
<tr><td>q+fanout</td> <td>maxMemorySize</td> <td>128.megabytes</td> <td>8.megabytes</td> </tr>
<tr><td>x</td> <td>maxMemorySize</td> <td>16.megabytes</td> <td>16.megabytes</td> </tr>
<tr><td>q</td> <td>maxItems</td> <td>500</td> <td>500</td> </tr>
<tr><td>q+fanout</td> <td>maxItems</td> <td>500</td> <td>500</td> </tr>
<tr><td>q+fanout</td> <td>maxAge</td> <td>None</td> <td>Some(1.minute)</td> </tr>
</table>


Full queues
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/net/lag/kestrel/AliasedQueue.scala
Expand Up @@ -42,12 +42,12 @@ class AliasedQueue(val name: String, @volatile var config: AliasConfig, queueCol
/**
* Add a value to the end of the aliased queue(s).
*/
def add(value: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
def add(value: Array[Byte], expiry: Option[Time], addTime: Time, clientDescription: Option[() => String]): Boolean = {
putItems.getAndIncrement()
putBytes.getAndAdd(value.length)

config.destinationQueues.foldLeft(true) { case (result, name) =>
val thisResult = queueCollection.add(name, value, expiry, addTime)
val thisResult = queueCollection.add(name, value, expiry, addTime, clientDescription)
result && thisResult
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Expand Up @@ -185,12 +185,11 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder], ali
// optionally, start a periodic timer to clean out expired items.
expirationBackgroundProcess = expirationTimerFrequency.map { period =>
log.info("Starting up background expiration task.")
val taskDesc = Some(() => "<background expiration task>")
val proc = new PeriodicBackgroundProcess("background-expiration", period) {
def periodic() {
val expired = Kestrel.this.queueCollection.flushAllExpired(true)
if (expired > 0) {
log.info("Expired %d item(s) from queues automatically.", expired)
}
Kestrel.this.queueCollection.flushAllExpired(true, taskDesc)

// Now that we've cleaned out the queue, lets see if any of them are
// ready to be expired.
Kestrel.this.queueCollection.deleteExpiredQueues()
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/net/lag/kestrel/KestrelHandler.scala
Expand Up @@ -150,7 +150,7 @@ abstract class KestrelHandler(

def flushAllQueues() {
checkBlockWrites("flushAll", "<all>")
queues.queueNames.foreach { qName => queues.flush(qName) }
queues.queueNames.foreach { qName => queues.flush(qName, Some(clientDescription)) }
}

protected def countPendingReads(key: String): Int
Expand All @@ -175,7 +175,7 @@ abstract class KestrelHandler(
f(None, None)
} else {
Stats.incr("cmd_monitor_get")
queues.remove(key, timeLimit, opening, false).onSuccess {
queues.remove(key, timeLimit, opening, false, Some(clientDescription)).onSuccess {
case None =>
f(None, None)
case x @ Some(item) =>
Expand Down Expand Up @@ -205,7 +205,7 @@ abstract class KestrelHandler(
Stats.incr("cmd_get")
}
val startTime = Time.now
val future = queues.remove(key, timeout, opening, peeking)
val future = queues.remove(key, timeout, opening, peeking, Some(clientDescription))
waitingFor = Some(future)
future.map { itemOption =>
waitingFor = None
Expand All @@ -231,7 +231,7 @@ abstract class KestrelHandler(
log.debug("set -> q=%s flags=%d expiry=%s size=%d", key, flags, expiry, data.length)
Stats.incr("cmd_set")
val (rv, nsec) = Duration.inNanoseconds {
queues.add(key, data, expiry, Time.now)
queues.add(key, data, expiry, Time.now, Some(clientDescription))
}
Stats.addMetric("set_latency_usec", nsec.inMicroseconds.toInt)
Stats.addMetric("q/" + key + "/set_latency_usec", nsec.inMicroseconds.toInt)
Expand All @@ -241,19 +241,19 @@ abstract class KestrelHandler(
def flush(key: String) {
checkBlockWrites("flush", key)
log.debug("flush -> q=%s", key)
queues.flush(key)
queues.flush(key, Some(clientDescription))
}

def delete(key: String) {
checkBlockWrites("delete", key)
log.debug("delete -> q=%s", key)
queues.delete(key)
queues.delete(key, Some(clientDescription))
}

def flushExpired(key: String) = {
checkBlockWrites("flushExpired", key)
log.debug("flush_expired -> q=%s", key)
queues.flushExpired(key)
queues.flushExpired(key, clientDescription = Some(clientDescription))
}

private def withServerStatus[T](f: (ServerStatus) => T): T = {
Expand Down

0 comments on commit f935206

Please sign in to comment.